2018-03-31 22:42:28 +02:00
/ *
2018-04-01 19:31:18 +02:00
Copyright ( c ) 2013 , 2014 Paolo Patierno
2018-03-31 22:42:28 +02:00
2018-04-01 19:31:18 +02:00
All rights reserved . This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1 . 0
and Eclipse Distribution License v1 . 0 which accompany this distribution .
2018-03-31 22:42:28 +02:00
2018-04-01 19:31:18 +02:00
The Eclipse Public License is available at
http : //www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http : //www.eclipse.org/org/documents/edl-v10.php.
2018-03-31 22:42:28 +02:00
2018-04-01 19:31:18 +02:00
Contributors :
Paolo Patierno - initial API and implementation and / or initial documentation
2018-03-31 22:42:28 +02:00
* /
using System ;
using System.Net ;
2018-04-01 19:31:18 +02:00
#if ! ( WINDOWS_APP | | WINDOWS_PHONE_APP )
2018-03-31 22:42:28 +02:00
using System.Net.Sockets ;
2018-04-01 19:31:18 +02:00
using System.Security.Cryptography.X509Certificates ;
#endif
2018-03-31 22:42:28 +02:00
using System.Threading ;
using uPLibrary.Networking.M2Mqtt.Exceptions ;
using uPLibrary.Networking.M2Mqtt.Messages ;
2018-04-01 19:31:18 +02:00
using uPLibrary.Networking.M2Mqtt.Session ;
using uPLibrary.Networking.M2Mqtt.Utility ;
using uPLibrary.Networking.M2Mqtt.Internal ;
2018-03-31 22:42:28 +02:00
// if .Net Micro Framework
2019-11-26 15:34:16 +01:00
#if MF_FRAMEWORK_VERSION_V4_2 | | MF_FRAMEWORK_VERSION_V4_3
2018-03-31 22:42:28 +02:00
using Microsoft.SPOT ;
#if SSL
using Microsoft.SPOT.Net.Security ;
#endif
// else other frameworks (.Net, .Net Compact, Mono, Windows Phone)
#else
2018-04-01 19:31:18 +02:00
using System.Collections.Generic ;
2019-11-26 15:34:16 +01:00
#if SSL & & ! ( WINDOWS_APP | | WINDOWS_PHONE_APP )
2018-03-31 22:42:28 +02:00
using System.Security.Authentication ;
using System.Net.Security ;
#endif
#endif
2019-11-26 15:34:16 +01:00
#if WINDOWS_APP | | WINDOWS_PHONE_APP
2018-04-01 19:31:18 +02:00
using Windows.Networking.Sockets ;
#endif
using System.Collections ;
// alias needed due to Microsoft.SPOT.Trace in .Net Micro Framework
// (it's ambiguos with uPLibrary.Networking.M2Mqtt.Utility.Trace)
using MqttUtility = uPLibrary . Networking . M2Mqtt . Utility ;
using System.IO ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
namespace uPLibrary.Networking.M2Mqtt {
/// <summary>
/// MQTT Client
/// </summary>
public class MqttClient {
#if BROKER
#region Constants . . .
// thread names
private const string RECEIVE_THREAD_NAME = "ReceiveThread" ;
private const string RECEIVE_EVENT_THREAD_NAME = "DispatchEventThread" ;
private const string PROCESS_INFLIGHT_THREAD_NAME = "ProcessInflightThread" ;
private const string KEEP_ALIVE_THREAD = "KeepAliveThread" ;
#endregion
#endif
2018-03-31 22:42:28 +02:00
/// <summary>
2019-11-26 15:34:16 +01:00
/// Delagate that defines event handler for PUBLISH message received
2018-03-31 22:42:28 +02:00
/// </summary>
2019-11-26 15:34:16 +01:00
public delegate void MqttMsgPublishEventHandler ( Object sender , MqttMsgPublishEventArgs e ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Delegate that defines event handler for published message
/// </summary>
public delegate void MqttMsgPublishedEventHandler ( Object sender , MqttMsgPublishedEventArgs e ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Delagate that defines event handler for subscribed topic
/// </summary>
public delegate void MqttMsgSubscribedEventHandler ( Object sender , MqttMsgSubscribedEventArgs e ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Delagate that defines event handler for unsubscribed topic
/// </summary>
public delegate void MqttMsgUnsubscribedEventHandler ( Object sender , MqttMsgUnsubscribedEventArgs e ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
#if BROKER
/// <summary>
/// Delagate that defines event handler for SUBSCRIBE message received
/// </summary>
public delegate void MqttMsgSubscribeEventHandler ( object sender , MqttMsgSubscribeEventArgs e ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Delagate that defines event handler for UNSUBSCRIBE message received
/// </summary>
public delegate void MqttMsgUnsubscribeEventHandler ( object sender , MqttMsgUnsubscribeEventArgs e ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Delagate that defines event handler for CONNECT message received
/// </summary>
public delegate void MqttMsgConnectEventHandler ( object sender , MqttMsgConnectEventArgs e ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Delegate that defines event handler for client disconnection (DISCONNECT message or not)
/// </summary>
public delegate void MqttMsgDisconnectEventHandler ( object sender , EventArgs e ) ;
#endif
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Delegate that defines event handler for cliet/peer disconnection
/// </summary>
public delegate void ConnectionClosedEventHandler ( Object sender , EventArgs e ) ;
// broker hostname (or ip address) and port
private String brokerHostName ;
private Int32 brokerPort ;
// running status of threads
private Boolean isRunning ;
// event for raising received message event
private AutoResetEvent receiveEventWaitHandle ;
// event for starting process inflight queue asynchronously
private AutoResetEvent inflightWaitHandle ;
// event for signaling synchronous receive
AutoResetEvent syncEndReceiving ;
// message received
MqttMsgBase msgReceived ;
// exeption thrown during receiving
Exception exReceiving ;
// keep alive period (in ms)
private Int32 keepAlivePeriod ;
// events for signaling on keep alive thread
private AutoResetEvent keepAliveEvent ;
private AutoResetEvent keepAliveEventEnd ;
// last communication time in ticks
private Int32 lastCommTime ;
// event for PUBLISH message received
public event MqttMsgPublishEventHandler MqttMsgPublishReceived ;
// event for published message
public event MqttMsgPublishedEventHandler MqttMsgPublished ;
// event for subscribed topic
public event MqttMsgSubscribedEventHandler MqttMsgSubscribed ;
// event for unsubscribed topic
public event MqttMsgUnsubscribedEventHandler MqttMsgUnsubscribed ;
#if BROKER
// event for SUBSCRIBE message received
public event MqttMsgSubscribeEventHandler MqttMsgSubscribeReceived ;
// event for USUBSCRIBE message received
public event MqttMsgUnsubscribeEventHandler MqttMsgUnsubscribeReceived ;
// event for CONNECT message received
public event MqttMsgConnectEventHandler MqttMsgConnected ;
// event for DISCONNECT message received
public event MqttMsgDisconnectEventHandler MqttMsgDisconnected ;
#endif
// event for peer/client disconnection
public event ConnectionClosedEventHandler ConnectionClosed ;
// channel to communicate over the network
private IMqttNetworkChannel channel ;
// inflight messages queue
private Queue inflightQueue ;
// internal queue for received messages about inflight messages
private Queue internalQueue ;
// internal queue for dispatching events
private Queue eventQueue ;
// session
private MqttClientSession session ;
// current message identifier generated
private UInt16 messageIdCounter = 0 ;
// connection is closing due to peer
private Boolean isConnectionClosing ;
2018-04-01 19:31:18 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Connection status between client and broker
/// </summary>
public Boolean IsConnected { get ; private set ; }
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Client identifier
/// </summary>
public String ClientId { get ; private set ; }
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Clean session flag
/// </summary>
public Boolean CleanSession { get ; private set ; }
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Will flag
/// </summary>
public Boolean WillFlag { get ; private set ; }
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Will QOS level
/// </summary>
public Byte WillQosLevel { get ; private set ; }
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Will topic
/// </summary>
public String WillTopic { get ; private set ; }
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Will message
/// </summary>
public String WillMessage { get ; private set ; }
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// MQTT protocol version
/// </summary>
public MqttProtocolVersion ProtocolVersion { get ; set ; }
2018-04-01 19:31:18 +02:00
#if BROKER
/// <summary>
/// MQTT Client Session
/// </summary>
public MqttClientSession Session
{
get { return this . session ; }
set { this . session = value ; }
}
#endif
2019-11-26 15:34:16 +01:00
/// <summary>
/// MQTT client settings
/// </summary>
public MqttSettings Settings { get ; private set ; }
2018-04-01 19:31:18 +02:00
2019-11-26 15:34:16 +01:00
#if ! ( WINDOWS_APP | | WINDOWS_PHONE_APP )
/// <summary>
/// Constructor
/// </summary>
/// <param name="brokerIpAddress">Broker IP address</param>
[Obsolete("Use this ctor MqttClient(string brokerHostName) insted")]
public MqttClient ( IPAddress brokerIpAddress ) : this ( brokerIpAddress , MqttSettings . MQTT_BROKER_DEFAULT_PORT , false , null , null , MqttSslProtocols . None ) {
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Constructor
/// </summary>
/// <param name="brokerIpAddress">Broker IP address</param>
/// <param name="brokerPort">Broker port</param>
/// <param name="secure">Using secure connection</param>
/// <param name="caCert">CA certificate for secure connection</param>
/// <param name="clientCert">Client certificate</param>
/// <param name="sslProtocol">SSL/TLS protocol version</param>
[Obsolete("Use this ctor MqttClient(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert) insted")]
public MqttClient ( IPAddress brokerIpAddress , Int32 brokerPort , Boolean secure , X509Certificate caCert , X509Certificate clientCert , MqttSslProtocols sslProtocol ) = >
2018-04-01 19:31:18 +02:00
#if ! ( MF_FRAMEWORK_VERSION_V4_2 | | MF_FRAMEWORK_VERSION_V4_3 | | COMPACT_FRAMEWORK )
2019-11-26 15:34:16 +01:00
this . Init ( brokerIpAddress . ToString ( ) , brokerPort , secure , caCert , clientCert , sslProtocol , null , null ) ;
2018-04-01 19:31:18 +02:00
#else
this . Init ( brokerIpAddress . ToString ( ) , brokerPort , secure , caCert , clientCert , sslProtocol ) ;
#endif
#endif
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Constructor
/// </summary>
/// <param name="brokerHostName">Broker Host Name or IP Address</param>
public MqttClient ( String brokerHostName ) :
2018-04-01 19:31:18 +02:00
#if ! ( WINDOWS_APP | | WINDOWS_PHONE_APP )
this ( brokerHostName , MqttSettings . MQTT_BROKER_DEFAULT_PORT , false , null , null , MqttSslProtocols . None )
#else
this ( brokerHostName , MqttSettings . MQTT_BROKER_DEFAULT_PORT , false , MqttSslProtocols . None )
#endif
2018-03-31 22:42:28 +02:00
{
2019-11-26 15:34:16 +01:00
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Constructor
/// </summary>
/// <param name="brokerHostName">Broker Host Name or IP Address</param>
/// <param name="brokerPort">Broker port</param>
/// <param name="secure">Using secure connection</param>
/// <param name="sslProtocol">SSL/TLS protocol version</param>
2018-04-01 19:31:18 +02:00
#if ! ( WINDOWS_APP | | WINDOWS_PHONE_APP )
2019-11-26 15:34:16 +01:00
/// <param name="caCert">CA certificate for secure connection</param>
/// <param name="clientCert">Client certificate</param>
public MqttClient ( String brokerHostName , Int32 brokerPort , Boolean secure , X509Certificate caCert , X509Certificate clientCert , MqttSslProtocols sslProtocol ) = >
2018-04-01 19:31:18 +02:00
#else
2019-11-26 15:34:16 +01:00
public MqttClient ( string brokerHostName , int brokerPort , bool secure , MqttSslProtocols sslProtocol ) = >
2018-04-01 19:31:18 +02:00
#endif
#if ! ( MF_FRAMEWORK_VERSION_V4_2 | | MF_FRAMEWORK_VERSION_V4_3 | | COMPACT_FRAMEWORK | | WINDOWS_APP | | WINDOWS_PHONE_APP )
2019-11-26 15:34:16 +01:00
this . Init ( brokerHostName , brokerPort , secure , caCert , clientCert , sslProtocol , null , null ) ;
#elif WINDOWS_APP | | WINDOWS_PHONE_APP
2018-04-01 19:31:18 +02:00
this . Init ( brokerHostName , brokerPort , secure , sslProtocol ) ;
#else
this . Init ( brokerHostName , brokerPort , secure , caCert , clientCert , sslProtocol ) ;
#endif
2019-11-26 15:34:16 +01:00
2018-03-31 22:42:28 +02:00
2018-04-01 19:31:18 +02:00
#if ! ( MF_FRAMEWORK_VERSION_V4_2 | | MF_FRAMEWORK_VERSION_V4_3 | | COMPACT_FRAMEWORK | | WINDOWS_APP | | WINDOWS_PHONE_APP )
2019-11-26 15:34:16 +01:00
/// <summary>
/// Constructor
/// </summary>
/// <param name="brokerHostName">Broker Host Name or IP Address</param>
/// <param name="brokerPort">Broker port</param>
/// <param name="secure">Using secure connection</param>
/// <param name="caCert">CA certificate for secure connection</param>
/// <param name="clientCert">Client certificate</param>
/// <param name="sslProtocol">SSL/TLS protocol version</param>
/// <param name="userCertificateValidationCallback">A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party</param>
public MqttClient ( String brokerHostName , Int32 brokerPort , Boolean secure , X509Certificate caCert , X509Certificate clientCert , MqttSslProtocols sslProtocol ,
RemoteCertificateValidationCallback userCertificateValidationCallback )
: this ( brokerHostName , brokerPort , secure , caCert , clientCert , sslProtocol , userCertificateValidationCallback , null ) {
}
2018-04-01 19:31:18 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Constructor
/// </summary>
/// <param name="brokerHostName">Broker Host Name or IP Address</param>
/// <param name="brokerPort">Broker port</param>
/// <param name="secure">Using secure connection</param>
/// <param name="sslProtocol">SSL/TLS protocol version</param>
/// <param name="userCertificateValidationCallback">A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party</param>
/// <param name="userCertificateSelectionCallback">A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication</param>
public MqttClient ( String brokerHostName , Int32 brokerPort , Boolean secure , MqttSslProtocols sslProtocol ,
RemoteCertificateValidationCallback userCertificateValidationCallback ,
LocalCertificateSelectionCallback userCertificateSelectionCallback )
: this ( brokerHostName , brokerPort , secure , null , null , sslProtocol , userCertificateValidationCallback , userCertificateSelectionCallback ) {
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Constructor
/// </summary>
/// <param name="brokerHostName">Broker Host Name or IP Address</param>
/// <param name="brokerPort">Broker port</param>
/// <param name="secure">Using secure connection</param>
/// <param name="caCert">CA certificate for secure connection</param>
/// <param name="clientCert">Client certificate</param>
/// <param name="sslProtocol">SSL/TLS protocol version</param>
/// <param name="userCertificateValidationCallback">A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party</param>
/// <param name="userCertificateSelectionCallback">A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication</param>
public MqttClient ( String brokerHostName , Int32 brokerPort , Boolean secure , X509Certificate caCert , X509Certificate clientCert , MqttSslProtocols sslProtocol ,
RemoteCertificateValidationCallback userCertificateValidationCallback ,
LocalCertificateSelectionCallback userCertificateSelectionCallback ) = > this . Init ( brokerHostName , brokerPort , secure , caCert , clientCert , sslProtocol , userCertificateValidationCallback , userCertificateSelectionCallback ) ;
2018-04-01 19:31:18 +02:00
#endif
2018-03-31 22:42:28 +02:00
#if BROKER
/// <summary>
/// Constructor
/// </summary>
2018-04-01 19:31:18 +02:00
/// <param name="channel">Network channel for communication</param>
public MqttClient ( IMqttNetworkChannel channel )
2018-03-31 22:42:28 +02:00
{
2018-04-01 19:31:18 +02:00
// set default MQTT protocol version (default is 3.1.1)
this . ProtocolVersion = MqttProtocolVersion . Version_3_1_1 ;
this . channel = channel ;
2018-03-31 22:42:28 +02:00
// reference to MQTT settings
this . settings = MqttSettings . Instance ;
// client not connected yet (CONNACK not send from client), some default values
this . IsConnected = false ;
this . ClientId = null ;
this . CleanSession = true ;
this . keepAliveEvent = new AutoResetEvent ( false ) ;
// queue for handling inflight messages (publishing and acknowledge)
this . inflightWaitHandle = new AutoResetEvent ( false ) ;
this . inflightQueue = new Queue ( ) ;
// queue for received message
this . receiveEventWaitHandle = new AutoResetEvent ( false ) ;
2018-04-01 19:31:18 +02:00
this . eventQueue = new Queue ( ) ;
2018-03-31 22:42:28 +02:00
this . internalQueue = new Queue ( ) ;
2018-04-01 19:31:18 +02:00
// session
this . session = null ;
2018-03-31 22:42:28 +02:00
}
#endif
2019-11-26 15:34:16 +01:00
/// <summary>
/// MqttClient initialization
/// </summary>
/// <param name="brokerHostName">Broker Host Name or IP Address</param>
/// <param name="brokerPort">Broker port</param>
/// <param name="secure">>Using secure connection</param>
/// <param name="caCert">CA certificate for secure connection</param>
/// <param name="clientCert">Client certificate</param>
/// <param name="sslProtocol">SSL/TLS protocol version</param>
2018-04-01 19:31:18 +02:00
#if ! ( MF_FRAMEWORK_VERSION_V4_2 | | MF_FRAMEWORK_VERSION_V4_3 | | COMPACT_FRAMEWORK | | WINDOWS_APP | | WINDOWS_PHONE_APP )
2019-11-26 15:34:16 +01:00
/// <param name="userCertificateSelectionCallback">A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party</param>
/// <param name="userCertificateValidationCallback">A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication</param>
private void Init ( String brokerHostName , Int32 brokerPort , Boolean secure , X509Certificate caCert , X509Certificate clientCert , MqttSslProtocols sslProtocol ,
RemoteCertificateValidationCallback userCertificateValidationCallback ,
LocalCertificateSelectionCallback userCertificateSelectionCallback )
#elif WINDOWS_APP | | WINDOWS_PHONE_APP
2018-04-01 19:31:18 +02:00
private void Init ( string brokerHostName , int brokerPort , bool secure , MqttSslProtocols sslProtocol )
#else
private void Init ( string brokerHostName , int brokerPort , bool secure , X509Certificate caCert , X509Certificate clientCert , MqttSslProtocols sslProtocol )
#endif
2018-03-31 22:42:28 +02:00
{
2019-11-26 15:34:16 +01:00
// set default MQTT protocol version (default is 3.1.1)
this . ProtocolVersion = MqttProtocolVersion . Version_3_1_1 ;
2018-04-01 19:31:18 +02:00
#if ! SSL
2019-11-26 15:34:16 +01:00
// check security parameters
if ( secure ) {
throw new ArgumentException ( "Library compiled without SSL support" ) ;
}
2018-03-31 22:42:28 +02:00
#endif
2019-11-26 15:34:16 +01:00
this . brokerHostName = brokerHostName ;
this . brokerPort = brokerPort ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// reference to MQTT settings
this . Settings = MqttSettings . Instance ;
// set settings port based on secure connection or not
if ( ! secure ) {
this . Settings . Port = this . brokerPort ;
} else {
this . Settings . SslPort = this . brokerPort ;
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
this . syncEndReceiving = new AutoResetEvent ( false ) ;
this . keepAliveEvent = new AutoResetEvent ( false ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// queue for handling inflight messages (publishing and acknowledge)
this . inflightWaitHandle = new AutoResetEvent ( false ) ;
this . inflightQueue = new Queue ( ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// queue for received message
this . receiveEventWaitHandle = new AutoResetEvent ( false ) ;
this . eventQueue = new Queue ( ) ;
this . internalQueue = new Queue ( ) ;
2018-04-01 19:31:18 +02:00
2019-11-26 15:34:16 +01:00
// session
this . session = null ;
2018-04-01 19:31:18 +02:00
2019-11-26 15:34:16 +01:00
// create network channel
2018-04-01 19:31:18 +02:00
#if ! ( MF_FRAMEWORK_VERSION_V4_2 | | MF_FRAMEWORK_VERSION_V4_3 | | COMPACT_FRAMEWORK | | WINDOWS_APP | | WINDOWS_PHONE_APP )
2019-11-26 15:34:16 +01:00
this . channel = new MqttNetworkChannel ( this . brokerHostName , this . brokerPort , secure , caCert , clientCert , sslProtocol , userCertificateValidationCallback , userCertificateSelectionCallback ) ;
#elif WINDOWS_APP | | WINDOWS_PHONE_APP
2018-04-01 19:31:18 +02:00
this . channel = new MqttNetworkChannel ( this . brokerHostName , this . brokerPort , secure , sslProtocol ) ;
#else
this . channel = new MqttNetworkChannel ( this . brokerHostName , this . brokerPort , secure , caCert , clientCert , sslProtocol ) ;
#endif
2019-11-26 15:34:16 +01:00
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Connect to broker
/// </summary>
/// <param name="clientId">Client identifier</param>
/// <returns>Return code of CONNACK message from broker</returns>
public Byte Connect ( String clientId ) = > this . Connect ( clientId , null , null , false , MqttMsgConnect . QOS_LEVEL_AT_MOST_ONCE , false , null , null , true , MqttMsgConnect . KEEP_ALIVE_PERIOD_DEFAULT ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Connect to broker
/// </summary>
/// <param name="clientId">Client identifier</param>
/// <param name="username">Username</param>
/// <param name="password">Password</param>
/// <returns>Return code of CONNACK message from broker</returns>
public Byte Connect ( String clientId ,
String username ,
String password ) = > this . Connect ( clientId , username , password , false , MqttMsgConnect . QOS_LEVEL_AT_MOST_ONCE , false , null , null , true , MqttMsgConnect . KEEP_ALIVE_PERIOD_DEFAULT ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Connect to broker
/// </summary>
/// <param name="clientId">Client identifier</param>
/// <param name="username">Username</param>
/// <param name="password">Password</param>
/// <param name="cleanSession">Clean sessione flag</param>
/// <param name="keepAlivePeriod">Keep alive period</param>
/// <returns>Return code of CONNACK message from broker</returns>
public Byte Connect ( String clientId ,
String username ,
String password ,
Boolean cleanSession ,
UInt16 keepAlivePeriod ) = > this . Connect ( clientId , username , password , false , MqttMsgConnect . QOS_LEVEL_AT_MOST_ONCE , false , null , null , cleanSession , keepAlivePeriod ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Connect to broker
/// </summary>
/// <param name="clientId">Client identifier</param>
/// <param name="username">Username</param>
/// <param name="password">Password</param>
/// <param name="willRetain">Will retain flag</param>
/// <param name="willQosLevel">Will QOS level</param>
/// <param name="willFlag">Will flag</param>
/// <param name="willTopic">Will topic</param>
/// <param name="willMessage">Will message</param>
/// <param name="cleanSession">Clean sessione flag</param>
/// <param name="keepAlivePeriod">Keep alive period</param>
/// <returns>Return code of CONNACK message from broker</returns>
public Byte Connect ( String clientId ,
String username ,
String password ,
Boolean willRetain ,
Byte willQosLevel ,
Boolean willFlag ,
String willTopic ,
String willMessage ,
Boolean cleanSession ,
UInt16 keepAlivePeriod ) {
// create CONNECT message
MqttMsgConnect connect = new MqttMsgConnect ( clientId ,
username ,
password ,
willRetain ,
willQosLevel ,
willFlag ,
willTopic ,
willMessage ,
cleanSession ,
keepAlivePeriod ,
( Byte ) this . ProtocolVersion ) ;
try {
// connect to the broker
this . channel . Connect ( ) ;
} catch ( Exception ex ) {
throw new MqttConnectionException ( "Exception connecting to the broker" , ex ) ;
}
this . lastCommTime = 0 ;
this . isRunning = true ;
this . isConnectionClosing = false ;
// start thread for receiving messages from broker
Fx . StartThread ( this . ReceiveThread ) ;
MqttMsgConnack connack = ( MqttMsgConnack ) this . SendReceive ( connect ) ;
// if connection accepted, start keep alive timer and
if ( connack . ReturnCode = = MqttMsgConnack . CONN_ACCEPTED ) {
// set all client properties
this . ClientId = clientId ;
this . CleanSession = cleanSession ;
this . WillFlag = willFlag ;
this . WillTopic = willTopic ;
this . WillMessage = willMessage ;
this . WillQosLevel = willQosLevel ;
this . keepAlivePeriod = keepAlivePeriod * 1000 ; // convert in ms
// restore previous session
this . RestoreSession ( ) ;
// keep alive period equals zero means turning off keep alive mechanism
if ( this . keepAlivePeriod ! = 0 ) {
// start thread for sending keep alive message to the broker
Fx . StartThread ( this . KeepAliveThread ) ;
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// start thread for raising received message event from broker
Fx . StartThread ( this . DispatchEventThread ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// start thread for handling inflight messages queue to broker asynchronously (publish and acknowledge)
Fx . StartThread ( this . ProcessInflightThread ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
this . IsConnected = true ;
}
return connack . ReturnCode ;
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Disconnect from broker
/// </summary>
public void Disconnect ( ) {
MqttMsgDisconnect disconnect = new MqttMsgDisconnect ( ) ;
this . Send ( disconnect ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// close client
this . OnConnectionClosing ( ) ;
}
2018-03-31 22:42:28 +02:00
#if BROKER
/// <summary>
/// Open client communication
/// </summary>
public void Open ( )
{
this . isRunning = true ;
// start thread for receiving messages from client
2018-04-01 19:31:18 +02:00
Fx . StartThread ( this . ReceiveThread ) ;
2018-03-31 22:42:28 +02:00
// start thread for raising received message event from client
2018-04-01 19:31:18 +02:00
Fx . StartThread ( this . DispatchEventThread ) ;
2018-03-31 22:42:28 +02:00
// start thread for handling inflight messages queue to client asynchronously (publish and acknowledge)
2018-04-01 19:31:18 +02:00
Fx . StartThread ( this . ProcessInflightThread ) ;
2018-03-31 22:42:28 +02:00
}
#endif
2019-11-26 15:34:16 +01:00
/// <summary>
/// Close client
/// </summary>
2018-03-31 22:42:28 +02:00
#if BROKER
public void Close ( )
#else
2019-11-26 15:34:16 +01:00
private void Close ( )
2018-03-31 22:42:28 +02:00
#endif
{
2019-11-26 15:34:16 +01:00
// stop receiving thread
this . isRunning = false ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// wait end receive event thread
if ( this . receiveEventWaitHandle ! = null ) {
this . receiveEventWaitHandle . Set ( ) ;
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// wait end process inflight thread
if ( this . inflightWaitHandle ! = null ) {
this . inflightWaitHandle . Set ( ) ;
}
2018-03-31 22:42:28 +02:00
#if BROKER
2018-04-01 19:31:18 +02:00
// unlock keep alive thread
this . keepAliveEvent . Set ( ) ;
2018-03-31 22:42:28 +02:00
#else
2019-11-26 15:34:16 +01:00
// unlock keep alive thread and wait
this . keepAliveEvent . Set ( ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
if ( this . keepAliveEventEnd ! = null ) {
this . keepAliveEventEnd . WaitOne ( ) ;
}
2018-03-31 22:42:28 +02:00
#endif
2018-04-01 19:31:18 +02:00
2019-11-26 15:34:16 +01:00
// clear all queues
this . inflightQueue . Clear ( ) ;
this . internalQueue . Clear ( ) ;
this . eventQueue . Clear ( ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// close network channel
this . channel . Close ( ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
this . IsConnected = false ;
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Execute ping to broker for keep alive
/// </summary>
/// <returns>PINGRESP message from broker</returns>
private MqttMsgPingResp Ping ( ) {
MqttMsgPingReq pingreq = new MqttMsgPingReq ( ) ;
try {
// broker must send PINGRESP within timeout equal to keep alive period
return ( MqttMsgPingResp ) this . SendReceive ( pingreq , this . keepAlivePeriod ) ;
} catch ( Exception e ) {
2018-04-01 19:31:18 +02:00
#if TRACE
2019-11-26 15:34:16 +01:00
MqttUtility . Trace . WriteLine ( TraceLevel . Error , "Exception occurred: {0}" , e . ToString ( ) ) ;
2018-04-01 19:31:18 +02:00
#endif
2019-11-26 15:34:16 +01:00
// client must close connection
this . OnConnectionClosing ( ) ;
return null ;
}
}
2018-03-31 22:42:28 +02:00
#if BROKER
/// <summary>
/// Send CONNACK message to the client (connection accepted or not)
/// </summary>
/// <param name="connect">CONNECT message with all client information</param>
2018-04-01 19:31:18 +02:00
/// <param name="returnCode">Return code for CONNACK message</param>
/// <param name="clientId">If not null, client id assigned by broker</param>
/// <param name="sessionPresent">Session present on the broker</param>
public void Connack ( MqttMsgConnect connect , byte returnCode , string clientId , bool sessionPresent )
2018-03-31 22:42:28 +02:00
{
this . lastCommTime = 0 ;
// create CONNACK message and ...
MqttMsgConnack connack = new MqttMsgConnack ( ) ;
connack . ReturnCode = returnCode ;
2018-04-01 19:31:18 +02:00
// [v3.1.1] session present flag
if ( this . ProtocolVersion = = MqttProtocolVersion . Version_3_1_1 )
connack . SessionPresent = sessionPresent ;
2018-03-31 22:42:28 +02:00
// ... send it to the client
2018-04-01 19:31:18 +02:00
this . Send ( connack ) ;
2018-03-31 22:42:28 +02:00
// connection accepted, start keep alive thread checking
if ( connack . ReturnCode = = MqttMsgConnack . CONN_ACCEPTED )
{
2018-04-01 19:31:18 +02:00
// [v3.1.1] if client id isn't null, the CONNECT message has a cliend id with zero bytes length
// and broker assigned a unique identifier to the client
this . ClientId = ( clientId = = null ) ? connect . ClientId : clientId ;
2018-03-31 22:42:28 +02:00
this . CleanSession = connect . CleanSession ;
this . WillFlag = connect . WillFlag ;
this . WillTopic = connect . WillTopic ;
this . WillMessage = connect . WillMessage ;
this . WillQosLevel = connect . WillQosLevel ;
this . keepAlivePeriod = connect . KeepAlivePeriod * 1000 ; // convert in ms
// broker has a tolerance of 1.5 specified keep alive period
this . keepAlivePeriod + = ( this . keepAlivePeriod / 2 ) ;
// start thread for checking keep alive period timeout
2018-04-01 19:31:18 +02:00
Fx . StartThread ( this . KeepAliveThread ) ;
2018-03-31 22:42:28 +02:00
2018-04-01 19:31:18 +02:00
this . isConnectionClosing = false ;
2018-03-31 22:42:28 +02:00
this . IsConnected = true ;
}
// connection refused, close TCP/IP channel
else
{
this . Close ( ) ;
}
}
/// <summary>
/// Send SUBACK message to the client
/// </summary>
/// <param name="messageId">Message Id for the SUBSCRIBE message that is being acknowledged</param>
/// <param name="grantedQosLevels">Granted QoS Levels</param>
public void Suback ( ushort messageId , byte [ ] grantedQosLevels )
{
MqttMsgSuback suback = new MqttMsgSuback ( ) ;
suback . MessageId = messageId ;
suback . GrantedQoSLevels = grantedQosLevels ;
2018-04-01 19:31:18 +02:00
this . Send ( suback ) ;
2018-03-31 22:42:28 +02:00
}
/// <summary>
/// Send UNSUBACK message to the client
/// </summary>
/// <param name="messageId">Message Id for the UNSUBSCRIBE message that is being acknowledged</param>
public void Unsuback ( ushort messageId )
{
MqttMsgUnsuback unsuback = new MqttMsgUnsuback ( ) ;
unsuback . MessageId = messageId ;
2018-04-01 19:31:18 +02:00
this . Send ( unsuback ) ;
2018-03-31 22:42:28 +02:00
}
#endif
2019-11-26 15:34:16 +01:00
/// <summary>
/// Subscribe for message topics
/// </summary>
/// <param name="topics">List of topics to subscribe</param>
/// <param name="qosLevels">QOS levels related to topics</param>
/// <returns>Message Id related to SUBSCRIBE message</returns>
public UInt16 Subscribe ( String [ ] topics , Byte [ ] qosLevels ) {
MqttMsgSubscribe subscribe =
new MqttMsgSubscribe ( topics , qosLevels ) {
MessageId = this . GetMessageId ( )
} ;
// enqueue subscribe request into the inflight queue
_ = this . EnqueueInflight ( subscribe , MqttMsgFlow . ToPublish ) ;
return subscribe . MessageId ;
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Unsubscribe for message topics
/// </summary>
/// <param name="topics">List of topics to unsubscribe</param>
/// <returns>Message Id in UNSUBACK message from broker</returns>
public UInt16 Unsubscribe ( String [ ] topics ) {
MqttMsgUnsubscribe unsubscribe =
new MqttMsgUnsubscribe ( topics ) {
MessageId = this . GetMessageId ( )
} ;
// enqueue unsubscribe request into the inflight queue
_ = this . EnqueueInflight ( unsubscribe , MqttMsgFlow . ToPublish ) ;
return unsubscribe . MessageId ;
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Publish a message asynchronously (QoS Level 0 and not retained)
/// </summary>
/// <param name="topic">Message topic</param>
/// <param name="message">Message data (payload)</param>
/// <returns>Message Id related to PUBLISH message</returns>
public UInt16 Publish ( String topic , Byte [ ] message ) = > this . Publish ( topic , message , MqttMsgBase . QOS_LEVEL_AT_MOST_ONCE , false ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Publish a message asynchronously
/// </summary>
/// <param name="topic">Message topic</param>
/// <param name="message">Message data (payload)</param>
/// <param name="qosLevel">QoS Level</param>
/// <param name="retain">Retain flag</param>
/// <returns>Message Id related to PUBLISH message</returns>
public UInt16 Publish ( String topic , Byte [ ] message , Byte qosLevel , Boolean retain ) {
MqttMsgPublish publish =
new MqttMsgPublish ( topic , message , false , qosLevel , retain ) {
MessageId = this . GetMessageId ( )
} ;
// enqueue message to publish into the inflight queue
Boolean enqueue = this . EnqueueInflight ( publish , MqttMsgFlow . ToPublish ) ;
// message enqueued
if ( enqueue ) {
return publish . MessageId ;
}
// infligh queue full, message not enqueued
else {
throw new MqttClientException ( MqttClientErrorCode . InflightQueueFull ) ;
}
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Wrapper method for raising events
/// </summary>
/// <param name="internalEvent">Internal event</param>
private void OnInternalEvent ( InternalEvent internalEvent ) {
lock ( this . eventQueue ) {
this . eventQueue . Enqueue ( internalEvent ) ;
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
_ = this . receiveEventWaitHandle . Set ( ) ;
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Wrapper method for raising closing connection event
/// </summary>
private void OnConnectionClosing ( ) {
if ( ! this . isConnectionClosing ) {
this . isConnectionClosing = true ;
_ = this . receiveEventWaitHandle . Set ( ) ;
}
}
2018-04-01 19:31:18 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Wrapper method for raising PUBLISH message received event
/// </summary>
/// <param name="publish">PUBLISH message received</param>
private void OnMqttMsgPublishReceived ( MqttMsgPublish publish ) = > this . MqttMsgPublishReceived ? . Invoke ( this ,
new MqttMsgPublishEventArgs ( publish . Topic , publish . Message , publish . DupFlag , publish . QosLevel , publish . Retain ) ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Wrapper method for raising published message event
/// </summary>
/// <param name="messageId">Message identifier for published message</param>
/// <param name="isPublished">Publish flag</param>
private void OnMqttMsgPublished ( UInt16 messageId , Boolean isPublished ) = > this . MqttMsgPublished ? . Invoke ( this ,
new MqttMsgPublishedEventArgs ( messageId , isPublished ) ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Wrapper method for raising subscribed topic event
/// </summary>
/// <param name="suback">SUBACK message received</param>
private void OnMqttMsgSubscribed ( MqttMsgSuback suback ) = > this . MqttMsgSubscribed ? . Invoke ( this ,
new MqttMsgSubscribedEventArgs ( suback . MessageId , suback . GrantedQoSLevels ) ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Wrapper method for raising unsubscribed topic event
/// </summary>
/// <param name="messageId">Message identifier for unsubscribed topic</param>
private void OnMqttMsgUnsubscribed ( UInt16 messageId ) = > this . MqttMsgUnsubscribed ? . Invoke ( this ,
new MqttMsgUnsubscribedEventArgs ( messageId ) ) ;
2018-03-31 22:42:28 +02:00
#if BROKER
/// <summary>
/// Wrapper method for raising SUBSCRIBE message event
/// </summary>
/// <param name="messageId">Message identifier for subscribe topics request</param>
/// <param name="topics">Topics requested to subscribe</param>
/// <param name="qosLevels">List of QOS Levels requested</param>
private void OnMqttMsgSubscribeReceived ( ushort messageId , string [ ] topics , byte [ ] qosLevels )
{
if ( this . MqttMsgSubscribeReceived ! = null )
{
this . MqttMsgSubscribeReceived ( this ,
new MqttMsgSubscribeEventArgs ( messageId , topics , qosLevels ) ) ;
}
}
/// <summary>
/// Wrapper method for raising UNSUBSCRIBE message event
/// </summary>
/// <param name="messageId">Message identifier for unsubscribe topics request</param>
/// <param name="topics">Topics requested to unsubscribe</param>
private void OnMqttMsgUnsubscribeReceived ( ushort messageId , string [ ] topics )
{
if ( this . MqttMsgUnsubscribeReceived ! = null )
{
this . MqttMsgUnsubscribeReceived ( this ,
new MqttMsgUnsubscribeEventArgs ( messageId , topics ) ) ;
}
}
/// <summary>
2018-04-01 19:31:18 +02:00
/// Wrapper method for raising CONNECT message event
2018-03-31 22:42:28 +02:00
/// </summary>
private void OnMqttMsgConnected ( MqttMsgConnect connect )
{
if ( this . MqttMsgConnected ! = null )
{
2018-04-01 19:31:18 +02:00
this . ProtocolVersion = ( MqttProtocolVersion ) connect . ProtocolVersion ;
2018-03-31 22:42:28 +02:00
this . MqttMsgConnected ( this , new MqttMsgConnectEventArgs ( connect ) ) ;
}
}
/// <summary>
2018-04-01 19:31:18 +02:00
/// Wrapper method for raising DISCONNECT message event
2018-03-31 22:42:28 +02:00
/// </summary>
private void OnMqttMsgDisconnected ( )
{
if ( this . MqttMsgDisconnected ! = null )
{
this . MqttMsgDisconnected ( this , EventArgs . Empty ) ;
}
}
2018-04-01 19:31:18 +02:00
#endif
2019-11-26 15:34:16 +01:00
/// <summary>
/// Wrapper method for peer/client disconnection
/// </summary>
private void OnConnectionClosed ( ) = > this . ConnectionClosed ? . Invoke ( this , EventArgs . Empty ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Send a message
/// </summary>
/// <param name="msgBytes">Message bytes</param>
private void Send ( Byte [ ] msgBytes ) {
try {
// send message
this . channel . Send ( msgBytes ) ;
2018-03-31 22:42:28 +02:00
#if ! BROKER
2019-11-26 15:34:16 +01:00
// update last message sent ticks
this . lastCommTime = Environment . TickCount ;
2018-03-31 22:42:28 +02:00
#endif
2019-11-26 15:34:16 +01:00
} catch ( Exception e ) {
2018-04-01 19:31:18 +02:00
#if TRACE
2019-11-26 15:34:16 +01:00
MqttUtility . Trace . WriteLine ( TraceLevel . Error , "Exception occurred: {0}" , e . ToString ( ) ) ;
2018-04-01 19:31:18 +02:00
#endif
2019-11-26 15:34:16 +01:00
throw new MqttCommunicationException ( e ) ;
}
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Send a message
/// </summary>
/// <param name="msg">Message</param>
private void Send ( MqttMsgBase msg ) {
2018-04-01 19:31:18 +02:00
#if TRACE
2019-11-26 15:34:16 +01:00
MqttUtility . Trace . WriteLine ( TraceLevel . Frame , "SEND {0}" , msg ) ;
2018-04-01 19:31:18 +02:00
#endif
2019-11-26 15:34:16 +01:00
this . Send ( msg . GetBytes ( ( Byte ) this . ProtocolVersion ) ) ;
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Send a message to the broker and wait answer
/// </summary>
/// <param name="msgBytes">Message bytes</param>
/// <returns>MQTT message response</returns>
private MqttMsgBase SendReceive ( Byte [ ] msgBytes ) = > this . SendReceive ( msgBytes , MqttSettings . MQTT_DEFAULT_TIMEOUT ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Send a message to the broker and wait answer
/// </summary>
/// <param name="msgBytes">Message bytes</param>
/// <param name="timeout">Timeout for receiving answer</param>
/// <returns>MQTT message response</returns>
private MqttMsgBase SendReceive ( Byte [ ] msgBytes , Int32 timeout ) {
// reset handle before sending
this . syncEndReceiving . Reset ( ) ;
try {
// send message
this . channel . Send ( msgBytes ) ;
// update last message sent ticks
this . lastCommTime = Environment . TickCount ;
} catch ( Exception e ) {
2018-04-01 19:31:18 +02:00
#if ! ( MF_FRAMEWORK_VERSION_V4_2 | | MF_FRAMEWORK_VERSION_V4_3 | | COMPACT_FRAMEWORK | | WINDOWS_APP | | WINDOWS_PHONE_APP )
2019-11-26 15:34:16 +01:00
if ( typeof ( SocketException ) = = e . GetType ( ) ) {
// connection reset by broker
if ( ( ( SocketException ) e ) . SocketErrorCode = = SocketError . ConnectionReset ) {
this . IsConnected = false ;
}
}
2018-04-01 19:31:18 +02:00
#endif
#if TRACE
2019-11-26 15:34:16 +01:00
Trace . WriteLine ( TraceLevel . Error , "Exception occurred: {0}" , e . ToString ( ) ) ;
2018-03-31 22:42:28 +02:00
#endif
2019-11-26 15:34:16 +01:00
throw new MqttCommunicationException ( e ) ;
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
#if MF_FRAMEWORK_VERSION_V4_2 | | MF_FRAMEWORK_VERSION_V4_3 | | COMPACT_FRAMEWORK
2018-03-31 22:42:28 +02:00
// wait for answer from broker
if ( this . syncEndReceiving . WaitOne ( timeout , false ) )
#else
2019-11-26 15:34:16 +01:00
// wait for answer from broker
if ( this . syncEndReceiving . WaitOne ( timeout ) )
2018-03-31 22:42:28 +02:00
#endif
{
2019-11-26 15:34:16 +01:00
// message received without exception
if ( this . exReceiving = = null ) {
return this . msgReceived ;
2018-03-31 22:42:28 +02:00
}
2019-11-26 15:34:16 +01:00
// receiving thread catched exception
else {
throw this . exReceiving ;
2018-04-01 19:31:18 +02:00
}
2019-11-26 15:34:16 +01:00
} else {
// throw timeout exception
throw new MqttCommunicationException ( ) ;
}
}
2018-04-01 19:31:18 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Send a message to the broker and wait answer
/// </summary>
/// <param name="msg">Message</param>
/// <returns>MQTT message response</returns>
private MqttMsgBase SendReceive ( MqttMsgBase msg ) = > this . SendReceive ( msg , MqttSettings . MQTT_DEFAULT_TIMEOUT ) ;
/// <summary>
/// Send a message to the broker and wait answer
/// </summary>
/// <param name="msg">Message</param>
/// <param name="timeout">Timeout for receiving answer</param>
/// <returns>MQTT message response</returns>
private MqttMsgBase SendReceive ( MqttMsgBase msg , Int32 timeout ) {
2018-04-01 19:31:18 +02:00
#if TRACE
2019-11-26 15:34:16 +01:00
MqttUtility . Trace . WriteLine ( TraceLevel . Frame , "SEND {0}" , msg ) ;
2018-04-01 19:31:18 +02:00
#endif
2019-11-26 15:34:16 +01:00
return this . SendReceive ( msg . GetBytes ( ( Byte ) this . ProtocolVersion ) , timeout ) ;
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Enqueue a message into the inflight queue
/// </summary>
/// <param name="msg">Message to enqueue</param>
/// <param name="flow">Message flow (publish, acknowledge)</param>
/// <returns>Message enqueued or not</returns>
private Boolean EnqueueInflight ( MqttMsgBase msg , MqttMsgFlow flow ) {
// enqueue is needed (or not)
Boolean enqueue = true ;
// if it is a PUBLISH message with QoS Level 2
if ( msg . Type = = MqttMsgBase . MQTT_MSG_PUBLISH_TYPE & &
msg . QosLevel = = MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE ) {
lock ( this . inflightQueue ) {
// if it is a PUBLISH message already received (it is in the inflight queue), the publisher
// re-sent it because it didn't received the PUBREC. In this case, we have to re-send PUBREC
// NOTE : I need to find on message id and flow because the broker could be publish/received
// to/from client and message id could be the same (one tracked by broker and the other by client)
MqttMsgContextFinder msgCtxFinder = new MqttMsgContextFinder ( msg . MessageId , MqttMsgFlow . ToAcknowledge ) ;
MqttMsgContext msgCtx = ( MqttMsgContext ) QueueExtension . Get ( this . inflightQueue , msgCtxFinder . Find ) ;
// the PUBLISH message is alredy in the inflight queue, we don't need to re-enqueue but we need
// to change state to re-send PUBREC
if ( msgCtx ! = null ) {
msgCtx . State = MqttMsgState . QueuedQos2 ;
msgCtx . Flow = MqttMsgFlow . ToAcknowledge ;
enqueue = false ;
}
}
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
if ( enqueue ) {
// set a default state
MqttMsgState state = MqttMsgState . QueuedQos0 ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// based on QoS level, the messages flow between broker and client changes
switch ( msg . QosLevel ) {
// QoS Level 0
case MqttMsgBase . QOS_LEVEL_AT_MOST_ONCE :
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
state = MqttMsgState . QueuedQos0 ;
break ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// QoS Level 1
case MqttMsgBase . QOS_LEVEL_AT_LEAST_ONCE :
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
state = MqttMsgState . QueuedQos1 ;
break ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// QoS Level 2
case MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE :
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
state = MqttMsgState . QueuedQos2 ;
break ;
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// [v3.1.1] SUBSCRIBE and UNSUBSCRIBE aren't "officially" QOS = 1
// so QueuedQos1 state isn't valid for them
if ( msg . Type = = MqttMsgBase . MQTT_MSG_SUBSCRIBE_TYPE ) {
state = MqttMsgState . SendSubscribe ;
} else if ( msg . Type = = MqttMsgBase . MQTT_MSG_UNSUBSCRIBE_TYPE ) {
state = MqttMsgState . SendUnsubscribe ;
}
2018-04-01 19:31:18 +02:00
2019-11-26 15:34:16 +01:00
// queue message context
MqttMsgContext msgContext = new MqttMsgContext ( ) {
Message = msg ,
State = state ,
Flow = flow ,
Attempt = 0
} ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
lock ( this . inflightQueue ) {
// check number of messages inside inflight queue
enqueue = this . inflightQueue . Count < this . Settings . InflightQueueSize ;
2018-04-01 19:31:18 +02:00
2019-11-26 15:34:16 +01:00
if ( enqueue ) {
// enqueue message and unlock send thread
this . inflightQueue . Enqueue ( msgContext ) ;
2018-04-01 19:31:18 +02:00
#if TRACE
2019-11-26 15:34:16 +01:00
MqttUtility . Trace . WriteLine ( TraceLevel . Queuing , "enqueued {0}" , msg ) ;
2018-04-01 19:31:18 +02:00
#endif
2019-11-26 15:34:16 +01:00
// PUBLISH message
if ( msg . Type = = MqttMsgBase . MQTT_MSG_PUBLISH_TYPE ) {
// to publish and QoS level 1 or 2
if ( msgContext . Flow = = MqttMsgFlow . ToPublish & &
( msg . QosLevel = = MqttMsgBase . QOS_LEVEL_AT_LEAST_ONCE | |
msg . QosLevel = = MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE ) ) {
if ( this . session ! = null ) {
this . session . InflightMessages . Add ( msgContext . Key , msgContext ) ;
2018-03-31 22:42:28 +02:00
}
2019-11-26 15:34:16 +01:00
}
// to acknowledge and QoS level 2
else if ( msgContext . Flow = = MqttMsgFlow . ToAcknowledge & &
msg . QosLevel = = MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE ) {
if ( this . session ! = null ) {
this . session . InflightMessages . Add ( msgContext . Key , msgContext ) ;
2018-04-01 19:31:18 +02:00
}
2019-11-26 15:34:16 +01:00
}
2018-04-01 19:31:18 +02:00
}
2019-11-26 15:34:16 +01:00
}
}
}
2018-04-01 19:31:18 +02:00
2019-11-26 15:34:16 +01:00
this . inflightWaitHandle . Set ( ) ;
2018-04-01 19:31:18 +02:00
2019-11-26 15:34:16 +01:00
return enqueue ;
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Enqueue a message into the internal queue
/// </summary>
/// <param name="msg">Message to enqueue</param>
private void EnqueueInternal ( MqttMsgBase msg ) {
// enqueue is needed (or not)
Boolean enqueue = true ;
// if it is a PUBREL message (for QoS Level 2)
if ( msg . Type = = MqttMsgBase . MQTT_MSG_PUBREL_TYPE ) {
lock ( this . inflightQueue ) {
// if it is a PUBREL but the corresponding PUBLISH isn't in the inflight queue,
// it means that we processed PUBLISH message and received PUBREL and we sent PUBCOMP
// but publisher didn't receive PUBCOMP so it re-sent PUBREL. We need only to re-send PUBCOMP.
// NOTE : I need to find on message id and flow because the broker could be publish/received
// to/from client and message id could be the same (one tracked by broker and the other by client)
MqttMsgContextFinder msgCtxFinder = new MqttMsgContextFinder ( msg . MessageId , MqttMsgFlow . ToAcknowledge ) ;
MqttMsgContext msgCtx = ( MqttMsgContext ) QueueExtension . Get ( this . inflightQueue , msgCtxFinder . Find ) ;
// the PUBLISH message isn't in the inflight queue, it was already processed so
// we need to re-send PUBCOMP only
if ( msgCtx = = null ) {
MqttMsgPubcomp pubcomp = new MqttMsgPubcomp {
MessageId = msg . MessageId
} ;
this . Send ( pubcomp ) ;
enqueue = false ;
}
}
}
// if it is a PUBCOMP message (for QoS Level 2)
else if ( msg . Type = = MqttMsgBase . MQTT_MSG_PUBCOMP_TYPE ) {
lock ( this . inflightQueue ) {
// if it is a PUBCOMP but the corresponding PUBLISH isn't in the inflight queue,
// it means that we sent PUBLISH message, sent PUBREL (after receiving PUBREC) and already received PUBCOMP
// but publisher didn't receive PUBREL so it re-sent PUBCOMP. We need only to ignore this PUBCOMP.
// NOTE : I need to find on message id and flow because the broker could be publish/received
// to/from client and message id could be the same (one tracked by broker and the other by client)
MqttMsgContextFinder msgCtxFinder = new MqttMsgContextFinder ( msg . MessageId , MqttMsgFlow . ToPublish ) ;
MqttMsgContext msgCtx = ( MqttMsgContext ) QueueExtension . Get ( this . inflightQueue , msgCtxFinder . Find ) ;
// the PUBLISH message isn't in the inflight queue, it was already sent so we need to ignore this PUBCOMP
if ( msgCtx = = null ) {
enqueue = false ;
}
}
}
// if it is a PUBREC message (for QoS Level 2)
else if ( msg . Type = = MqttMsgBase . MQTT_MSG_PUBREC_TYPE ) {
lock ( this . inflightQueue ) {
// if it is a PUBREC but the corresponding PUBLISH isn't in the inflight queue,
// it means that we sent PUBLISH message more times (retries) but broker didn't send PUBREC in time
// the publish is failed and we need only to ignore this PUBREC.
// NOTE : I need to find on message id and flow because the broker could be publish/received
// to/from client and message id could be the same (one tracked by broker and the other by client)
MqttMsgContextFinder msgCtxFinder = new MqttMsgContextFinder ( msg . MessageId , MqttMsgFlow . ToPublish ) ;
MqttMsgContext msgCtx = ( MqttMsgContext ) QueueExtension . Get ( this . inflightQueue , msgCtxFinder . Find ) ;
// the PUBLISH message isn't in the inflight queue, it was already sent so we need to ignore this PUBREC
if ( msgCtx = = null ) {
enqueue = false ;
}
}
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
if ( enqueue ) {
lock ( this . internalQueue ) {
this . internalQueue . Enqueue ( msg ) ;
2018-04-01 19:31:18 +02:00
#if TRACE
2019-11-26 15:34:16 +01:00
MqttUtility . Trace . WriteLine ( TraceLevel . Queuing , "enqueued {0}" , msg ) ;
2018-04-01 19:31:18 +02:00
#endif
2019-11-26 15:34:16 +01:00
this . inflightWaitHandle . Set ( ) ;
2018-03-31 22:42:28 +02:00
}
2019-11-26 15:34:16 +01:00
}
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Thread for receiving messages
/// </summary>
private void ReceiveThread ( ) {
Int32 readBytes = 0 ;
Byte [ ] fixedHeaderFirstByte = new Byte [ 1 ] ;
Byte msgType ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
while ( this . isRunning ) {
try {
// read first byte (fixed header)
readBytes = this . channel . Receive ( fixedHeaderFirstByte ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
if ( readBytes > 0 ) {
2018-03-31 22:42:28 +02:00
#if BROKER
// update last message received ticks
this . lastCommTime = Environment . TickCount ;
#endif
2019-11-26 15:34:16 +01:00
// extract message type from received byte
msgType = ( Byte ) ( ( fixedHeaderFirstByte [ 0 ] & MqttMsgBase . MSG_TYPE_MASK ) > > MqttMsgBase . MSG_TYPE_OFFSET ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
switch ( msgType ) {
// CONNECT message received
case MqttMsgBase . MQTT_MSG_CONNECT_TYPE :
2018-03-31 22:42:28 +02:00
#if BROKER
2018-04-01 19:31:18 +02:00
MqttMsgConnect connect = MqttMsgConnect . Parse ( fixedHeaderFirstByte [ 0 ] , ( byte ) this . ProtocolVersion , this . channel ) ;
#if TRACE
Trace . WriteLine ( TraceLevel . Frame , "RECV {0}" , connect ) ;
#endif
2018-03-31 22:42:28 +02:00
// raise message received event
2018-04-01 19:31:18 +02:00
this . OnInternalEvent ( new MsgInternalEvent ( connect ) ) ;
2018-03-31 22:42:28 +02:00
break ;
#else
2019-11-26 15:34:16 +01:00
throw new MqttClientException ( MqttClientErrorCode . WrongBrokerMessage ) ;
2018-03-31 22:42:28 +02:00
#endif
2019-11-26 15:34:16 +01:00
// CONNACK message received
case MqttMsgBase . MQTT_MSG_CONNACK_TYPE :
2018-03-31 22:42:28 +02:00
#if BROKER
throw new MqttClientException ( MqttClientErrorCode . WrongBrokerMessage ) ;
#else
2019-11-26 15:34:16 +01:00
this . msgReceived = MqttMsgConnack . Parse ( fixedHeaderFirstByte [ 0 ] , ( Byte ) this . ProtocolVersion , this . channel ) ;
2018-04-01 19:31:18 +02:00
#if TRACE
2019-11-26 15:34:16 +01:00
MqttUtility . Trace . WriteLine ( TraceLevel . Frame , "RECV {0}" , this . msgReceived ) ;
2018-04-01 19:31:18 +02:00
#endif
2019-11-26 15:34:16 +01:00
this . syncEndReceiving . Set ( ) ;
break ;
2018-03-31 22:42:28 +02:00
#endif
2019-11-26 15:34:16 +01:00
// PINGREQ message received
case MqttMsgBase . MQTT_MSG_PINGREQ_TYPE :
2018-03-31 22:42:28 +02:00
#if BROKER
2018-04-01 19:31:18 +02:00
this . msgReceived = MqttMsgPingReq . Parse ( fixedHeaderFirstByte [ 0 ] , ( byte ) this . ProtocolVersion , this . channel ) ;
#if TRACE
Trace . WriteLine ( TraceLevel . Frame , "RECV {0}" , this . msgReceived ) ;
#endif
2018-03-31 22:42:28 +02:00
MqttMsgPingResp pingresp = new MqttMsgPingResp ( ) ;
2018-04-01 19:31:18 +02:00
this . Send ( pingresp ) ;
2018-03-31 22:42:28 +02:00
break ;
#else
2019-11-26 15:34:16 +01:00
throw new MqttClientException ( MqttClientErrorCode . WrongBrokerMessage ) ;
2018-03-31 22:42:28 +02:00
#endif
2019-11-26 15:34:16 +01:00
// PINGRESP message received
case MqttMsgBase . MQTT_MSG_PINGRESP_TYPE :
2018-03-31 22:42:28 +02:00
#if BROKER
throw new MqttClientException ( MqttClientErrorCode . WrongBrokerMessage ) ;
#else
2019-11-26 15:34:16 +01:00
this . msgReceived = MqttMsgPingResp . Parse ( fixedHeaderFirstByte [ 0 ] , ( Byte ) this . ProtocolVersion , this . channel ) ;
2018-04-01 19:31:18 +02:00
#if TRACE
2019-11-26 15:34:16 +01:00
MqttUtility . Trace . WriteLine ( TraceLevel . Frame , "RECV {0}" , this . msgReceived ) ;
2018-04-01 19:31:18 +02:00
#endif
2019-11-26 15:34:16 +01:00
this . syncEndReceiving . Set ( ) ;
break ;
2018-03-31 22:42:28 +02:00
#endif
2019-11-26 15:34:16 +01:00
// SUBSCRIBE message received
case MqttMsgBase . MQTT_MSG_SUBSCRIBE_TYPE :
2018-03-31 22:42:28 +02:00
#if BROKER
2018-04-01 19:31:18 +02:00
MqttMsgSubscribe subscribe = MqttMsgSubscribe . Parse ( fixedHeaderFirstByte [ 0 ] , ( byte ) this . ProtocolVersion , this . channel ) ;
#if TRACE
Trace . WriteLine ( TraceLevel . Frame , "RECV {0}" , subscribe ) ;
#endif
2018-03-31 22:42:28 +02:00
// raise message received event
2018-04-01 19:31:18 +02:00
this . OnInternalEvent ( new MsgInternalEvent ( subscribe ) ) ;
2018-03-31 22:42:28 +02:00
break ;
#else
2019-11-26 15:34:16 +01:00
throw new MqttClientException ( MqttClientErrorCode . WrongBrokerMessage ) ;
2018-03-31 22:42:28 +02:00
#endif
2019-11-26 15:34:16 +01:00
// SUBACK message received
case MqttMsgBase . MQTT_MSG_SUBACK_TYPE :
2018-03-31 22:42:28 +02:00
#if BROKER
throw new MqttClientException ( MqttClientErrorCode . WrongBrokerMessage ) ;
#else
2019-11-26 15:34:16 +01:00
// enqueue SUBACK message received (for QoS Level 1) into the internal queue
MqttMsgSuback suback = MqttMsgSuback . Parse ( fixedHeaderFirstByte [ 0 ] , ( Byte ) this . ProtocolVersion , this . channel ) ;
2018-04-01 19:31:18 +02:00
#if TRACE
2019-11-26 15:34:16 +01:00
MqttUtility . Trace . WriteLine ( TraceLevel . Frame , "RECV {0}" , suback ) ;
2018-04-01 19:31:18 +02:00
#endif
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// enqueue SUBACK message into the internal queue
this . EnqueueInternal ( suback ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
break ;
2018-03-31 22:42:28 +02:00
#endif
2019-11-26 15:34:16 +01:00
// PUBLISH message received
case MqttMsgBase . MQTT_MSG_PUBLISH_TYPE :
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
MqttMsgPublish publish = MqttMsgPublish . Parse ( fixedHeaderFirstByte [ 0 ] , ( Byte ) this . ProtocolVersion , this . channel ) ;
2018-04-01 19:31:18 +02:00
#if TRACE
2019-11-26 15:34:16 +01:00
MqttUtility . Trace . WriteLine ( TraceLevel . Frame , "RECV {0}" , publish ) ;
2018-04-01 19:31:18 +02:00
#endif
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// enqueue PUBLISH message to acknowledge into the inflight queue
this . EnqueueInflight ( publish , MqttMsgFlow . ToAcknowledge ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
break ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// PUBACK message received
case MqttMsgBase . MQTT_MSG_PUBACK_TYPE :
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// enqueue PUBACK message received (for QoS Level 1) into the internal queue
MqttMsgPuback puback = MqttMsgPuback . Parse ( fixedHeaderFirstByte [ 0 ] , ( Byte ) this . ProtocolVersion , this . channel ) ;
2018-04-01 19:31:18 +02:00
#if TRACE
2019-11-26 15:34:16 +01:00
MqttUtility . Trace . WriteLine ( TraceLevel . Frame , "RECV {0}" , puback ) ;
2018-04-01 19:31:18 +02:00
#endif
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// enqueue PUBACK message into the internal queue
this . EnqueueInternal ( puback ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
break ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// PUBREC message received
case MqttMsgBase . MQTT_MSG_PUBREC_TYPE :
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// enqueue PUBREC message received (for QoS Level 2) into the internal queue
MqttMsgPubrec pubrec = MqttMsgPubrec . Parse ( fixedHeaderFirstByte [ 0 ] , ( Byte ) this . ProtocolVersion , this . channel ) ;
2018-04-01 19:31:18 +02:00
#if TRACE
2019-11-26 15:34:16 +01:00
MqttUtility . Trace . WriteLine ( TraceLevel . Frame , "RECV {0}" , pubrec ) ;
2018-04-01 19:31:18 +02:00
#endif
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// enqueue PUBREC message into the internal queue
this . EnqueueInternal ( pubrec ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
break ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// PUBREL message received
case MqttMsgBase . MQTT_MSG_PUBREL_TYPE :
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// enqueue PUBREL message received (for QoS Level 2) into the internal queue
MqttMsgPubrel pubrel = MqttMsgPubrel . Parse ( fixedHeaderFirstByte [ 0 ] , ( Byte ) this . ProtocolVersion , this . channel ) ;
2018-04-01 19:31:18 +02:00
#if TRACE
2019-11-26 15:34:16 +01:00
MqttUtility . Trace . WriteLine ( TraceLevel . Frame , "RECV {0}" , pubrel ) ;
2018-04-01 19:31:18 +02:00
#endif
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// enqueue PUBREL message into the internal queue
this . EnqueueInternal ( pubrel ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
break ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// PUBCOMP message received
case MqttMsgBase . MQTT_MSG_PUBCOMP_TYPE :
// enqueue PUBCOMP message received (for QoS Level 2) into the internal queue
MqttMsgPubcomp pubcomp = MqttMsgPubcomp . Parse ( fixedHeaderFirstByte [ 0 ] , ( Byte ) this . ProtocolVersion , this . channel ) ;
2018-04-01 19:31:18 +02:00
#if TRACE
2019-11-26 15:34:16 +01:00
MqttUtility . Trace . WriteLine ( TraceLevel . Frame , "RECV {0}" , pubcomp ) ;
2018-04-01 19:31:18 +02:00
#endif
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// enqueue PUBCOMP message into the internal queue
this . EnqueueInternal ( pubcomp ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
break ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// UNSUBSCRIBE message received
case MqttMsgBase . MQTT_MSG_UNSUBSCRIBE_TYPE :
2018-03-31 22:42:28 +02:00
#if BROKER
2018-04-01 19:31:18 +02:00
MqttMsgUnsubscribe unsubscribe = MqttMsgUnsubscribe . Parse ( fixedHeaderFirstByte [ 0 ] , ( byte ) this . ProtocolVersion , this . channel ) ;
#if TRACE
Trace . WriteLine ( TraceLevel . Frame , "RECV {0}" , unsubscribe ) ;
#endif
2018-03-31 22:42:28 +02:00
// raise message received event
2018-04-01 19:31:18 +02:00
this . OnInternalEvent ( new MsgInternalEvent ( unsubscribe ) ) ;
2018-03-31 22:42:28 +02:00
break ;
#else
2019-11-26 15:34:16 +01:00
throw new MqttClientException ( MqttClientErrorCode . WrongBrokerMessage ) ;
2018-03-31 22:42:28 +02:00
#endif
2019-11-26 15:34:16 +01:00
// UNSUBACK message received
case MqttMsgBase . MQTT_MSG_UNSUBACK_TYPE :
2018-03-31 22:42:28 +02:00
#if BROKER
throw new MqttClientException ( MqttClientErrorCode . WrongBrokerMessage ) ;
#else
2019-11-26 15:34:16 +01:00
// enqueue UNSUBACK message received (for QoS Level 1) into the internal queue
MqttMsgUnsuback unsuback = MqttMsgUnsuback . Parse ( fixedHeaderFirstByte [ 0 ] , ( Byte ) this . ProtocolVersion , this . channel ) ;
2018-04-01 19:31:18 +02:00
#if TRACE
2019-11-26 15:34:16 +01:00
MqttUtility . Trace . WriteLine ( TraceLevel . Frame , "RECV {0}" , unsuback ) ;
2018-04-01 19:31:18 +02:00
#endif
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// enqueue UNSUBACK message into the internal queue
this . EnqueueInternal ( unsuback ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
break ;
2018-03-31 22:42:28 +02:00
#endif
2019-11-26 15:34:16 +01:00
// DISCONNECT message received
case MqttMsgDisconnect . MQTT_MSG_DISCONNECT_TYPE :
2018-03-31 22:42:28 +02:00
#if BROKER
2018-04-01 19:31:18 +02:00
MqttMsgDisconnect disconnect = MqttMsgDisconnect . Parse ( fixedHeaderFirstByte [ 0 ] , ( byte ) this . ProtocolVersion , this . channel ) ;
#if TRACE
Trace . WriteLine ( TraceLevel . Frame , "RECV {0}" , disconnect ) ;
#endif
2018-03-31 22:42:28 +02:00
// raise message received event
2018-04-01 19:31:18 +02:00
this . OnInternalEvent ( new MsgInternalEvent ( disconnect ) ) ;
2018-03-31 22:42:28 +02:00
break ;
#else
2019-11-26 15:34:16 +01:00
throw new MqttClientException ( MqttClientErrorCode . WrongBrokerMessage ) ;
2018-03-31 22:42:28 +02:00
#endif
2019-11-26 15:34:16 +01:00
default :
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
throw new MqttClientException ( MqttClientErrorCode . WrongBrokerMessage ) ;
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
this . exReceiving = null ;
}
// zero bytes read, peer gracefully closed socket
else {
// wake up thread that will notify connection is closing
this . OnConnectionClosing ( ) ;
}
} catch ( Exception e ) {
2018-04-01 19:31:18 +02:00
#if TRACE
2019-11-26 15:34:16 +01:00
MqttUtility . Trace . WriteLine ( TraceLevel . Error , "Exception occurred: {0}" , e . ToString ( ) ) ;
#endif
this . exReceiving = new MqttCommunicationException ( e ) ;
Boolean close = false ;
if ( e . GetType ( ) = = typeof ( MqttClientException ) ) {
// [v3.1.1] scenarios the receiver MUST close the network connection
MqttClientException ex = e as MqttClientException ;
close = ex . ErrorCode = = MqttClientErrorCode . InvalidFlagBits | |
ex . ErrorCode = = MqttClientErrorCode . InvalidProtocolName | |
ex . ErrorCode = = MqttClientErrorCode . InvalidConnectFlags ;
}
2018-04-01 19:31:18 +02:00
#if ! ( WINDOWS_APP | | WINDOWS_PHONE_APP )
2019-11-26 15:34:16 +01:00
else if ( e . GetType ( ) = = typeof ( IOException ) | | e . GetType ( ) = = typeof ( SocketException ) | |
e . InnerException ! = null & & e . InnerException . GetType ( ) = = typeof ( SocketException ) ) // added for SSL/TLS incoming connection that use SslStream that wraps SocketException
2018-04-01 19:31:18 +02:00
{
2019-11-26 15:34:16 +01:00
close = true ;
}
2018-04-01 19:31:18 +02:00
#endif
2019-11-26 15:34:16 +01:00
if ( close ) {
// wake up thread that will notify connection is closing
this . OnConnectionClosing ( ) ;
}
2018-03-31 22:42:28 +02:00
}
2019-11-26 15:34:16 +01:00
}
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Thread for handling keep alive message
/// </summary>
private void KeepAliveThread ( ) {
Int32 delta = 0 ;
Int32 wait = this . keepAlivePeriod ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// create event to signal that current thread is end
this . keepAliveEventEnd = new AutoResetEvent ( false ) ;
while ( this . isRunning ) {
#if MF_FRAMEWORK_VERSION_V4_2 | | MF_FRAMEWORK_VERSION_V4_3 | | COMPACT_FRAMEWORK
2018-03-31 22:42:28 +02:00
// waiting...
this . keepAliveEvent . WaitOne ( wait , false ) ;
#else
2019-11-26 15:34:16 +01:00
// waiting...
this . keepAliveEvent . WaitOne ( wait ) ;
2018-03-31 22:42:28 +02:00
#endif
2019-11-26 15:34:16 +01:00
if ( this . isRunning ) {
delta = Environment . TickCount - this . lastCommTime ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// if timeout exceeded ...
if ( delta > = this . keepAlivePeriod ) {
2018-03-31 22:42:28 +02:00
#if BROKER
// client must close connection
2018-04-01 19:31:18 +02:00
this . OnConnectionClosing ( ) ;
2018-03-31 22:42:28 +02:00
#else
2019-11-26 15:34:16 +01:00
// ... send keep alive
2018-04-01 19:31:18 +02:00
this . Ping ( ) ;
wait = this . keepAlivePeriod ;
2018-03-31 22:42:28 +02:00
#endif
2019-11-26 15:34:16 +01:00
} else {
// update waiting time
wait = this . keepAlivePeriod - delta ;
}
2018-03-31 22:42:28 +02:00
}
2019-11-26 15:34:16 +01:00
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// signal thread end
this . keepAliveEventEnd . Set ( ) ;
}
/// <summary>
/// Thread for raising event
/// </summary>
private void DispatchEventThread ( ) {
while ( this . isRunning ) {
2018-04-01 19:31:18 +02:00
#if BROKER
if ( ( this . eventQueue . Count = = 0 ) & & ! this . isConnectionClosing )
{
// broker need to receive the first message (CONNECT)
// within a reasonable amount of time after TCP/IP connection
if ( ! this . IsConnected )
{
// wait on receiving message from client with a connection timeout
if ( ! this . receiveEventWaitHandle . WaitOne ( this . settings . TimeoutOnConnection ) )
{
// client must close connection
this . Close ( ) ;
// client raw disconnection
this . OnConnectionClosed ( ) ;
}
}
else
{
// wait on receiving message from client
this . receiveEventWaitHandle . WaitOne ( ) ;
}
}
#else
2019-11-26 15:34:16 +01:00
if ( this . eventQueue . Count = = 0 & & ! this . isConnectionClosing ) {
// wait on receiving message from client
this . receiveEventWaitHandle . WaitOne ( ) ;
}
2018-04-01 19:31:18 +02:00
#endif
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// check if it is running or we are closing client
if ( this . isRunning ) {
// get event from queue
InternalEvent internalEvent = null ;
lock ( this . eventQueue ) {
if ( this . eventQueue . Count > 0 ) {
internalEvent = ( InternalEvent ) this . eventQueue . Dequeue ( ) ;
}
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// it's an event with a message inside
if ( internalEvent ! = null ) {
MqttMsgBase msg = ( ( MsgInternalEvent ) internalEvent ) . Message ;
2018-04-01 19:31:18 +02:00
2019-11-26 15:34:16 +01:00
if ( msg ! = null ) {
switch ( msg . Type ) {
// CONNECT message received
case MqttMsgBase . MQTT_MSG_CONNECT_TYPE :
2018-03-31 22:42:28 +02:00
#if BROKER
2018-04-01 19:31:18 +02:00
// raise connected client event (CONNECT message received)
this . OnMqttMsgConnected ( ( MqttMsgConnect ) msg ) ;
break ;
2018-03-31 22:42:28 +02:00
#else
2019-11-26 15:34:16 +01:00
throw new MqttClientException ( MqttClientErrorCode . WrongBrokerMessage ) ;
2018-03-31 22:42:28 +02:00
#endif
2019-11-26 15:34:16 +01:00
// SUBSCRIBE message received
case MqttMsgBase . MQTT_MSG_SUBSCRIBE_TYPE :
2018-03-31 22:42:28 +02:00
#if BROKER
2018-04-01 19:31:18 +02:00
MqttMsgSubscribe subscribe = ( MqttMsgSubscribe ) msg ;
// raise subscribe topic event (SUBSCRIBE message received)
this . OnMqttMsgSubscribeReceived ( subscribe . MessageId , subscribe . Topics , subscribe . QoSLevels ) ;
break ;
2018-03-31 22:42:28 +02:00
#else
2019-11-26 15:34:16 +01:00
throw new MqttClientException ( MqttClientErrorCode . WrongBrokerMessage ) ;
2018-03-31 22:42:28 +02:00
#endif
2019-11-26 15:34:16 +01:00
// SUBACK message received
case MqttMsgBase . MQTT_MSG_SUBACK_TYPE :
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// raise subscribed topic event (SUBACK message received)
this . OnMqttMsgSubscribed ( ( MqttMsgSuback ) msg ) ;
break ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// PUBLISH message received
case MqttMsgBase . MQTT_MSG_PUBLISH_TYPE :
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// PUBLISH message received in a published internal event, no publish succeeded
if ( internalEvent . GetType ( ) = = typeof ( MsgPublishedInternalEvent ) ) {
this . OnMqttMsgPublished ( msg . MessageId , false ) ;
} else {
// raise PUBLISH message received event
this . OnMqttMsgPublishReceived ( ( MqttMsgPublish ) msg ) ;
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
break ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// PUBACK message received
case MqttMsgBase . MQTT_MSG_PUBACK_TYPE :
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// raise published message event
// (PUBACK received for QoS Level 1)
this . OnMqttMsgPublished ( msg . MessageId , true ) ;
break ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// PUBREL message received
case MqttMsgBase . MQTT_MSG_PUBREL_TYPE :
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// raise message received event
// (PUBREL received for QoS Level 2)
this . OnMqttMsgPublishReceived ( ( MqttMsgPublish ) msg ) ;
break ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// PUBCOMP message received
case MqttMsgBase . MQTT_MSG_PUBCOMP_TYPE :
// raise published message event
// (PUBCOMP received for QoS Level 2)
this . OnMqttMsgPublished ( msg . MessageId , true ) ;
break ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// UNSUBSCRIBE message received from client
case MqttMsgBase . MQTT_MSG_UNSUBSCRIBE_TYPE :
2018-03-31 22:42:28 +02:00
#if BROKER
2018-04-01 19:31:18 +02:00
MqttMsgUnsubscribe unsubscribe = ( MqttMsgUnsubscribe ) msg ;
// raise unsubscribe topic event (UNSUBSCRIBE message received)
this . OnMqttMsgUnsubscribeReceived ( unsubscribe . MessageId , unsubscribe . Topics ) ;
break ;
2018-03-31 22:42:28 +02:00
#else
2019-11-26 15:34:16 +01:00
throw new MqttClientException ( MqttClientErrorCode . WrongBrokerMessage ) ;
2018-03-31 22:42:28 +02:00
#endif
2019-11-26 15:34:16 +01:00
// UNSUBACK message received
case MqttMsgBase . MQTT_MSG_UNSUBACK_TYPE :
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// raise unsubscribed topic event
this . OnMqttMsgUnsubscribed ( msg . MessageId ) ;
break ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// DISCONNECT message received from client
case MqttMsgDisconnect . MQTT_MSG_DISCONNECT_TYPE :
2018-03-31 22:42:28 +02:00
#if BROKER
2018-04-01 19:31:18 +02:00
// raise disconnected client event (DISCONNECT message received)
this . OnMqttMsgDisconnected ( ) ;
break ;
2018-03-31 22:42:28 +02:00
#else
2019-11-26 15:34:16 +01:00
throw new MqttClientException ( MqttClientErrorCode . WrongBrokerMessage ) ;
2018-03-31 22:42:28 +02:00
#endif
2019-11-26 15:34:16 +01:00
}
2018-03-31 22:42:28 +02:00
}
2019-11-26 15:34:16 +01:00
}
// all events for received messages dispatched, check if there is closing connection
if ( this . eventQueue . Count = = 0 & & this . isConnectionClosing ) {
// client must close connection
this . Close ( ) ;
// client raw disconnection
this . OnConnectionClosed ( ) ;
}
2018-03-31 22:42:28 +02:00
}
2019-11-26 15:34:16 +01:00
}
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Process inflight messages queue
/// </summary>
private void ProcessInflightThread ( ) {
MqttMsgContext msgContext = null ;
MqttMsgBase msgInflight = null ;
MqttMsgBase msgReceived = null ;
InternalEvent internalEvent = null ;
Boolean acknowledge = false ;
Int32 timeout = Timeout . Infinite ;
Int32 delta ;
Boolean msgReceivedProcessed = false ;
try {
while ( this . isRunning ) {
#if MF_FRAMEWORK_VERSION_V4_2 | | MF_FRAMEWORK_VERSION_V4_3 | | COMPACT_FRAMEWORK
2018-03-31 22:42:28 +02:00
// wait on message queueud to inflight
this . inflightWaitHandle . WaitOne ( timeout , false ) ;
#else
2019-11-26 15:34:16 +01:00
// wait on message queueud to inflight
this . inflightWaitHandle . WaitOne ( timeout ) ;
#endif
// it could be unblocked because Close() method is joining
if ( this . isRunning ) {
lock ( this . inflightQueue ) {
// message received and peeked from internal queue is processed
// NOTE : it has the corresponding message in inflight queue based on messageId
// (ex. a PUBREC for a PUBLISH, a SUBACK for a SUBSCRIBE, ...)
// if it's orphan we need to remove from internal queue
msgReceivedProcessed = false ;
acknowledge = false ;
msgReceived = null ;
// set timeout tu MaxValue instead of Infinte (-1) to perform
// compare with calcultad current msgTimeout
timeout = Int32 . MaxValue ;
// a message inflight could be re-enqueued but we have to
// analyze it only just one time for cycle
Int32 count = this . inflightQueue . Count ;
// process all inflight queued messages
while ( count > 0 ) {
count - - ;
acknowledge = false ;
msgReceived = null ;
// check to be sure that client isn't closing and all queues are now empty !
if ( ! this . isRunning ) {
break ;
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// dequeue message context from queue
msgContext = ( MqttMsgContext ) this . inflightQueue . Dequeue ( ) ;
// get inflight message
msgInflight = ( MqttMsgBase ) msgContext . Message ;
2018-04-01 19:31:18 +02:00
2019-11-26 15:34:16 +01:00
switch ( msgContext . State ) {
case MqttMsgState . QueuedQos0 :
// QoS 0, PUBLISH message to send to broker, no state change, no acknowledge
if ( msgContext . Flow = = MqttMsgFlow . ToPublish ) {
this . Send ( msgInflight ) ;
}
// QoS 0, no need acknowledge
else if ( msgContext . Flow = = MqttMsgFlow . ToAcknowledge ) {
internalEvent = new MsgInternalEvent ( msgInflight ) ;
// notify published message from broker (no need acknowledged)
this . OnInternalEvent ( internalEvent ) ;
}
2018-04-01 19:31:18 +02:00
#if TRACE
2019-11-26 15:34:16 +01:00
MqttUtility . Trace . WriteLine ( TraceLevel . Queuing , "processed {0}" , msgInflight ) ;
#endif
break ;
case MqttMsgState . QueuedQos1 :
// [v3.1.1] SUBSCRIBE and UNSIBSCRIBE aren't "officially" QOS = 1
case MqttMsgState . SendSubscribe :
case MqttMsgState . SendUnsubscribe :
// QoS 1, PUBLISH or SUBSCRIBE/UNSUBSCRIBE message to send to broker, state change to wait PUBACK or SUBACK/UNSUBACK
if ( msgContext . Flow = = MqttMsgFlow . ToPublish ) {
msgContext . Timestamp = Environment . TickCount ;
msgContext . Attempt + + ;
if ( msgInflight . Type = = MqttMsgBase . MQTT_MSG_PUBLISH_TYPE ) {
// PUBLISH message to send, wait for PUBACK
msgContext . State = MqttMsgState . WaitForPuback ;
// retry ? set dup flag [v3.1.1] only for PUBLISH message
if ( msgContext . Attempt > 1 ) {
msgInflight . DupFlag = true ;
}
} else if ( msgInflight . Type = = MqttMsgBase . MQTT_MSG_SUBSCRIBE_TYPE ) {
// SUBSCRIBE message to send, wait for SUBACK
msgContext . State = MqttMsgState . WaitForSuback ;
} else if ( msgInflight . Type = = MqttMsgBase . MQTT_MSG_UNSUBSCRIBE_TYPE ) {
// UNSUBSCRIBE message to send, wait for UNSUBACK
msgContext . State = MqttMsgState . WaitForUnsuback ;
}
this . Send ( msgInflight ) ;
// update timeout : minimum between delay (based on current message sent) or current timeout
timeout = ( this . Settings . DelayOnRetry < timeout ) ? this . Settings . DelayOnRetry : timeout ;
// re-enqueue message (I have to re-analyze for receiving PUBACK, SUBACK or UNSUBACK)
this . inflightQueue . Enqueue ( msgContext ) ;
}
// QoS 1, PUBLISH message received from broker to acknowledge, send PUBACK
else if ( msgContext . Flow = = MqttMsgFlow . ToAcknowledge ) {
MqttMsgPuback puback = new MqttMsgPuback {
MessageId = msgInflight . MessageId
} ;
this . Send ( puback ) ;
internalEvent = new MsgInternalEvent ( msgInflight ) ;
// notify published message from broker and acknowledged
this . OnInternalEvent ( internalEvent ) ;
2018-04-01 19:31:18 +02:00
#if TRACE
2019-11-26 15:34:16 +01:00
MqttUtility . Trace . WriteLine ( TraceLevel . Queuing , "processed {0}" , msgInflight ) ;
#endif
}
break ;
case MqttMsgState . QueuedQos2 :
// QoS 2, PUBLISH message to send to broker, state change to wait PUBREC
if ( msgContext . Flow = = MqttMsgFlow . ToPublish ) {
msgContext . Timestamp = Environment . TickCount ;
msgContext . Attempt + + ;
msgContext . State = MqttMsgState . WaitForPubrec ;
// retry ? set dup flag
if ( msgContext . Attempt > 1 ) {
msgInflight . DupFlag = true ;
}
this . Send ( msgInflight ) ;
// update timeout : minimum between delay (based on current message sent) or current timeout
timeout = ( this . Settings . DelayOnRetry < timeout ) ? this . Settings . DelayOnRetry : timeout ;
// re-enqueue message (I have to re-analyze for receiving PUBREC)
this . inflightQueue . Enqueue ( msgContext ) ;
}
// QoS 2, PUBLISH message received from broker to acknowledge, send PUBREC, state change to wait PUBREL
else if ( msgContext . Flow = = MqttMsgFlow . ToAcknowledge ) {
MqttMsgPubrec pubrec = new MqttMsgPubrec {
MessageId = msgInflight . MessageId
} ;
msgContext . State = MqttMsgState . WaitForPubrel ;
this . Send ( pubrec ) ;
// re-enqueue message (I have to re-analyze for receiving PUBREL)
this . inflightQueue . Enqueue ( msgContext ) ;
}
break ;
case MqttMsgState . WaitForPuback :
case MqttMsgState . WaitForSuback :
case MqttMsgState . WaitForUnsuback :
// QoS 1, waiting for PUBACK of a PUBLISH message sent or
// waiting for SUBACK of a SUBSCRIBE message sent or
// waiting for UNSUBACK of a UNSUBSCRIBE message sent or
if ( msgContext . Flow = = MqttMsgFlow . ToPublish ) {
acknowledge = false ;
lock ( this . internalQueue ) {
if ( this . internalQueue . Count > 0 ) {
msgReceived = ( MqttMsgBase ) this . internalQueue . Peek ( ) ;
}
}
// it is a PUBACK message or a SUBACK/UNSUBACK message
if ( msgReceived ! = null ) {
// PUBACK message or SUBACK/UNSUBACK message for the current message
if ( msgReceived . Type = = MqttMsgBase . MQTT_MSG_PUBACK_TYPE & & msgInflight . Type = = MqttMsgBase . MQTT_MSG_PUBLISH_TYPE & & msgReceived . MessageId = = msgInflight . MessageId | |
msgReceived . Type = = MqttMsgBase . MQTT_MSG_SUBACK_TYPE & & msgInflight . Type = = MqttMsgBase . MQTT_MSG_SUBSCRIBE_TYPE & & msgReceived . MessageId = = msgInflight . MessageId | |
msgReceived . Type = = MqttMsgBase . MQTT_MSG_UNSUBACK_TYPE & & msgInflight . Type = = MqttMsgBase . MQTT_MSG_UNSUBSCRIBE_TYPE & & msgReceived . MessageId = = msgInflight . MessageId ) {
lock ( this . internalQueue ) {
// received message processed
this . internalQueue . Dequeue ( ) ;
acknowledge = true ;
msgReceivedProcessed = true ;
2018-04-01 19:31:18 +02:00
#if TRACE
2019-11-26 15:34:16 +01:00
MqttUtility . Trace . WriteLine ( TraceLevel . Queuing , "dequeued {0}" , msgReceived ) ;
2018-04-01 19:31:18 +02:00
#endif
2019-11-26 15:34:16 +01:00
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// if PUBACK received, confirm published with flag
internalEvent = msgReceived . Type = = MqttMsgBase . MQTT_MSG_PUBACK_TYPE
? new MsgPublishedInternalEvent ( msgReceived , true )
: new MsgInternalEvent ( msgReceived ) ;
2018-04-01 19:31:18 +02:00
2019-11-26 15:34:16 +01:00
// notify received acknowledge from broker of a published message or subscribe/unsubscribe message
this . OnInternalEvent ( internalEvent ) ;
2018-04-01 19:31:18 +02:00
2019-11-26 15:34:16 +01:00
// PUBACK received for PUBLISH message with QoS Level 1, remove from session state
if ( msgInflight . Type = = MqttMsgBase . MQTT_MSG_PUBLISH_TYPE & &
this . session ! = null & &
#if MF_FRAMEWORK_VERSION_V4_2 | | MF_FRAMEWORK_VERSION_V4_3 | | COMPACT_FRAMEWORK
2018-04-01 19:31:18 +02:00
( this . session . InflightMessages . Contains ( msgContext . Key ) ) )
#else
2019-11-26 15:34:16 +01:00
this . session . InflightMessages . ContainsKey ( msgContext . Key ) )
2018-04-01 19:31:18 +02:00
#endif
{
2019-11-26 15:34:16 +01:00
this . session . InflightMessages . Remove ( msgContext . Key ) ;
}
2018-04-01 19:31:18 +02:00
#if TRACE
2019-11-26 15:34:16 +01:00
MqttUtility . Trace . WriteLine ( TraceLevel . Queuing , "processed {0}" , msgInflight ) ;
#endif
}
}
// current message not acknowledged, no PUBACK or SUBACK/UNSUBACK or not equal messageid
if ( ! acknowledge ) {
delta = Environment . TickCount - msgContext . Timestamp ;
// check timeout for receiving PUBACK since PUBLISH was sent or
// for receiving SUBACK since SUBSCRIBE was sent or
// for receiving UNSUBACK since UNSUBSCRIBE was sent
if ( delta > = this . Settings . DelayOnRetry ) {
// max retry not reached, resend
if ( msgContext . Attempt < this . Settings . AttemptsOnRetry ) {
msgContext . State = MqttMsgState . QueuedQos1 ;
// re-enqueue message
this . inflightQueue . Enqueue ( msgContext ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// update timeout (0 -> reanalyze queue immediately)
timeout = 0 ;
} else {
// if PUBACK for a PUBLISH message not received after retries, raise event for not published
if ( msgInflight . Type = = MqttMsgBase . MQTT_MSG_PUBLISH_TYPE ) {
// PUBACK not received in time, PUBLISH retries failed, need to remove from session inflight messages too
if ( this . session ! = null & &
#if MF_FRAMEWORK_VERSION_V4_2 | | MF_FRAMEWORK_VERSION_V4_3 | | COMPACT_FRAMEWORK
2018-04-01 19:31:18 +02:00
( this . session . InflightMessages . Contains ( msgContext . Key ) ) )
#else
2019-11-26 15:34:16 +01:00
this . session . InflightMessages . ContainsKey ( msgContext . Key ) )
2018-04-01 19:31:18 +02:00
#endif
{
2019-11-26 15:34:16 +01:00
this . session . InflightMessages . Remove ( msgContext . Key ) ;
}
internalEvent = new MsgPublishedInternalEvent ( msgInflight , false ) ;
// notify not received acknowledge from broker and message not published
this . OnInternalEvent ( internalEvent ) ;
}
// NOTE : not raise events for SUBACK or UNSUBACK not received
// for the user no event raised means subscribe/unsubscribe failed
}
} else {
// re-enqueue message (I have to re-analyze for receiving PUBACK, SUBACK or UNSUBACK)
this . inflightQueue . Enqueue ( msgContext ) ;
// update timeout
Int32 msgTimeout = this . Settings . DelayOnRetry - delta ;
timeout = ( msgTimeout < timeout ) ? msgTimeout : timeout ;
}
}
}
break ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
case MqttMsgState . WaitForPubrec :
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// QoS 2, waiting for PUBREC of a PUBLISH message sent
if ( msgContext . Flow = = MqttMsgFlow . ToPublish ) {
acknowledge = false ;
lock ( this . internalQueue ) {
if ( this . internalQueue . Count > 0 ) {
msgReceived = ( MqttMsgBase ) this . internalQueue . Peek ( ) ;
}
}
// it is a PUBREC message
if ( msgReceived ! = null & & msgReceived . Type = = MqttMsgBase . MQTT_MSG_PUBREC_TYPE ) {
// PUBREC message for the current PUBLISH message, send PUBREL, wait for PUBCOMP
if ( msgReceived . MessageId = = msgInflight . MessageId ) {
lock ( this . internalQueue ) {
// received message processed
this . internalQueue . Dequeue ( ) ;
acknowledge = true ;
msgReceivedProcessed = true ;
#if TRACE
MqttUtility . Trace . WriteLine ( TraceLevel . Queuing , "dequeued {0}" , msgReceived ) ;
#endif
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
MqttMsgPubrel pubrel = new MqttMsgPubrel {
MessageId = msgInflight . MessageId
} ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
msgContext . State = MqttMsgState . WaitForPubcomp ;
msgContext . Timestamp = Environment . TickCount ;
msgContext . Attempt = 1 ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
this . Send ( pubrel ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// update timeout : minimum between delay (based on current message sent) or current timeout
timeout = ( this . Settings . DelayOnRetry < timeout ) ? this . Settings . DelayOnRetry : timeout ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// re-enqueue message
this . inflightQueue . Enqueue ( msgContext ) ;
}
}
// current message not acknowledged
if ( ! acknowledge ) {
delta = Environment . TickCount - msgContext . Timestamp ;
// check timeout for receiving PUBREC since PUBLISH was sent
if ( delta > = this . Settings . DelayOnRetry ) {
// max retry not reached, resend
if ( msgContext . Attempt < this . Settings . AttemptsOnRetry ) {
msgContext . State = MqttMsgState . QueuedQos2 ;
// re-enqueue message
this . inflightQueue . Enqueue ( msgContext ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// update timeout (0 -> reanalyze queue immediately)
timeout = 0 ;
} else {
// PUBREC not received in time, PUBLISH retries failed, need to remove from session inflight messages too
if ( this . session ! = null & &
#if MF_FRAMEWORK_VERSION_V4_2 | | MF_FRAMEWORK_VERSION_V4_3 | | COMPACT_FRAMEWORK
2018-04-01 19:31:18 +02:00
( this . session . InflightMessages . Contains ( msgContext . Key ) ) )
#else
2019-11-26 15:34:16 +01:00
this . session . InflightMessages . ContainsKey ( msgContext . Key ) )
2018-04-01 19:31:18 +02:00
#endif
{
2019-11-26 15:34:16 +01:00
this . session . InflightMessages . Remove ( msgContext . Key ) ;
}
// if PUBREC for a PUBLISH message not received after retries, raise event for not published
internalEvent = new MsgPublishedInternalEvent ( msgInflight , false ) ;
// notify not received acknowledge from broker and message not published
this . OnInternalEvent ( internalEvent ) ;
}
} else {
// re-enqueue message
this . inflightQueue . Enqueue ( msgContext ) ;
// update timeout
Int32 msgTimeout = this . Settings . DelayOnRetry - delta ;
timeout = ( msgTimeout < timeout ) ? msgTimeout : timeout ;
}
}
}
break ;
case MqttMsgState . WaitForPubrel :
// QoS 2, waiting for PUBREL of a PUBREC message sent
if ( msgContext . Flow = = MqttMsgFlow . ToAcknowledge ) {
lock ( this . internalQueue ) {
if ( this . internalQueue . Count > 0 ) {
msgReceived = ( MqttMsgBase ) this . internalQueue . Peek ( ) ;
}
}
// it is a PUBREL message
if ( msgReceived ! = null & & msgReceived . Type = = MqttMsgBase . MQTT_MSG_PUBREL_TYPE ) {
// PUBREL message for the current message, send PUBCOMP
if ( msgReceived . MessageId = = msgInflight . MessageId ) {
lock ( this . internalQueue ) {
// received message processed
this . internalQueue . Dequeue ( ) ;
msgReceivedProcessed = true ;
2018-04-01 19:31:18 +02:00
#if TRACE
2019-11-26 15:34:16 +01:00
MqttUtility . Trace . WriteLine ( TraceLevel . Queuing , "dequeued {0}" , msgReceived ) ;
2018-04-01 19:31:18 +02:00
#endif
2019-11-26 15:34:16 +01:00
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
MqttMsgPubcomp pubcomp = new MqttMsgPubcomp {
MessageId = msgInflight . MessageId
} ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
this . Send ( pubcomp ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
internalEvent = new MsgInternalEvent ( msgInflight ) ;
// notify published message from broker and acknowledged
this . OnInternalEvent ( internalEvent ) ;
2018-04-01 19:31:18 +02:00
2019-11-26 15:34:16 +01:00
// PUBREL received (and PUBCOMP sent) for PUBLISH message with QoS Level 2, remove from session state
if ( msgInflight . Type = = MqttMsgBase . MQTT_MSG_PUBLISH_TYPE & &
this . session ! = null & &
#if MF_FRAMEWORK_VERSION_V4_2 | | MF_FRAMEWORK_VERSION_V4_3 | | COMPACT_FRAMEWORK
2018-04-01 19:31:18 +02:00
( this . session . InflightMessages . Contains ( msgContext . Key ) ) )
#else
2019-11-26 15:34:16 +01:00
this . session . InflightMessages . ContainsKey ( msgContext . Key ) )
2018-04-01 19:31:18 +02:00
#endif
{
2019-11-26 15:34:16 +01:00
this . session . InflightMessages . Remove ( msgContext . Key ) ;
}
2018-04-01 19:31:18 +02:00
#if TRACE
2019-11-26 15:34:16 +01:00
MqttUtility . Trace . WriteLine ( TraceLevel . Queuing , "processed {0}" , msgInflight ) ;
#endif
} else {
// re-enqueue message
this . inflightQueue . Enqueue ( msgContext ) ;
}
} else {
// re-enqueue message
this . inflightQueue . Enqueue ( msgContext ) ;
}
}
break ;
case MqttMsgState . WaitForPubcomp :
// QoS 2, waiting for PUBCOMP of a PUBREL message sent
if ( msgContext . Flow = = MqttMsgFlow . ToPublish ) {
acknowledge = false ;
lock ( this . internalQueue ) {
if ( this . internalQueue . Count > 0 ) {
msgReceived = ( MqttMsgBase ) this . internalQueue . Peek ( ) ;
}
}
// it is a PUBCOMP message
if ( msgReceived ! = null & & msgReceived . Type = = MqttMsgBase . MQTT_MSG_PUBCOMP_TYPE ) {
// PUBCOMP message for the current message
if ( msgReceived . MessageId = = msgInflight . MessageId ) {
lock ( this . internalQueue ) {
// received message processed
this . internalQueue . Dequeue ( ) ;
acknowledge = true ;
msgReceivedProcessed = true ;
2018-04-01 19:31:18 +02:00
#if TRACE
2019-11-26 15:34:16 +01:00
MqttUtility . Trace . WriteLine ( TraceLevel . Queuing , "dequeued {0}" , msgReceived ) ;
2018-04-01 19:31:18 +02:00
#endif
2019-11-26 15:34:16 +01:00
}
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
internalEvent = new MsgPublishedInternalEvent ( msgReceived , true ) ;
// notify received acknowledge from broker of a published message
this . OnInternalEvent ( internalEvent ) ;
2018-04-01 19:31:18 +02:00
2019-11-26 15:34:16 +01:00
// PUBCOMP received for PUBLISH message with QoS Level 2, remove from session state
if ( msgInflight . Type = = MqttMsgBase . MQTT_MSG_PUBLISH_TYPE & &
this . session ! = null & &
#if MF_FRAMEWORK_VERSION_V4_2 | | MF_FRAMEWORK_VERSION_V4_3 | | COMPACT_FRAMEWORK
2018-04-01 19:31:18 +02:00
( this . session . InflightMessages . Contains ( msgContext . Key ) ) )
#else
2019-11-26 15:34:16 +01:00
this . session . InflightMessages . ContainsKey ( msgContext . Key ) )
2018-04-01 19:31:18 +02:00
#endif
{
2019-11-26 15:34:16 +01:00
this . session . InflightMessages . Remove ( msgContext . Key ) ;
}
2018-04-01 19:31:18 +02:00
#if TRACE
2019-11-26 15:34:16 +01:00
MqttUtility . Trace . WriteLine ( TraceLevel . Queuing , "processed {0}" , msgInflight ) ;
#endif
}
}
// it is a PUBREC message
else if ( msgReceived ! = null & & msgReceived . Type = = MqttMsgBase . MQTT_MSG_PUBREC_TYPE ) {
// another PUBREC message for the current message due to a retransmitted PUBLISH
// I'm in waiting for PUBCOMP, so I can discard this PUBREC
if ( msgReceived . MessageId = = msgInflight . MessageId ) {
lock ( this . internalQueue ) {
// received message processed
this . internalQueue . Dequeue ( ) ;
acknowledge = true ;
msgReceivedProcessed = true ;
2018-04-01 19:31:18 +02:00
#if TRACE
2019-11-26 15:34:16 +01:00
MqttUtility . Trace . WriteLine ( TraceLevel . Queuing , "dequeued {0}" , msgReceived ) ;
#endif
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// re-enqueue message
this . inflightQueue . Enqueue ( msgContext ) ;
}
}
}
// current message not acknowledged
if ( ! acknowledge ) {
delta = Environment . TickCount - msgContext . Timestamp ;
// check timeout for receiving PUBCOMP since PUBREL was sent
if ( delta > = this . Settings . DelayOnRetry ) {
// max retry not reached, resend
if ( msgContext . Attempt < this . Settings . AttemptsOnRetry ) {
msgContext . State = MqttMsgState . SendPubrel ;
// re-enqueue message
this . inflightQueue . Enqueue ( msgContext ) ;
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
// update timeout (0 -> reanalyze queue immediately)
timeout = 0 ;
} else {
// PUBCOMP not received, PUBREL retries failed, need to remove from session inflight messages too
if ( this . session ! = null & &
#if MF_FRAMEWORK_VERSION_V4_2 | | MF_FRAMEWORK_VERSION_V4_3 | | COMPACT_FRAMEWORK
2018-04-01 19:31:18 +02:00
( this . session . InflightMessages . Contains ( msgContext . Key ) ) )
#else
2019-11-26 15:34:16 +01:00
this . session . InflightMessages . ContainsKey ( msgContext . Key ) )
2018-04-01 19:31:18 +02:00
#endif
{
2019-11-26 15:34:16 +01:00
this . session . InflightMessages . Remove ( msgContext . Key ) ;
2018-03-31 22:42:28 +02:00
}
2019-11-26 15:34:16 +01:00
// if PUBCOMP for a PUBLISH message not received after retries, raise event for not published
internalEvent = new MsgPublishedInternalEvent ( msgInflight , false ) ;
// notify not received acknowledge from broker and message not published
this . OnInternalEvent ( internalEvent ) ;
}
} else {
// re-enqueue message
this . inflightQueue . Enqueue ( msgContext ) ;
// update timeout
Int32 msgTimeout = this . Settings . DelayOnRetry - delta ;
timeout = ( msgTimeout < timeout ) ? msgTimeout : timeout ;
}
}
}
break ;
2018-04-01 19:31:18 +02:00
2019-11-26 15:34:16 +01:00
case MqttMsgState . SendPubrec :
// TODO : impossible ? --> QueuedQos2 ToAcknowledge
break ;
case MqttMsgState . SendPubrel :
// QoS 2, PUBREL message to send to broker, state change to wait PUBCOMP
if ( msgContext . Flow = = MqttMsgFlow . ToPublish ) {
MqttMsgPubrel pubrel = new MqttMsgPubrel {
MessageId = msgInflight . MessageId
} ;
msgContext . State = MqttMsgState . WaitForPubcomp ;
msgContext . Timestamp = Environment . TickCount ;
msgContext . Attempt + + ;
// retry ? set dup flag [v3.1.1] no needed
if ( this . ProtocolVersion = = MqttProtocolVersion . Version_3_1 ) {
if ( msgContext . Attempt > 1 ) {
pubrel . DupFlag = true ;
2018-03-31 22:42:28 +02:00
}
2019-11-26 15:34:16 +01:00
}
this . Send ( pubrel ) ;
// update timeout : minimum between delay (based on current message sent) or current timeout
timeout = ( this . Settings . DelayOnRetry < timeout ) ? this . Settings . DelayOnRetry : timeout ;
// re-enqueue message
this . inflightQueue . Enqueue ( msgContext ) ;
2018-03-31 22:42:28 +02:00
}
2019-11-26 15:34:16 +01:00
break ;
case MqttMsgState . SendPubcomp :
// TODO : impossible ?
break ;
case MqttMsgState . SendPuback :
// TODO : impossible ? --> QueuedQos1 ToAcknowledge
break ;
default :
break ;
2018-03-31 22:42:28 +02:00
}
2019-11-26 15:34:16 +01:00
}
// if calculated timeout is MaxValue, it means that must be Infinite (-1)
if ( timeout = = Int32 . MaxValue ) {
timeout = Timeout . Infinite ;
}
2018-04-01 19:31:18 +02:00
2019-11-26 15:34:16 +01:00
// if message received is orphan, no corresponding message in inflight queue
// based on messageId, we need to remove from the queue
if ( msgReceived ! = null & & ! msgReceivedProcessed ) {
this . internalQueue . Dequeue ( ) ;
2018-04-01 19:31:18 +02:00
#if TRACE
2019-11-26 15:34:16 +01:00
MqttUtility . Trace . WriteLine ( TraceLevel . Queuing , "dequeued {0} orphan" , msgReceived ) ;
2018-04-01 19:31:18 +02:00
#endif
2019-11-26 15:34:16 +01:00
}
2018-04-01 19:31:18 +02:00
}
2019-11-26 15:34:16 +01:00
}
}
} catch ( MqttCommunicationException e ) {
// possible exception on Send, I need to re-enqueue not sent message
if ( msgContext ! = null ) {
// re-enqueue message
this . inflightQueue . Enqueue ( msgContext ) ;
2018-04-01 19:31:18 +02:00
}
2019-11-26 15:34:16 +01:00
#if TRACE
MqttUtility . Trace . WriteLine ( TraceLevel . Error , "Exception occurred: {0}" , e . ToString ( ) ) ;
#endif
2018-04-01 19:31:18 +02:00
2019-11-26 15:34:16 +01:00
// raise disconnection client event
this . OnConnectionClosing ( ) ;
}
}
2018-04-01 19:31:18 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Restore session
/// </summary>
private void RestoreSession ( ) {
// if not clean session
if ( ! this . CleanSession ) {
// there is a previous session
if ( this . session ! = null ) {
lock ( this . inflightQueue ) {
foreach ( MqttMsgContext msgContext in this . session . InflightMessages . Values ) {
this . inflightQueue . Enqueue ( msgContext ) ;
// if it is a PUBLISH message to publish
if ( msgContext . Message . Type = = MqttMsgBase . MQTT_MSG_PUBLISH_TYPE & &
msgContext . Flow = = MqttMsgFlow . ToPublish ) {
// it's QoS 1 and we haven't received PUBACK
if ( msgContext . Message . QosLevel = = MqttMsgBase . QOS_LEVEL_AT_LEAST_ONCE & &
msgContext . State = = MqttMsgState . WaitForPuback ) {
// we haven't received PUBACK, we need to resend PUBLISH message
msgContext . State = MqttMsgState . QueuedQos1 ;
2018-04-01 19:31:18 +02:00
}
2019-11-26 15:34:16 +01:00
// it's QoS 2
else if ( msgContext . Message . QosLevel = = MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE ) {
// we haven't received PUBREC, we need to resend PUBLISH message
if ( msgContext . State = = MqttMsgState . WaitForPubrec ) {
msgContext . State = MqttMsgState . QueuedQos2 ;
}
// we haven't received PUBCOMP, we need to resend PUBREL for it
else if ( msgContext . State = = MqttMsgState . WaitForPubcomp ) {
msgContext . State = MqttMsgState . SendPubrel ;
}
2018-04-01 19:31:18 +02:00
}
2019-11-26 15:34:16 +01:00
}
2018-04-01 19:31:18 +02:00
}
2019-11-26 15:34:16 +01:00
}
// unlock process inflight queue
_ = this . inflightWaitHandle . Set ( ) ;
} else {
// create new session
this . session = new MqttClientSession ( this . ClientId ) ;
}
}
// clean any previous session
else {
if ( this . session ! = null ) {
this . session . Clear ( ) ;
2018-04-01 19:31:18 +02:00
}
2019-11-26 15:34:16 +01:00
}
}
2018-04-01 19:31:18 +02:00
#if BROKER
/// <summary>
/// Load a given session
/// </summary>
/// <param name="session">MQTT Client session to load</param>
public void LoadSession ( MqttClientSession session )
{
// if not clean session
if ( ! this . CleanSession )
{
// set the session ...
this . session = session ;
// ... and restore it
this . RestoreSession ( ) ;
2018-03-31 22:42:28 +02:00
}
}
2018-04-01 19:31:18 +02:00
#endif
2018-03-31 22:42:28 +02:00
2019-11-26 15:34:16 +01:00
/// <summary>
/// Generate the next message identifier
/// </summary>
/// <returns>Message identifier</returns>
private UInt16 GetMessageId ( ) {
// if 0 or max UInt16, it becomes 1 (first valid messageId)
this . messageIdCounter = ( this . messageIdCounter % UInt16 . MaxValue ! = 0 ) ? ( UInt16 ) ( this . messageIdCounter + 1 ) : ( UInt16 ) 1 ;
return this . messageIdCounter ;
2018-03-31 22:42:28 +02:00
}
2018-04-01 19:31:18 +02:00
/// <summary>
2019-11-26 15:34:16 +01:00
/// Finder class for PUBLISH message inside a queue
2018-04-01 19:31:18 +02:00
/// </summary>
2019-11-26 15:34:16 +01:00
internal class MqttMsgContextFinder {
// PUBLISH message id
internal UInt16 MessageId { get ; set ; }
// message flow into inflight queue
internal MqttMsgFlow Flow { get ; set ; }
/// <summary>
/// Constructor
/// </summary>
/// <param name="messageId">Message Id</param>
/// <param name="flow">Message flow inside inflight queue</param>
internal MqttMsgContextFinder ( UInt16 messageId , MqttMsgFlow flow ) {
this . MessageId = messageId ;
this . Flow = flow ;
}
internal Boolean Find ( Object item ) {
MqttMsgContext msgCtx = ( MqttMsgContext ) item ;
return msgCtx . Message . Type = = MqttMsgBase . MQTT_MSG_PUBLISH_TYPE & &
msgCtx . Message . MessageId = = this . MessageId & &
msgCtx . Flow = = this . Flow ;
}
2018-04-01 19:31:18 +02:00
}
2019-11-26 15:34:16 +01:00
}
/// <summary>
/// MQTT protocol version
/// </summary>
public enum MqttProtocolVersion {
Version_3_1 = MqttMsgConnect . PROTOCOL_VERSION_V3_1 ,
Version_3_1_1 = MqttMsgConnect . PROTOCOL_VERSION_V3_1_1
}
}