using System; using System.Collections.Generic; using System.Text; using System.Threading; using BlubbFish.BosmonMqtt.Models; using BosMon.Data; using BosMon.Data.Telegrams; using BosMon.Plugins; using BosMon.Utils; using LitJson; using TelegramFilter.Filter; using uPLibrary.Networking.M2Mqtt; using uPLibrary.Networking.M2Mqtt.Messages; namespace BlubbFish.BosmonMqtt.Controller { class MqttEventProcessor : IDisposable { private readonly PluginConfiguration pluginconfig; private readonly IBosMonHost pluginhost; private MqttClient client; private FilterList filter; private readonly List pfilter = new List(); private readonly List ffilter = new List(); private readonly List zfilter = new List(); Thread connectionWatcher; public MqttEventProcessor(PluginConfiguration pluginconfiguration, IBosMonHost pluginHost) { this.pluginconfig = pluginconfiguration; this.pluginhost = pluginHost; this.Connect(); if(this.pluginconfig.Enable) { this.ConnectionWatcherStart(); } this.pluginconfig.ConfigChanged += this.Pluginconfig_ConfigChanged; } private void ConnectionWatcherRunner() { while(true) { try { Thread.Sleep(10000); if(!this.client.IsConnected) { this.Reconnect(); } } catch(Exception) { } } } private void ConnectionWatcherStart() { if(this.connectionWatcher == null) { this.connectionWatcher = new Thread(this.ConnectionWatcherRunner); } if(this.connectionWatcher.ThreadState == ThreadState.Running || this.connectionWatcher.ThreadState == ThreadState.WaitSleepJoin) { return; } this.connectionWatcher.Start(); this.Log("ConnectionWatcherStart", "Überwachungsthread gestartet."); } private void ConnectionWatcherStop() { if(this.connectionWatcher == null) { return; } try { this.connectionWatcher.Abort(); Thread.Sleep(100); this.connectionWatcher = null; } catch (Exception) { } this.Log("ConnectionWatcherStop", "Überwachungsthread gestoppt."); } private void Pluginconfig_ConfigChanged(Object sender, EventArgs e) { this.Reconnect(); if (this.pluginconfig.Enable) { this.ConnectionWatcherStart(); } if(!this.pluginconfig.Enable) { this.ConnectionWatcherStop(); } } private void Reconnect() { if (this.pluginconfig.Enable) { this.Log("Reconnect", "Reconnect ausgelößt."); } this.Disconnect(); this.Connect(); } private void Connect() { if (this.pluginconfig.Enable) { try { this.client = new MqttClient(this.pluginconfig.Server, this.pluginconfig.Port, false, null, null, MqttSslProtocols.None); if (this.pluginconfig.User == "" || this.pluginconfig.Password == "") { this.client.Connect("bosmon-" + Guid.NewGuid().ToString()); } else { this.client.Connect("bosmon-" + Guid.NewGuid().ToString(), this.pluginconfig.User, this.pluginconfig.Password); } this.client.Subscribe(new String[] { this.pluginconfig.Topic }, new Byte[] { MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE }); } catch (Exception) { return; } this.filter = new FilterList(this.pluginconfig.PluginStorage.GetSection("filter")); foreach (FilterItem item in this.filter) { if (item.GetType() == typeof(PocsagFilterItem)) { this.pfilter.Add(item as PocsagFilterItem); } if (item.GetType() == typeof(FmsFilterItem)) { this.ffilter.Add(item as FmsFilterItem); } if (item.GetType() == typeof(ZveiFilterItem)) { this.zfilter.Add(item as ZveiFilterItem); } } this.Log("Connect", "Erfolgreich verbunden."); } } private void Disconnect() { this.pfilter.Clear(); this.ffilter.Clear(); this.zfilter.Clear(); if (this.client != null && this.client.IsConnected) { this.client.Unsubscribe(new String[] { this.pluginconfig.Topic }); this.client.Disconnect(); this.Log("Disconnect", "Verbindung getrennt."); } this.client = null; } internal void TelegramEvent(Object sender, TelegramEventArgs e) { if (this.pluginconfig.Enable) { try { Telegram t = e.Telegram as Telegram; if (this.client != null && this.client.IsConnected && this.filter != null) { if (this.FilterMatch(t)) { String text = ""; String topic = ""; if (t.Type == PocsagTelegram.TYPE_POCSAG) { PocsagTelegram p = t as PocsagTelegram; text = JsonMapper.ToJson(p); topic = this.pluginconfig.Topic + "pocsag/" + p.Address + p.Func; } else if(t.Type == FmsTelegram.TYPE_FMS) { FmsTelegram f = t as FmsTelegram; text = JsonMapper.ToJson(f); topic = this.pluginconfig.Topic + "fms/" + f.Address; } else if(t.Type == ZveiTelegram.TYPE_ZVEI) { ZveiTelegram z = t as ZveiTelegram; text = JsonMapper.ToJson(z); topic = this.pluginconfig.Topic + "zvei/" + z.Address; } else if (t.Type == ZveiDtmfTelegram.TYPE_ZVEIDTMF) { ZveiDtmfTelegram z = t as ZveiDtmfTelegram; text = JsonMapper.ToJson(z); topic = this.pluginconfig.Topic + "zveidmf/" + z.Address; } this.client.Publish(topic, Encoding.UTF8.GetBytes(text)); } } } catch (Exception) { } } } private Boolean FilterMatch(Telegram t) { if (t.Type == PocsagTelegram.TYPE_POCSAG) { return this.FilterMatchType(this.pfilter, t); } else if(t.Type == FmsTelegram.TYPE_FMS) { return this.FilterMatchType(this.ffilter, t); } else { return this.FilterMatchType(this.zfilter, t); } } private Boolean FilterMatchType(List filter, Telegram t) { Boolean negatedInList = false; if(filter.Count == 0) { return true; } foreach (FilterItem item in filter) { if (item.IsMatching(t) && item.Negated) { return false; } if(item.IsMatching(t) && !item.Negated) { return true; } if(item.Negated) { negatedInList = true; } } return negatedInList; } private void Log(String method, String text) { BosMonLog.Logger.WriteLine(LogSeverity.Info, "BosmonMqtt.MqttEventProcessor", method, text); this.pluginhost.AddMessage(DateTime.Now.Ticks, MessageType.Message, "BosmonMqtt " + text); } #region IDisposable Support private Boolean disposedValue = false; protected virtual void Dispose(Boolean disposing) { if (!this.disposedValue) { if (disposing) { this.ConnectionWatcherStop(); this.Disconnect(); } this.disposedValue = true; } } public void Dispose() { Dispose(true); } #endregion } }