using System; using System.Collections.Generic; using System.Diagnostics; using System.Text.RegularExpressions; namespace BlubbFish.Utils.IoT.Connector.Data { public class Mosquitto : ADataBackend, IDisposable { private Process p; private String message; public override event MqttMessage MessageIncomming; public override event MqttMessage MessageSending; public Mosquitto(Dictionary mqtt_settings) { this.settings = mqtt_settings; //mosquitto_sub --cafile ca.pem --cert cert.pem --key cert.key -h swb.broker.flex4grid.eu -p 8883 -t "#" -v -d this.message = ""; this.p = new Process(); this.p.StartInfo.FileName = "mosquitto_sub"; String topic = "#"; String args = "-h " + this.settings["server"]+" "; if(this.settings.ContainsKey("port")) { args += "-p "+ this.settings["port"]+" "; } if (this.settings.ContainsKey("cafile")) { args += "--cafile " + this.settings["cafile"] + " "; } if (this.settings.ContainsKey("cert")) { args += "--cert " + this.settings["cert"] + " "; } if (this.settings.ContainsKey("key")) { args += "--key " + this.settings["key"] + " "; } if(this.settings.ContainsKey("topic")) { topic = this.settings["topic"]; } this.p.StartInfo.Arguments = args+"-t \""+ topic + "\" -v -d"; this.p.StartInfo.CreateNoWindow = true; this.p.StartInfo.UseShellExecute = false; this.p.StartInfo.RedirectStandardOutput = true; this.p.StartInfo.RedirectStandardError = true; this.p.OutputDataReceived += this.P_OutputDataReceived; this.p.ErrorDataReceived += this.P_ErrorDataReceived; this.p.Start(); this.p.BeginOutputReadLine(); } public override void Send(String topic, String data) { Process send = new Process(); send.StartInfo.FileName = "mosquitto_pub"; String args = "-h " + this.settings["server"] + " "; if (this.settings.ContainsKey("port")) { args += "-p " + this.settings["port"] + " "; } if (this.settings.ContainsKey("cafile")) { args += "--cafile " + this.settings["cafile"] + " "; } if (this.settings.ContainsKey("cert")) { args += "--cert " + this.settings["cert"] + " "; } if (this.settings.ContainsKey("key")) { args += "--key " + this.settings["key"] + " "; } send.StartInfo.Arguments = args + "-m \""+data.Replace("\"","\\\"")+"\" -t \""+topic+"\" -d"; send.StartInfo.CreateNoWindow = true; send.StartInfo.UseShellExecute = false; send.StartInfo.RedirectStandardOutput = true; send.StartInfo.RedirectStandardError = true; send.Start(); send.WaitForExit(); MessageSending?.Invoke(this, new MqttEventArgs(data, topic)); } private void P_ErrorDataReceived(Object sender, DataReceivedEventArgs e) { if (e.Data != null) { throw new NotImplementedException(e.Data); } } private void P_OutputDataReceived(Object sender, DataReceivedEventArgs e) { if (e.Data != null) { if (e.Data.StartsWith("Client mosqsub")) { if (this.message != "" && this.message.IndexOf(" received PUBLISH ") > 0) { MatchCollection matches = (new Regex("^Client mosqsub[\\|/].*received PUBLISH \\(.*,.*,.*,.*, '(.*)'.*\\)\\)\n[^ ]* (.*)$", RegexOptions.IgnoreCase | RegexOptions.Singleline)).Matches(this.message); String topic = matches[0].Groups[1].Value; String message = matches[0].Groups[2].Value.Trim(); this.MessageIncomming?.Invoke(this, new MqttEventArgs(message, topic)); } this.message = e.Data + "\n"; } else { this.message += e.Data + "\n"; } } } #region IDisposable Support private Boolean disposedValue = false; // Dient zur Erkennung redundanter Aufrufe. private readonly Dictionary settings; protected virtual void Dispose(Boolean disposing) { if (!this.disposedValue) { if (disposing) { this.p.CancelOutputRead(); if (!this.p.HasExited) { this.p.Kill(); } this.p.Close(); } this.p = null; this.disposedValue = true; } } ~Mosquitto() { Dispose(false); } public override void Dispose() { Dispose(true); GC.SuppressFinalize(this); } #endregion } }