8.3 Wrap full pipeline in lock
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -202,7 +202,7 @@ Each type lives in `src/FrameProcessor/Domain/`. Tests in `tests/FrameProcessor.
|
|||||||
- `ConcurrentDictionary<FrameName, SemaphoreSlim>` (each `SemaphoreSlim(1, 1)`).
|
- `ConcurrentDictionary<FrameName, SemaphoreSlim>` (each `SemaphoreSlim(1, 1)`).
|
||||||
- `Task<IDisposable> AcquireAsync(FrameName, CancellationToken)` returning a disposable that releases on dispose.
|
- `Task<IDisposable> AcquireAsync(FrameName, CancellationToken)` returning a disposable that releases on dispose.
|
||||||
|
|
||||||
### [ ] 8.3 Wrap full pipeline in lock
|
### [x] 8.3 Wrap full pipeline in lock
|
||||||
- In `FramesController`, acquire the frame's lock before fetch/decode and release after publish-attempt completes (`CLAUDE.md` "Per-frame serialization").
|
- In `FramesController`, acquire the frame's lock before fetch/decode and release after publish-attempt completes (`CLAUDE.md` "Per-frame serialization").
|
||||||
- **Manual check:** fire two concurrent uploads to the same frame → both return 200, only one PNG on disk reflects whichever finished last, two MQTT publishes (or one if collapsed by retry queue).
|
- **Manual check:** fire two concurrent uploads to the same frame → both return 200, only one PNG on disk reflects whichever finished last, two MQTT publishes (or one if collapsed by retry queue).
|
||||||
|
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
using FrameProcessor.Concurrency;
|
||||||
using FrameProcessor.Configuration;
|
using FrameProcessor.Configuration;
|
||||||
using FrameProcessor.Domain;
|
using FrameProcessor.Domain;
|
||||||
using FrameProcessor.ImagePipeline;
|
using FrameProcessor.ImagePipeline;
|
||||||
@@ -17,19 +18,22 @@ public sealed class FramesController : ControllerBase
|
|||||||
private readonly ImageStore _store;
|
private readonly ImageStore _store;
|
||||||
private readonly MqttPublisher _mqtt;
|
private readonly MqttPublisher _mqtt;
|
||||||
private readonly IImageUrlFetcher _urlFetcher;
|
private readonly IImageUrlFetcher _urlFetcher;
|
||||||
|
private readonly FrameLockProvider _locks;
|
||||||
|
|
||||||
public FramesController(
|
public FramesController(
|
||||||
FramesRegistry frames,
|
FramesRegistry frames,
|
||||||
IImagePipeline pipeline,
|
IImagePipeline pipeline,
|
||||||
ImageStore store,
|
ImageStore store,
|
||||||
MqttPublisher mqtt,
|
MqttPublisher mqtt,
|
||||||
IImageUrlFetcher urlFetcher)
|
IImageUrlFetcher urlFetcher,
|
||||||
|
FrameLockProvider locks)
|
||||||
{
|
{
|
||||||
_frames = frames;
|
_frames = frames;
|
||||||
_pipeline = pipeline;
|
_pipeline = pipeline;
|
||||||
_store = store;
|
_store = store;
|
||||||
_mqtt = mqtt;
|
_mqtt = mqtt;
|
||||||
_urlFetcher = urlFetcher;
|
_urlFetcher = urlFetcher;
|
||||||
|
_locks = locks;
|
||||||
}
|
}
|
||||||
|
|
||||||
[HttpPost("{name}/image")]
|
[HttpPost("{name}/image")]
|
||||||
@@ -46,13 +50,16 @@ public sealed class FramesController : ControllerBase
|
|||||||
return BadRequest(new { error = "Missing 'image' file part." });
|
return BadRequest(new { error = "Missing 'image' file part." });
|
||||||
}
|
}
|
||||||
|
|
||||||
byte[] pngBytes;
|
using (await _locks.AcquireAsync(frame.Name, cancellationToken).ConfigureAwait(false))
|
||||||
await using (var stream = file.OpenReadStream())
|
|
||||||
{
|
{
|
||||||
pngBytes = _pipeline.Process(stream, frame);
|
byte[] pngBytes;
|
||||||
}
|
await using (var stream = file.OpenReadStream())
|
||||||
|
{
|
||||||
|
pngBytes = _pipeline.Process(stream, frame);
|
||||||
|
}
|
||||||
|
|
||||||
return await FinishUploadAsync(frame, pngBytes, cancellationToken).ConfigureAwait(false);
|
return await FinishUploadAsync(frame, pngBytes, cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
[HttpPost("{name}/image-url")]
|
[HttpPost("{name}/image-url")]
|
||||||
@@ -73,23 +80,26 @@ public sealed class FramesController : ControllerBase
|
|||||||
return BadRequest(new { error = "Missing or invalid 'url'." });
|
return BadRequest(new { error = "Missing or invalid 'url'." });
|
||||||
}
|
}
|
||||||
|
|
||||||
Stream source;
|
using (await _locks.AcquireAsync(frame.Name, cancellationToken).ConfigureAwait(false))
|
||||||
try
|
|
||||||
{
|
{
|
||||||
source = await _urlFetcher.FetchAsync(uri, cancellationToken).ConfigureAwait(false);
|
Stream source;
|
||||||
}
|
try
|
||||||
catch (ImageFetchException ex)
|
{
|
||||||
{
|
source = await _urlFetcher.FetchAsync(uri, cancellationToken).ConfigureAwait(false);
|
||||||
return StatusCode(StatusCodes.Status502BadGateway, new { error = ex.Message });
|
}
|
||||||
}
|
catch (ImageFetchException ex)
|
||||||
|
{
|
||||||
|
return StatusCode(StatusCodes.Status502BadGateway, new { error = ex.Message });
|
||||||
|
}
|
||||||
|
|
||||||
byte[] pngBytes;
|
byte[] pngBytes;
|
||||||
await using (source)
|
await using (source)
|
||||||
{
|
{
|
||||||
pngBytes = _pipeline.Process(source, frame);
|
pngBytes = _pipeline.Process(source, frame);
|
||||||
}
|
}
|
||||||
|
|
||||||
return await FinishUploadAsync(frame, pngBytes, cancellationToken).ConfigureAwait(false);
|
return await FinishUploadAsync(frame, pngBytes, cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task<IActionResult> FinishUploadAsync(Frame frame, byte[] pngBytes, CancellationToken cancellationToken)
|
private async Task<IActionResult> FinishUploadAsync(Frame frame, byte[] pngBytes, CancellationToken cancellationToken)
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
using FrameProcessor.Concurrency;
|
||||||
using FrameProcessor.Configuration;
|
using FrameProcessor.Configuration;
|
||||||
using FrameProcessor.ImagePipeline;
|
using FrameProcessor.ImagePipeline;
|
||||||
using FrameProcessor.Middleware;
|
using FrameProcessor.Middleware;
|
||||||
@@ -44,6 +45,7 @@ builder.Services.AddSingleton<IImagePipeline, FrameProcessor.ImagePipeline.Image
|
|||||||
builder.Services.AddSingleton<ImageStore>();
|
builder.Services.AddSingleton<ImageStore>();
|
||||||
builder.Services.AddSingleton<MqttPublisher>();
|
builder.Services.AddSingleton<MqttPublisher>();
|
||||||
builder.Services.AddHostedService(sp => sp.GetRequiredService<MqttPublisher>());
|
builder.Services.AddHostedService(sp => sp.GetRequiredService<MqttPublisher>());
|
||||||
|
builder.Services.AddSingleton<FrameLockProvider>();
|
||||||
|
|
||||||
builder.Services.AddHttpClient<IImageUrlFetcher, ImageUrlFetcher>((sp, client) =>
|
builder.Services.AddHttpClient<IImageUrlFetcher, ImageUrlFetcher>((sp, client) =>
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user