6.1 MqttPublisher hosted service
Persistent MQTTnet v4 IMqttClient wrapped in a BackgroundService: attempts to connect on StartAsync (logging and continuing on failure) and reconnects on a 5s loop while running. Exposes IsConnected so 6.4 can wire it into /health later. Honors username/password and UseTls from MqttOptions; clean session is disabled. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -151,7 +151,7 @@ Each type lives in `src/FrameProcessor/Domain/`. Tests in `tests/FrameProcessor.
|
|||||||
|
|
||||||
## Phase 6 — MQTT
|
## Phase 6 — MQTT
|
||||||
|
|
||||||
### [ ] 6.1 `MqttPublisher` hosted service
|
### [x] 6.1 `MqttPublisher` hosted service
|
||||||
- Singleton `IHostedService` wrapping `IMqttClient` (MQTTnet v4).
|
- Singleton `IHostedService` wrapping `IMqttClient` (MQTTnet v4).
|
||||||
- `WithReconnectDelay`, `WithCleanSession(false)`, credentials/TLS from `MqttOptions`.
|
- `WithReconnectDelay`, `WithCleanSession(false)`, credentials/TLS from `MqttOptions`.
|
||||||
- Exposes `bool IsConnected` for `/health`.
|
- Exposes `bool IsConnected` for `/health`.
|
||||||
|
|||||||
132
src/FrameProcessor/Mqtt/MqttPublisher.cs
Normal file
132
src/FrameProcessor/Mqtt/MqttPublisher.cs
Normal file
@@ -0,0 +1,132 @@
|
|||||||
|
using FrameProcessor.Configuration;
|
||||||
|
using Microsoft.Extensions.Options;
|
||||||
|
using MQTTnet;
|
||||||
|
using MQTTnet.Client;
|
||||||
|
|
||||||
|
namespace FrameProcessor.Mqtt;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Persistent MQTT client wrapped in a hosted service. Connects on startup and
|
||||||
|
/// reconnects in the background after disconnects. Connection state is surfaced
|
||||||
|
/// via <see cref="IsConnected"/> for the <c>/health</c> endpoint.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class MqttPublisher : BackgroundService
|
||||||
|
{
|
||||||
|
private static readonly TimeSpan ReconnectDelay = TimeSpan.FromSeconds(5);
|
||||||
|
|
||||||
|
private readonly IOptions<MqttOptions> _options;
|
||||||
|
private readonly ILogger<MqttPublisher> _logger;
|
||||||
|
private readonly IMqttClient _client;
|
||||||
|
|
||||||
|
public MqttPublisher(IOptions<MqttOptions> options, ILogger<MqttPublisher> logger)
|
||||||
|
{
|
||||||
|
_options = options;
|
||||||
|
_logger = logger;
|
||||||
|
_client = new MqttFactory().CreateMqttClient();
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool IsConnected => _client.IsConnected;
|
||||||
|
|
||||||
|
public override async Task StartAsync(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await _client.ConnectAsync(BuildClientOptions(), cancellationToken).ConfigureAwait(false);
|
||||||
|
_logger.LogInformation(
|
||||||
|
"MQTT connected to {Host}:{Port}",
|
||||||
|
_options.Value.Host,
|
||||||
|
_options.Value.Port);
|
||||||
|
}
|
||||||
|
catch (Exception ex) when (!cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
_logger.LogWarning(
|
||||||
|
ex,
|
||||||
|
"Initial MQTT connect to {Host}:{Port} failed; background reconnect will retry",
|
||||||
|
_options.Value.Host,
|
||||||
|
_options.Value.Port);
|
||||||
|
}
|
||||||
|
|
||||||
|
await base.StartAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||||
|
{
|
||||||
|
while (!stoppingToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await Task.Delay(ReconnectDelay, stoppingToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (_client.IsConnected)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await _client.ConnectAsync(BuildClientOptions(), stoppingToken).ConfigureAwait(false);
|
||||||
|
_logger.LogInformation(
|
||||||
|
"MQTT reconnected to {Host}:{Port}",
|
||||||
|
_options.Value.Host,
|
||||||
|
_options.Value.Port);
|
||||||
|
}
|
||||||
|
catch (Exception ex) when (!stoppingToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
_logger.LogWarning(
|
||||||
|
ex,
|
||||||
|
"MQTT reconnect to {Host}:{Port} failed",
|
||||||
|
_options.Value.Host,
|
||||||
|
_options.Value.Port);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public override async Task StopAsync(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
await base.StopAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
|
||||||
|
if (_client.IsConnected)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await _client.DisconnectAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogWarning(ex, "MQTT disconnect on shutdown failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public override void Dispose()
|
||||||
|
{
|
||||||
|
_client.Dispose();
|
||||||
|
base.Dispose();
|
||||||
|
}
|
||||||
|
|
||||||
|
private MqttClientOptions BuildClientOptions()
|
||||||
|
{
|
||||||
|
var opts = _options.Value;
|
||||||
|
var builder = new MqttClientOptionsBuilder()
|
||||||
|
.WithTcpServer(opts.Host, opts.Port)
|
||||||
|
.WithClientId(opts.ClientId)
|
||||||
|
.WithCleanSession(false);
|
||||||
|
|
||||||
|
if (!string.IsNullOrEmpty(opts.Username))
|
||||||
|
{
|
||||||
|
builder = builder.WithCredentials(opts.Username, opts.Password);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (opts.UseTls)
|
||||||
|
{
|
||||||
|
builder = builder.WithTlsOptions(_ => { });
|
||||||
|
}
|
||||||
|
|
||||||
|
return builder.Build();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,5 +1,6 @@
|
|||||||
using FrameProcessor.Configuration;
|
using FrameProcessor.Configuration;
|
||||||
using FrameProcessor.ImagePipeline;
|
using FrameProcessor.ImagePipeline;
|
||||||
|
using FrameProcessor.Mqtt;
|
||||||
using FrameProcessor.Storage;
|
using FrameProcessor.Storage;
|
||||||
|
|
||||||
var builder = WebApplication.CreateBuilder(args);
|
var builder = WebApplication.CreateBuilder(args);
|
||||||
@@ -38,6 +39,8 @@ builder.Services.AddSingleton<FramesOptionsValidator>();
|
|||||||
builder.Services.AddSingleton<FramesRegistry>();
|
builder.Services.AddSingleton<FramesRegistry>();
|
||||||
builder.Services.AddSingleton<IImagePipeline, FrameProcessor.ImagePipeline.ImagePipeline>();
|
builder.Services.AddSingleton<IImagePipeline, FrameProcessor.ImagePipeline.ImagePipeline>();
|
||||||
builder.Services.AddSingleton<ImageStore>();
|
builder.Services.AddSingleton<ImageStore>();
|
||||||
|
builder.Services.AddSingleton<MqttPublisher>();
|
||||||
|
builder.Services.AddHostedService(sp => sp.GetRequiredService<MqttPublisher>());
|
||||||
|
|
||||||
var app = builder.Build();
|
var app = builder.Build();
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user