using System.Collections.Concurrent; using System.Threading.Channels; using FrameProcessor.Configuration; using FrameProcessor.Domain; using Microsoft.Extensions.Options; using MQTTnet; using MQTTnet.Client; using MQTTnet.Protocol; namespace FrameProcessor.Mqtt; /// /// Persistent MQTT client wrapped in a hosted service. Connects on startup and /// reconnects in the background after disconnects. Connection state is surfaced /// via for the /health endpoint. /// /// On publish failure the (mac → topic, payload) pair is enqueued for retry; a /// background loop drains the queue using . /// Multiple failed publishes for the same MAC collapse to a single pending entry /// per SPEC.md §5.1. /// public sealed class MqttPublisher : BackgroundService { private static readonly TimeSpan ReconnectDelay = TimeSpan.FromSeconds(5); private readonly IOptions _options; private readonly ILogger _logger; private readonly IMqttClient _client; private readonly ConcurrentDictionary _pending = new(); private readonly Channel _wakeup = Channel.CreateBounded( new BoundedChannelOptions(1) { FullMode = BoundedChannelFullMode.DropWrite }); public MqttPublisher(IOptions options, ILogger logger) { _options = options; _logger = logger; _client = new MqttFactory().CreateMqttClient(); } public bool IsConnected => _client.IsConnected; public override async Task StartAsync(CancellationToken cancellationToken) { try { await _client.ConnectAsync(BuildClientOptions(), cancellationToken).ConfigureAwait(false); _logger.LogInformation( "MQTT connected to {Host}:{Port}", _options.Value.Host, _options.Value.Port); } catch (Exception ex) when (!cancellationToken.IsCancellationRequested) { _logger.LogWarning( ex, "Initial MQTT connect to {Host}:{Port} failed; background reconnect will retry", _options.Value.Host, _options.Value.Port); } await base.StartAsync(cancellationToken).ConfigureAwait(false); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var retryLoop = RunRetryLoopAsync(stoppingToken); try { while (!stoppingToken.IsCancellationRequested) { try { await Task.Delay(ReconnectDelay, stoppingToken).ConfigureAwait(false); } catch (OperationCanceledException) { return; } if (_client.IsConnected) { continue; } try { await _client.ConnectAsync(BuildClientOptions(), stoppingToken).ConfigureAwait(false); _logger.LogInformation( "MQTT reconnected to {Host}:{Port}", _options.Value.Host, _options.Value.Port); // Drain any queued publishes immediately now that the link is back up. _wakeup.Writer.TryWrite(true); } catch (Exception ex) when (!stoppingToken.IsCancellationRequested) { _logger.LogWarning( ex, "MQTT reconnect to {Host}:{Port} failed", _options.Value.Host, _options.Value.Port); } } } finally { await retryLoop.ConfigureAwait(false); } } /// /// Publish an update notification for the given frame. Returns /// on any error (disconnected, broker reject, /// transport fault) — per SPEC.md §3.5, publish failure must not fail the upload, /// so this method never throws on MQTT errors. /// still propagates so callers can honor cooperative cancellation. /// /// On failure, the MAC is enqueued for background retry (SPEC.md §5.1). /// public async Task PublishAsync(MacAddress mac, CancellationToken cancellationToken) { var result = await PublishCoreAsync(mac, cancellationToken).ConfigureAwait(false); if (result == PublishResult.Failure) { EnqueueRetry(mac); } return result; } private async Task PublishCoreAsync(MacAddress mac, CancellationToken cancellationToken) { var opts = _options.Value; var topic = $"{opts.BaseTopic}/{mac}"; if (!_client.IsConnected) { _logger.LogWarning("MQTT publish to {Topic} skipped — client not connected", topic); return PublishResult.Failure; } var message = new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload("update") .WithQualityOfServiceLevel((MqttQualityOfServiceLevel)opts.PublishQos) .WithRetainFlag(false) .Build(); try { var result = await _client.PublishAsync(message, cancellationToken).ConfigureAwait(false); if (result.IsSuccess) { _logger.LogInformation("MQTT publish to {Topic} succeeded", topic); return PublishResult.Success; } _logger.LogWarning("MQTT publish to {Topic} returned {ReasonCode}", topic, result.ReasonCode); return PublishResult.Failure; } catch (Exception ex) when (!cancellationToken.IsCancellationRequested) { _logger.LogWarning(ex, "MQTT publish to {Topic} failed", topic); return PublishResult.Failure; } } private void EnqueueRetry(MacAddress mac) { _pending[mac] = 0; _wakeup.Writer.TryWrite(true); } private async Task RunRetryLoopAsync(CancellationToken stoppingToken) { var backoff = _options.Value.RetryBackoffSeconds; var backoffIndex = 0; while (!stoppingToken.IsCancellationRequested) { try { if (_pending.IsEmpty) { // Idle: wait indefinitely for an enqueue or a reconnect signal. await _wakeup.Reader.ReadAsync(stoppingToken).ConfigureAwait(false); backoffIndex = 0; } else { var delaySeconds = backoff[Math.Min(backoffIndex, backoff.Length - 1)]; using var delayCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken); delayCts.CancelAfter(TimeSpan.FromSeconds(delaySeconds)); try { // A signal during the backoff (new failure, reconnect) wakes us early. await _wakeup.Reader.ReadAsync(delayCts.Token).ConfigureAwait(false); } catch (OperationCanceledException) when (!stoppingToken.IsCancellationRequested) { // Backoff window elapsed; fall through to attempt. } } } catch (OperationCanceledException) { return; } if (stoppingToken.IsCancellationRequested) { return; } var anyAttempted = false; var anyFailed = false; foreach (var mac in _pending.Keys.ToArray()) { if (!_client.IsConnected) { // Wait for the reconnect loop to bring the link back up before retrying. anyFailed = true; break; } anyAttempted = true; var result = await PublishCoreAsync(mac, stoppingToken).ConfigureAwait(false); if (result == PublishResult.Success) { _pending.TryRemove(mac, out _); } else { anyFailed = true; } } if (!anyAttempted || anyFailed) { if (!_pending.IsEmpty) { backoffIndex = Math.Min(backoffIndex + 1, backoff.Length - 1); } else { backoffIndex = 0; } } else { backoffIndex = 0; } } } public override async Task StopAsync(CancellationToken cancellationToken) { await base.StopAsync(cancellationToken).ConfigureAwait(false); if (_client.IsConnected) { try { await _client.DisconnectAsync(cancellationToken: cancellationToken).ConfigureAwait(false); } catch (Exception ex) { _logger.LogWarning(ex, "MQTT disconnect on shutdown failed"); } } } public override void Dispose() { _client.Dispose(); base.Dispose(); } private MqttClientOptions BuildClientOptions() { var opts = _options.Value; var builder = new MqttClientOptionsBuilder() .WithTcpServer(opts.Host, opts.Port) .WithClientId(opts.ClientId) .WithCleanSession(false); if (!string.IsNullOrEmpty(opts.Username)) { builder = builder.WithCredentials(opts.Username, opts.Password); } if (opts.UseTls) { builder = builder.WithTlsOptions(_ => { }); } return builder.Build(); } }