I'm running a Windows Service project in VS2019, with Mqtt 4.1.4 .Net
framework client.
My .net client keeps Connecting and Disconnecting
over and over again.
I recently discovered that my OnSubscriberDisconnected
method is passing the following args
values:
args.Reason = SessionTakenOver
args.ReasonString = "Another client connected with the same client id."
Initially I had been creating a new random ClientID upon each HiveMq Broker connection (the free cloud version), But I changed it to :
clientId = ".netWinSvc-" + this.machineName;
this way the machine which is running my Win Service code will ALWAYS conn with the same ClientID.
I believe I was piling up lots of new ClientID connections, and asking the broker to persist the session (CleanSession = false
). And the free cloud subscription allows for 100 device connections
.
QUESTION IS: What do I do to clean up all of these clientID connections, and how avoid this Disconnect/Reconnect issue? Is re-using the same ClientID with CleanSession = false
the best way to go ? In other words, shouldn't I ask the broker to persist
my ClientID connection?
Here is a good portion of my .Net Window Service code:
using log4net.Ext.EventID;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Formatter;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Reflection;
using System.Security.Cryptography.X509Certificates;
using System.ServiceProcess;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;
using Timer = System.Timers.Timer;
using MqttNotificationService.Models;
using Newtonsoft.Json;
namespace MqttNotificationService
{
public partial class MqttService : ServiceBase
{
public static readonly IEventIDLog applog = EventIDLogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
private IManagedMqttClient managedMqttClientPublisher;
private IManagedMqttClient managedMqttClientSubscriber;
private string mqttClientUser = "";
private byte[] mqttClientPswd;
private string mqttBrokerAddress;
private string mqttProtocol = "wss";
private int? mqttPort;
private string defaultMessage;
private string topicThisHost = ""; // get topic from app.config
private string heartBeatPubMsg;
private double heartBeatTimerMs;
public MqttService()
{
InitializeComponent();
}
protected override void OnStart(string[] args)
{
Init();
CreateThreadAndRun();
StartHeartBeatTimer();
}
private void Init()
{
log4net.Config.XmlConfigurator.Configure();
mqttClientUser = ConfigurationManager.AppSettings["MqttClientUser"];
mqttBrokerAddress = ConfigurationManager.AppSettings["MqttBrokerAddress"];
mqttProtocol = ConfigurationManager.AppSettings["MqttProtocol"];
mqttPort = Int16.Parse(ConfigurationManager.AppSettings["MqttPort"]);
MqttUseTls = bool.Parse(ConfigurationManager.AppSettings["UseTlsCertificate"]);
var MqttQos = Int16.Parse(ConfigurationManager.AppSettings["QualityOfService"]);
mqttRetained = bool.Parse(ConfigurationManager.AppSettings["MqttRetained"]);
mqttLastWillRetained = bool.Parse(ConfigurationManager.AppSettings["MqttLastWillRetained"]);
mqttLastWillMessage = ConfigurationManager.AppSettings["MqttLastWillMessage"];
mqttKeepAliveSeconds = Int16.Parse(ConfigurationManager.AppSettings["MqttLastWillKeepAliveSeconds"]);
CertificateFileName = ConfigurationManager.AppSettings["CertificateFileName"];
CertificatePwd = ConfigurationManager.AppSettings["CertificatePswd"];
defaultMessage = ConfigurationManager.AppSettings["DefaultPubMessage"];
topicSubFromHar = ConfigurationManager.AppSettings["MqttTopicSubFromHar"];
topicThisHost = ConfigurationManager.AppSettings["MqttTopicThisHost"];
heartBeatPubMsg = ConfigurationManager.AppSettings["HeartBeatPubMessage"];
heartBeatTimerMs = Double.Parse(ConfigurationManager.AppSettings["HeartBeatTimerMs"]);
pingDicom = bool.Parse(ConfigurationManager.AppSettings["CheckDicomServers"]);
SynergyHostName = ConfigurationManager.AppSettings["SynergyHostName"];
machineName = Dns.GetHostName();
hostIp = Dns.GetHostEntry(machineName)
.AddressList
.FirstOrDefault(ip => ip.AddressFamily == AddressFamily.InterNetwork)
.ToString();
clientId = ".netWinSvc-" + this.machineName;
QosThisHost = MqttQualityOfServiceLevel.AtLeastOnce;
switch (MqttQos)
{
case 0:
QosThisHost = MqttQualityOfServiceLevel.AtLeastOnce;
break;
case 1:
QosThisHost = MqttQualityOfServiceLevel.AtMostOnce;
break;
case 2:
QosThisHost = MqttQualityOfServiceLevel.ExactlyOnce;
break;
}
}
public void CreateThreadAndRun()
{
Thread m_Thread = new Thread(new ThreadStart(StartPublisherAndSubscriber));
m_Thread.SetApartmentState(ApartmentState.STA);
m_Thread.Name = "MT";
m_Thread.Priority = ThreadPriority.Highest;
m_Thread.Start();
}
private void StartPublisherAndSubscriber()
{
StartSubscriber();
_ = StartPublisher();
CheckOtherServers();
}
private void StartHeartBeatTimer()
{
TimeSpan ts = new TimeSpan(0, 0, 5);
Thread.Sleep(ts);
Timer timer = new Timer();
timer.Elapsed += new ElapsedEventHandler(PublishHeartBeat);
timer.Interval = heartBeatTimerMs;
timer.Enabled = true;
}
private void PublishHeartBeat(object source, ElapsedEventArgs e)
{
var message = $"{ this.heartBeatPubMsg}: { MyHostName} {hostIp}";
_ = this.Publish(message, topicThisHost);
this.CheckOtherServers();
}
private async void StartSubscriber()
{
applog.Debug($"In StartSubscriber()");
var mqttFactory = new MQTTnet.MqttFactory();
managedMqttClientSubscriber = mqttFactory.CreateManagedMqttClient();
managedMqttClientSubscriber.ConnectedAsync += OnSubscriberConnected;
managedMqttClientSubscriber.DisconnectedAsync += OnSubscriberDisconnected;
managedMqttClientSubscriber.ApplicationMessageReceivedAsync += this.OnSubscriberMessageReceived;
// If tls is enabled in app.config, we use wss with cert file
if (MqttUseTls)
{
var managedClientOptions = WsSecureClientOptions();
await managedMqttClientSubscriber.StartAsync(managedClientOptions);
}
else
{
var insecureOptions = WsInsecureOptions();
await this.managedMqttClientSubscriber.StartAsync(
new ManagedMqttClientOptions
{
ClientOptions = insecureOptions
});
}
List<MqttTopicFilter> topicFilter = new List<MqttTopicFilter>();
topicFilter.Add(new MqttTopicFilter { Topic = topicThisHost });
topicFilter.Add(new MqttTopicFilter { Topic = topicSubFromHar });
Console.WriteLine("We have subscribed to multiple !");
await this.managedMqttClientSubscriber.SubscribeAsync(topicFilter);
}
public async Task StartPublisher()
{
var mqttFactory = new MqttFactory();
this.managedMqttClientPublisher = mqttFactory.CreateManagedMqttClient();
// If tls is enabled in app.config, we use wss with cert file
if (MqttUseTls)
{
var managedClientOptions = WsSecureClientOptions();
managedClientOptions.AutoReconnectDelay = TimeSpan.FromSeconds(10);
await this.managedMqttClientPublisher.StartAsync(managedClientOptions);
}
else
{
var insecureOptions = WsInsecureOptions();
await this.managedMqttClientPublisher.StartAsync(
new ManagedMqttClientOptions
{
ClientOptions = insecureOptions
});
}
applog.Debug($"In StartPublisher()");
await Publish($"{defaultMessage} - Machine: {this.machineName}, Host: {this.SynergyHostName}", this.topicThisHost);
}
public async Task Publish(string messageIn, string topic, IManagedMqttClient pubClient = null)
{
MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtLeastOnce;
switch (MqttQos)
{
case 0:
qos = MqttQualityOfServiceLevel.AtLeastOnce;
break;
case 1:
qos = MqttQualityOfServiceLevel.AtMostOnce;
break;
case 2:
qos = MqttQualityOfServiceLevel.ExactlyOnce;
break;
}
MqttModel message = new MqttModel();
message.message = messageIn;
message.datestamp = DateTime.Now;
message.source = "";
message.status = "";
var payload = JsonConvert.SerializeObject(message, Formatting.Indented);
var send = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.WithQualityOfServiceLevel(qos)
.WithRetainFlag(false)
.Build();
if (this.managedMqttClientPublisher == null)
{
this.managedMqttClientPublisher = pubClient;
}
if (this.managedMqttClientPublisher != null)
{
try
{
applog.Debug($"Mqtt Service Publish() method - about to pub mqtt message EnqueueAsync() - {messageIn} / {topic} ");
await this.managedMqttClientPublisher.EnqueueAsync(send);
MonitoringLogs logs = new MonitoringLogs();
logs.InsertIntoLog(message);
}
catch (Exception ex)
{
string errorMessage = $"Exception occured in Publish() method. {ex.Message}";
applog.Error(errorMessage);
throw new Exception(errorMessage);
}
}
else
{
applog.Info($"Mqtt Service Publish() method - managedMqttClientPublisher object appears to be NULL");
}
}
public ManagedMqttClientOptions WsSecureClientOptions()
{
string assemblyPath = Path.GetDirectoryName(Assembly.GetAssembly(typeof(MqttService)).CodeBase);
// Building out the secure wss url (both pfx/crt certificate file types appear to work here)
var url = $"{mqttBrokerAddress}:{mqttPort}/mqtt";
X509Certificate2 x509Cert = null;
var file = CertificateFileName;
var filePath = Path.Combine(assemblyPath, file).Remove(0, 6);
// pfx file contains both pub and priv keys (needs pswd); crt file only has pub key (no pswd req'd)
if (Path.GetExtension(CertificateFileName.ToLower()) == ".pfx")
{
// using a PFX cert file via the X509 class
x509Cert = new X509Certificate2(filePath, CertificatePwd);
}
else if (Path.GetExtension(CertificateFileName.ToLower()) == ".crt")
{
x509Cert = new X509Certificate2(filePath);
}
applog.Debug($"In WsSecureClientOptions(), Certificate Path - {filePath}");
var clientOptionsBldr = new MqttClientOptionsBuilder()
.WithProtocolVersion(MqttProtocolVersion.V500)
.WithWebSocketServer(url)
.WithCredentials(mqttClientUser, mqttClientPswd)
.WithClientId(clientId)
.WithCleanSession()
.WithCredentials(mqttClientUser, mqttClientPswd)
.WithTls(
new MqttClientOptionsBuilderTlsParameters()
{
UseTls = true,
SslProtocol = System.Security.Authentication.SslProtocols.Tls12,
Certificates = new List<X509Certificate2>() { x509Cert }
});
ManagedMqttClientOptions managedClientOptions = null;
try
{
applog.Debug($"In WsSecureClientOptions(), about to Build Publisher - ${url}");
managedClientOptions = new ManagedMqttClientOptionsBuilder()
.WithClientOptions(clientOptionsBldr)
.Build();
}
catch (Exception ex)
{
applog.Error("CERT ERROR ! Exception in WsSecureClientOptions() " + ex.Message);
}
return managedClientOptions;
}
private Task OnSubscriberConnected(MqttClientConnectedEventArgs _)
{
return Task.CompletedTask;
}
private Task OnSubscriberDisconnected(MqttClientDisconnectedEventArgs _)
{
return Task.CompletedTask;
}
}
As per the comments your code was creating two connections to the broker; once each in:
StartSubscriber()
StartPublisher()
Both functions end up creating a connection to the broker with the same client id (as they both use the same WsSecureClientOptions()
/WsInsecureOptions()
).
MQTT-3.1.4-2 states:
If the ClientId represents a Client already connected to the Server then the Server MUST disconnect the existing Client.
So the two connections will end up "fighting" - one will connect, leading to the other disconnecting, it will try to reconnect etc.
To solve this either: