From d6ebf8946860cb5135d35ed059c408a6e8023493 Mon Sep 17 00:00:00 2001 From: Fritiof Hedman Date: Sun, 7 Jun 2026 16:05:02 +0200 Subject: [PATCH] 8.3 Wrap full pipeline in lock Co-Authored-By: Claude Opus 4.7 (1M context) --- IMPLEMENTATION.md | 2 +- .../Controllers/FramesController.cs | 50 +++++++++++-------- src/FrameProcessor/Program.cs | 2 + 3 files changed, 33 insertions(+), 21 deletions(-) diff --git a/IMPLEMENTATION.md b/IMPLEMENTATION.md index 069777b..04534f6 100644 --- a/IMPLEMENTATION.md +++ b/IMPLEMENTATION.md @@ -202,7 +202,7 @@ Each type lives in `src/FrameProcessor/Domain/`. Tests in `tests/FrameProcessor. - `ConcurrentDictionary` (each `SemaphoreSlim(1, 1)`). - `Task AcquireAsync(FrameName, CancellationToken)` returning a disposable that releases on dispose. -### [ ] 8.3 Wrap full pipeline in lock +### [x] 8.3 Wrap full pipeline in lock - In `FramesController`, acquire the frame's lock before fetch/decode and release after publish-attempt completes (`CLAUDE.md` "Per-frame serialization"). - **Manual check:** fire two concurrent uploads to the same frame → both return 200, only one PNG on disk reflects whichever finished last, two MQTT publishes (or one if collapsed by retry queue). diff --git a/src/FrameProcessor/Controllers/FramesController.cs b/src/FrameProcessor/Controllers/FramesController.cs index d6fca03..bfb2fd2 100644 --- a/src/FrameProcessor/Controllers/FramesController.cs +++ b/src/FrameProcessor/Controllers/FramesController.cs @@ -1,3 +1,4 @@ +using FrameProcessor.Concurrency; using FrameProcessor.Configuration; using FrameProcessor.Domain; using FrameProcessor.ImagePipeline; @@ -17,19 +18,22 @@ public sealed class FramesController : ControllerBase private readonly ImageStore _store; private readonly MqttPublisher _mqtt; private readonly IImageUrlFetcher _urlFetcher; + private readonly FrameLockProvider _locks; public FramesController( FramesRegistry frames, IImagePipeline pipeline, ImageStore store, MqttPublisher mqtt, - IImageUrlFetcher urlFetcher) + IImageUrlFetcher urlFetcher, + FrameLockProvider locks) { _frames = frames; _pipeline = pipeline; _store = store; _mqtt = mqtt; _urlFetcher = urlFetcher; + _locks = locks; } [HttpPost("{name}/image")] @@ -46,13 +50,16 @@ public sealed class FramesController : ControllerBase return BadRequest(new { error = "Missing 'image' file part." }); } - byte[] pngBytes; - await using (var stream = file.OpenReadStream()) + using (await _locks.AcquireAsync(frame.Name, cancellationToken).ConfigureAwait(false)) { - pngBytes = _pipeline.Process(stream, frame); - } + byte[] pngBytes; + await using (var stream = file.OpenReadStream()) + { + pngBytes = _pipeline.Process(stream, frame); + } - return await FinishUploadAsync(frame, pngBytes, cancellationToken).ConfigureAwait(false); + return await FinishUploadAsync(frame, pngBytes, cancellationToken).ConfigureAwait(false); + } } [HttpPost("{name}/image-url")] @@ -73,23 +80,26 @@ public sealed class FramesController : ControllerBase return BadRequest(new { error = "Missing or invalid 'url'." }); } - Stream source; - try + using (await _locks.AcquireAsync(frame.Name, cancellationToken).ConfigureAwait(false)) { - source = await _urlFetcher.FetchAsync(uri, cancellationToken).ConfigureAwait(false); - } - catch (ImageFetchException ex) - { - return StatusCode(StatusCodes.Status502BadGateway, new { error = ex.Message }); - } + Stream source; + try + { + source = await _urlFetcher.FetchAsync(uri, cancellationToken).ConfigureAwait(false); + } + catch (ImageFetchException ex) + { + return StatusCode(StatusCodes.Status502BadGateway, new { error = ex.Message }); + } - byte[] pngBytes; - await using (source) - { - pngBytes = _pipeline.Process(source, frame); - } + byte[] pngBytes; + await using (source) + { + pngBytes = _pipeline.Process(source, frame); + } - return await FinishUploadAsync(frame, pngBytes, cancellationToken).ConfigureAwait(false); + return await FinishUploadAsync(frame, pngBytes, cancellationToken).ConfigureAwait(false); + } } private async Task FinishUploadAsync(Frame frame, byte[] pngBytes, CancellationToken cancellationToken) diff --git a/src/FrameProcessor/Program.cs b/src/FrameProcessor/Program.cs index 4486b17..e6a81f5 100644 --- a/src/FrameProcessor/Program.cs +++ b/src/FrameProcessor/Program.cs @@ -1,3 +1,4 @@ +using FrameProcessor.Concurrency; using FrameProcessor.Configuration; using FrameProcessor.ImagePipeline; using FrameProcessor.Middleware; @@ -44,6 +45,7 @@ builder.Services.AddSingleton(); builder.Services.AddSingleton(); builder.Services.AddHostedService(sp => sp.GetRequiredService()); +builder.Services.AddSingleton(); builder.Services.AddHttpClient((sp, client) => {