diff --git a/IMPLEMENTATION.md b/IMPLEMENTATION.md index a83ed20..1dcf9e5 100644 --- a/IMPLEMENTATION.md +++ b/IMPLEMENTATION.md @@ -151,7 +151,7 @@ Each type lives in `src/FrameProcessor/Domain/`. Tests in `tests/FrameProcessor. ## Phase 6 — MQTT -### [ ] 6.1 `MqttPublisher` hosted service +### [x] 6.1 `MqttPublisher` hosted service - Singleton `IHostedService` wrapping `IMqttClient` (MQTTnet v4). - `WithReconnectDelay`, `WithCleanSession(false)`, credentials/TLS from `MqttOptions`. - Exposes `bool IsConnected` for `/health`. diff --git a/src/FrameProcessor/Mqtt/MqttPublisher.cs b/src/FrameProcessor/Mqtt/MqttPublisher.cs new file mode 100644 index 0000000..7940d20 --- /dev/null +++ b/src/FrameProcessor/Mqtt/MqttPublisher.cs @@ -0,0 +1,132 @@ +using FrameProcessor.Configuration; +using Microsoft.Extensions.Options; +using MQTTnet; +using MQTTnet.Client; + +namespace FrameProcessor.Mqtt; + +/// +/// Persistent MQTT client wrapped in a hosted service. Connects on startup and +/// reconnects in the background after disconnects. Connection state is surfaced +/// via for the /health endpoint. +/// +public sealed class MqttPublisher : BackgroundService +{ + private static readonly TimeSpan ReconnectDelay = TimeSpan.FromSeconds(5); + + private readonly IOptions _options; + private readonly ILogger _logger; + private readonly IMqttClient _client; + + public MqttPublisher(IOptions options, ILogger logger) + { + _options = options; + _logger = logger; + _client = new MqttFactory().CreateMqttClient(); + } + + public bool IsConnected => _client.IsConnected; + + public override async Task StartAsync(CancellationToken cancellationToken) + { + try + { + await _client.ConnectAsync(BuildClientOptions(), cancellationToken).ConfigureAwait(false); + _logger.LogInformation( + "MQTT connected to {Host}:{Port}", + _options.Value.Host, + _options.Value.Port); + } + catch (Exception ex) when (!cancellationToken.IsCancellationRequested) + { + _logger.LogWarning( + ex, + "Initial MQTT connect to {Host}:{Port} failed; background reconnect will retry", + _options.Value.Host, + _options.Value.Port); + } + + await base.StartAsync(cancellationToken).ConfigureAwait(false); + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + while (!stoppingToken.IsCancellationRequested) + { + try + { + await Task.Delay(ReconnectDelay, stoppingToken).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + return; + } + + if (_client.IsConnected) + { + continue; + } + + try + { + await _client.ConnectAsync(BuildClientOptions(), stoppingToken).ConfigureAwait(false); + _logger.LogInformation( + "MQTT reconnected to {Host}:{Port}", + _options.Value.Host, + _options.Value.Port); + } + catch (Exception ex) when (!stoppingToken.IsCancellationRequested) + { + _logger.LogWarning( + ex, + "MQTT reconnect to {Host}:{Port} failed", + _options.Value.Host, + _options.Value.Port); + } + } + } + + public override async Task StopAsync(CancellationToken cancellationToken) + { + await base.StopAsync(cancellationToken).ConfigureAwait(false); + + if (_client.IsConnected) + { + try + { + await _client.DisconnectAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "MQTT disconnect on shutdown failed"); + } + } + } + + public override void Dispose() + { + _client.Dispose(); + base.Dispose(); + } + + private MqttClientOptions BuildClientOptions() + { + var opts = _options.Value; + var builder = new MqttClientOptionsBuilder() + .WithTcpServer(opts.Host, opts.Port) + .WithClientId(opts.ClientId) + .WithCleanSession(false); + + if (!string.IsNullOrEmpty(opts.Username)) + { + builder = builder.WithCredentials(opts.Username, opts.Password); + } + + if (opts.UseTls) + { + builder = builder.WithTlsOptions(_ => { }); + } + + return builder.Build(); + } +} diff --git a/src/FrameProcessor/Program.cs b/src/FrameProcessor/Program.cs index be676dc..e0712a3 100644 --- a/src/FrameProcessor/Program.cs +++ b/src/FrameProcessor/Program.cs @@ -1,5 +1,6 @@ using FrameProcessor.Configuration; using FrameProcessor.ImagePipeline; +using FrameProcessor.Mqtt; using FrameProcessor.Storage; var builder = WebApplication.CreateBuilder(args); @@ -38,6 +39,8 @@ builder.Services.AddSingleton(); builder.Services.AddSingleton(); builder.Services.AddSingleton(); builder.Services.AddSingleton(); +builder.Services.AddSingleton(); +builder.Services.AddHostedService(sp => sp.GetRequiredService()); var app = builder.Build();