diff --git a/App/Controllers/ImageController.cs b/App/Controllers/ImageController.cs index d2fda76..e04cfb0 100644 --- a/App/Controllers/ImageController.cs +++ b/App/Controllers/ImageController.cs @@ -6,7 +6,11 @@ namespace StitchATon2.App.Controllers; public static class ImageController { - public static async Task GenerateImage(HttpResponse response, GenerateImageDto dto, TileManager tileManager) + public static async Task GenerateImage( + HttpResponse response, + GenerateImageDto dto, + TileManager tileManager, + CancellationToken cancellationToken) { if (dto.GetErrors() is { Count: > 0 } errors) { @@ -14,7 +18,7 @@ public static class ImageController response.ContentType = "text/json"; var errorBody = JsonSerializer.Serialize(errors, AppJsonSerializerContext.Default.DictionaryStringListString); response.ContentLength = errorBody.Length; - await response.WriteAsync(errorBody); + await response.WriteAsync(errorBody, cancellationToken: cancellationToken); await response.CompleteAsync(); return; } @@ -24,12 +28,15 @@ public static class ImageController await tileManager .CreateSection(dto) - .DangerousWriteToPipe(response.BodyWriter, dto.OutputScale); + .DangerousWriteToPipe(response.BodyWriter, dto.OutputScale, cancellationToken); await response.CompleteAsync(); } - public static async Task GenerateRandomImage(HttpResponse response, TileManager tileManager) + public static async Task GenerateRandomImage( + HttpResponse response, + TileManager tileManager, + CancellationToken cancellationToken) { response.StatusCode = 200; response.ContentType = "image/png"; @@ -47,7 +54,7 @@ public static class ImageController var scale = float.Clamp(480f / int.Max(section.Width, section.Height), 0.01f, 1f); Console.WriteLine($"Generate random image for {coordinatePair} scale: {scale}"); - await section.DangerousWriteToPipe(response.BodyWriter, scale); + await section.DangerousWriteToPipe(response.BodyWriter, scale, cancellationToken); await response.CompleteAsync(); } } \ No newline at end of file diff --git a/App/Utils.cs b/App/Utils.cs index 2b96e01..edc1087 100644 --- a/App/Utils.cs +++ b/App/Utils.cs @@ -21,9 +21,13 @@ public static class Utils await imageCreator.WriteToStream(stream, scale!.Value); } - public static async Task DangerousWriteToPipe(this GridSection section, PipeWriter pipeWriter, float? scale) + public static async Task DangerousWriteToPipe( + this GridSection section, + PipeWriter pipeWriter, + float? scale, + CancellationToken cancellationToken = default) { var imageCreator = new DangerousImageCreator(section); - await imageCreator.WriteToPipe(pipeWriter, scale!.Value); + await imageCreator.WriteToPipe(pipeWriter, scale!.Value, cancellationToken); } } \ No newline at end of file diff --git a/Domain/ImageCreators/DangerousImageCreator.cs b/Domain/ImageCreators/DangerousImageCreator.cs index 2346479..ebf6441 100644 --- a/Domain/ImageCreators/DangerousImageCreator.cs +++ b/Domain/ImageCreators/DangerousImageCreator.cs @@ -57,6 +57,12 @@ public sealed class DangerousImageCreator : IDisposable using var yEndMap = MemoryAllocator.Allocate(targetWidth); var yStart = OffsetY; + + var pxInt32 = Int32Pixel.Zero; + ref var px = ref pxInt32; + ref var rChannel = ref Unsafe.As(ref px); + ref var gChannel = ref Unsafe.Add(ref rChannel, 4); + ref var bChannel = ref Unsafe.Add(ref rChannel, 8); var outputTaskQueue = TaskHelper.SynchronizedTaskFactory.StartNew(() => { }, cancellationToken); for (var y = 0; y < targetHeight; y++) @@ -75,15 +81,10 @@ public sealed class DangerousImageCreator : IDisposable } int xStart = OffsetX, x0 = 0; - - var pxInt32 = Int32Pixel.Zero; - ref var px = ref pxInt32; - ref var rChannel = ref Unsafe.As(ref px); - ref var gChannel = ref Unsafe.Add(ref rChannel, 4); - ref var bChannel = ref Unsafe.Add(ref rChannel, 8); var outputBuffer = MemoryAllocator.Allocate(outputBufferSize); ref var outputChannel = ref outputBuffer.Span[0]; + var boxHeight = yEnd - yStart; for (int x1 = 0; x1 < targetWidth; x1++) { var xEnd = xLookup[x1]; @@ -92,7 +93,7 @@ public sealed class DangerousImageCreator : IDisposable px += yStartMap[x0]; px -= yEndMap[x0]; px -= yStartMap[x1]; - px /= Math.Max(1, (xEnd - xStart) * (yEnd - yStart)); + px /= Math.Max(1, (xEnd - xStart) * boxHeight); outputChannel = rChannel; outputChannel = ref Unsafe.Add(ref outputChannel, 1); @@ -108,13 +109,16 @@ public sealed class DangerousImageCreator : IDisposable } outputTaskQueue = outputTaskQueue - .ContinueWith(_ => encoder.WriteData(outputBuffer, cancellationToken: cancellationToken), cancellationToken); + .ContinueWith(async _ => + { + await encoder.WriteDataAsync(outputBuffer, cancellationToken: cancellationToken); + }, cancellationToken); yStart = yEnd; } await outputTaskQueue; - encoder.WriteEndOfFile(cancellationToken); + await encoder.WriteEndOfFileAsync(cancellationToken); } private void MapRow( diff --git a/Infra/Buffers/ArrayOwner.cs b/Infra/Buffers/ArrayOwner.cs index 2c17cb5..f1e30fb 100644 --- a/Infra/Buffers/ArrayOwner.cs +++ b/Infra/Buffers/ArrayOwner.cs @@ -7,10 +7,11 @@ public class ArrayOwner : IBuffer where T : unmanaged private readonly ArrayPool _owner; private readonly T[] _buffer; - public ArrayOwner(ArrayPool owner, int size) + public ArrayOwner(ArrayPool owner, int length) { _owner = owner; - _buffer = owner.Rent(size); + _buffer = owner.Rent(length); + Length = length; } ~ArrayOwner() => Dispose(); @@ -26,4 +27,6 @@ public class ArrayOwner : IBuffer where T : unmanaged public Span Span => _buffer; public T[] Array => _buffer; + + public int Length { get; } } \ No newline at end of file diff --git a/Infra/Buffers/IBuffer.cs b/Infra/Buffers/IBuffer.cs index 2eff74b..de13308 100644 --- a/Infra/Buffers/IBuffer.cs +++ b/Infra/Buffers/IBuffer.cs @@ -5,4 +5,6 @@ public interface IBuffer : IDisposable where T : unmanaged ref T this[int index] { get; } Span Span { get; } + + int Length { get; } } \ No newline at end of file diff --git a/Infra/Buffers/MemoryAllocator.cs b/Infra/Buffers/MemoryAllocator.cs index 75787f0..0d40d63 100644 --- a/Infra/Buffers/MemoryAllocator.cs +++ b/Infra/Buffers/MemoryAllocator.cs @@ -1,4 +1,5 @@ using System.Buffers; +using System.Runtime.CompilerServices; namespace StitchATon2.Infra.Buffers; @@ -15,4 +16,29 @@ public static class MemoryAllocator public static MemoryManager AllocateImmovable(int count) where T : unmanaged => new ImmovableMemory(count); + + public static unsafe IBuffer Clone(this IBuffer buffer) where T : unmanaged + { + if (buffer is UnmanagedMemory unmanagedMemory) + { + var newBuffer = new UnmanagedMemory(buffer.Length); + var byteCount = (uint)(Unsafe.SizeOf() * buffer.Length); + Unsafe.CopyBlock(newBuffer.Pointer, unmanagedMemory.Pointer, byteCount); + return newBuffer; + } + + throw new NotSupportedException(); + } + + public static unsafe void Copy(this IBuffer source, IBuffer destination, int count) where T : unmanaged + { + if (source is UnmanagedMemory sourceBuffer && destination is UnmanagedMemory destinationBuffer) + { + var byteCount = (uint)(Unsafe.SizeOf() * count); + Unsafe.CopyBlock(destinationBuffer.Pointer, sourceBuffer.Pointer, byteCount); + return; + } + + throw new NotSupportedException(); + } } \ No newline at end of file diff --git a/Infra/Buffers/UnmanagedMemory.cs b/Infra/Buffers/UnmanagedMemory.cs index 7f61b3d..6b97f1d 100644 --- a/Infra/Buffers/UnmanagedMemory.cs +++ b/Infra/Buffers/UnmanagedMemory.cs @@ -9,18 +9,19 @@ namespace StitchATon2.Infra.Buffers; /// internal sealed unsafe class UnmanagedMemory : IBuffer where T : unmanaged { - private readonly T* _pointer; - private readonly int _count; + internal readonly T* Pointer; private bool _disposed; + + public int Length { get; } - public ref T this[int index] => ref Unsafe.AsRef(_pointer + index); + public ref T this[int index] => ref Unsafe.AsRef(Pointer + index); - public Span Span => new(_pointer, _count); + public Span Span => new(Pointer, Length); - public UnmanagedMemory(int count) + public UnmanagedMemory(int length) { - _pointer = (T*)NativeMemory.Alloc((nuint)count, (nuint)Unsafe.SizeOf()); - _count = count; + Pointer = (T*)NativeMemory.Alloc((nuint)length, (nuint)Unsafe.SizeOf()); + Length = length; } ~UnmanagedMemory() => Dispose(); @@ -29,7 +30,7 @@ internal sealed unsafe class UnmanagedMemory : IBuffer where T : unmanaged { if (!_disposed) { - NativeMemory.Free(_pointer); + NativeMemory.Free(Pointer); GC.SuppressFinalize(this); _disposed = true; } diff --git a/Infra/Encoders/PngPipeEncoder.cs b/Infra/Encoders/PngPipeEncoder.cs index 8aef896..965602d 100644 --- a/Infra/Encoders/PngPipeEncoder.cs +++ b/Infra/Encoders/PngPipeEncoder.cs @@ -10,6 +10,7 @@ public class PngPipeEncoder : IDisposable { private const int BufferSize = 8 * 1024; private const int FlushThreshold = 1024; + private const int PipeChunkThreshold = 16 * 1024; private readonly PipeWriter _outputPipe; private readonly MemoryStream _memoryStream; @@ -60,33 +61,33 @@ public class PngPipeEncoder : IDisposable _outputPipe.Write(headerBytes); } - public void WriteData(IBuffer buffer, bool disposeBuffer = true, CancellationToken cancellationToken = default) + public async Task WriteDataAsync(IBuffer buffer, bool disposeBuffer = true, CancellationToken cancellationToken = default) { _zlibStream.Write([0]); - var dataSlice = buffer.Span; - while (dataSlice.Length > FlushThreshold) + var offset = 0; + while (buffer.Length - offset > FlushThreshold) { - _zlibStream.Write(dataSlice[..FlushThreshold]); - _zlibStream.Flush(); - dataSlice = dataSlice[FlushThreshold..]; - if(_memoryStream.Length >= BufferSize) - Flush(cancellationToken); + _zlibStream.Write(buffer.Span.Slice(offset, FlushThreshold)); + await _zlibStream.FlushAsync(cancellationToken); + offset += FlushThreshold; + if(_outputPipe.UnflushedBytes >= PipeChunkThreshold) + await FlushAsync(cancellationToken); } - if (dataSlice.Length > 0) + if (buffer.Length > offset) { - _zlibStream.Write(dataSlice); - _zlibStream.Flush(); + _zlibStream.Write(buffer.Span[offset..]); + await _zlibStream.FlushAsync(cancellationToken); _shouldFlush = true; } if(disposeBuffer) buffer.Dispose(); } - private void Flush(CancellationToken cancellationToken) + private async Task FlushAsync(CancellationToken cancellationToken) { - _zlibStream.Flush(); + await _zlibStream.FlushAsync(cancellationToken); var dataSize = (int)(_memoryStream.Length - 8); _memoryStream.Write("\0\0\0\0"u8); @@ -102,15 +103,17 @@ public class PngPipeEncoder : IDisposable BinaryPrimitives.WriteUInt32BigEndian(buffer.AsSpan(dataSize + 8), crc); _outputPipe.Write(buffer.AsSpan(0, dataSize + 12)); + await _outputPipe.FlushAsync(cancellationToken); + _memoryStream.SetLength(8); _memoryStream.Position = 8; _shouldFlush = false; } - public void WriteEndOfFile(CancellationToken cancellationToken = default) + public async Task WriteEndOfFileAsync(CancellationToken cancellationToken = default) { if(_shouldFlush) - Flush(cancellationToken); + await FlushAsync(cancellationToken); Span endChunk = [ 0x00, 0x00, 0x00, 0x00, // Length diff --git a/Infra/ImageIntegral.cs b/Infra/ImageIntegral.cs index f3d20e4..8819ea2 100644 --- a/Infra/ImageIntegral.cs +++ b/Infra/ImageIntegral.cs @@ -6,6 +6,7 @@ using SixLabors.ImageSharp.Formats; using SixLabors.ImageSharp.Formats.Png; using SixLabors.ImageSharp.PixelFormats; using StitchATon2.Infra.Buffers; +using StitchATon2.Infra.Synchronization; namespace StitchATon2.Infra; @@ -58,7 +59,7 @@ public class ImageIntegral : IDisposable { if (_memoryMappedFile is null) { - Task.Factory.StartNew(() => Initialize(cancellationToken), cancellationToken); + Task.Run(() => Initialize(cancellationToken), cancellationToken); _initializationLock.Wait(cancellationToken); _initializationLock.Dispose(); } @@ -86,8 +87,8 @@ public class ImageIntegral : IDisposable } var taskQueue = backedFileStream == null - ? Task.CompletedTask - : AllocateBackedFile(backedFileStream, header); + ? TaskHelper.SynchronizedTaskFactory.StartNew(() => { }, cancellationToken) + : AllocateBackedFile(backedFileStream, header, cancellationToken); taskQueue = taskQueue.ContinueWith( _ => @@ -129,7 +130,7 @@ public class ImageIntegral : IDisposable var imageBuffer = image.Frames.RootFrame.PixelBuffer; var accumulator = Int32Pixel.Zero; - var buffer = MemoryAllocator.AllocateArray(_width); + var buffer = MemoryAllocator.Allocate(_width); var processedRows = _processedRows; Interlocked.Exchange(ref _queueCounter, 0); @@ -143,50 +144,55 @@ public class ImageIntegral : IDisposable buffer[x] = accumulator; } - taskQueue = QueueWriterTask(taskQueue, 0, buffer.Clone(_width), cancellationToken); + taskQueue = QueueWriterTask(taskQueue, 0, buffer.Clone(), cancellationToken); processedRows++; } else { - ReadRow(processedRows - 1, buffer); + ReadRow(processedRows - 1, buffer.Span); } if(cancellationToken.IsCancellationRequested) return; var prevBuffer = buffer; - buffer = MemoryAllocator.AllocateArray(_width); - - for (int y = processedRows; y < image.Height; y++) + buffer = MemoryAllocator.Allocate(_width); + try { - var sourceRow = imageBuffer.DangerousGetRowSpan(y); - accumulator = (Int32Pixel)sourceRow[0]; - buffer[0] = accumulator + prevBuffer[0]; - - // Process all other columns - for (var x = 1; x < sourceRow.Length; x++) + for (int y = processedRows; y < image.Height; y++) { - accumulator.Accumulate(sourceRow[x]); - buffer[x] = accumulator + prevBuffer[x]; + var sourceRow = imageBuffer.DangerousGetRowSpan(y); + accumulator = (Int32Pixel)sourceRow[0]; + buffer[0] = accumulator + prevBuffer[0]; + + // Process all other columns + for (var x = 1; x < sourceRow.Length; x++) + { + accumulator.Accumulate(sourceRow[x]); + buffer[x] = accumulator + prevBuffer[x]; + } + + if (_queueCounter >= MaxProcessingQueue) + { + _queueLock.Reset(); + _queueLock.Wait(cancellationToken); + } + + if(cancellationToken.IsCancellationRequested) + break; + + var writeBuffer = prevBuffer; + buffer.Copy(writeBuffer, _width); + taskQueue = QueueWriterTask(taskQueue, y, writeBuffer, cancellationToken); + prevBuffer = buffer; + buffer = MemoryAllocator.Allocate(_width); } - - if (_queueCounter >= MaxProcessingQueue) - { - _queueLock.Reset(); - _queueLock.Wait(cancellationToken); - } - - if(cancellationToken.IsCancellationRequested) - break; - - var writeBuffer = prevBuffer; - Array.Copy(buffer.Array, writeBuffer.Array, image.Width); - taskQueue = QueueWriterTask(taskQueue, y, writeBuffer, cancellationToken); - prevBuffer = buffer; - buffer = MemoryAllocator.AllocateArray(_width); + } + finally + { + buffer.Dispose(); } - buffer.Dispose(); if(cancellationToken.IsCancellationRequested) return; @@ -205,7 +211,7 @@ public class ImageIntegral : IDisposable private Task QueueWriterTask( Task taskQueue, int row, - ArrayOwner writeBuffer, + IBuffer writeBuffer, CancellationToken cancellationToken) { Interlocked.Increment(ref _queueCounter); @@ -213,7 +219,7 @@ public class ImageIntegral : IDisposable return taskQueue.ContinueWith(_ => { using (var view = AcquireView(row, MemoryMappedFileAccess.Write)) - view.WriteArray(0, writeBuffer.Array, 0, _width); + view.DangerousWriteSpan(0, writeBuffer.Span, 0, _width); writeBuffer.Dispose(); _rowLocks!.Memory.Span[row].Set(); @@ -249,12 +255,6 @@ public class ImageIntegral : IDisposable view.DangerousReadSpan(0, buffer, 0, _width); } - private void ReadRow(int row, ArrayOwner buffer) - { - using var view = AcquireView(row, MemoryMappedFileAccess.Read); - view.DangerousReadSpan(0, buffer.Span, 0, _width); - } - private FileStream? InitializeBackedFile(string path, out Header header) { var expectedHeader = Header.CreateInitial(_width, _height); @@ -311,33 +311,35 @@ public class ImageIntegral : IDisposable return fs; } - private static async Task AllocateBackedFile(FileStream fileStream, Header header) + private static Task AllocateBackedFile(FileStream fileStream, Header header, CancellationToken cancellationToken) { - // The input filestream is expected to be empty with - // initial cursor at the beginning of the file and the content - // is pre-allocated for at least Header.Length bytes - // No other process should be accessed the file while being - // allocated. - // Allocated bytes is not necessary to be zeroed. + return TaskHelper.SynchronizedTaskFactory.StartNew(() => + { + // The input filestream is expected to be empty with + // initial cursor at the beginning of the file and the content + // is pre-allocated for at least Header.Length bytes + // No other process should be accessed the file while being + // allocated. + // Allocated bytes is not necessary to be zeroed. - // const int writeBufferSize = 4 * 1024; - // using var writeBuffer = MemoryPool.Shared.Rent(writeBufferSize); - // - // var written = 0; - // while (written + writeBufferSize < header.Length) - // { - // await fileStream.WriteAsync(writeBuffer.Memory, cancellationToken); - // written += writeBufferSize; - // } - // - // if (written < header.Length) - // { - // await fileStream.WriteAsync(writeBuffer.Memory[..(header.Length - written)], cancellationToken); - // } + // const int writeBufferSize = 4 * 1024; + // using var writeBuffer = MemoryPool.Shared.Rent(writeBufferSize); + // + // var written = 0; + // while (written + writeBufferSize < header.Length) + // { + // await fileStream.WriteAsync(writeBuffer.Memory, cancellationToken); + // written += writeBufferSize; + // } + // + // if (written < header.Length) + // { + // await fileStream.WriteAsync(writeBuffer.Memory[..(header.Length - written)], cancellationToken); + // } - fileStream.SetLength(header.Length + Header.Size); - - await fileStream.DisposeAsync(); + fileStream.SetLength(header.Length + Header.Size); + fileStream.Dispose(); + }, cancellationToken); } [StructLayout(LayoutKind.Sequential)] diff --git a/Infra/Utils.cs b/Infra/Utils.cs index 4ca34c3..3b92da5 100644 --- a/Infra/Utils.cs +++ b/Infra/Utils.cs @@ -1,16 +1,13 @@ using System.IO.MemoryMappedFiles; -using System.Runtime.CompilerServices; -using System.Runtime.InteropServices; -using StitchATon2.Infra.Buffers; namespace StitchATon2.Infra; -public static class Utils +internal static class Utils { private static unsafe uint AlignedSizeOf() where T : unmanaged { uint size = (uint)sizeof(T); - return size is 1 or 2 ? size : (uint)((size + 3) & (~3)); + return size is 1 or 2 ? size : (uint)((size + 3) & ~3); } internal static void DangerousReadSpan(this MemoryMappedViewAccessor view, long position, Span span, int offset, int count) @@ -36,10 +33,10 @@ public static class Utils view.SafeMemoryMappedViewHandle.ReadSpan(byteOffset, span.Slice(offset, n)); } - public static ArrayOwner Clone(this ArrayOwner arrayOwner, int length) where T : unmanaged + internal static void DangerousWriteSpan(this MemoryMappedViewAccessor view, long position, Span span, int offset, int count) + where T : unmanaged { - var newArrayOwner = MemoryAllocator.AllocateArray(length); - Array.Copy(arrayOwner.Array, 0, newArrayOwner.Array, 0, length); - return newArrayOwner; + var byteOffset = (ulong)(view.PointerOffset + position); + view.SafeMemoryMappedViewHandle.WriteSpan(byteOffset, span.Slice(offset, count)); } } \ No newline at end of file