dangerous release (possibly memory leak and deadlock)
This commit is contained in:
parent
a1cb6592eb
commit
741d34a5e0
10 changed files with 164 additions and 115 deletions
|
|
@ -6,6 +6,7 @@ using SixLabors.ImageSharp.Formats;
|
|||
using SixLabors.ImageSharp.Formats.Png;
|
||||
using SixLabors.ImageSharp.PixelFormats;
|
||||
using StitchATon2.Infra.Buffers;
|
||||
using StitchATon2.Infra.Synchronization;
|
||||
|
||||
namespace StitchATon2.Infra;
|
||||
|
||||
|
|
@ -58,7 +59,7 @@ public class ImageIntegral : IDisposable
|
|||
{
|
||||
if (_memoryMappedFile is null)
|
||||
{
|
||||
Task.Factory.StartNew(() => Initialize(cancellationToken), cancellationToken);
|
||||
Task.Run(() => Initialize(cancellationToken), cancellationToken);
|
||||
_initializationLock.Wait(cancellationToken);
|
||||
_initializationLock.Dispose();
|
||||
}
|
||||
|
|
@ -86,8 +87,8 @@ public class ImageIntegral : IDisposable
|
|||
}
|
||||
|
||||
var taskQueue = backedFileStream == null
|
||||
? Task.CompletedTask
|
||||
: AllocateBackedFile(backedFileStream, header);
|
||||
? TaskHelper.SynchronizedTaskFactory.StartNew(() => { }, cancellationToken)
|
||||
: AllocateBackedFile(backedFileStream, header, cancellationToken);
|
||||
|
||||
taskQueue = taskQueue.ContinueWith(
|
||||
_ =>
|
||||
|
|
@ -129,7 +130,7 @@ public class ImageIntegral : IDisposable
|
|||
var imageBuffer = image.Frames.RootFrame.PixelBuffer;
|
||||
|
||||
var accumulator = Int32Pixel.Zero;
|
||||
var buffer = MemoryAllocator.AllocateArray<Int32Pixel>(_width);
|
||||
var buffer = MemoryAllocator.Allocate<Int32Pixel>(_width);
|
||||
var processedRows = _processedRows;
|
||||
Interlocked.Exchange(ref _queueCounter, 0);
|
||||
|
||||
|
|
@ -143,50 +144,55 @@ public class ImageIntegral : IDisposable
|
|||
buffer[x] = accumulator;
|
||||
}
|
||||
|
||||
taskQueue = QueueWriterTask(taskQueue, 0, buffer.Clone(_width), cancellationToken);
|
||||
taskQueue = QueueWriterTask(taskQueue, 0, buffer.Clone(), cancellationToken);
|
||||
processedRows++;
|
||||
}
|
||||
else
|
||||
{
|
||||
ReadRow(processedRows - 1, buffer);
|
||||
ReadRow(processedRows - 1, buffer.Span);
|
||||
}
|
||||
|
||||
if(cancellationToken.IsCancellationRequested)
|
||||
return;
|
||||
|
||||
var prevBuffer = buffer;
|
||||
buffer = MemoryAllocator.AllocateArray<Int32Pixel>(_width);
|
||||
|
||||
for (int y = processedRows; y < image.Height; y++)
|
||||
buffer = MemoryAllocator.Allocate<Int32Pixel>(_width);
|
||||
try
|
||||
{
|
||||
var sourceRow = imageBuffer.DangerousGetRowSpan(y);
|
||||
accumulator = (Int32Pixel)sourceRow[0];
|
||||
buffer[0] = accumulator + prevBuffer[0];
|
||||
|
||||
// Process all other columns
|
||||
for (var x = 1; x < sourceRow.Length; x++)
|
||||
for (int y = processedRows; y < image.Height; y++)
|
||||
{
|
||||
accumulator.Accumulate(sourceRow[x]);
|
||||
buffer[x] = accumulator + prevBuffer[x];
|
||||
var sourceRow = imageBuffer.DangerousGetRowSpan(y);
|
||||
accumulator = (Int32Pixel)sourceRow[0];
|
||||
buffer[0] = accumulator + prevBuffer[0];
|
||||
|
||||
// Process all other columns
|
||||
for (var x = 1; x < sourceRow.Length; x++)
|
||||
{
|
||||
accumulator.Accumulate(sourceRow[x]);
|
||||
buffer[x] = accumulator + prevBuffer[x];
|
||||
}
|
||||
|
||||
if (_queueCounter >= MaxProcessingQueue)
|
||||
{
|
||||
_queueLock.Reset();
|
||||
_queueLock.Wait(cancellationToken);
|
||||
}
|
||||
|
||||
if(cancellationToken.IsCancellationRequested)
|
||||
break;
|
||||
|
||||
var writeBuffer = prevBuffer;
|
||||
buffer.Copy(writeBuffer, _width);
|
||||
taskQueue = QueueWriterTask(taskQueue, y, writeBuffer, cancellationToken);
|
||||
prevBuffer = buffer;
|
||||
buffer = MemoryAllocator.Allocate<Int32Pixel>(_width);
|
||||
}
|
||||
|
||||
if (_queueCounter >= MaxProcessingQueue)
|
||||
{
|
||||
_queueLock.Reset();
|
||||
_queueLock.Wait(cancellationToken);
|
||||
}
|
||||
|
||||
if(cancellationToken.IsCancellationRequested)
|
||||
break;
|
||||
|
||||
var writeBuffer = prevBuffer;
|
||||
Array.Copy(buffer.Array, writeBuffer.Array, image.Width);
|
||||
taskQueue = QueueWriterTask(taskQueue, y, writeBuffer, cancellationToken);
|
||||
prevBuffer = buffer;
|
||||
buffer = MemoryAllocator.AllocateArray<Int32Pixel>(_width);
|
||||
}
|
||||
finally
|
||||
{
|
||||
buffer.Dispose();
|
||||
}
|
||||
|
||||
buffer.Dispose();
|
||||
if(cancellationToken.IsCancellationRequested)
|
||||
return;
|
||||
|
||||
|
|
@ -205,7 +211,7 @@ public class ImageIntegral : IDisposable
|
|||
private Task QueueWriterTask(
|
||||
Task taskQueue,
|
||||
int row,
|
||||
ArrayOwner<Int32Pixel> writeBuffer,
|
||||
IBuffer<Int32Pixel> writeBuffer,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
Interlocked.Increment(ref _queueCounter);
|
||||
|
|
@ -213,7 +219,7 @@ public class ImageIntegral : IDisposable
|
|||
return taskQueue.ContinueWith(_ =>
|
||||
{
|
||||
using (var view = AcquireView(row, MemoryMappedFileAccess.Write))
|
||||
view.WriteArray(0, writeBuffer.Array, 0, _width);
|
||||
view.DangerousWriteSpan(0, writeBuffer.Span, 0, _width);
|
||||
|
||||
writeBuffer.Dispose();
|
||||
_rowLocks!.Memory.Span[row].Set();
|
||||
|
|
@ -249,12 +255,6 @@ public class ImageIntegral : IDisposable
|
|||
view.DangerousReadSpan(0, buffer, 0, _width);
|
||||
}
|
||||
|
||||
private void ReadRow(int row, ArrayOwner<Int32Pixel> buffer)
|
||||
{
|
||||
using var view = AcquireView(row, MemoryMappedFileAccess.Read);
|
||||
view.DangerousReadSpan(0, buffer.Span, 0, _width);
|
||||
}
|
||||
|
||||
private FileStream? InitializeBackedFile(string path, out Header header)
|
||||
{
|
||||
var expectedHeader = Header.CreateInitial(_width, _height);
|
||||
|
|
@ -311,33 +311,35 @@ public class ImageIntegral : IDisposable
|
|||
return fs;
|
||||
}
|
||||
|
||||
private static async Task AllocateBackedFile(FileStream fileStream, Header header)
|
||||
private static Task AllocateBackedFile(FileStream fileStream, Header header, CancellationToken cancellationToken)
|
||||
{
|
||||
// The input filestream is expected to be empty with
|
||||
// initial cursor at the beginning of the file and the content
|
||||
// is pre-allocated for at least Header.Length bytes
|
||||
// No other process should be accessed the file while being
|
||||
// allocated.
|
||||
// Allocated bytes is not necessary to be zeroed.
|
||||
return TaskHelper.SynchronizedTaskFactory.StartNew(() =>
|
||||
{
|
||||
// The input filestream is expected to be empty with
|
||||
// initial cursor at the beginning of the file and the content
|
||||
// is pre-allocated for at least Header.Length bytes
|
||||
// No other process should be accessed the file while being
|
||||
// allocated.
|
||||
// Allocated bytes is not necessary to be zeroed.
|
||||
|
||||
// const int writeBufferSize = 4 * 1024;
|
||||
// using var writeBuffer = MemoryPool<byte>.Shared.Rent(writeBufferSize);
|
||||
//
|
||||
// var written = 0;
|
||||
// while (written + writeBufferSize < header.Length)
|
||||
// {
|
||||
// await fileStream.WriteAsync(writeBuffer.Memory, cancellationToken);
|
||||
// written += writeBufferSize;
|
||||
// }
|
||||
//
|
||||
// if (written < header.Length)
|
||||
// {
|
||||
// await fileStream.WriteAsync(writeBuffer.Memory[..(header.Length - written)], cancellationToken);
|
||||
// }
|
||||
// const int writeBufferSize = 4 * 1024;
|
||||
// using var writeBuffer = MemoryPool<byte>.Shared.Rent(writeBufferSize);
|
||||
//
|
||||
// var written = 0;
|
||||
// while (written + writeBufferSize < header.Length)
|
||||
// {
|
||||
// await fileStream.WriteAsync(writeBuffer.Memory, cancellationToken);
|
||||
// written += writeBufferSize;
|
||||
// }
|
||||
//
|
||||
// if (written < header.Length)
|
||||
// {
|
||||
// await fileStream.WriteAsync(writeBuffer.Memory[..(header.Length - written)], cancellationToken);
|
||||
// }
|
||||
|
||||
fileStream.SetLength(header.Length + Header.Size);
|
||||
|
||||
await fileStream.DisposeAsync();
|
||||
fileStream.SetLength(header.Length + Header.Size);
|
||||
fileStream.Dispose();
|
||||
}, cancellationToken);
|
||||
}
|
||||
|
||||
[StructLayout(LayoutKind.Sequential)]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue