StitchATon2/Infra/ImageIntegral.cs

384 lines
13 KiB
C#
Raw Permalink Normal View History

2025-07-30 07:30:00 +07:00
using System.IO.MemoryMappedFiles;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using SixLabors.ImageSharp.Formats;
using SixLabors.ImageSharp.Formats.Png;
using SixLabors.ImageSharp.PixelFormats;
using StitchATon2.Infra.Buffers;
using StitchATon2.Infra.Synchronization;
2025-07-30 07:30:00 +07:00
namespace StitchATon2.Infra;
public class ImageIntegral : IDisposable
{
private const int MaxProcessingQueue = 4;
private readonly string _imagePath;
private readonly string _outputDirectory;
private readonly int _width;
private readonly int _height;
2025-08-01 09:51:39 +07:00
private ManualResetEventSlim[]? _rowLocks;
2025-07-30 07:30:00 +07:00
private MemoryMappedFile? _memoryMappedFile;
2025-07-31 06:19:32 +07:00
private readonly Lock _lock = new();
2025-07-30 07:30:00 +07:00
private readonly ManualResetEventSlim _queueLock = new(true);
private readonly ManualResetEventSlim _initializationLock = new(false);
private volatile int _processedRows;
private volatile int _queueCounter;
public ImageIntegral(string imagePath, string outputDirectory, int width, int height)
{
_imagePath = imagePath;
_outputDirectory = outputDirectory;
_width = width;
_height = height;
}
~ImageIntegral() => Dispose();
public void Acquire(int row, Int32Pixel[] buffer, CancellationToken cancellationToken = default)
{
Acquire(row, cancellationToken);
ReadRow(row, buffer);
}
public void Acquire(int row, Span<Int32Pixel> buffer, CancellationToken cancellationToken = default)
{
Acquire(row, cancellationToken);
ReadRow(row, buffer);
}
private void Acquire(int row, CancellationToken cancellationToken)
{
if (_memoryMappedFile is null)
{
lock (_lock)
{
if (_memoryMappedFile is null)
{
Task.Run(() => Initialize(cancellationToken), cancellationToken);
2025-07-30 07:30:00 +07:00
_initializationLock.Wait(cancellationToken);
_initializationLock.Dispose();
}
}
}
2025-08-01 09:51:39 +07:00
_rowLocks?[row].Wait(cancellationToken);
2025-07-30 07:30:00 +07:00
}
private void Initialize(CancellationToken cancellationToken)
{
var fileName = Path.GetFileNameWithoutExtension(_imagePath);
var path = Path.Combine(_outputDirectory, $"{fileName}.mmf");
var backedFileStream = InitializeBackedFile(path, out var header);
_processedRows = header.ProcessedRows;
if (header.ProcessedRows >= _height)
{
// When statement above is true
// then it is guaranteed that backed file is valid and fully processed
_memoryMappedFile = MemoryMappedFile.CreateFromFile(path, FileMode.Open);
_initializationLock.Set();
return;
}
var taskQueue = backedFileStream == null
? TaskHelper.SynchronizedTaskFactory.StartNew(() => { }, cancellationToken)
: AllocateBackedFile(backedFileStream, header, cancellationToken);
2025-07-30 07:30:00 +07:00
taskQueue = taskQueue.ContinueWith(
_ =>
{
_memoryMappedFile = MemoryMappedFile.CreateFromFile(path, FileMode.Open);
_initializationLock.Set();
},
cancellationToken);
// initialize resource gating, all rows is expected to be locked
// if the backed file require to allocate, it should be safe to do this
// asynchronously
2025-08-01 09:51:39 +07:00
var rowLocks = new ManualResetEventSlim[_height];
2025-07-30 07:30:00 +07:00
for (int i = 0; i < _height; i++)
{
var isOpen = i < header.ProcessedRows;
2025-08-01 09:51:39 +07:00
rowLocks[i] = new ManualResetEventSlim(isOpen);
2025-07-30 07:30:00 +07:00
}
_rowLocks = rowLocks;
ProcessIntegral(taskQueue, cancellationToken);
}
private void ProcessIntegral(Task taskQueue, CancellationToken cancellationToken)
{
PngDecoderOptions decoderOptions = new()
{
PngCrcChunkHandling = PngCrcChunkHandling.IgnoreAll,
GeneralOptions = new DecoderOptions
{
MaxFrames = 1,
SkipMetadata = true,
}
};
using var fileStream = File.OpenRead(_imagePath);
using var image = PngDecoder.Instance.Decode<Rgb24>(decoderOptions, fileStream);
var imageBuffer = image.Frames.RootFrame.PixelBuffer;
var accumulator = Int32Pixel.Zero;
var buffer = MemoryAllocator.Allocate<Int32Pixel>(_width);
2025-07-30 07:30:00 +07:00
var processedRows = _processedRows;
Interlocked.Exchange(ref _queueCounter, 0);
// First row
if (processedRows == 0)
{
var sourceRow = imageBuffer.DangerousGetRowSpan(0);
for (var x = 0; x < sourceRow.Length; x++)
{
accumulator.Accumulate(sourceRow[x]);
buffer[x] = accumulator;
}
taskQueue = QueueWriterTask(taskQueue, 0, buffer.Clone(), cancellationToken);
2025-07-30 07:30:00 +07:00
processedRows++;
}
else
{
ReadRow(processedRows - 1, buffer.Span);
2025-07-30 07:30:00 +07:00
}
if(cancellationToken.IsCancellationRequested)
return;
var prevBuffer = buffer;
buffer = MemoryAllocator.Allocate<Int32Pixel>(_width);
try
2025-07-30 07:30:00 +07:00
{
for (int y = processedRows; y < image.Height; y++)
2025-07-30 07:30:00 +07:00
{
var sourceRow = imageBuffer.DangerousGetRowSpan(y);
accumulator = (Int32Pixel)sourceRow[0];
buffer[0] = accumulator + prevBuffer[0];
2025-07-30 07:30:00 +07:00
// Process all other columns
for (var x = 1; x < sourceRow.Length; x++)
{
accumulator.Accumulate(sourceRow[x]);
buffer[x] = accumulator + prevBuffer[x];
}
if (_queueCounter >= MaxProcessingQueue)
{
_queueLock.Reset();
_queueLock.Wait(cancellationToken);
}
2025-07-30 07:30:00 +07:00
if(cancellationToken.IsCancellationRequested)
break;
2025-07-30 07:30:00 +07:00
var writeBuffer = prevBuffer;
buffer.Copy(writeBuffer, _width);
taskQueue = QueueWriterTask(taskQueue, y, writeBuffer, cancellationToken);
prevBuffer = buffer;
buffer = MemoryAllocator.Allocate<Int32Pixel>(_width);
}
}
finally
{
buffer.Dispose();
2025-07-30 07:30:00 +07:00
}
if(cancellationToken.IsCancellationRequested)
return;
taskQueue = taskQueue.ContinueWith(task =>
{
if (task.IsCompletedSuccessfully)
{
DisposeRowLocks();
_queueLock.Dispose();
}
}, cancellationToken);
taskQueue.Wait(cancellationToken);
}
private Task QueueWriterTask(
Task taskQueue,
int row,
IBuffer<Int32Pixel> writeBuffer,
2025-07-30 07:30:00 +07:00
CancellationToken cancellationToken)
{
Interlocked.Increment(ref _queueCounter);
cancellationToken.Register(writeBuffer.Dispose);
return taskQueue.ContinueWith(_ =>
{
using (var view = AcquireView(row, MemoryMappedFileAccess.Write))
view.DangerousWriteSpan(0, writeBuffer.Span, 0, _width);
2025-07-30 07:30:00 +07:00
writeBuffer.Dispose();
2025-08-01 09:51:39 +07:00
_rowLocks![row].Set();
2025-07-30 07:30:00 +07:00
Interlocked.Increment(ref _processedRows);
using (var view = AcquireHeaderView(MemoryMappedFileAccess.Write))
view.Write(16, _processedRows);
Interlocked.Decrement(ref _queueCounter);
_queueLock.Set();
}, cancellationToken);
}
private MemoryMappedViewAccessor AcquireHeaderView(MemoryMappedFileAccess access)
=> _memoryMappedFile!.CreateViewAccessor(0, Header.Size, access);
private MemoryMappedViewAccessor AcquireView(int row, MemoryMappedFileAccess access)
{
var size = _width * Int32Pixel.Size;
var offset = row * size + Header.Size;
return _memoryMappedFile!.CreateViewAccessor(offset, size, access);
}
private void ReadRow(int row, Int32Pixel[] readBuffer)
{
using var view = AcquireView(row, MemoryMappedFileAccess.Read);
view.ReadArray(0, readBuffer, 0, _width);
}
private void ReadRow(int row, Span<Int32Pixel> buffer)
{
using var view = AcquireView(row, MemoryMappedFileAccess.Read);
view.DangerousReadSpan(0, buffer, 0, _width);
}
private FileStream? InitializeBackedFile(string path, out Header header)
{
var expectedHeader = Header.CreateInitial(_width, _height);
// Expectation when file exists:
// - throws IOException when it's being processed, handle it if possible
// - returns null if file is valid
// - delete the existing file if it's not valid (modified from external)
FileStream fs;
if (File.Exists(path))
{
fs = File.OpenRead(path);
if (fs.Length < expectedHeader.Length + Header.Size)
{
fs.Dispose();
File.Delete(path);
}
Span<byte> headerBytes = stackalloc byte[Header.Size];
fs.ReadExactly(headerBytes);
header = MemoryMarshal.Cast<byte, Header>(headerBytes)[0];
var isValid = expectedHeader.Identifier == header.Identifier
&& expectedHeader.Width == header.Width
&& expectedHeader.Height == header.Height
&& expectedHeader.Length == header.Length;
fs.Dispose();
Console.WriteLine($"Image integral file found: {path}");
if (!isValid) File.Delete(path);
else return null;
}
// Expected process:
// - Initialize file creation
// - Write the header
// - Allocate initial content by Header.Length
// - Action above should be done asynchronously and return the task handle.
var fsOptions = new FileStreamOptions
{
Access = FileAccess.Write,
Share = FileShare.None,
Mode = FileMode.CreateNew,
PreallocationSize = Header.Size,
};
Console.WriteLine($"Create image integral file: {path}");
Directory.CreateDirectory(_outputDirectory);
fs = File.Open(path, fsOptions);
fs.Write(MemoryMarshal.AsBytes([expectedHeader]));
header = expectedHeader;
return fs;
}
private static Task AllocateBackedFile(FileStream fileStream, Header header, CancellationToken cancellationToken)
2025-07-30 07:30:00 +07:00
{
return TaskHelper.SynchronizedTaskFactory.StartNew(() =>
{
// The input filestream is expected to be empty with
// initial cursor at the beginning of the file and the content
// is pre-allocated for at least Header.Length bytes
// No other process should be accessed the file while being
// allocated.
// Allocated bytes is not necessary to be zeroed.
// const int writeBufferSize = 4 * 1024;
// using var writeBuffer = MemoryPool<byte>.Shared.Rent(writeBufferSize);
//
// var written = 0;
// while (written + writeBufferSize < header.Length)
// {
// await fileStream.WriteAsync(writeBuffer.Memory, cancellationToken);
// written += writeBufferSize;
// }
//
// if (written < header.Length)
// {
// await fileStream.WriteAsync(writeBuffer.Memory[..(header.Length - written)], cancellationToken);
// }
2025-07-30 07:30:00 +07:00
fileStream.SetLength(header.Length + Header.Size);
fileStream.Dispose();
}, cancellationToken);
2025-07-30 07:30:00 +07:00
}
[StructLayout(LayoutKind.Sequential)]
private struct Header
{
private const int Signature = 0x47544e49; // INTG
public static int Size => Unsafe.SizeOf<Header>();
public uint Identifier;
public int Width;
public int Height;
public int Length;
public int ProcessedRows;
public static Header CreateInitial(int width, int height) => new()
{
Identifier = Signature,
Width = width,
Height = height,
Length = width * height * Int32Pixel.Size,
ProcessedRows = 0,
};
}
private void DisposeRowLocks()
{
if (_rowLocks is { } locks)
{
_rowLocks = null;
for(int i = 0; i < _height; i++)
2025-08-01 09:51:39 +07:00
locks[i].Dispose();
2025-07-30 07:30:00 +07:00
}
}
public void Dispose()
{
DisposeRowLocks();
_memoryMappedFile?.Dispose();
_queueLock.Dispose();
_initializationLock.Dispose();
GC.SuppressFinalize(this);
}
}