From 4c4deb4a1399775a02736a3d5c55e012b83384c4 Mon Sep 17 00:00:00 2001 From: BlubbFish Date: Mon, 27 May 2019 17:24:54 +0200 Subject: [PATCH] [1.1.0] Rewrite Module to reconnect itselfs, so you dont need to watch over the the state of the connection --- ConnectorDataMqtt/Mqtt.cs | 222 ++++++++++++------- ConnectorDataMqtt/Properties/AssemblyInfo.cs | 77 ++++--- 2 files changed, 180 insertions(+), 119 deletions(-) diff --git a/ConnectorDataMqtt/Mqtt.cs b/ConnectorDataMqtt/Mqtt.cs index 0ebd821..f22e6d7 100644 --- a/ConnectorDataMqtt/Mqtt.cs +++ b/ConnectorDataMqtt/Mqtt.cs @@ -1,83 +1,139 @@ -using System; -using System.Collections.Generic; -using System.Text; -using BlubbFish.Utils.IoT.Events; -using uPLibrary.Networking.M2Mqtt; -using uPLibrary.Networking.M2Mqtt.Messages; - -namespace BlubbFish.Utils.IoT.Connector.Data { - public class Mqtt : ADataBackend, IDisposable { - private MqttClient client; - - public Mqtt(Dictionary settings) : base(settings) { - Int32 port = 1883; - if(this.settings.ContainsKey("port")) { - port = Int32.Parse(this.settings["port"]); - } - this.client = new MqttClient(this.settings["server"], port, false, null, null, MqttSslProtocols.None); - Connect(); - } - - private void Connect() { - this.client.MqttMsgPublishReceived += this.Client_MqttMsgPublishReceived; - if (this.settings.ContainsKey("user") && this.settings.ContainsKey("pass")) { - this.client.Connect(Guid.NewGuid().ToString(), this.settings["user"], this.settings["pass"]); - } else { - this.client.Connect(Guid.NewGuid().ToString()); - } - if (this.settings.ContainsKey("topic")) { - Int32 l = this.settings["topic"].Split(';').Length; - Byte[] qos = new Byte[l]; - for (Int32 i = 0; i < qos.Length; i++) { - qos[i] = MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE; - } - this.client.Subscribe(this.settings["topic"].Split(';'), qos); - } else { - this.client.Subscribe(new String[] { "#" }, new Byte[] { MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE }); - } - } - - private void Client_MqttMsgPublishReceived(Object sender, MqttMsgPublishEventArgs e) { - this.NotifyClientIncomming(new DataEvent(Encoding.UTF8.GetString(e.Message), e.Topic, DateTime.Now)); - } - - public override void Send(String topic, String data) { - this.client.Publish(topic, Encoding.UTF8.GetBytes(data)); - this.NotifyClientSending(new DataEvent(data, topic, DateTime.Now)); - } - - #region IDisposable Support - private Boolean disposedValue = false; - - public override Boolean IsConnected { - get { - if(this.client != null) { - return this.client.IsConnected; - } else { - return false; - } - } - } - - protected virtual void Dispose(Boolean disposing) { - if(!this.disposedValue) { - if(disposing) {try { - this.client.MqttMsgPublishReceived -= this.Client_MqttMsgPublishReceived; - this.client.Unsubscribe(new String[] { "#" }); - this.client.Disconnect(); - } catch (Exception) { } - } - - this.client = null; - - this.disposedValue = true; - } - } - - public override void Dispose() { - Dispose(true); - GC.SuppressFinalize(this); - } - #endregion - } -} +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using BlubbFish.Utils.IoT.Events; +using uPLibrary.Networking.M2Mqtt; +using uPLibrary.Networking.M2Mqtt.Messages; + +namespace BlubbFish.Utils.IoT.Connector.Data { + public class Mqtt : ADataBackend, IDisposable { + private MqttClient client; + private Thread connectionWatcher; + + public Mqtt(Dictionary settings) : base(settings) { + Console.WriteLine("BlubbFish.Utils.IoT.Connector.Data.Mqtt.Mqtt()"); + Int32 port = 1883; + if(this.settings.ContainsKey("port")) { + port = Int32.Parse(this.settings["port"]); + } + this.client = new MqttClient(this.settings["server"], port, false, null, null, MqttSslProtocols.None); + this.ConnectionWatcher(); + } + + #region ConectionManage + private void ConnectionWatcher() { + this.connectionWatcher = new Thread(this.ConnectionWatcherRunner); + this.connectionWatcher.Start(); + } + + private void ConnectionWatcherRunner() { + while(true) { + try { + if(!this.IsConnected) { + this.Reconnect(); + } + Thread.Sleep(500); + } catch(Exception) { } + } + } + + private void Reconnect() { + Console.WriteLine("BlubbFish.Utils.IoT.Connector.Data.Mqtt.Reconnect()"); + if(this.IsConnected) { + this.Disconnect(true); + } else { + this.Disconnect(false); + } + this.Connect(); + } + + private void Disconnect(Boolean complete) { + Console.WriteLine("BlubbFish.Utils.IoT.Connector.Data.Mqtt.Disconnect()"); + this.client.MqttMsgPublishReceived -= this.Client_MqttMsgPublishReceived; + this.Unsubscripe(); + if(complete) { + this.client.Disconnect(); + } + } + + private void Connect() { + Console.WriteLine("BlubbFish.Utils.IoT.Connector.Data.Mqtt.Connect()"); + this.client.MqttMsgPublishReceived += this.Client_MqttMsgPublishReceived; + if (this.settings.ContainsKey("user") && this.settings.ContainsKey("pass")) { + this.client.Connect(Guid.NewGuid().ToString(), this.settings["user"], this.settings["pass"]); + } else { + this.client.Connect(Guid.NewGuid().ToString()); + } + this.Subscripe(); + } + #endregion + + #region Subscription + private void Unsubscripe() { + if(this.settings.ContainsKey("topic")) { + this.client.Unsubscribe(this.settings["topic"].Split(';')); + } else { + this.client.Unsubscribe(new String[] { "#" }); + } + } + + private void Subscripe() { + if(this.settings.ContainsKey("topic")) { + Int32 l = this.settings["topic"].Split(';').Length; + Byte[] qos = new Byte[l]; + for(Int32 i = 0; i < qos.Length; i++) { + qos[i] = MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE; + } + this.client.Subscribe(this.settings["topic"].Split(';'), qos); + } else { + this.client.Subscribe(new String[] { "#" }, new Byte[] { MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE }); + } + } + #endregion + + private async void Client_MqttMsgPublishReceived(Object sender, MqttMsgPublishEventArgs e) => await Task.Run(() => { + this.NotifyClientIncomming(new DataEvent(Encoding.UTF8.GetString(e.Message), e.Topic, DateTime.Now)); + }); + + public override void Send(String topic, String data) { + this.client.Publish(topic, Encoding.UTF8.GetBytes(data)); + this.NotifyClientSending(new DataEvent(data, topic, DateTime.Now)); + } + + #region IDisposable Support + private Boolean disposedValue = false; + + public override Boolean IsConnected { + get { + if(this.client != null) { + return this.client.IsConnected; + } else { + return false; + } + } + } + + protected virtual void Dispose(Boolean disposing) { + if(!this.disposedValue) { + if(disposing) {try { + try { + this.connectionWatcher.Abort(); + this.connectionWatcher = null; + } catch { } + this.Disconnect(true); + } catch (Exception) { } + } + this.client = null; + this.disposedValue = true; + } + } + + public override void Dispose() { + this.Dispose(true); + GC.SuppressFinalize(this); + } + #endregion + } +} diff --git a/ConnectorDataMqtt/Properties/AssemblyInfo.cs b/ConnectorDataMqtt/Properties/AssemblyInfo.cs index d73f1be..c7aacff 100644 --- a/ConnectorDataMqtt/Properties/AssemblyInfo.cs +++ b/ConnectorDataMqtt/Properties/AssemblyInfo.cs @@ -1,36 +1,41 @@ -using System.Reflection; -using System.Runtime.CompilerServices; -using System.Runtime.InteropServices; - -// Allgemeine Informationen über eine Assembly werden über die folgenden -// Attribute gesteuert. Ändern Sie diese Attributwerte, um die Informationen zu ändern, -// die einer Assembly zugeordnet sind. -[assembly: AssemblyTitle("ConnectorDataMqtt")] -[assembly: AssemblyDescription("")] -[assembly: AssemblyConfiguration("")] -[assembly: AssemblyCompany("")] -[assembly: AssemblyProduct("ConnectorDataMqtt")] -[assembly: AssemblyCopyright("Copyright © 2017")] -[assembly: AssemblyTrademark("")] -[assembly: AssemblyCulture("")] - -// Durch Festlegen von ComVisible auf FALSE werden die Typen in dieser Assembly -// für COM-Komponenten unsichtbar. Wenn Sie auf einen Typ in dieser Assembly von -// COM aus zugreifen müssen, sollten Sie das ComVisible-Attribut für diesen Typ auf "True" festlegen. -[assembly: ComVisible(false)] - -// Die folgende GUID bestimmt die ID der Typbibliothek, wenn dieses Projekt für COM verfügbar gemacht wird -[assembly: Guid("ee6c8f68-ed46-4c1c-abdd-cfcdf75104f2")] - -// Versionsinformationen für eine Assembly bestehen aus den folgenden vier Werten: -// -// Hauptversion -// Nebenversion -// Buildnummer -// Revision -// -// Sie können alle Werte angeben oder Standardwerte für die Build- und Revisionsnummern verwenden, -// indem Sie "*" wie unten gezeigt eingeben: -// [assembly: AssemblyVersion("1.0.*")] -[assembly: AssemblyVersion("1.0.0.0")] -[assembly: AssemblyFileVersion("1.0.0.0")] +using System.Reflection; +using System.Resources; +using System.Runtime.InteropServices; + +// Allgemeine Informationen über eine Assembly werden über die folgenden +// Attribute gesteuert. Ändern Sie diese Attributwerte, um die Informationen zu ändern, +// die einer Assembly zugeordnet sind. +[assembly: AssemblyTitle("ConnectorDataMqtt")] +[assembly: AssemblyDescription("ADataBackend Connector that connects to mqtt using M2Mqtt")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("BlubbFish")] +[assembly: AssemblyProduct("ConnectorDataMqtt")] +[assembly: AssemblyCopyright("Copyright © 2017 - 27.05.2019")] +[assembly: AssemblyTrademark("© BlubbFish")] +[assembly: AssemblyCulture("")] +[assembly: NeutralResourcesLanguage("de-DE")] + +// Durch Festlegen von ComVisible auf FALSE werden die Typen in dieser Assembly +// für COM-Komponenten unsichtbar. Wenn Sie auf einen Typ in dieser Assembly von +// COM aus zugreifen müssen, sollten Sie das ComVisible-Attribut für diesen Typ auf "True" festlegen. +[assembly: ComVisible(false)] + +// Die folgende GUID bestimmt die ID der Typbibliothek, wenn dieses Projekt für COM verfügbar gemacht wird +[assembly: Guid("ee6c8f68-ed46-4c1c-abdd-cfcdf75104f2")] + +// Versionsinformationen für eine Assembly bestehen aus den folgenden vier Werten: +// +// Hauptversion +// Nebenversion +// Buildnummer +// Revision +// +// Sie können alle Werte angeben oder Standardwerte für die Build- und Revisionsnummern verwenden, +// indem Sie "*" wie unten gezeigt eingeben: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.1.0")] +[assembly: AssemblyFileVersion("1.1.0")] + +/* + * 1.1.0 Rewrite Module to reconnect itselfs, so you dont need to watch over the the state of the connection + */ \ No newline at end of file