diff --git a/IMPLEMENTATION.md b/IMPLEMENTATION.md index f588a6c..3619bf9 100644 --- a/IMPLEMENTATION.md +++ b/IMPLEMENTATION.md @@ -167,7 +167,7 @@ Each type lives in `src/FrameProcessor/Domain/`. Tests in `tests/FrameProcessor. ### [x] 6.4 `/health` reports MQTT status - Replace hardcoded `mqttConnected` with `MqttPublisher.IsConnected`. -### [ ] 6.5 Background retry queue +### [x] 6.5 Background retry queue - In-memory `Channel` (one slot per frame; newer publish supersedes older — per `SPEC.md` §5.1 "Multiple queued publishes for the same frame collapse to the most recent one"). - Background loop drains with backoff sequence from `MqttOptions.RetryBackoffSeconds`. - On reconnect, drain immediately. diff --git a/src/FrameProcessor/Mqtt/MqttPublisher.cs b/src/FrameProcessor/Mqtt/MqttPublisher.cs index 95e5f63..83d20ca 100644 --- a/src/FrameProcessor/Mqtt/MqttPublisher.cs +++ b/src/FrameProcessor/Mqtt/MqttPublisher.cs @@ -1,3 +1,5 @@ +using System.Collections.Concurrent; +using System.Threading.Channels; using FrameProcessor.Configuration; using FrameProcessor.Domain; using Microsoft.Extensions.Options; @@ -11,6 +13,11 @@ namespace FrameProcessor.Mqtt; /// Persistent MQTT client wrapped in a hosted service. Connects on startup and /// reconnects in the background after disconnects. Connection state is surfaced /// via for the /health endpoint. +/// +/// On publish failure the (mac → topic, payload) pair is enqueued for retry; a +/// background loop drains the queue using . +/// Multiple failed publishes for the same MAC collapse to a single pending entry +/// per SPEC.md §5.1. /// public sealed class MqttPublisher : BackgroundService { @@ -20,6 +27,10 @@ public sealed class MqttPublisher : BackgroundService private readonly ILogger _logger; private readonly IMqttClient _client; + private readonly ConcurrentDictionary _pending = new(); + private readonly Channel _wakeup = Channel.CreateBounded( + new BoundedChannelOptions(1) { FullMode = BoundedChannelFullMode.DropWrite }); + public MqttPublisher(IOptions options, ILogger logger) { _options = options; @@ -53,39 +64,50 @@ public sealed class MqttPublisher : BackgroundService protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - while (!stoppingToken.IsCancellationRequested) + var retryLoop = RunRetryLoopAsync(stoppingToken); + + try { - try + while (!stoppingToken.IsCancellationRequested) { - await Task.Delay(ReconnectDelay, stoppingToken).ConfigureAwait(false); - } - catch (OperationCanceledException) - { - return; - } + try + { + await Task.Delay(ReconnectDelay, stoppingToken).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + return; + } - if (_client.IsConnected) - { - continue; - } + if (_client.IsConnected) + { + continue; + } - try - { - await _client.ConnectAsync(BuildClientOptions(), stoppingToken).ConfigureAwait(false); - _logger.LogInformation( - "MQTT reconnected to {Host}:{Port}", - _options.Value.Host, - _options.Value.Port); - } - catch (Exception ex) when (!stoppingToken.IsCancellationRequested) - { - _logger.LogWarning( - ex, - "MQTT reconnect to {Host}:{Port} failed", - _options.Value.Host, - _options.Value.Port); + try + { + await _client.ConnectAsync(BuildClientOptions(), stoppingToken).ConfigureAwait(false); + _logger.LogInformation( + "MQTT reconnected to {Host}:{Port}", + _options.Value.Host, + _options.Value.Port); + // Drain any queued publishes immediately now that the link is back up. + _wakeup.Writer.TryWrite(true); + } + catch (Exception ex) when (!stoppingToken.IsCancellationRequested) + { + _logger.LogWarning( + ex, + "MQTT reconnect to {Host}:{Port} failed", + _options.Value.Host, + _options.Value.Port); + } } } + finally + { + await retryLoop.ConfigureAwait(false); + } } /// @@ -94,11 +116,30 @@ public sealed class MqttPublisher : BackgroundService /// transport fault) — per SPEC.md §3.5, publish failure must not fail the upload, /// so this method never throws on MQTT errors. /// still propagates so callers can honor cooperative cancellation. + /// + /// On failure, the MAC is enqueued for background retry (SPEC.md §5.1). /// public async Task PublishAsync(MacAddress mac, CancellationToken cancellationToken) + { + var result = await PublishCoreAsync(mac, cancellationToken).ConfigureAwait(false); + if (result == PublishResult.Failure) + { + EnqueueRetry(mac); + } + return result; + } + + private async Task PublishCoreAsync(MacAddress mac, CancellationToken cancellationToken) { var opts = _options.Value; var topic = $"{opts.BaseTopic}/{mac}"; + + if (!_client.IsConnected) + { + _logger.LogWarning("MQTT publish to {Topic} skipped — client not connected", topic); + return PublishResult.Failure; + } + var message = new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload("update") @@ -125,6 +166,94 @@ public sealed class MqttPublisher : BackgroundService } } + private void EnqueueRetry(MacAddress mac) + { + _pending[mac] = 0; + _wakeup.Writer.TryWrite(true); + } + + private async Task RunRetryLoopAsync(CancellationToken stoppingToken) + { + var backoff = _options.Value.RetryBackoffSeconds; + var backoffIndex = 0; + + while (!stoppingToken.IsCancellationRequested) + { + try + { + if (_pending.IsEmpty) + { + // Idle: wait indefinitely for an enqueue or a reconnect signal. + await _wakeup.Reader.ReadAsync(stoppingToken).ConfigureAwait(false); + backoffIndex = 0; + } + else + { + var delaySeconds = backoff[Math.Min(backoffIndex, backoff.Length - 1)]; + using var delayCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken); + delayCts.CancelAfter(TimeSpan.FromSeconds(delaySeconds)); + try + { + // A signal during the backoff (new failure, reconnect) wakes us early. + await _wakeup.Reader.ReadAsync(delayCts.Token).ConfigureAwait(false); + } + catch (OperationCanceledException) when (!stoppingToken.IsCancellationRequested) + { + // Backoff window elapsed; fall through to attempt. + } + } + } + catch (OperationCanceledException) + { + return; + } + + if (stoppingToken.IsCancellationRequested) + { + return; + } + + var anyAttempted = false; + var anyFailed = false; + foreach (var mac in _pending.Keys.ToArray()) + { + if (!_client.IsConnected) + { + // Wait for the reconnect loop to bring the link back up before retrying. + anyFailed = true; + break; + } + + anyAttempted = true; + var result = await PublishCoreAsync(mac, stoppingToken).ConfigureAwait(false); + if (result == PublishResult.Success) + { + _pending.TryRemove(mac, out _); + } + else + { + anyFailed = true; + } + } + + if (!anyAttempted || anyFailed) + { + if (!_pending.IsEmpty) + { + backoffIndex = Math.Min(backoffIndex + 1, backoff.Length - 1); + } + else + { + backoffIndex = 0; + } + } + else + { + backoffIndex = 0; + } + } + } + public override async Task StopAsync(CancellationToken cancellationToken) { await base.StopAsync(cancellationToken).ConfigureAwait(false);