using System.IO.MemoryMappedFiles; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using SixLabors.ImageSharp.Formats; using SixLabors.ImageSharp.Formats.Png; using SixLabors.ImageSharp.PixelFormats; using StitchATon2.Infra.Buffers; using StitchATon2.Infra.Synchronization; namespace StitchATon2.Infra; public class ImageIntegral : IDisposable { private const int MaxProcessingQueue = 4; private readonly string _imagePath; private readonly string _outputDirectory; private readonly int _width; private readonly int _height; private ManualResetEventSlim[]? _rowLocks; private MemoryMappedFile? _memoryMappedFile; private readonly Lock _lock = new(); private readonly ManualResetEventSlim _queueLock = new(true); private readonly ManualResetEventSlim _initializationLock = new(false); private volatile int _processedRows; private volatile int _queueCounter; public ImageIntegral(string imagePath, string outputDirectory, int width, int height) { _imagePath = imagePath; _outputDirectory = outputDirectory; _width = width; _height = height; } ~ImageIntegral() => Dispose(); public void Acquire(int row, Int32Pixel[] buffer, CancellationToken cancellationToken = default) { Acquire(row, cancellationToken); ReadRow(row, buffer); } public void Acquire(int row, Span buffer, CancellationToken cancellationToken = default) { Acquire(row, cancellationToken); ReadRow(row, buffer); } private void Acquire(int row, CancellationToken cancellationToken) { if (_memoryMappedFile is null) { lock (_lock) { if (_memoryMappedFile is null) { Task.Run(() => Initialize(cancellationToken), cancellationToken); _initializationLock.Wait(cancellationToken); _initializationLock.Dispose(); } } } _rowLocks?[row].Wait(cancellationToken); } private void Initialize(CancellationToken cancellationToken) { var fileName = Path.GetFileNameWithoutExtension(_imagePath); var path = Path.Combine(_outputDirectory, $"{fileName}.mmf"); var backedFileStream = InitializeBackedFile(path, out var header); _processedRows = header.ProcessedRows; if (header.ProcessedRows >= _height) { // When statement above is true // then it is guaranteed that backed file is valid and fully processed _memoryMappedFile = MemoryMappedFile.CreateFromFile(path, FileMode.Open); _initializationLock.Set(); return; } var taskQueue = backedFileStream == null ? TaskHelper.SynchronizedTaskFactory.StartNew(() => { }, cancellationToken) : AllocateBackedFile(backedFileStream, header, cancellationToken); taskQueue = taskQueue.ContinueWith( _ => { _memoryMappedFile = MemoryMappedFile.CreateFromFile(path, FileMode.Open); _initializationLock.Set(); }, cancellationToken); // initialize resource gating, all rows is expected to be locked // if the backed file require to allocate, it should be safe to do this // asynchronously var rowLocks = new ManualResetEventSlim[_height]; for (int i = 0; i < _height; i++) { var isOpen = i < header.ProcessedRows; rowLocks[i] = new ManualResetEventSlim(isOpen); } _rowLocks = rowLocks; ProcessIntegral(taskQueue, cancellationToken); } private void ProcessIntegral(Task taskQueue, CancellationToken cancellationToken) { PngDecoderOptions decoderOptions = new() { PngCrcChunkHandling = PngCrcChunkHandling.IgnoreAll, GeneralOptions = new DecoderOptions { MaxFrames = 1, SkipMetadata = true, } }; using var fileStream = File.OpenRead(_imagePath); using var image = PngDecoder.Instance.Decode(decoderOptions, fileStream); var imageBuffer = image.Frames.RootFrame.PixelBuffer; var accumulator = Int32Pixel.Zero; var buffer = MemoryAllocator.Allocate(_width); var processedRows = _processedRows; Interlocked.Exchange(ref _queueCounter, 0); // First row if (processedRows == 0) { var sourceRow = imageBuffer.DangerousGetRowSpan(0); for (var x = 0; x < sourceRow.Length; x++) { accumulator.Accumulate(sourceRow[x]); buffer[x] = accumulator; } taskQueue = QueueWriterTask(taskQueue, 0, buffer.Clone(), cancellationToken); processedRows++; } else { ReadRow(processedRows - 1, buffer.Span); } if(cancellationToken.IsCancellationRequested) return; var prevBuffer = buffer; buffer = MemoryAllocator.Allocate(_width); try { for (int y = processedRows; y < image.Height; y++) { var sourceRow = imageBuffer.DangerousGetRowSpan(y); accumulator = (Int32Pixel)sourceRow[0]; buffer[0] = accumulator + prevBuffer[0]; // Process all other columns for (var x = 1; x < sourceRow.Length; x++) { accumulator.Accumulate(sourceRow[x]); buffer[x] = accumulator + prevBuffer[x]; } if (_queueCounter >= MaxProcessingQueue) { _queueLock.Reset(); _queueLock.Wait(cancellationToken); } if(cancellationToken.IsCancellationRequested) break; var writeBuffer = prevBuffer; buffer.Copy(writeBuffer, _width); taskQueue = QueueWriterTask(taskQueue, y, writeBuffer, cancellationToken); prevBuffer = buffer; buffer = MemoryAllocator.Allocate(_width); } } finally { buffer.Dispose(); } if(cancellationToken.IsCancellationRequested) return; taskQueue = taskQueue.ContinueWith(task => { if (task.IsCompletedSuccessfully) { DisposeRowLocks(); _queueLock.Dispose(); } }, cancellationToken); taskQueue.Wait(cancellationToken); } private Task QueueWriterTask( Task taskQueue, int row, IBuffer writeBuffer, CancellationToken cancellationToken) { Interlocked.Increment(ref _queueCounter); cancellationToken.Register(writeBuffer.Dispose); return taskQueue.ContinueWith(_ => { using (var view = AcquireView(row, MemoryMappedFileAccess.Write)) view.DangerousWriteSpan(0, writeBuffer.Span, 0, _width); writeBuffer.Dispose(); _rowLocks![row].Set(); Interlocked.Increment(ref _processedRows); using (var view = AcquireHeaderView(MemoryMappedFileAccess.Write)) view.Write(16, _processedRows); Interlocked.Decrement(ref _queueCounter); _queueLock.Set(); }, cancellationToken); } private MemoryMappedViewAccessor AcquireHeaderView(MemoryMappedFileAccess access) => _memoryMappedFile!.CreateViewAccessor(0, Header.Size, access); private MemoryMappedViewAccessor AcquireView(int row, MemoryMappedFileAccess access) { var size = _width * Int32Pixel.Size; var offset = row * size + Header.Size; return _memoryMappedFile!.CreateViewAccessor(offset, size, access); } private void ReadRow(int row, Int32Pixel[] readBuffer) { using var view = AcquireView(row, MemoryMappedFileAccess.Read); view.ReadArray(0, readBuffer, 0, _width); } private void ReadRow(int row, Span buffer) { using var view = AcquireView(row, MemoryMappedFileAccess.Read); view.DangerousReadSpan(0, buffer, 0, _width); } private FileStream? InitializeBackedFile(string path, out Header header) { var expectedHeader = Header.CreateInitial(_width, _height); // Expectation when file exists: // - throws IOException when it's being processed, handle it if possible // - returns null if file is valid // - delete the existing file if it's not valid (modified from external) FileStream fs; if (File.Exists(path)) { fs = File.OpenRead(path); if (fs.Length < expectedHeader.Length + Header.Size) { fs.Dispose(); File.Delete(path); } Span headerBytes = stackalloc byte[Header.Size]; fs.ReadExactly(headerBytes); header = MemoryMarshal.Cast(headerBytes)[0]; var isValid = expectedHeader.Identifier == header.Identifier && expectedHeader.Width == header.Width && expectedHeader.Height == header.Height && expectedHeader.Length == header.Length; fs.Dispose(); Console.WriteLine($"Image integral file found: {path}"); if (!isValid) File.Delete(path); else return null; } // Expected process: // - Initialize file creation // - Write the header // - Allocate initial content by Header.Length // - Action above should be done asynchronously and return the task handle. var fsOptions = new FileStreamOptions { Access = FileAccess.Write, Share = FileShare.None, Mode = FileMode.CreateNew, PreallocationSize = Header.Size, }; Console.WriteLine($"Create image integral file: {path}"); Directory.CreateDirectory(_outputDirectory); fs = File.Open(path, fsOptions); fs.Write(MemoryMarshal.AsBytes([expectedHeader])); header = expectedHeader; return fs; } private static Task AllocateBackedFile(FileStream fileStream, Header header, CancellationToken cancellationToken) { return TaskHelper.SynchronizedTaskFactory.StartNew(() => { // The input filestream is expected to be empty with // initial cursor at the beginning of the file and the content // is pre-allocated for at least Header.Length bytes // No other process should be accessed the file while being // allocated. // Allocated bytes is not necessary to be zeroed. // const int writeBufferSize = 4 * 1024; // using var writeBuffer = MemoryPool.Shared.Rent(writeBufferSize); // // var written = 0; // while (written + writeBufferSize < header.Length) // { // await fileStream.WriteAsync(writeBuffer.Memory, cancellationToken); // written += writeBufferSize; // } // // if (written < header.Length) // { // await fileStream.WriteAsync(writeBuffer.Memory[..(header.Length - written)], cancellationToken); // } fileStream.SetLength(header.Length + Header.Size); fileStream.Dispose(); }, cancellationToken); } [StructLayout(LayoutKind.Sequential)] private struct Header { private const int Signature = 0x47544e49; // INTG public static int Size => Unsafe.SizeOf
(); public uint Identifier; public int Width; public int Height; public int Length; public int ProcessedRows; public static Header CreateInitial(int width, int height) => new() { Identifier = Signature, Width = width, Height = height, Length = width * height * Int32Pixel.Size, ProcessedRows = 0, }; } private void DisposeRowLocks() { if (_rowLocks is { } locks) { _rowLocks = null; for(int i = 0; i < _height; i++) locks[i].Dispose(); } } public void Dispose() { DisposeRowLocks(); _memoryMappedFile?.Dispose(); _queueLock.Dispose(); _initializationLock.Dispose(); GC.SuppressFinalize(this); } }