RaspberryIO/Unosquare.Swan/Networking/Connection.cs

867 lines
31 KiB
C#
Raw Normal View History

2019-12-03 18:44:25 +01:00
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Security;
using System.Net.Sockets;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Unosquare.Swan.Networking {
/// <summary>
/// Represents a network connection either on the server or on the client. It wraps a TcpClient
/// and its corresponding network streams. It is capable of working in 2 modes. Typically on the server side
/// you will need to enable continuous reading and events. On the client side you may want to disable continuous reading
/// and use the Read methods available. In continuous reading mode Read methods are not available and will throw
/// an invalid operation exceptions if they are used.
/// Continuous Reading Mode: Subscribe to data reception events, it runs a background thread, don't use Read methods
/// Manual Reading Mode: Data reception events are NEVER fired. No background threads are used. Use Read methods to receive data.
/// </summary>
/// <seealso cref="System.IDisposable" />
/// <example>
/// The following code explains how to create a TCP server.
/// <code>
/// using System.Text;
/// using Unosquare.Swan.Networking;
///
/// class Example
/// {
/// static void Main()
/// {
/// // create a new connection listener on a specific port
/// var connectionListener = new ConnectionListener(1337);
///
/// // handle the OnConnectionAccepting event
/// connectionListener.OnConnectionAccepted += (s, e) =>
/// {
/// // create a new connection
/// using (var con = new Connection(e.Client))
/// {
/// con.WriteLineAsync("Hello world!").Wait();
/// }
/// };
///
/// connectionListener.Start();
/// }
/// }
/// </code>
/// The following code describes how to create a TCP client.
/// <code>
/// using System.Net.Sockets;
/// using System.Text;
/// using System.Threading.Tasks;
/// using Unosquare.Swan.Networking;
///
/// class Example
/// {
/// static async Task Main()
/// {
/// // create a new TcpClient object
/// var client = new TcpClient();
///
/// // connect to a specific address and port
/// client.Connect("localhost", 1337);
///
/// //create a new connection with specific encoding,
/// //new line sequence and continuous reading disabled
/// using (var cn = new Connection(client, Encoding.UTF8, "\r\n", true, 0))
/// {
/// var response = await cn.ReadTextAsync();
/// }
/// }
/// }
/// </code>
/// </example>
public sealed class Connection : IDisposable {
// New Line definitions for reading. This applies to both, events and read methods
private readonly String _newLineSequence;
private readonly Byte[] _newLineSequenceBytes;
private readonly Char[] _newLineSequenceChars;
private readonly String[] _newLineSequenceLineSplitter;
private readonly Byte[] _receiveBuffer;
private readonly TimeSpan _continuousReadingInterval = TimeSpan.FromMilliseconds(5);
private readonly Queue<String> _readLineBuffer = new Queue<String>();
private readonly ManualResetEvent _writeDone = new ManualResetEvent(true);
// Disconnect and Dispose
private Boolean _hasDisposed;
private Int32 _disconnectCalls;
// Continuous Reading
private Thread _continuousReadingThread;
private Int32 _receiveBufferPointer;
// Reading and writing
private Task<Int32> _readTask;
/// <summary>
/// Initializes a new instance of the <see cref="Connection"/> class.
/// </summary>
/// <param name="client">The client.</param>
/// <param name="textEncoding">The text encoding.</param>
/// <param name="newLineSequence">The new line sequence used for read and write operations.</param>
/// <param name="disableContinuousReading">if set to <c>true</c> [disable continuous reading].</param>
/// <param name="blockSize">Size of the block. -- set to 0 or less to disable.</param>
public Connection(
TcpClient client,
Encoding textEncoding,
String newLineSequence,
Boolean disableContinuousReading,
Int32 blockSize) {
// Setup basic properties
this.Id = Guid.NewGuid();
this.TextEncoding = textEncoding;
// Setup new line sequence
if(String.IsNullOrEmpty(newLineSequence)) {
throw new ArgumentException("Argument cannot be null", nameof(newLineSequence));
}
this._newLineSequence = newLineSequence;
this._newLineSequenceBytes = this.TextEncoding.GetBytes(this._newLineSequence);
this._newLineSequenceChars = this._newLineSequence.ToCharArray();
this._newLineSequenceLineSplitter = new[] { this._newLineSequence };
// Setup Connection timers
this.ConnectionStartTimeUtc = DateTime.UtcNow;
this.DataReceivedLastTimeUtc = this.ConnectionStartTimeUtc;
this.DataSentLastTimeUtc = this.ConnectionStartTimeUtc;
// Setup connection properties
this.RemoteClient = client;
this.LocalEndPoint = client.Client.LocalEndPoint as IPEndPoint;
this.NetworkStream = this.RemoteClient.GetStream();
this.RemoteEndPoint = this.RemoteClient.Client.RemoteEndPoint as IPEndPoint;
// Setup buffers
this._receiveBuffer = new Byte[this.RemoteClient.ReceiveBufferSize * 2];
this.ProtocolBlockSize = blockSize;
this._receiveBufferPointer = 0;
// Setup continuous reading mode if enabled
if(disableContinuousReading) {
return;
}
2019-02-17 14:08:57 +01:00
#if NETSTANDARD1_3
ThreadPool.QueueUserWorkItem(PerformContinuousReading, this);
#else
2019-12-03 18:44:25 +01:00
ThreadPool.GetAvailableThreads(out Int32 availableWorkerThreads, out _);
ThreadPool.GetMaxThreads(out Int32 maxWorkerThreads, out Int32 _);
Int32 activeThreadPoolTreads = maxWorkerThreads - availableWorkerThreads;
if(activeThreadPoolTreads < Environment.ProcessorCount / 4) {
ThreadPool.QueueUserWorkItem(this.PerformContinuousReading, this);
} else {
new Thread(this.PerformContinuousReading) { IsBackground = true }.Start();
}
2019-02-17 14:08:57 +01:00
#endif
2019-12-03 18:44:25 +01:00
}
/// <summary>
/// Initializes a new instance of the <see cref="Connection"/> class in continuous reading mode.
/// It uses UTF8 encoding, CRLF as a new line sequence and disables a protocol block size.
/// </summary>
/// <param name="client">The client.</param>
public Connection(TcpClient client)
: this(client, Encoding.UTF8, "\r\n", false, 0) {
// placeholder
}
/// <summary>
/// Initializes a new instance of the <see cref="Connection"/> class in continuous reading mode.
/// It uses UTF8 encoding, disables line sequences, and uses a protocol block size instead.
/// </summary>
/// <param name="client">The client.</param>
/// <param name="blockSize">Size of the block.</param>
public Connection(TcpClient client, Int32 blockSize)
: this(client, Encoding.UTF8, new String('\n', blockSize + 1), false, blockSize) {
// placeholder
}
#region Events
/// <summary>
/// Occurs when the receive buffer has encounters a new line sequence, the buffer is flushed or the buffer is full.
/// </summary>
public event EventHandler<ConnectionDataReceivedEventArgs> DataReceived = (s, e) => { };
/// <summary>
/// Occurs when an error occurs while upgrading, sending, or receiving data in this client
/// </summary>
public event EventHandler<ConnectionFailureEventArgs> ConnectionFailure = (s, e) => { };
/// <summary>
/// Occurs when a client is disconnected
/// </summary>
public event EventHandler ClientDisconnected = (s, e) => { };
#endregion
#region Properties
/// <summary>
/// Gets the unique identifier of this connection.
/// This field is filled out upon instantiation of this class.
/// </summary>
/// <value>
/// The identifier.
/// </value>
public Guid Id {
get;
}
/// <summary>
/// Gets the active stream. Returns an SSL stream if the connection is secure, otherwise returns
/// the underlying NetworkStream.
/// </summary>
/// <value>
/// The active stream.
/// </value>
public Stream ActiveStream => this.SecureStream ?? this.NetworkStream as Stream;
/// <summary>
/// Gets a value indicating whether the current connection stream is an SSL stream.
/// </summary>
/// <value>
/// <c>true</c> if this instance is active stream secure; otherwise, <c>false</c>.
/// </value>
public Boolean IsActiveStreamSecure => this.SecureStream != null;
/// <summary>
/// Gets the text encoding for send and receive operations.
/// </summary>
/// <value>
/// The text encoding.
/// </value>
public Encoding TextEncoding {
get;
}
/// <summary>
/// Gets the remote end point of this TCP connection.
/// </summary>
/// <value>
/// The remote end point.
/// </value>
public IPEndPoint RemoteEndPoint {
get;
}
/// <summary>
/// Gets the local end point of this TCP connection.
/// </summary>
/// <value>
/// The local end point.
/// </value>
public IPEndPoint LocalEndPoint {
get;
}
/// <summary>
/// Gets the remote client of this TCP connection.
/// </summary>
/// <value>
/// The remote client.
/// </value>
public TcpClient RemoteClient {
get; private set;
}
/// <summary>
/// When in continuous reading mode, and if set to greater than 0,
/// a Data reception event will be fired whenever the amount of bytes
/// determined by this property has been received. Useful for fixed-length message protocols.
/// </summary>
/// <value>
/// The size of the protocol block.
/// </value>
public Int32 ProtocolBlockSize {
get;
}
/// <summary>
/// Gets a value indicating whether this connection is in continuous reading mode.
/// Remark: Whenever a disconnect event occurs, the background thread is terminated
/// and this property will return false whenever the reading thread is not active.
/// Therefore, even if continuous reading was not disabled in the constructor, this property
/// might return false.
/// </summary>
/// <value>
/// <c>true</c> if this instance is continuous reading enabled; otherwise, <c>false</c>.
/// </value>
public Boolean IsContinuousReadingEnabled => this._continuousReadingThread != null;
/// <summary>
/// Gets the start time at which the connection was started in UTC.
/// </summary>
/// <value>
/// The connection start time UTC.
/// </value>
public DateTime ConnectionStartTimeUtc {
get;
}
/// <summary>
/// Gets the start time at which the connection was started in local time.
/// </summary>
/// <value>
/// The connection start time.
/// </value>
public DateTime ConnectionStartTime => this.ConnectionStartTimeUtc.ToLocalTime();
/// <summary>
/// Gets the duration of the connection.
/// </summary>
/// <value>
/// The duration of the connection.
/// </value>
public TimeSpan ConnectionDuration => DateTime.UtcNow.Subtract(this.ConnectionStartTimeUtc);
/// <summary>
/// Gets the last time data was received at in UTC.
/// </summary>
/// <value>
/// The data received last time UTC.
/// </value>
public DateTime DataReceivedLastTimeUtc {
get; private set;
}
/// <summary>
/// Gets how long has elapsed since data was last received.
/// </summary>
public TimeSpan DataReceivedIdleDuration => DateTime.UtcNow.Subtract(this.DataReceivedLastTimeUtc);
/// <summary>
/// Gets the last time at which data was sent in UTC.
/// </summary>
/// <value>
/// The data sent last time UTC.
/// </value>
public DateTime DataSentLastTimeUtc {
get; private set;
}
/// <summary>
/// Gets how long has elapsed since data was last sent.
/// </summary>
/// <value>
/// The duration of the data sent idle.
/// </value>
public TimeSpan DataSentIdleDuration => DateTime.UtcNow.Subtract(this.DataSentLastTimeUtc);
/// <summary>
/// Gets a value indicating whether this connection is connected.
/// Remarks: This property polls the socket internally and checks if it is available to read data from it.
/// If disconnect has been called, then this property will return false.
/// </summary>
/// <value>
/// <c>true</c> if this instance is connected; otherwise, <c>false</c>.
/// </value>
public Boolean IsConnected {
get {
if(this._disconnectCalls > 0) {
return false;
}
try {
Socket socket = this.RemoteClient.Client;
Boolean pollResult = !(socket.Poll(1000, SelectMode.SelectRead)
&& this.NetworkStream.DataAvailable == false || !socket.Connected);
if(pollResult == false) {
this.Disconnect();
}
return pollResult;
} catch {
this.Disconnect();
return false;
}
}
}
private NetworkStream NetworkStream {
get; set;
}
private SslStream SecureStream {
get; set;
}
#endregion
#region Read Methods
/// <summary>
/// Reads data from the remote client asynchronously and with the given timeout.
/// </summary>
/// <param name="timeout">The timeout.</param>
/// <param name="ct">The cancellation token.</param>
/// <returns>A byte array containing the results of encoding the specified set of characters.</returns>
/// <exception cref="InvalidOperationException">Read methods have been disabled because continuous reading is enabled.</exception>
/// <exception cref="TimeoutException">Reading data from {ActiveStream} timed out in {timeout.TotalMilliseconds} m.</exception>
public async Task<Byte[]> ReadDataAsync(TimeSpan timeout, CancellationToken ct = default) {
if(this.IsContinuousReadingEnabled) {
throw new InvalidOperationException(
"Read methods have been disabled because continuous reading is enabled.");
}
if(this.RemoteClient == null) {
throw new InvalidOperationException("An open connection is required");
}
Byte[] receiveBuffer = new Byte[this.RemoteClient.ReceiveBufferSize * 2];
List<Byte> receiveBuilder = new List<Byte>(receiveBuffer.Length);
try {
DateTime startTime = DateTime.UtcNow;
while(receiveBuilder.Count <= 0) {
if(DateTime.UtcNow.Subtract(startTime) >= timeout) {
throw new TimeoutException(
$"Reading data from {this.ActiveStream} timed out in {timeout.TotalMilliseconds} ms");
}
if(this._readTask == null) {
this._readTask = this.ActiveStream.ReadAsync(receiveBuffer, 0, receiveBuffer.Length, ct);
}
if(this._readTask.Wait(this._continuousReadingInterval)) {
Int32 bytesReceivedCount = this._readTask.Result;
if(bytesReceivedCount > 0) {
this.DataReceivedLastTimeUtc = DateTime.UtcNow;
Byte[] buffer = new Byte[bytesReceivedCount];
Array.Copy(receiveBuffer, 0, buffer, 0, bytesReceivedCount);
receiveBuilder.AddRange(buffer);
}
this._readTask = null;
} else {
await Task.Delay(this._continuousReadingInterval, ct).ConfigureAwait(false);
}
}
} catch(Exception ex) {
ex.Error(typeof(Connection).FullName, "Error while reading network stream data asynchronously.");
throw;
}
return receiveBuilder.ToArray();
}
/// <summary>
/// Reads data asynchronously from the remote stream with a 5000 millisecond timeout.
/// </summary>
/// <param name="ct">The cancellation token.</param>
/// <returns>A byte array containing the results the specified sequence of bytes.</returns>
public Task<Byte[]> ReadDataAsync(CancellationToken ct = default)
=> this.ReadDataAsync(TimeSpan.FromSeconds(5), ct);
/// <summary>
/// Asynchronously reads data as text with the given timeout.
/// </summary>
/// <param name="timeout">The timeout.</param>
/// <param name="ct">The cancellation token.</param>
/// <returns>A <see cref="System.String" /> that contains the results of decoding the specified sequence of bytes.</returns>
public async Task<String> ReadTextAsync(TimeSpan timeout, CancellationToken ct = default) {
Byte[] buffer = await this.ReadDataAsync(timeout, ct).ConfigureAwait(false);
return buffer == null ? null : this.TextEncoding.GetString(buffer);
}
/// <summary>
/// Asynchronously reads data as text with a 5000 millisecond timeout.
/// </summary>
/// <param name="ct">The cancellation token.</param>
/// <returns>When this method completes successfully, it returns the contents of the file as a text string.</returns>
public Task<String> ReadTextAsync(CancellationToken ct = default)
=> this.ReadTextAsync(TimeSpan.FromSeconds(5), ct);
/// <summary>
/// Performs the same task as this method's overload but it defaults to a read timeout of 30 seconds.
/// </summary>
/// <param name="ct">The cancellation token.</param>
/// <returns>
/// A task that represents the asynchronous read operation. The value of the TResult parameter
/// contains the next line from the stream, or is null if all the characters have been read.
/// </returns>
public Task<String> ReadLineAsync(CancellationToken ct = default)
=> this.ReadLineAsync(TimeSpan.FromSeconds(30), ct);
/// <summary>
/// Reads the next available line of text in queue. Return null when no text is read.
/// This method differs from the rest of the read methods because it keeps an internal
/// queue of lines that are read from the stream and only returns the one line next in the queue.
/// It is only recommended to use this method when you are working with text-based protocols
/// and the rest of the read methods are not called.
/// </summary>
/// <param name="timeout">The timeout.</param>
/// <param name="ct">The cancellation token.</param>
/// <returns>A task with a string line from the queue.</returns>
/// <exception cref="InvalidOperationException">Read methods have been disabled because continuous reading is enabled.</exception>
public async Task<String> ReadLineAsync(TimeSpan timeout, CancellationToken ct = default) {
if(this.IsContinuousReadingEnabled) {
throw new InvalidOperationException(
"Read methods have been disabled because continuous reading is enabled.");
}
if(this._readLineBuffer.Count > 0) {
return this._readLineBuffer.Dequeue();
}
StringBuilder builder = new StringBuilder();
while(true) {
String text = await this.ReadTextAsync(timeout, ct).ConfigureAwait(false);
if(text.Length == 0) {
break;
}
_ = builder.Append(text);
if(text.EndsWith(this._newLineSequence) == false) {
continue;
}
String[] lines = builder.ToString().TrimEnd(this._newLineSequenceChars)
.Split(this._newLineSequenceLineSplitter, StringSplitOptions.None);
foreach(String item in lines) {
this._readLineBuffer.Enqueue(item);
}
break;
}
return this._readLineBuffer.Count > 0 ? this._readLineBuffer.Dequeue() : null;
}
#endregion
#region Write Methods
/// <summary>
/// Writes data asynchronously.
/// </summary>
/// <param name="buffer">The buffer.</param>
/// <param name="forceFlush">if set to <c>true</c> [force flush].</param>
/// <param name="ct">The cancellation token.</param>
/// <returns>A task that represents the asynchronous write operation.</returns>
public async Task WriteDataAsync(Byte[] buffer, Boolean forceFlush, CancellationToken ct = default) {
try {
_ = this._writeDone.WaitOne();
_ = this._writeDone.Reset();
await this.ActiveStream.WriteAsync(buffer, 0, buffer.Length, ct).ConfigureAwait(false);
if(forceFlush) {
await this.ActiveStream.FlushAsync(ct).ConfigureAwait(false);
}
this.DataSentLastTimeUtc = DateTime.UtcNow;
} finally {
_ = this._writeDone.Set();
}
}
/// <summary>
/// Writes text asynchronously.
/// </summary>
/// <param name="text">The text.</param>
/// <param name="ct">The cancellation token.</param>
/// <returns>A task that represents the asynchronous write operation.</returns>
public Task WriteTextAsync(String text, CancellationToken ct = default)
=> this.WriteTextAsync(text, this.TextEncoding, ct);
/// <summary>
/// Writes text asynchronously.
/// </summary>
/// <param name="text">The text.</param>
/// <param name="encoding">The encoding.</param>
/// <param name="ct">The cancellation token.</param>
/// <returns>A task that represents the asynchronous write operation.</returns>
public Task WriteTextAsync(String text, Encoding encoding, CancellationToken ct = default)
=> this.WriteDataAsync(encoding.GetBytes(text), true, ct);
/// <summary>
/// Writes a line of text asynchronously.
/// The new line sequence is added automatically at the end of the line.
/// </summary>
/// <param name="line">The line.</param>
/// <param name="encoding">The encoding.</param>
/// <param name="ct">The cancellation token.</param>
/// <returns>A task that represents the asynchronous write operation.</returns>
public Task WriteLineAsync(String line, Encoding encoding, CancellationToken ct = default)
=> this.WriteDataAsync(encoding.GetBytes($"{line}{this._newLineSequence}"), true, ct);
/// <summary>
/// Writes a line of text asynchronously.
/// The new line sequence is added automatically at the end of the line.
/// </summary>
/// <param name="line">The line.</param>
/// <param name="ct">The cancellation token.</param>
/// <returns>A task that represents the asynchronous write operation.</returns>
public Task WriteLineAsync(String line, CancellationToken ct = default)
=> this.WriteLineAsync(line, this.TextEncoding, ct);
#endregion
#region Socket Methods
/// <summary>
/// Upgrades the active stream to an SSL stream if this connection object is hosted in the server.
/// </summary>
/// <param name="serverCertificate">The server certificate.</param>
/// <returns><c>true</c> if the object is hosted in the server; otherwise, <c>false</c>.</returns>
public async Task<Boolean> UpgradeToSecureAsServerAsync(X509Certificate2 serverCertificate) {
if(this.IsActiveStreamSecure) {
return true;
}
_ = this._writeDone.WaitOne();
SslStream secureStream = null;
try {
secureStream = new SslStream(this.NetworkStream, true);
await secureStream.AuthenticateAsServerAsync(serverCertificate).ConfigureAwait(false);
this.SecureStream = secureStream;
return true;
} catch(Exception ex) {
ConnectionFailure(this, new ConnectionFailureEventArgs(ex));
secureStream?.Dispose();
return false;
}
}
/// <summary>
/// Upgrades the active stream to an SSL stream if this connection object is hosted in the client.
/// </summary>
/// <param name="hostname">The hostname.</param>
/// <param name="callback">The callback.</param>
/// <returns>A tasks with <c>true</c> if the upgrade to SSL was successful; otherwise, <c>false</c>.</returns>
public async Task<Boolean> UpgradeToSecureAsClientAsync(
String hostname = null,
RemoteCertificateValidationCallback callback = null) {
if(this.IsActiveStreamSecure) {
return true;
}
SslStream secureStream = callback == null
? new SslStream(this.NetworkStream, true)
: new SslStream(this.NetworkStream, true, callback);
try {
await secureStream.AuthenticateAsClientAsync(hostname ?? Network.HostName.ToLowerInvariant()).ConfigureAwait(false);
this.SecureStream = secureStream;
} catch(Exception ex) {
secureStream.Dispose();
ConnectionFailure(this, new ConnectionFailureEventArgs(ex));
return false;
}
return true;
}
/// <summary>
/// Disconnects this connection.
/// </summary>
public void Disconnect() {
if(this._disconnectCalls > 0) {
return;
}
this._disconnectCalls++;
this._writeDone.WaitOne();
try {
ClientDisconnected(this, EventArgs.Empty);
} catch {
// ignore
}
try {
2019-02-17 14:08:57 +01:00
#if !NET452
RemoteClient.Dispose();
SecureStream?.Dispose();
NetworkStream?.Dispose();
#else
2019-12-03 18:44:25 +01:00
this.RemoteClient.Close();
this.SecureStream?.Close();
this.NetworkStream?.Close();
2019-02-17 14:08:57 +01:00
#endif
2019-12-03 18:44:25 +01:00
} catch {
// ignored
} finally {
this.NetworkStream = null;
this.SecureStream = null;
this.RemoteClient = null;
this._continuousReadingThread = null;
}
}
#endregion
#region Dispose
/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
public void Dispose() {
if(this._hasDisposed) {
return;
}
// Release managed resources
this.Disconnect();
this._continuousReadingThread = null;
this._writeDone.Dispose();
this._hasDisposed = true;
}
#endregion
#region Continuous Read Methods
/// <summary>
/// Raises the receive buffer events.
/// </summary>
/// <param name="receivedData">The received data.</param>
/// <exception cref="Exception">Split function failed! This is terribly wrong.</exception>
private void RaiseReceiveBufferEvents(Byte[] receivedData) {
Boolean moreAvailable = this.RemoteClient.Available > 0;
foreach(Byte data in receivedData) {
this.ProcessReceivedBlock(data, moreAvailable);
}
// Check if we are left with some more stuff to handle
if(this._receiveBufferPointer <= 0) {
return;
}
// Extract the segments split by newline terminated bytes
List<Byte[]> sequences = this._receiveBuffer.Skip(0).Take(this._receiveBufferPointer).ToArray()
.Split(0, this._newLineSequenceBytes);
// Something really wrong happened
if(sequences.Count == 0) {
throw new InvalidOperationException("Split function failed! This is terribly wrong!");
}
// We only have one sequence and it is not newline-terminated
// we don't have to do anything.
if(sequences.Count == 1 && sequences[0].EndsWith(this._newLineSequenceBytes) == false) {
return;
}
// Process the events for each sequence
for(Int32 i = 0; i < sequences.Count; i++) {
Byte[] sequenceBytes = sequences[i];
Boolean isNewLineTerminated = sequences[i].EndsWith(this._newLineSequenceBytes);
Boolean isLast = i == sequences.Count - 1;
if(isNewLineTerminated) {
ConnectionDataReceivedEventArgs eventArgs = new ConnectionDataReceivedEventArgs(
sequenceBytes,
ConnectionDataReceivedTrigger.NewLineSequenceEncountered,
isLast == false);
DataReceived(this, eventArgs);
}
// Depending on the last segment determine what to do with the receive buffer
if(!isLast) {
continue;
}
if(isNewLineTerminated) {
// Simply reset the buffer pointer if the last segment was also terminated
this._receiveBufferPointer = 0;
} else {
// If we have not received the termination sequence, then just shift the receive buffer to the left
// and adjust the pointer
Array.Copy(sequenceBytes, this._receiveBuffer, sequenceBytes.Length);
this._receiveBufferPointer = sequenceBytes.Length;
}
}
}
private void ProcessReceivedBlock(Byte data, Boolean moreAvailable) {
this._receiveBuffer[this._receiveBufferPointer] = data;
this._receiveBufferPointer++;
// Block size reached
if(this.ProtocolBlockSize > 0 && this._receiveBufferPointer >= this.ProtocolBlockSize) {
this.SendBuffer(moreAvailable, ConnectionDataReceivedTrigger.BlockSizeReached);
return;
}
// The receive buffer is full. Time to flush
if(this._receiveBufferPointer >= this._receiveBuffer.Length) {
this.SendBuffer(moreAvailable, ConnectionDataReceivedTrigger.BufferFull);
}
}
private void SendBuffer(Boolean moreAvailable, ConnectionDataReceivedTrigger trigger) {
Byte[] eventBuffer = new Byte[this._receiveBuffer.Length];
Array.Copy(this._receiveBuffer, eventBuffer, eventBuffer.Length);
DataReceived(this,
new ConnectionDataReceivedEventArgs(
eventBuffer,
trigger,
moreAvailable));
this._receiveBufferPointer = 0;
}
private void PerformContinuousReading(Object threadContext) {
this._continuousReadingThread = Thread.CurrentThread;
// Check if the RemoteClient is still there
if(this.RemoteClient == null) {
return;
}
Byte[] receiveBuffer = new Byte[this.RemoteClient.ReceiveBufferSize * 2];
while(this.IsConnected && this._disconnectCalls <= 0) {
Boolean doThreadSleep = false;
try {
if(this._readTask == null) {
this._readTask = this.ActiveStream.ReadAsync(receiveBuffer, 0, receiveBuffer.Length);
}
if(this._readTask.Wait(this._continuousReadingInterval)) {
Int32 bytesReceivedCount = this._readTask.Result;
if(bytesReceivedCount > 0) {
this.DataReceivedLastTimeUtc = DateTime.UtcNow;
Byte[] buffer = new Byte[bytesReceivedCount];
Array.Copy(receiveBuffer, 0, buffer, 0, bytesReceivedCount);
this.RaiseReceiveBufferEvents(buffer);
}
this._readTask = null;
} else {
doThreadSleep = this._disconnectCalls <= 0;
}
} catch(Exception ex) {
ex.Log(nameof(Connection), "Continuous Read operation errored");
} finally {
if(doThreadSleep) {
Thread.Sleep(this._continuousReadingInterval);
}
}
}
}
#endregion
}
2019-02-17 14:08:57 +01:00
}