[1.1.0] Rewrite Module to reconnect itselfs, so you dont need to watch over the the state of the connection

This commit is contained in:
BlubbFish 2019-05-27 17:24:54 +02:00
parent e8812fa80b
commit 4c4deb4a13
2 changed files with 180 additions and 119 deletions

View File

@ -1,83 +1,139 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Text; using System.Text;
using BlubbFish.Utils.IoT.Events; using System.Threading;
using uPLibrary.Networking.M2Mqtt; using System.Threading.Tasks;
using uPLibrary.Networking.M2Mqtt.Messages; using BlubbFish.Utils.IoT.Events;
using uPLibrary.Networking.M2Mqtt;
namespace BlubbFish.Utils.IoT.Connector.Data { using uPLibrary.Networking.M2Mqtt.Messages;
public class Mqtt : ADataBackend, IDisposable {
private MqttClient client; namespace BlubbFish.Utils.IoT.Connector.Data {
public class Mqtt : ADataBackend, IDisposable {
public Mqtt(Dictionary<String, String> settings) : base(settings) { private MqttClient client;
Int32 port = 1883; private Thread connectionWatcher;
if(this.settings.ContainsKey("port")) {
port = Int32.Parse(this.settings["port"]); public Mqtt(Dictionary<String, String> settings) : base(settings) {
} Console.WriteLine("BlubbFish.Utils.IoT.Connector.Data.Mqtt.Mqtt()");
this.client = new MqttClient(this.settings["server"], port, false, null, null, MqttSslProtocols.None); Int32 port = 1883;
Connect(); if(this.settings.ContainsKey("port")) {
} port = Int32.Parse(this.settings["port"]);
}
private void Connect() { this.client = new MqttClient(this.settings["server"], port, false, null, null, MqttSslProtocols.None);
this.client.MqttMsgPublishReceived += this.Client_MqttMsgPublishReceived; this.ConnectionWatcher();
if (this.settings.ContainsKey("user") && this.settings.ContainsKey("pass")) { }
this.client.Connect(Guid.NewGuid().ToString(), this.settings["user"], this.settings["pass"]);
} else { #region ConectionManage
this.client.Connect(Guid.NewGuid().ToString()); private void ConnectionWatcher() {
} this.connectionWatcher = new Thread(this.ConnectionWatcherRunner);
if (this.settings.ContainsKey("topic")) { this.connectionWatcher.Start();
Int32 l = this.settings["topic"].Split(';').Length; }
Byte[] qos = new Byte[l];
for (Int32 i = 0; i < qos.Length; i++) { private void ConnectionWatcherRunner() {
qos[i] = MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE; while(true) {
} try {
this.client.Subscribe(this.settings["topic"].Split(';'), qos); if(!this.IsConnected) {
} else { this.Reconnect();
this.client.Subscribe(new String[] { "#" }, new Byte[] { MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE }); }
} Thread.Sleep(500);
} } catch(Exception) { }
}
private void Client_MqttMsgPublishReceived(Object sender, MqttMsgPublishEventArgs e) { }
this.NotifyClientIncomming(new DataEvent(Encoding.UTF8.GetString(e.Message), e.Topic, DateTime.Now));
} private void Reconnect() {
Console.WriteLine("BlubbFish.Utils.IoT.Connector.Data.Mqtt.Reconnect()");
public override void Send(String topic, String data) { if(this.IsConnected) {
this.client.Publish(topic, Encoding.UTF8.GetBytes(data)); this.Disconnect(true);
this.NotifyClientSending(new DataEvent(data, topic, DateTime.Now)); } else {
} this.Disconnect(false);
}
#region IDisposable Support this.Connect();
private Boolean disposedValue = false; }
public override Boolean IsConnected { private void Disconnect(Boolean complete) {
get { Console.WriteLine("BlubbFish.Utils.IoT.Connector.Data.Mqtt.Disconnect()");
if(this.client != null) { this.client.MqttMsgPublishReceived -= this.Client_MqttMsgPublishReceived;
return this.client.IsConnected; this.Unsubscripe();
} else { if(complete) {
return false; this.client.Disconnect();
} }
} }
}
private void Connect() {
protected virtual void Dispose(Boolean disposing) { Console.WriteLine("BlubbFish.Utils.IoT.Connector.Data.Mqtt.Connect()");
if(!this.disposedValue) { this.client.MqttMsgPublishReceived += this.Client_MqttMsgPublishReceived;
if(disposing) {try { if (this.settings.ContainsKey("user") && this.settings.ContainsKey("pass")) {
this.client.MqttMsgPublishReceived -= this.Client_MqttMsgPublishReceived; this.client.Connect(Guid.NewGuid().ToString(), this.settings["user"], this.settings["pass"]);
this.client.Unsubscribe(new String[] { "#" }); } else {
this.client.Disconnect(); this.client.Connect(Guid.NewGuid().ToString());
} catch (Exception) { } }
} this.Subscripe();
}
this.client = null; #endregion
this.disposedValue = true; #region Subscription
} private void Unsubscripe() {
} if(this.settings.ContainsKey("topic")) {
this.client.Unsubscribe(this.settings["topic"].Split(';'));
public override void Dispose() { } else {
Dispose(true); this.client.Unsubscribe(new String[] { "#" });
GC.SuppressFinalize(this); }
} }
#endregion
} private void Subscripe() {
} if(this.settings.ContainsKey("topic")) {
Int32 l = this.settings["topic"].Split(';').Length;
Byte[] qos = new Byte[l];
for(Int32 i = 0; i < qos.Length; i++) {
qos[i] = MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE;
}
this.client.Subscribe(this.settings["topic"].Split(';'), qos);
} else {
this.client.Subscribe(new String[] { "#" }, new Byte[] { MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE });
}
}
#endregion
private async void Client_MqttMsgPublishReceived(Object sender, MqttMsgPublishEventArgs e) => await Task.Run(() => {
this.NotifyClientIncomming(new DataEvent(Encoding.UTF8.GetString(e.Message), e.Topic, DateTime.Now));
});
public override void Send(String topic, String data) {
this.client.Publish(topic, Encoding.UTF8.GetBytes(data));
this.NotifyClientSending(new DataEvent(data, topic, DateTime.Now));
}
#region IDisposable Support
private Boolean disposedValue = false;
public override Boolean IsConnected {
get {
if(this.client != null) {
return this.client.IsConnected;
} else {
return false;
}
}
}
protected virtual void Dispose(Boolean disposing) {
if(!this.disposedValue) {
if(disposing) {try {
try {
this.connectionWatcher.Abort();
this.connectionWatcher = null;
} catch { }
this.Disconnect(true);
} catch (Exception) { }
}
this.client = null;
this.disposedValue = true;
}
}
public override void Dispose() {
this.Dispose(true);
GC.SuppressFinalize(this);
}
#endregion
}
}

View File

@ -1,36 +1,41 @@
using System.Reflection; using System.Reflection;
using System.Runtime.CompilerServices; using System.Resources;
using System.Runtime.InteropServices; using System.Runtime.InteropServices;
// Allgemeine Informationen über eine Assembly werden über die folgenden // Allgemeine Informationen über eine Assembly werden über die folgenden
// Attribute gesteuert. Ändern Sie diese Attributwerte, um die Informationen zu ändern, // Attribute gesteuert. Ändern Sie diese Attributwerte, um die Informationen zu ändern,
// die einer Assembly zugeordnet sind. // die einer Assembly zugeordnet sind.
[assembly: AssemblyTitle("ConnectorDataMqtt")] [assembly: AssemblyTitle("ConnectorDataMqtt")]
[assembly: AssemblyDescription("")] [assembly: AssemblyDescription("ADataBackend Connector that connects to mqtt using M2Mqtt")]
[assembly: AssemblyConfiguration("")] [assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")] [assembly: AssemblyCompany("BlubbFish")]
[assembly: AssemblyProduct("ConnectorDataMqtt")] [assembly: AssemblyProduct("ConnectorDataMqtt")]
[assembly: AssemblyCopyright("Copyright © 2017")] [assembly: AssemblyCopyright("Copyright © 2017 - 27.05.2019")]
[assembly: AssemblyTrademark("")] [assembly: AssemblyTrademark("© BlubbFish")]
[assembly: AssemblyCulture("")] [assembly: AssemblyCulture("")]
[assembly: NeutralResourcesLanguage("de-DE")]
// Durch Festlegen von ComVisible auf FALSE werden die Typen in dieser Assembly
// für COM-Komponenten unsichtbar. Wenn Sie auf einen Typ in dieser Assembly von // Durch Festlegen von ComVisible auf FALSE werden die Typen in dieser Assembly
// COM aus zugreifen müssen, sollten Sie das ComVisible-Attribut für diesen Typ auf "True" festlegen. // für COM-Komponenten unsichtbar. Wenn Sie auf einen Typ in dieser Assembly von
[assembly: ComVisible(false)] // COM aus zugreifen müssen, sollten Sie das ComVisible-Attribut für diesen Typ auf "True" festlegen.
[assembly: ComVisible(false)]
// Die folgende GUID bestimmt die ID der Typbibliothek, wenn dieses Projekt für COM verfügbar gemacht wird
[assembly: Guid("ee6c8f68-ed46-4c1c-abdd-cfcdf75104f2")] // Die folgende GUID bestimmt die ID der Typbibliothek, wenn dieses Projekt für COM verfügbar gemacht wird
[assembly: Guid("ee6c8f68-ed46-4c1c-abdd-cfcdf75104f2")]
// Versionsinformationen für eine Assembly bestehen aus den folgenden vier Werten:
// // Versionsinformationen für eine Assembly bestehen aus den folgenden vier Werten:
// Hauptversion //
// Nebenversion // Hauptversion
// Buildnummer // Nebenversion
// Revision // Buildnummer
// // Revision
// Sie können alle Werte angeben oder Standardwerte für die Build- und Revisionsnummern verwenden, //
// indem Sie "*" wie unten gezeigt eingeben: // Sie können alle Werte angeben oder Standardwerte für die Build- und Revisionsnummern verwenden,
// [assembly: AssemblyVersion("1.0.*")] // indem Sie "*" wie unten gezeigt eingeben:
[assembly: AssemblyVersion("1.0.0.0")] // [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyFileVersion("1.0.0.0")] [assembly: AssemblyVersion("1.1.0")]
[assembly: AssemblyFileVersion("1.1.0")]
/*
* 1.1.0 Rewrite Module to reconnect itselfs, so you dont need to watch over the the state of the connection
*/