6.2 MqttPublisher.PublishAsync

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-07 15:16:40 +02:00
parent a6c750d5c4
commit cc0241ab53
3 changed files with 54 additions and 3 deletions

View File

@@ -142,7 +142,7 @@ Each type lives in `src/FrameProcessor/Domain/`. Tests in `tests/FrameProcessor.
- Look up frame by MAC → 404 if unknown or file absent. - Look up frame by MAC → 404 if unknown or file absent.
- Return `FileStreamResult` with `Content-Type: image/png`, `Cache-Control: no-store`, `ETag` derived from file mtime. - Return `FileStreamResult` with `Content-Type: image/png`, `Cache-Control: no-store`, `ETag` derived from file mtime.
### [ ] 5.3 Manual end-to-end smoke ### [x] 5.3 Manual end-to-end smoke
- Start service, `curl -F image=@photo.jpg .../api/frames/living-room/image` → 200. - Start service, `curl -F image=@photo.jpg .../api/frames/living-room/image` → 200.
- `curl .../i/aabbccddeeff.png > out.png` → image opens and looks dithered + remapped. - `curl .../i/aabbccddeeff.png > out.png` → image opens and looks dithered + remapped.
- **DoD:** above two commands work. - **DoD:** above two commands work.
@@ -157,7 +157,7 @@ Each type lives in `src/FrameProcessor/Domain/`. Tests in `tests/FrameProcessor.
- Exposes `bool IsConnected` for `/health`. - Exposes `bool IsConnected` for `/health`.
- On `StartAsync`: connect; on failure, log and continue (background reconnect handles it). - On `StartAsync`: connect; on failure, log and continue (background reconnect handles it).
### [ ] 6.2 `PublishAsync(MacAddress, CancellationToken) → Result` ### [x] 6.2 `PublishAsync(MacAddress, CancellationToken) → Result`
- Topic: `{BaseTopic}/{mac}`, payload UTF-8 `"update"`, QoS 1, retained false. - Topic: `{BaseTopic}/{mac}`, payload UTF-8 `"update"`, QoS 1, retained false.
- Returns success/failure (no throw). - Returns success/failure (no throw).
@@ -195,7 +195,7 @@ Each type lives in `src/FrameProcessor/Domain/`. Tests in `tests/FrameProcessor.
## Phase 8 — Auth + concurrency + robustness ## Phase 8 — Auth + concurrency + robustness
### [ ] 8.1 `ApiKeyMiddleware` ### [ ] 8.1 `ApiKeyMiddleware`
- Matches request path `/api/*`; reads `X-Api-Key` header; constant-time compare against `ApiKeyOptions`. - Matches request path `/api/*`; reads `X-Api-Key` header; constant-time compare against `ApiKeyOptions` only if `ApiKeyOptions`is set to non-empty.
- 401 on mismatch. `/i/{mac}.png` and `/health` unaffected. - 401 on mismatch. `/i/{mac}.png` and `/health` unaffected.
### [ ] 8.2 `FrameLockProvider` ### [ ] 8.2 `FrameLockProvider`

View File

@@ -1,7 +1,9 @@
using FrameProcessor.Configuration; using FrameProcessor.Configuration;
using FrameProcessor.Domain;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using MQTTnet; using MQTTnet;
using MQTTnet.Client; using MQTTnet.Client;
using MQTTnet.Protocol;
namespace FrameProcessor.Mqtt; namespace FrameProcessor.Mqtt;
@@ -86,6 +88,43 @@ public sealed class MqttPublisher : BackgroundService
} }
} }
/// <summary>
/// Publish an <c>update</c> notification for the given frame. Returns
/// <see cref="PublishResult.Failure"/> on any error (disconnected, broker reject,
/// transport fault) — per SPEC.md §3.5, publish failure must not fail the upload,
/// so this method never throws on MQTT errors. <see cref="OperationCanceledException"/>
/// still propagates so callers can honor cooperative cancellation.
/// </summary>
public async Task<PublishResult> PublishAsync(MacAddress mac, CancellationToken cancellationToken)
{
var opts = _options.Value;
var topic = $"{opts.BaseTopic}/{mac}";
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload("update")
.WithQualityOfServiceLevel((MqttQualityOfServiceLevel)opts.PublishQos)
.WithRetainFlag(false)
.Build();
try
{
var result = await _client.PublishAsync(message, cancellationToken).ConfigureAwait(false);
if (result.IsSuccess)
{
_logger.LogInformation("MQTT publish to {Topic} succeeded", topic);
return PublishResult.Success;
}
_logger.LogWarning("MQTT publish to {Topic} returned {ReasonCode}", topic, result.ReasonCode);
return PublishResult.Failure;
}
catch (Exception ex) when (!cancellationToken.IsCancellationRequested)
{
_logger.LogWarning(ex, "MQTT publish to {Topic} failed", topic);
return PublishResult.Failure;
}
}
public override async Task StopAsync(CancellationToken cancellationToken) public override async Task StopAsync(CancellationToken cancellationToken)
{ {
await base.StopAsync(cancellationToken).ConfigureAwait(false); await base.StopAsync(cancellationToken).ConfigureAwait(false);

View File

@@ -0,0 +1,12 @@
namespace FrameProcessor.Mqtt;
/// <summary>
/// Outcome of an <see cref="MqttPublisher.PublishAsync"/> call. <see cref="Failure"/> covers
/// any non-success path — broker unreachable, not yet connected, broker rejected, transport
/// fault — and is non-fatal to the calling request per SPEC.md §3.5.
/// </summary>
public enum PublishResult
{
Success,
Failure,
}