ConnectorDataMosquitto/ConnectorDataMosquitto/Mosquitto.cs

126 lines
4.4 KiB
C#

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text.RegularExpressions;
namespace BlubbFish.Utils.IoT.Connector.Data {
public class Mosquitto : ADataBackend, IDisposable {
private Process p;
private String message;
public override event MqttMessage MessageIncomming;
public override event MqttMessage MessageSending;
public Mosquitto(Dictionary<String, String> mqtt_settings) {
this.settings = mqtt_settings;
//mosquitto_sub --cafile ca.pem --cert cert.pem --key cert.key -h swb.broker.flex4grid.eu -p 8883 -t "#" -v -d
this.message = "";
this.p = new Process();
this.p.StartInfo.FileName = "mosquitto_sub";
String topic = "#";
String args = "-h " + this.settings["server"]+" ";
if(this.settings.ContainsKey("port")) {
args += "-p "+ this.settings["port"]+" ";
}
if (this.settings.ContainsKey("cafile")) {
args += "--cafile " + this.settings["cafile"] + " ";
}
if (this.settings.ContainsKey("cert")) {
args += "--cert " + this.settings["cert"] + " ";
}
if (this.settings.ContainsKey("key")) {
args += "--key " + this.settings["key"] + " ";
}
if(this.settings.ContainsKey("topic")) {
topic = this.settings["topic"];
}
this.p.StartInfo.Arguments = args+"-t \""+ topic + "\" -v -d";
this.p.StartInfo.CreateNoWindow = true;
this.p.StartInfo.UseShellExecute = false;
this.p.StartInfo.RedirectStandardOutput = true;
this.p.StartInfo.RedirectStandardError = true;
this.p.OutputDataReceived += this.P_OutputDataReceived;
this.p.ErrorDataReceived += this.P_ErrorDataReceived;
this.p.Start();
this.p.BeginOutputReadLine();
}
public override void Send(String topic, String data) {
Process send = new Process();
send.StartInfo.FileName = "mosquitto_pub";
String args = "-h " + this.settings["server"] + " ";
if (this.settings.ContainsKey("port")) {
args += "-p " + this.settings["port"] + " ";
}
if (this.settings.ContainsKey("cafile")) {
args += "--cafile " + this.settings["cafile"] + " ";
}
if (this.settings.ContainsKey("cert")) {
args += "--cert " + this.settings["cert"] + " ";
}
if (this.settings.ContainsKey("key")) {
args += "--key " + this.settings["key"] + " ";
}
send.StartInfo.Arguments = args + "-m \""+data.Replace("\"","\\\"")+"\" -t \""+topic+"\" -d";
send.StartInfo.CreateNoWindow = true;
send.StartInfo.UseShellExecute = false;
send.StartInfo.RedirectStandardOutput = true;
send.StartInfo.RedirectStandardError = true;
send.Start();
send.WaitForExit();
MessageSending?.Invoke(this, new MqttEventArgs(data, topic));
}
private void P_ErrorDataReceived(Object sender, DataReceivedEventArgs e) {
if (e.Data != null) {
throw new NotImplementedException(e.Data);
}
}
private void P_OutputDataReceived(Object sender, DataReceivedEventArgs e) {
if (e.Data != null) {
if (e.Data.StartsWith("Client mosqsub")) {
if (this.message != "" && this.message.IndexOf(" received PUBLISH ") > 0) {
MatchCollection matches = (new Regex("^Client mosqsub[\\|/].*received PUBLISH \\(.*,.*,.*,.*, '(.*)'.*\\)\\)\n[^ ]* (.*)$", RegexOptions.IgnoreCase | RegexOptions.Singleline)).Matches(this.message);
String topic = matches[0].Groups[1].Value;
String message = matches[0].Groups[2].Value.Trim();
this.MessageIncomming?.Invoke(this, new MqttEventArgs(message, topic));
}
this.message = e.Data + "\n";
} else {
this.message += e.Data + "\n";
}
}
}
#region IDisposable Support
private Boolean disposedValue = false; // Dient zur Erkennung redundanter Aufrufe.
private readonly Dictionary<String, String> settings;
protected virtual void Dispose(Boolean disposing) {
if (!this.disposedValue) {
if (disposing) {
this.p.CancelOutputRead();
if (!this.p.HasExited) {
this.p.Kill();
}
this.p.Close();
}
this.p = null;
this.disposedValue = true;
}
}
~Mosquitto() {
Dispose(false);
}
public override void Dispose() {
Dispose(true);
GC.SuppressFinalize(this);
}
#endregion
}
}