diff --git a/mqtt/M2Mqtt/Messages/MqttMsgPublish.cs b/mqtt/M2Mqtt/Messages/MqttMsgPublish.cs
index 6bbf207..0809af9 100644
--- a/mqtt/M2Mqtt/Messages/MqttMsgPublish.cs
+++ b/mqtt/M2Mqtt/Messages/MqttMsgPublish.cs
@@ -18,260 +18,232 @@ using System;
using System.Text;
using uPLibrary.Networking.M2Mqtt.Exceptions;
-namespace uPLibrary.Networking.M2Mqtt.Messages
-{
+namespace uPLibrary.Networking.M2Mqtt.Messages {
+ ///
+ /// Class for PUBLISH message from client to broker
+ ///
+ public class MqttMsgPublish : MqttMsgBase {
+ #region Properties...
+
///
- /// Class for PUBLISH message from client to broker
+ /// Message topic
///
- public class MqttMsgPublish : MqttMsgBase
- {
- #region Properties...
-
- ///
- /// Message topic
- ///
- public string Topic
- {
- get { return this.topic; }
- set { this.topic = value; }
- }
-
- ///
- /// Message data
- ///
- public byte[] Message
- {
- get { return this.message; }
- set { this.message = value; }
- }
-
- #endregion
-
- // message topic
- private string topic;
- // message data
- private byte[] message;
-
- ///
- /// Constructor
- ///
- public MqttMsgPublish()
- {
- this.type = MQTT_MSG_PUBLISH_TYPE;
- }
-
- ///
- /// Constructor
- ///
- /// Message topic
- /// Message data
- public MqttMsgPublish(string topic, byte[] message) :
- this(topic, message, false, QOS_LEVEL_AT_MOST_ONCE, false)
- {
- }
-
- ///
- /// Constructor
- ///
- /// Message topic
- /// Message data
- /// Duplicate flag
- /// Quality of Service level
- /// Retain flag
- public MqttMsgPublish(string topic,
- byte[] message,
- bool dupFlag,
- byte qosLevel,
- bool retain) : base()
- {
- this.type = MQTT_MSG_PUBLISH_TYPE;
-
- this.topic = topic;
- this.message = message;
- this.dupFlag = dupFlag;
- this.qosLevel = qosLevel;
- this.retain = retain;
- this.messageId = 0;
- }
-
- public override byte[] GetBytes(byte protocolVersion)
- {
- int fixedHeaderSize = 0;
- int varHeaderSize = 0;
- int payloadSize = 0;
- int remainingLength = 0;
- byte[] buffer;
- int index = 0;
-
- // topic can't contain wildcards
- if ((this.topic.IndexOf('#') != -1) || (this.topic.IndexOf('+') != -1))
- throw new MqttClientException(MqttClientErrorCode.TopicWildcard);
-
- // check topic length
- if ((this.topic.Length < MIN_TOPIC_LENGTH) || (this.topic.Length > MAX_TOPIC_LENGTH))
- throw new MqttClientException(MqttClientErrorCode.TopicLength);
-
- // check wrong QoS level (both bits can't be set 1)
- if (this.qosLevel > QOS_LEVEL_EXACTLY_ONCE)
- throw new MqttClientException(MqttClientErrorCode.QosNotAllowed);
-
- byte[] topicUtf8 = Encoding.UTF8.GetBytes(this.topic);
-
- // topic name
- varHeaderSize += topicUtf8.Length + 2;
-
- // message id is valid only with QOS level 1 or QOS level 2
- if ((this.qosLevel == QOS_LEVEL_AT_LEAST_ONCE) ||
- (this.qosLevel == QOS_LEVEL_EXACTLY_ONCE))
- {
- varHeaderSize += MESSAGE_ID_SIZE;
- }
-
- // check on message with zero length
- if (this.message != null)
- // message data
- payloadSize += this.message.Length;
-
- remainingLength += (varHeaderSize + payloadSize);
-
- // first byte of fixed header
- fixedHeaderSize = 1;
-
- int temp = remainingLength;
- // increase fixed header size based on remaining length
- // (each remaining length byte can encode until 128)
- do
- {
- fixedHeaderSize++;
- temp = temp / 128;
- } while (temp > 0);
-
- // allocate buffer for message
- buffer = new byte[fixedHeaderSize + varHeaderSize + payloadSize];
-
- // first fixed header byte
- buffer[index] = (byte)((MQTT_MSG_PUBLISH_TYPE << MSG_TYPE_OFFSET) |
- (this.qosLevel << QOS_LEVEL_OFFSET));
- buffer[index] |= this.dupFlag ? (byte)(1 << DUP_FLAG_OFFSET) : (byte)0x00;
- buffer[index] |= this.retain ? (byte)(1 << RETAIN_FLAG_OFFSET) : (byte)0x00;
- index++;
-
- // encode remaining length
- index = this.encodeRemainingLength(remainingLength, buffer, index);
-
- // topic name
- buffer[index++] = (byte)((topicUtf8.Length >> 8) & 0x00FF); // MSB
- buffer[index++] = (byte)(topicUtf8.Length & 0x00FF); // LSB
- Array.Copy(topicUtf8, 0, buffer, index, topicUtf8.Length);
- index += topicUtf8.Length;
-
- // message id is valid only with QOS level 1 or QOS level 2
- if ((this.qosLevel == QOS_LEVEL_AT_LEAST_ONCE) ||
- (this.qosLevel == QOS_LEVEL_EXACTLY_ONCE))
- {
- // check message identifier assigned
- if (this.messageId == 0)
- throw new MqttClientException(MqttClientErrorCode.WrongMessageId);
- buffer[index++] = (byte)((this.messageId >> 8) & 0x00FF); // MSB
- buffer[index++] = (byte)(this.messageId & 0x00FF); // LSB
- }
-
- // check on message with zero length
- if (this.message != null)
- {
- // message data
- Array.Copy(this.message, 0, buffer, index, this.message.Length);
- index += this.message.Length;
- }
-
- return buffer;
- }
-
- ///
- /// Parse bytes for a PUBLISH message
- ///
- /// First fixed header byte
- /// Protocol Version
- /// Channel connected to the broker
- /// PUBLISH message instance
- public static MqttMsgPublish Parse(byte fixedHeaderFirstByte, byte protocolVersion, IMqttNetworkChannel channel)
- {
- byte[] buffer;
- int index = 0;
- byte[] topicUtf8;
- int topicUtf8Length;
- MqttMsgPublish msg = new MqttMsgPublish();
-
- // get remaining length and allocate buffer
- int remainingLength = MqttMsgBase.decodeRemainingLength(channel);
- buffer = new byte[remainingLength];
-
- // read bytes from socket...
- int received = channel.Receive(buffer);
-
- // topic name
- topicUtf8Length = ((buffer[index++] << 8) & 0xFF00);
- topicUtf8Length |= buffer[index++];
- topicUtf8 = new byte[topicUtf8Length];
- Array.Copy(buffer, index, topicUtf8, 0, topicUtf8Length);
- index += topicUtf8Length;
- msg.topic = new String(Encoding.UTF8.GetChars(topicUtf8));
-
- // read QoS level from fixed header
- msg.qosLevel = (byte)((fixedHeaderFirstByte & QOS_LEVEL_MASK) >> QOS_LEVEL_OFFSET);
- // check wrong QoS level (both bits can't be set 1)
- if (msg.qosLevel > QOS_LEVEL_EXACTLY_ONCE)
- throw new MqttClientException(MqttClientErrorCode.QosNotAllowed);
- // read DUP flag from fixed header
- msg.dupFlag = (((fixedHeaderFirstByte & DUP_FLAG_MASK) >> DUP_FLAG_OFFSET) == 0x01);
- // read retain flag from fixed header
- msg.retain = (((fixedHeaderFirstByte & RETAIN_FLAG_MASK) >> RETAIN_FLAG_OFFSET) == 0x01);
-
- // message id is valid only with QOS level 1 or QOS level 2
- if ((msg.qosLevel == QOS_LEVEL_AT_LEAST_ONCE) ||
- (msg.qosLevel == QOS_LEVEL_EXACTLY_ONCE))
- {
- // message id
- msg.messageId = (ushort)((buffer[index++] << 8) & 0xFF00);
- msg.messageId |= (buffer[index++]);
- }
-
- // get payload with message data
- int messageSize = remainingLength - index;
- int remaining = messageSize;
- int messageOffset = 0;
- msg.message = new byte[messageSize];
-
- // BUG FIX 26/07/2013 : receiving large payload
-
- // copy first part of payload data received
- Array.Copy(buffer, index, msg.message, messageOffset, received - index);
- remaining -= (received - index);
- messageOffset += (received - index);
-
- // if payload isn't finished
- while (remaining > 0)
- {
- // receive other payload data
- received = channel.Receive(buffer);
- Array.Copy(buffer, 0, msg.message, messageOffset, received);
- remaining -= received;
- messageOffset += received;
- }
-
- return msg;
- }
-
- public override string ToString()
- {
+ public String Topic { get; set; }
+
+ ///
+ /// Message data
+ ///
+ public Byte[] Message { get; set; }
+
+ #endregion
+
+ ///
+ /// Constructor
+ ///
+ public MqttMsgPublish() {
+ this.type = MQTT_MSG_PUBLISH_TYPE;
+ }
+
+ ///
+ /// Constructor
+ ///
+ /// Message topic
+ /// Message data
+ public MqttMsgPublish(String topic, Byte[] message) : this(topic, message, false, QOS_LEVEL_AT_MOST_ONCE, false) {
+ }
+
+ ///
+ /// Constructor
+ ///
+ /// Message topic
+ /// Message data
+ /// Duplicate flag
+ /// Quality of Service level
+ /// Retain flag
+ public MqttMsgPublish(String topic, Byte[] message, Boolean dupFlag, Byte qosLevel, Boolean retain) : base() {
+ this.type = MQTT_MSG_PUBLISH_TYPE;
+ this.Topic = topic;
+ this.Message = message;
+ this.dupFlag = dupFlag;
+ this.qosLevel = qosLevel;
+ this.retain = retain;
+ this.messageId = 0;
+ }
+
+ public override Byte[] GetBytes(Byte protocolVersion) {
+ Int32 fixedHeaderSize = 0;
+ Int32 varHeaderSize = 0;
+ Int32 payloadSize = 0;
+ Int32 remainingLength = 0;
+ Byte[] buffer;
+ Int32 index = 0;
+
+ // topic can't contain wildcards
+ if ((this.Topic.IndexOf('#') != -1) || (this.Topic.IndexOf('+') != -1)) {
+ throw new MqttClientException(MqttClientErrorCode.TopicWildcard);
+ }
+
+ // check topic length
+ if ((this.Topic.Length < MIN_TOPIC_LENGTH) || (this.Topic.Length > MAX_TOPIC_LENGTH)) {
+ throw new MqttClientException(MqttClientErrorCode.TopicLength);
+ }
+
+ // check wrong QoS level (both bits can't be set 1)
+ if (this.qosLevel > QOS_LEVEL_EXACTLY_ONCE) {
+ throw new MqttClientException(MqttClientErrorCode.QosNotAllowed);
+ }
+
+ Byte[] topicUtf8 = Encoding.UTF8.GetBytes(this.Topic);
+
+ // topic name
+ varHeaderSize += topicUtf8.Length + 2;
+
+ // message id is valid only with QOS level 1 or QOS level 2
+ if ((this.qosLevel == QOS_LEVEL_AT_LEAST_ONCE) || (this.qosLevel == QOS_LEVEL_EXACTLY_ONCE)) {
+ varHeaderSize += MESSAGE_ID_SIZE;
+ }
+
+ // check on message with zero length
+ if (this.Message != null) {
+ // message data
+ payloadSize += this.Message.Length;
+ }
+
+ remainingLength += (varHeaderSize + payloadSize);
+
+ // first byte of fixed header
+ fixedHeaderSize = 1;
+
+ Int32 temp = remainingLength;
+ // increase fixed header size based on remaining length
+ // (each remaining length byte can encode until 128)
+ do {
+ fixedHeaderSize++;
+ temp = temp / 128;
+ } while (temp > 0);
+
+ // allocate buffer for message
+ buffer = new Byte[fixedHeaderSize + varHeaderSize + payloadSize];
+
+ // first fixed header byte
+ buffer[index] = (Byte)((MQTT_MSG_PUBLISH_TYPE << MSG_TYPE_OFFSET) | (this.qosLevel << QOS_LEVEL_OFFSET));
+ buffer[index] |= this.dupFlag ? (Byte)(1 << DUP_FLAG_OFFSET) : (Byte)0x00;
+ buffer[index] |= this.retain ? (Byte)(1 << RETAIN_FLAG_OFFSET) : (Byte)0x00;
+ index++;
+
+ // encode remaining length
+ index = this.encodeRemainingLength(remainingLength, buffer, index);
+
+ // topic name
+ buffer[index++] = (Byte)((topicUtf8.Length >> 8) & 0x00FF); // MSB
+ buffer[index++] = (Byte)(topicUtf8.Length & 0x00FF); // LSB
+ Array.Copy(topicUtf8, 0, buffer, index, topicUtf8.Length);
+ index += topicUtf8.Length;
+
+ // message id is valid only with QOS level 1 or QOS level 2
+ if ((this.qosLevel == QOS_LEVEL_AT_LEAST_ONCE) ||
+ (this.qosLevel == QOS_LEVEL_EXACTLY_ONCE)) {
+ // check message identifier assigned
+ if (this.messageId == 0) {
+ throw new MqttClientException(MqttClientErrorCode.WrongMessageId);
+ }
+
+ buffer[index++] = (Byte)((this.messageId >> 8) & 0x00FF); // MSB
+ buffer[index++] = (Byte)(this.messageId & 0x00FF); // LSB
+ }
+
+ // check on message with zero length
+ if (this.Message != null) {
+ // message data
+ Array.Copy(this.Message, 0, buffer, index, this.Message.Length);
+ index += this.Message.Length;
+ }
+
+ return buffer;
+ }
+
+ ///
+ /// Parse bytes for a PUBLISH message
+ ///
+ /// First fixed header byte
+ /// Protocol Version
+ /// Channel connected to the broker
+ /// PUBLISH message instance
+ public static MqttMsgPublish Parse(Byte fixedHeaderFirstByte, Byte protocolVersion, IMqttNetworkChannel channel) {
+ Byte[] buffer;
+ Int32 index = 0;
+ Byte[] topicUtf8;
+ Int32 topicUtf8Length;
+ MqttMsgPublish msg = new MqttMsgPublish();
+
+ // get remaining length and allocate buffer
+ Int32 remainingLength = decodeRemainingLength(channel);
+ buffer = new Byte[remainingLength];
+
+ // read bytes from socket...
+ Int32 received = channel.Receive(buffer);
+
+ // topic name
+ topicUtf8Length = ((buffer[index++] << 8) & 0xFF00);
+ topicUtf8Length |= buffer[index++];
+ topicUtf8 = new Byte[topicUtf8Length];
+ Array.Copy(buffer, index, topicUtf8, 0, topicUtf8Length);
+ index += topicUtf8Length;
+ msg.Topic = new String(Encoding.UTF8.GetChars(topicUtf8));
+
+ // read QoS level from fixed header
+ msg.qosLevel = (Byte)((fixedHeaderFirstByte & QOS_LEVEL_MASK) >> QOS_LEVEL_OFFSET);
+ // check wrong QoS level (both bits can't be set 1)
+ if (msg.qosLevel > QOS_LEVEL_EXACTLY_ONCE) {
+ throw new MqttClientException(MqttClientErrorCode.QosNotAllowed);
+ }
+ // read DUP flag from fixed header
+ msg.dupFlag = (((fixedHeaderFirstByte & DUP_FLAG_MASK) >> DUP_FLAG_OFFSET) == 0x01);
+ // read retain flag from fixed header
+ msg.retain = (((fixedHeaderFirstByte & RETAIN_FLAG_MASK) >> RETAIN_FLAG_OFFSET) == 0x01);
+
+ // message id is valid only with QOS level 1 or QOS level 2
+ if ((msg.qosLevel == QOS_LEVEL_AT_LEAST_ONCE) ||
+ (msg.qosLevel == QOS_LEVEL_EXACTLY_ONCE)) {
+ // message id
+ msg.messageId = (UInt16)((buffer[index++] << 8) & 0xFF00);
+ msg.messageId |= (buffer[index++]);
+ }
+
+ // get payload with message data
+ Int32 messageSize = remainingLength - index;
+ Int32 remaining = messageSize;
+ Int32 messageOffset = 0;
+ msg.Message = new Byte[messageSize];
+
+ // BUG FIX 26/07/2013 : receiving large payload
+
+ // copy first part of payload data received
+ Array.Copy(buffer, index, msg.Message, messageOffset, received - index);
+ remaining -= (received - index);
+ messageOffset += (received - index);
+
+ // if payload isn't finished
+ while (remaining > 0) {
+ // receive other payload data
+ received = channel.Receive(buffer);
+ Array.Copy(buffer, 0, msg.Message, messageOffset, received);
+ remaining -= received;
+ messageOffset += received;
+ }
+
+ return msg;
+ }
+
+ public override String ToString() {
#if TRACE
- return this.GetTraceString(
- "PUBLISH",
- new object[] { "messageId", "topic", "message" },
- new object[] { this.messageId, this.topic, this.message });
+ return this.GetTraceString(
+ "PUBLISH",
+ new Object[] { "messageId", "topic", "message" },
+ new Object[] { this.messageId, this.Topic, this.Message });
#else
return base.ToString();
#endif
- }
- }
+ }
+ }
}