867 lines
31 KiB
C#
867 lines
31 KiB
C#
using System;
|
|
using System.Collections.Generic;
|
|
using System.IO;
|
|
using System.Linq;
|
|
using System.Net;
|
|
using System.Net.Security;
|
|
using System.Net.Sockets;
|
|
using System.Security.Cryptography.X509Certificates;
|
|
using System.Text;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
|
|
namespace Unosquare.Swan.Networking {
|
|
/// <summary>
|
|
/// Represents a network connection either on the server or on the client. It wraps a TcpClient
|
|
/// and its corresponding network streams. It is capable of working in 2 modes. Typically on the server side
|
|
/// you will need to enable continuous reading and events. On the client side you may want to disable continuous reading
|
|
/// and use the Read methods available. In continuous reading mode Read methods are not available and will throw
|
|
/// an invalid operation exceptions if they are used.
|
|
/// Continuous Reading Mode: Subscribe to data reception events, it runs a background thread, don't use Read methods
|
|
/// Manual Reading Mode: Data reception events are NEVER fired. No background threads are used. Use Read methods to receive data.
|
|
/// </summary>
|
|
/// <seealso cref="System.IDisposable" />
|
|
/// <example>
|
|
/// The following code explains how to create a TCP server.
|
|
/// <code>
|
|
/// using System.Text;
|
|
/// using Unosquare.Swan.Networking;
|
|
///
|
|
/// class Example
|
|
/// {
|
|
/// static void Main()
|
|
/// {
|
|
/// // create a new connection listener on a specific port
|
|
/// var connectionListener = new ConnectionListener(1337);
|
|
///
|
|
/// // handle the OnConnectionAccepting event
|
|
/// connectionListener.OnConnectionAccepted += (s, e) =>
|
|
/// {
|
|
/// // create a new connection
|
|
/// using (var con = new Connection(e.Client))
|
|
/// {
|
|
/// con.WriteLineAsync("Hello world!").Wait();
|
|
/// }
|
|
/// };
|
|
///
|
|
/// connectionListener.Start();
|
|
/// }
|
|
/// }
|
|
/// </code>
|
|
/// The following code describes how to create a TCP client.
|
|
/// <code>
|
|
/// using System.Net.Sockets;
|
|
/// using System.Text;
|
|
/// using System.Threading.Tasks;
|
|
/// using Unosquare.Swan.Networking;
|
|
///
|
|
/// class Example
|
|
/// {
|
|
/// static async Task Main()
|
|
/// {
|
|
/// // create a new TcpClient object
|
|
/// var client = new TcpClient();
|
|
///
|
|
/// // connect to a specific address and port
|
|
/// client.Connect("localhost", 1337);
|
|
///
|
|
/// //create a new connection with specific encoding,
|
|
/// //new line sequence and continuous reading disabled
|
|
/// using (var cn = new Connection(client, Encoding.UTF8, "\r\n", true, 0))
|
|
/// {
|
|
/// var response = await cn.ReadTextAsync();
|
|
/// }
|
|
/// }
|
|
/// }
|
|
/// </code>
|
|
/// </example>
|
|
public sealed class Connection : IDisposable {
|
|
// New Line definitions for reading. This applies to both, events and read methods
|
|
private readonly String _newLineSequence;
|
|
|
|
private readonly Byte[] _newLineSequenceBytes;
|
|
private readonly Char[] _newLineSequenceChars;
|
|
private readonly String[] _newLineSequenceLineSplitter;
|
|
private readonly Byte[] _receiveBuffer;
|
|
private readonly TimeSpan _continuousReadingInterval = TimeSpan.FromMilliseconds(5);
|
|
private readonly Queue<String> _readLineBuffer = new Queue<String>();
|
|
private readonly ManualResetEvent _writeDone = new ManualResetEvent(true);
|
|
|
|
// Disconnect and Dispose
|
|
private Boolean _hasDisposed;
|
|
|
|
private Int32 _disconnectCalls;
|
|
|
|
// Continuous Reading
|
|
private Thread _continuousReadingThread;
|
|
|
|
private Int32 _receiveBufferPointer;
|
|
|
|
// Reading and writing
|
|
private Task<Int32> _readTask;
|
|
|
|
/// <summary>
|
|
/// Initializes a new instance of the <see cref="Connection"/> class.
|
|
/// </summary>
|
|
/// <param name="client">The client.</param>
|
|
/// <param name="textEncoding">The text encoding.</param>
|
|
/// <param name="newLineSequence">The new line sequence used for read and write operations.</param>
|
|
/// <param name="disableContinuousReading">if set to <c>true</c> [disable continuous reading].</param>
|
|
/// <param name="blockSize">Size of the block. -- set to 0 or less to disable.</param>
|
|
public Connection(
|
|
TcpClient client,
|
|
Encoding textEncoding,
|
|
String newLineSequence,
|
|
Boolean disableContinuousReading,
|
|
Int32 blockSize) {
|
|
// Setup basic properties
|
|
this.Id = Guid.NewGuid();
|
|
this.TextEncoding = textEncoding;
|
|
|
|
// Setup new line sequence
|
|
if(String.IsNullOrEmpty(newLineSequence)) {
|
|
throw new ArgumentException("Argument cannot be null", nameof(newLineSequence));
|
|
}
|
|
|
|
this._newLineSequence = newLineSequence;
|
|
this._newLineSequenceBytes = this.TextEncoding.GetBytes(this._newLineSequence);
|
|
this._newLineSequenceChars = this._newLineSequence.ToCharArray();
|
|
this._newLineSequenceLineSplitter = new[] { this._newLineSequence };
|
|
|
|
// Setup Connection timers
|
|
this.ConnectionStartTimeUtc = DateTime.UtcNow;
|
|
this.DataReceivedLastTimeUtc = this.ConnectionStartTimeUtc;
|
|
this.DataSentLastTimeUtc = this.ConnectionStartTimeUtc;
|
|
|
|
// Setup connection properties
|
|
this.RemoteClient = client;
|
|
this.LocalEndPoint = client.Client.LocalEndPoint as IPEndPoint;
|
|
this.NetworkStream = this.RemoteClient.GetStream();
|
|
this.RemoteEndPoint = this.RemoteClient.Client.RemoteEndPoint as IPEndPoint;
|
|
|
|
// Setup buffers
|
|
this._receiveBuffer = new Byte[this.RemoteClient.ReceiveBufferSize * 2];
|
|
this.ProtocolBlockSize = blockSize;
|
|
this._receiveBufferPointer = 0;
|
|
|
|
// Setup continuous reading mode if enabled
|
|
if(disableContinuousReading) {
|
|
return;
|
|
}
|
|
|
|
#if NETSTANDARD1_3
|
|
ThreadPool.QueueUserWorkItem(PerformContinuousReading, this);
|
|
#else
|
|
ThreadPool.GetAvailableThreads(out Int32 availableWorkerThreads, out _);
|
|
ThreadPool.GetMaxThreads(out Int32 maxWorkerThreads, out Int32 _);
|
|
|
|
Int32 activeThreadPoolTreads = maxWorkerThreads - availableWorkerThreads;
|
|
|
|
if(activeThreadPoolTreads < Environment.ProcessorCount / 4) {
|
|
ThreadPool.QueueUserWorkItem(this.PerformContinuousReading, this);
|
|
} else {
|
|
new Thread(this.PerformContinuousReading) { IsBackground = true }.Start();
|
|
}
|
|
#endif
|
|
}
|
|
|
|
/// <summary>
|
|
/// Initializes a new instance of the <see cref="Connection"/> class in continuous reading mode.
|
|
/// It uses UTF8 encoding, CRLF as a new line sequence and disables a protocol block size.
|
|
/// </summary>
|
|
/// <param name="client">The client.</param>
|
|
public Connection(TcpClient client)
|
|
: this(client, Encoding.UTF8, "\r\n", false, 0) {
|
|
// placeholder
|
|
}
|
|
|
|
/// <summary>
|
|
/// Initializes a new instance of the <see cref="Connection"/> class in continuous reading mode.
|
|
/// It uses UTF8 encoding, disables line sequences, and uses a protocol block size instead.
|
|
/// </summary>
|
|
/// <param name="client">The client.</param>
|
|
/// <param name="blockSize">Size of the block.</param>
|
|
public Connection(TcpClient client, Int32 blockSize)
|
|
: this(client, Encoding.UTF8, new String('\n', blockSize + 1), false, blockSize) {
|
|
// placeholder
|
|
}
|
|
|
|
#region Events
|
|
|
|
/// <summary>
|
|
/// Occurs when the receive buffer has encounters a new line sequence, the buffer is flushed or the buffer is full.
|
|
/// </summary>
|
|
public event EventHandler<ConnectionDataReceivedEventArgs> DataReceived = (s, e) => { };
|
|
|
|
/// <summary>
|
|
/// Occurs when an error occurs while upgrading, sending, or receiving data in this client
|
|
/// </summary>
|
|
public event EventHandler<ConnectionFailureEventArgs> ConnectionFailure = (s, e) => { };
|
|
|
|
/// <summary>
|
|
/// Occurs when a client is disconnected
|
|
/// </summary>
|
|
public event EventHandler ClientDisconnected = (s, e) => { };
|
|
|
|
#endregion
|
|
|
|
#region Properties
|
|
|
|
/// <summary>
|
|
/// Gets the unique identifier of this connection.
|
|
/// This field is filled out upon instantiation of this class.
|
|
/// </summary>
|
|
/// <value>
|
|
/// The identifier.
|
|
/// </value>
|
|
public Guid Id {
|
|
get;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Gets the active stream. Returns an SSL stream if the connection is secure, otherwise returns
|
|
/// the underlying NetworkStream.
|
|
/// </summary>
|
|
/// <value>
|
|
/// The active stream.
|
|
/// </value>
|
|
public Stream ActiveStream => this.SecureStream ?? this.NetworkStream as Stream;
|
|
|
|
/// <summary>
|
|
/// Gets a value indicating whether the current connection stream is an SSL stream.
|
|
/// </summary>
|
|
/// <value>
|
|
/// <c>true</c> if this instance is active stream secure; otherwise, <c>false</c>.
|
|
/// </value>
|
|
public Boolean IsActiveStreamSecure => this.SecureStream != null;
|
|
|
|
/// <summary>
|
|
/// Gets the text encoding for send and receive operations.
|
|
/// </summary>
|
|
/// <value>
|
|
/// The text encoding.
|
|
/// </value>
|
|
public Encoding TextEncoding {
|
|
get;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Gets the remote end point of this TCP connection.
|
|
/// </summary>
|
|
/// <value>
|
|
/// The remote end point.
|
|
/// </value>
|
|
public IPEndPoint RemoteEndPoint {
|
|
get;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Gets the local end point of this TCP connection.
|
|
/// </summary>
|
|
/// <value>
|
|
/// The local end point.
|
|
/// </value>
|
|
public IPEndPoint LocalEndPoint {
|
|
get;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Gets the remote client of this TCP connection.
|
|
/// </summary>
|
|
/// <value>
|
|
/// The remote client.
|
|
/// </value>
|
|
public TcpClient RemoteClient {
|
|
get; private set;
|
|
}
|
|
|
|
/// <summary>
|
|
/// When in continuous reading mode, and if set to greater than 0,
|
|
/// a Data reception event will be fired whenever the amount of bytes
|
|
/// determined by this property has been received. Useful for fixed-length message protocols.
|
|
/// </summary>
|
|
/// <value>
|
|
/// The size of the protocol block.
|
|
/// </value>
|
|
public Int32 ProtocolBlockSize {
|
|
get;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Gets a value indicating whether this connection is in continuous reading mode.
|
|
/// Remark: Whenever a disconnect event occurs, the background thread is terminated
|
|
/// and this property will return false whenever the reading thread is not active.
|
|
/// Therefore, even if continuous reading was not disabled in the constructor, this property
|
|
/// might return false.
|
|
/// </summary>
|
|
/// <value>
|
|
/// <c>true</c> if this instance is continuous reading enabled; otherwise, <c>false</c>.
|
|
/// </value>
|
|
public Boolean IsContinuousReadingEnabled => this._continuousReadingThread != null;
|
|
|
|
/// <summary>
|
|
/// Gets the start time at which the connection was started in UTC.
|
|
/// </summary>
|
|
/// <value>
|
|
/// The connection start time UTC.
|
|
/// </value>
|
|
public DateTime ConnectionStartTimeUtc {
|
|
get;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Gets the start time at which the connection was started in local time.
|
|
/// </summary>
|
|
/// <value>
|
|
/// The connection start time.
|
|
/// </value>
|
|
public DateTime ConnectionStartTime => this.ConnectionStartTimeUtc.ToLocalTime();
|
|
|
|
/// <summary>
|
|
/// Gets the duration of the connection.
|
|
/// </summary>
|
|
/// <value>
|
|
/// The duration of the connection.
|
|
/// </value>
|
|
public TimeSpan ConnectionDuration => DateTime.UtcNow.Subtract(this.ConnectionStartTimeUtc);
|
|
|
|
/// <summary>
|
|
/// Gets the last time data was received at in UTC.
|
|
/// </summary>
|
|
/// <value>
|
|
/// The data received last time UTC.
|
|
/// </value>
|
|
public DateTime DataReceivedLastTimeUtc {
|
|
get; private set;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Gets how long has elapsed since data was last received.
|
|
/// </summary>
|
|
public TimeSpan DataReceivedIdleDuration => DateTime.UtcNow.Subtract(this.DataReceivedLastTimeUtc);
|
|
|
|
/// <summary>
|
|
/// Gets the last time at which data was sent in UTC.
|
|
/// </summary>
|
|
/// <value>
|
|
/// The data sent last time UTC.
|
|
/// </value>
|
|
public DateTime DataSentLastTimeUtc {
|
|
get; private set;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Gets how long has elapsed since data was last sent.
|
|
/// </summary>
|
|
/// <value>
|
|
/// The duration of the data sent idle.
|
|
/// </value>
|
|
public TimeSpan DataSentIdleDuration => DateTime.UtcNow.Subtract(this.DataSentLastTimeUtc);
|
|
|
|
/// <summary>
|
|
/// Gets a value indicating whether this connection is connected.
|
|
/// Remarks: This property polls the socket internally and checks if it is available to read data from it.
|
|
/// If disconnect has been called, then this property will return false.
|
|
/// </summary>
|
|
/// <value>
|
|
/// <c>true</c> if this instance is connected; otherwise, <c>false</c>.
|
|
/// </value>
|
|
public Boolean IsConnected {
|
|
get {
|
|
if(this._disconnectCalls > 0) {
|
|
return false;
|
|
}
|
|
|
|
try {
|
|
Socket socket = this.RemoteClient.Client;
|
|
Boolean pollResult = !(socket.Poll(1000, SelectMode.SelectRead)
|
|
&& this.NetworkStream.DataAvailable == false || !socket.Connected);
|
|
|
|
if(pollResult == false) {
|
|
this.Disconnect();
|
|
}
|
|
|
|
return pollResult;
|
|
} catch {
|
|
this.Disconnect();
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
|
|
private NetworkStream NetworkStream {
|
|
get; set;
|
|
}
|
|
|
|
private SslStream SecureStream {
|
|
get; set;
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region Read Methods
|
|
|
|
/// <summary>
|
|
/// Reads data from the remote client asynchronously and with the given timeout.
|
|
/// </summary>
|
|
/// <param name="timeout">The timeout.</param>
|
|
/// <param name="ct">The cancellation token.</param>
|
|
/// <returns>A byte array containing the results of encoding the specified set of characters.</returns>
|
|
/// <exception cref="InvalidOperationException">Read methods have been disabled because continuous reading is enabled.</exception>
|
|
/// <exception cref="TimeoutException">Reading data from {ActiveStream} timed out in {timeout.TotalMilliseconds} m.</exception>
|
|
public async Task<Byte[]> ReadDataAsync(TimeSpan timeout, CancellationToken ct = default) {
|
|
if(this.IsContinuousReadingEnabled) {
|
|
throw new InvalidOperationException(
|
|
"Read methods have been disabled because continuous reading is enabled.");
|
|
}
|
|
|
|
if(this.RemoteClient == null) {
|
|
throw new InvalidOperationException("An open connection is required");
|
|
}
|
|
|
|
Byte[] receiveBuffer = new Byte[this.RemoteClient.ReceiveBufferSize * 2];
|
|
List<Byte> receiveBuilder = new List<Byte>(receiveBuffer.Length);
|
|
|
|
try {
|
|
DateTime startTime = DateTime.UtcNow;
|
|
|
|
while(receiveBuilder.Count <= 0) {
|
|
if(DateTime.UtcNow.Subtract(startTime) >= timeout) {
|
|
throw new TimeoutException(
|
|
$"Reading data from {this.ActiveStream} timed out in {timeout.TotalMilliseconds} ms");
|
|
}
|
|
|
|
if(this._readTask == null) {
|
|
this._readTask = this.ActiveStream.ReadAsync(receiveBuffer, 0, receiveBuffer.Length, ct);
|
|
}
|
|
|
|
if(this._readTask.Wait(this._continuousReadingInterval)) {
|
|
Int32 bytesReceivedCount = this._readTask.Result;
|
|
if(bytesReceivedCount > 0) {
|
|
this.DataReceivedLastTimeUtc = DateTime.UtcNow;
|
|
Byte[] buffer = new Byte[bytesReceivedCount];
|
|
Array.Copy(receiveBuffer, 0, buffer, 0, bytesReceivedCount);
|
|
receiveBuilder.AddRange(buffer);
|
|
}
|
|
|
|
this._readTask = null;
|
|
} else {
|
|
await Task.Delay(this._continuousReadingInterval, ct).ConfigureAwait(false);
|
|
}
|
|
}
|
|
} catch(Exception ex) {
|
|
ex.Error(typeof(Connection).FullName, "Error while reading network stream data asynchronously.");
|
|
throw;
|
|
}
|
|
|
|
return receiveBuilder.ToArray();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Reads data asynchronously from the remote stream with a 5000 millisecond timeout.
|
|
/// </summary>
|
|
/// <param name="ct">The cancellation token.</param>
|
|
/// <returns>A byte array containing the results the specified sequence of bytes.</returns>
|
|
public Task<Byte[]> ReadDataAsync(CancellationToken ct = default)
|
|
=> this.ReadDataAsync(TimeSpan.FromSeconds(5), ct);
|
|
|
|
/// <summary>
|
|
/// Asynchronously reads data as text with the given timeout.
|
|
/// </summary>
|
|
/// <param name="timeout">The timeout.</param>
|
|
/// <param name="ct">The cancellation token.</param>
|
|
/// <returns>A <see cref="System.String" /> that contains the results of decoding the specified sequence of bytes.</returns>
|
|
public async Task<String> ReadTextAsync(TimeSpan timeout, CancellationToken ct = default) {
|
|
Byte[] buffer = await this.ReadDataAsync(timeout, ct).ConfigureAwait(false);
|
|
return buffer == null ? null : this.TextEncoding.GetString(buffer);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Asynchronously reads data as text with a 5000 millisecond timeout.
|
|
/// </summary>
|
|
/// <param name="ct">The cancellation token.</param>
|
|
/// <returns>When this method completes successfully, it returns the contents of the file as a text string.</returns>
|
|
public Task<String> ReadTextAsync(CancellationToken ct = default)
|
|
=> this.ReadTextAsync(TimeSpan.FromSeconds(5), ct);
|
|
|
|
/// <summary>
|
|
/// Performs the same task as this method's overload but it defaults to a read timeout of 30 seconds.
|
|
/// </summary>
|
|
/// <param name="ct">The cancellation token.</param>
|
|
/// <returns>
|
|
/// A task that represents the asynchronous read operation. The value of the TResult parameter
|
|
/// contains the next line from the stream, or is null if all the characters have been read.
|
|
/// </returns>
|
|
public Task<String> ReadLineAsync(CancellationToken ct = default)
|
|
=> this.ReadLineAsync(TimeSpan.FromSeconds(30), ct);
|
|
|
|
/// <summary>
|
|
/// Reads the next available line of text in queue. Return null when no text is read.
|
|
/// This method differs from the rest of the read methods because it keeps an internal
|
|
/// queue of lines that are read from the stream and only returns the one line next in the queue.
|
|
/// It is only recommended to use this method when you are working with text-based protocols
|
|
/// and the rest of the read methods are not called.
|
|
/// </summary>
|
|
/// <param name="timeout">The timeout.</param>
|
|
/// <param name="ct">The cancellation token.</param>
|
|
/// <returns>A task with a string line from the queue.</returns>
|
|
/// <exception cref="InvalidOperationException">Read methods have been disabled because continuous reading is enabled.</exception>
|
|
public async Task<String> ReadLineAsync(TimeSpan timeout, CancellationToken ct = default) {
|
|
if(this.IsContinuousReadingEnabled) {
|
|
throw new InvalidOperationException(
|
|
"Read methods have been disabled because continuous reading is enabled.");
|
|
}
|
|
|
|
if(this._readLineBuffer.Count > 0) {
|
|
return this._readLineBuffer.Dequeue();
|
|
}
|
|
|
|
StringBuilder builder = new StringBuilder();
|
|
|
|
while(true) {
|
|
String text = await this.ReadTextAsync(timeout, ct).ConfigureAwait(false);
|
|
if(text.Length == 0) {
|
|
break;
|
|
}
|
|
|
|
_ = builder.Append(text);
|
|
|
|
if(text.EndsWith(this._newLineSequence) == false) {
|
|
continue;
|
|
}
|
|
|
|
String[] lines = builder.ToString().TrimEnd(this._newLineSequenceChars)
|
|
.Split(this._newLineSequenceLineSplitter, StringSplitOptions.None);
|
|
foreach(String item in lines) {
|
|
this._readLineBuffer.Enqueue(item);
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
return this._readLineBuffer.Count > 0 ? this._readLineBuffer.Dequeue() : null;
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region Write Methods
|
|
|
|
/// <summary>
|
|
/// Writes data asynchronously.
|
|
/// </summary>
|
|
/// <param name="buffer">The buffer.</param>
|
|
/// <param name="forceFlush">if set to <c>true</c> [force flush].</param>
|
|
/// <param name="ct">The cancellation token.</param>
|
|
/// <returns>A task that represents the asynchronous write operation.</returns>
|
|
public async Task WriteDataAsync(Byte[] buffer, Boolean forceFlush, CancellationToken ct = default) {
|
|
try {
|
|
_ = this._writeDone.WaitOne();
|
|
_ = this._writeDone.Reset();
|
|
await this.ActiveStream.WriteAsync(buffer, 0, buffer.Length, ct).ConfigureAwait(false);
|
|
if(forceFlush) {
|
|
await this.ActiveStream.FlushAsync(ct).ConfigureAwait(false);
|
|
}
|
|
|
|
this.DataSentLastTimeUtc = DateTime.UtcNow;
|
|
} finally {
|
|
_ = this._writeDone.Set();
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Writes text asynchronously.
|
|
/// </summary>
|
|
/// <param name="text">The text.</param>
|
|
/// <param name="ct">The cancellation token.</param>
|
|
/// <returns>A task that represents the asynchronous write operation.</returns>
|
|
public Task WriteTextAsync(String text, CancellationToken ct = default)
|
|
=> this.WriteTextAsync(text, this.TextEncoding, ct);
|
|
|
|
/// <summary>
|
|
/// Writes text asynchronously.
|
|
/// </summary>
|
|
/// <param name="text">The text.</param>
|
|
/// <param name="encoding">The encoding.</param>
|
|
/// <param name="ct">The cancellation token.</param>
|
|
/// <returns>A task that represents the asynchronous write operation.</returns>
|
|
public Task WriteTextAsync(String text, Encoding encoding, CancellationToken ct = default)
|
|
=> this.WriteDataAsync(encoding.GetBytes(text), true, ct);
|
|
|
|
/// <summary>
|
|
/// Writes a line of text asynchronously.
|
|
/// The new line sequence is added automatically at the end of the line.
|
|
/// </summary>
|
|
/// <param name="line">The line.</param>
|
|
/// <param name="encoding">The encoding.</param>
|
|
/// <param name="ct">The cancellation token.</param>
|
|
/// <returns>A task that represents the asynchronous write operation.</returns>
|
|
public Task WriteLineAsync(String line, Encoding encoding, CancellationToken ct = default)
|
|
=> this.WriteDataAsync(encoding.GetBytes($"{line}{this._newLineSequence}"), true, ct);
|
|
|
|
/// <summary>
|
|
/// Writes a line of text asynchronously.
|
|
/// The new line sequence is added automatically at the end of the line.
|
|
/// </summary>
|
|
/// <param name="line">The line.</param>
|
|
/// <param name="ct">The cancellation token.</param>
|
|
/// <returns>A task that represents the asynchronous write operation.</returns>
|
|
public Task WriteLineAsync(String line, CancellationToken ct = default)
|
|
=> this.WriteLineAsync(line, this.TextEncoding, ct);
|
|
|
|
#endregion
|
|
|
|
#region Socket Methods
|
|
|
|
/// <summary>
|
|
/// Upgrades the active stream to an SSL stream if this connection object is hosted in the server.
|
|
/// </summary>
|
|
/// <param name="serverCertificate">The server certificate.</param>
|
|
/// <returns><c>true</c> if the object is hosted in the server; otherwise, <c>false</c>.</returns>
|
|
public async Task<Boolean> UpgradeToSecureAsServerAsync(X509Certificate2 serverCertificate) {
|
|
if(this.IsActiveStreamSecure) {
|
|
return true;
|
|
}
|
|
|
|
_ = this._writeDone.WaitOne();
|
|
|
|
SslStream secureStream = null;
|
|
|
|
try {
|
|
secureStream = new SslStream(this.NetworkStream, true);
|
|
await secureStream.AuthenticateAsServerAsync(serverCertificate).ConfigureAwait(false);
|
|
this.SecureStream = secureStream;
|
|
return true;
|
|
} catch(Exception ex) {
|
|
ConnectionFailure(this, new ConnectionFailureEventArgs(ex));
|
|
secureStream?.Dispose();
|
|
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Upgrades the active stream to an SSL stream if this connection object is hosted in the client.
|
|
/// </summary>
|
|
/// <param name="hostname">The hostname.</param>
|
|
/// <param name="callback">The callback.</param>
|
|
/// <returns>A tasks with <c>true</c> if the upgrade to SSL was successful; otherwise, <c>false</c>.</returns>
|
|
public async Task<Boolean> UpgradeToSecureAsClientAsync(
|
|
String hostname = null,
|
|
RemoteCertificateValidationCallback callback = null) {
|
|
if(this.IsActiveStreamSecure) {
|
|
return true;
|
|
}
|
|
|
|
SslStream secureStream = callback == null
|
|
? new SslStream(this.NetworkStream, true)
|
|
: new SslStream(this.NetworkStream, true, callback);
|
|
|
|
try {
|
|
await secureStream.AuthenticateAsClientAsync(hostname ?? Network.HostName.ToLowerInvariant()).ConfigureAwait(false);
|
|
this.SecureStream = secureStream;
|
|
} catch(Exception ex) {
|
|
secureStream.Dispose();
|
|
ConnectionFailure(this, new ConnectionFailureEventArgs(ex));
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Disconnects this connection.
|
|
/// </summary>
|
|
public void Disconnect() {
|
|
if(this._disconnectCalls > 0) {
|
|
return;
|
|
}
|
|
|
|
this._disconnectCalls++;
|
|
this._writeDone.WaitOne();
|
|
|
|
try {
|
|
ClientDisconnected(this, EventArgs.Empty);
|
|
} catch {
|
|
// ignore
|
|
}
|
|
|
|
try {
|
|
#if !NET452
|
|
RemoteClient.Dispose();
|
|
SecureStream?.Dispose();
|
|
NetworkStream?.Dispose();
|
|
#else
|
|
this.RemoteClient.Close();
|
|
this.SecureStream?.Close();
|
|
this.NetworkStream?.Close();
|
|
#endif
|
|
} catch {
|
|
// ignored
|
|
} finally {
|
|
this.NetworkStream = null;
|
|
this.SecureStream = null;
|
|
this.RemoteClient = null;
|
|
this._continuousReadingThread = null;
|
|
}
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region Dispose
|
|
|
|
/// <summary>
|
|
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
|
|
/// </summary>
|
|
public void Dispose() {
|
|
if(this._hasDisposed) {
|
|
return;
|
|
}
|
|
|
|
// Release managed resources
|
|
this.Disconnect();
|
|
this._continuousReadingThread = null;
|
|
this._writeDone.Dispose();
|
|
|
|
this._hasDisposed = true;
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region Continuous Read Methods
|
|
|
|
/// <summary>
|
|
/// Raises the receive buffer events.
|
|
/// </summary>
|
|
/// <param name="receivedData">The received data.</param>
|
|
/// <exception cref="Exception">Split function failed! This is terribly wrong.</exception>
|
|
private void RaiseReceiveBufferEvents(Byte[] receivedData) {
|
|
Boolean moreAvailable = this.RemoteClient.Available > 0;
|
|
|
|
foreach(Byte data in receivedData) {
|
|
this.ProcessReceivedBlock(data, moreAvailable);
|
|
}
|
|
|
|
// Check if we are left with some more stuff to handle
|
|
if(this._receiveBufferPointer <= 0) {
|
|
return;
|
|
}
|
|
|
|
// Extract the segments split by newline terminated bytes
|
|
List<Byte[]> sequences = this._receiveBuffer.Skip(0).Take(this._receiveBufferPointer).ToArray()
|
|
.Split(0, this._newLineSequenceBytes);
|
|
|
|
// Something really wrong happened
|
|
if(sequences.Count == 0) {
|
|
throw new InvalidOperationException("Split function failed! This is terribly wrong!");
|
|
}
|
|
|
|
// We only have one sequence and it is not newline-terminated
|
|
// we don't have to do anything.
|
|
if(sequences.Count == 1 && sequences[0].EndsWith(this._newLineSequenceBytes) == false) {
|
|
return;
|
|
}
|
|
|
|
// Process the events for each sequence
|
|
for(Int32 i = 0; i < sequences.Count; i++) {
|
|
Byte[] sequenceBytes = sequences[i];
|
|
Boolean isNewLineTerminated = sequences[i].EndsWith(this._newLineSequenceBytes);
|
|
Boolean isLast = i == sequences.Count - 1;
|
|
|
|
if(isNewLineTerminated) {
|
|
ConnectionDataReceivedEventArgs eventArgs = new ConnectionDataReceivedEventArgs(
|
|
sequenceBytes,
|
|
ConnectionDataReceivedTrigger.NewLineSequenceEncountered,
|
|
isLast == false);
|
|
DataReceived(this, eventArgs);
|
|
}
|
|
|
|
// Depending on the last segment determine what to do with the receive buffer
|
|
if(!isLast) {
|
|
continue;
|
|
}
|
|
|
|
if(isNewLineTerminated) {
|
|
// Simply reset the buffer pointer if the last segment was also terminated
|
|
this._receiveBufferPointer = 0;
|
|
} else {
|
|
// If we have not received the termination sequence, then just shift the receive buffer to the left
|
|
// and adjust the pointer
|
|
Array.Copy(sequenceBytes, this._receiveBuffer, sequenceBytes.Length);
|
|
this._receiveBufferPointer = sequenceBytes.Length;
|
|
}
|
|
}
|
|
}
|
|
|
|
private void ProcessReceivedBlock(Byte data, Boolean moreAvailable) {
|
|
this._receiveBuffer[this._receiveBufferPointer] = data;
|
|
this._receiveBufferPointer++;
|
|
|
|
// Block size reached
|
|
if(this.ProtocolBlockSize > 0 && this._receiveBufferPointer >= this.ProtocolBlockSize) {
|
|
this.SendBuffer(moreAvailable, ConnectionDataReceivedTrigger.BlockSizeReached);
|
|
return;
|
|
}
|
|
|
|
// The receive buffer is full. Time to flush
|
|
if(this._receiveBufferPointer >= this._receiveBuffer.Length) {
|
|
this.SendBuffer(moreAvailable, ConnectionDataReceivedTrigger.BufferFull);
|
|
}
|
|
}
|
|
|
|
private void SendBuffer(Boolean moreAvailable, ConnectionDataReceivedTrigger trigger) {
|
|
Byte[] eventBuffer = new Byte[this._receiveBuffer.Length];
|
|
Array.Copy(this._receiveBuffer, eventBuffer, eventBuffer.Length);
|
|
|
|
DataReceived(this,
|
|
new ConnectionDataReceivedEventArgs(
|
|
eventBuffer,
|
|
trigger,
|
|
moreAvailable));
|
|
this._receiveBufferPointer = 0;
|
|
}
|
|
|
|
private void PerformContinuousReading(Object threadContext) {
|
|
this._continuousReadingThread = Thread.CurrentThread;
|
|
|
|
// Check if the RemoteClient is still there
|
|
if(this.RemoteClient == null) {
|
|
return;
|
|
}
|
|
|
|
Byte[] receiveBuffer = new Byte[this.RemoteClient.ReceiveBufferSize * 2];
|
|
|
|
while(this.IsConnected && this._disconnectCalls <= 0) {
|
|
Boolean doThreadSleep = false;
|
|
|
|
try {
|
|
if(this._readTask == null) {
|
|
this._readTask = this.ActiveStream.ReadAsync(receiveBuffer, 0, receiveBuffer.Length);
|
|
}
|
|
|
|
if(this._readTask.Wait(this._continuousReadingInterval)) {
|
|
Int32 bytesReceivedCount = this._readTask.Result;
|
|
if(bytesReceivedCount > 0) {
|
|
this.DataReceivedLastTimeUtc = DateTime.UtcNow;
|
|
Byte[] buffer = new Byte[bytesReceivedCount];
|
|
Array.Copy(receiveBuffer, 0, buffer, 0, bytesReceivedCount);
|
|
this.RaiseReceiveBufferEvents(buffer);
|
|
}
|
|
|
|
this._readTask = null;
|
|
} else {
|
|
doThreadSleep = this._disconnectCalls <= 0;
|
|
}
|
|
} catch(Exception ex) {
|
|
ex.Log(nameof(Connection), "Continuous Read operation errored");
|
|
} finally {
|
|
if(doThreadSleep) {
|
|
Thread.Sleep(this._continuousReadingInterval);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
#endregion
|
|
}
|
|
}
|