#nullable enable
using Swan.Logging;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Security;
using System.Net.Sockets;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Swan.Net {
///
/// Represents a network connection either on the server or on the client. It wraps a TcpClient
/// and its corresponding network streams. It is capable of working in 2 modes. Typically on the server side
/// you will need to enable continuous reading and events. On the client side you may want to disable continuous reading
/// and use the Read methods available. In continuous reading mode Read methods are not available and will throw
/// an invalid operation exceptions if they are used.
/// Continuous Reading Mode: Subscribe to data reception events, it runs a background thread, don't use Read methods
/// Manual Reading Mode: Data reception events are NEVER fired. No background threads are used. Use Read methods to receive data.
///
///
///
/// The following code explains how to create a TCP server.
///
/// using System.Text;
/// using Swan.Net;
///
/// class Example
/// {
/// static void Main()
/// {
/// // create a new connection listener on a specific port
/// var connectionListener = new ConnectionListener(1337);
///
/// // handle the OnConnectionAccepting event
/// connectionListener.OnConnectionAccepted += async (s, e) =>
/// {
/// // create a new connection
/// using (var con = new Connection(e.Client))
/// {
/// await con.WriteLineAsync("Hello world!");
/// }
/// };
///
/// connectionListener.Start();
/// Console.ReadLine)=ñ
/// }
/// }
///
/// The following code describes how to create a TCP client.
///
/// using System.Net.Sockets;
/// using System.Text;
/// using System.Threading.Tasks;
/// using Swan.Net;
///
/// class Example
/// {
/// static async Task Main()
/// {
/// // create a new TcpClient object
/// var client = new TcpClient();
///
/// // connect to a specific address and port
/// client.Connect("localhost", 1337);
///
/// //create a new connection with specific encoding,
/// //new line sequence and continuous reading disabled
/// using (var cn = new Connection(client, Encoding.UTF8, "\r\n", true, 0))
/// {
/// var response = await cn.ReadTextAsync();
/// }
/// }
/// }
///
///
public sealed class Connection : IDisposable {
// New Line definitions for reading. This applies to both, events and read methods
private readonly String _newLineSequence;
private readonly Byte[] _newLineSequenceBytes;
private readonly Char[] _newLineSequenceChars;
private readonly String[] _newLineSequenceLineSplitter;
private readonly Byte[] _receiveBuffer;
private readonly TimeSpan _continuousReadingInterval = TimeSpan.FromMilliseconds(5);
private readonly Queue _readLineBuffer = new Queue();
private readonly ManualResetEvent _writeDone = new ManualResetEvent(true);
// Disconnect and Dispose
private Boolean _hasDisposed;
private Int32 _disconnectCalls;
// Continuous Reading
private Thread? _continuousReadingThread;
private Int32 _receiveBufferPointer;
// Reading and writing
private Task? _readTask;
///
/// Initializes a new instance of the class.
///
/// The client.
/// The text encoding.
/// The new line sequence used for read and write operations.
/// if set to true [disable continuous reading].
/// Size of the block. -- set to 0 or less to disable.
public Connection(TcpClient client, Encoding textEncoding, String newLineSequence, Boolean disableContinuousReading, Int32 blockSize) {
// Setup basic properties
this.Id = Guid.NewGuid();
this.TextEncoding = textEncoding;
// Setup new line sequence
if(String.IsNullOrEmpty(newLineSequence)) {
throw new ArgumentException("Argument cannot be null", nameof(newLineSequence));
}
this._newLineSequence = newLineSequence;
this._newLineSequenceBytes = this.TextEncoding.GetBytes(this._newLineSequence);
this._newLineSequenceChars = this._newLineSequence.ToCharArray();
this._newLineSequenceLineSplitter = new[] { this._newLineSequence };
// Setup Connection timers
this.ConnectionStartTimeUtc = DateTime.UtcNow;
this.DataReceivedLastTimeUtc = this.ConnectionStartTimeUtc;
this.DataSentLastTimeUtc = this.ConnectionStartTimeUtc;
// Setup connection properties
this.RemoteClient = client;
this.LocalEndPoint = client.Client.LocalEndPoint as IPEndPoint;
this.NetworkStream = this.RemoteClient.GetStream();
this.RemoteEndPoint = this.RemoteClient.Client.RemoteEndPoint as IPEndPoint;
// Setup buffers
this._receiveBuffer = new Byte[this.RemoteClient.ReceiveBufferSize * 2];
this.ProtocolBlockSize = blockSize;
this._receiveBufferPointer = 0;
// Setup continuous reading mode if enabled
if(disableContinuousReading) {
return;
}
ThreadPool.GetAvailableThreads(out Int32 availableWorkerThreads, out _);
ThreadPool.GetMaxThreads(out Int32 maxWorkerThreads, out _);
Int32 activeThreadPoolTreads = maxWorkerThreads - availableWorkerThreads;
if(activeThreadPoolTreads < Environment.ProcessorCount / 4) {
_ = ThreadPool.QueueUserWorkItem(this.PerformContinuousReading!, this);
} else {
new Thread(this.PerformContinuousReading!) { IsBackground = true }.Start();
}
}
///
/// Initializes a new instance of the class in continuous reading mode.
/// It uses UTF8 encoding, CRLF as a new line sequence and disables a protocol block size.
///
/// The client.
public Connection(TcpClient client) : this(client, Encoding.UTF8, "\r\n", false, 0) {
// placeholder
}
///
/// Initializes a new instance of the class in continuous reading mode.
/// It uses UTF8 encoding, disables line sequences, and uses a protocol block size instead.
///
/// The client.
/// Size of the block.
public Connection(TcpClient client, Int32 blockSize) : this(client, Encoding.UTF8, new String('\n', blockSize + 1), false, blockSize) {
// placeholder
}
#region Events
///
/// Occurs when the receive buffer has encounters a new line sequence, the buffer is flushed or the buffer is full.
///
public event EventHandler DataReceived = (s, e) => { };
///
/// Occurs when an error occurs while upgrading, sending, or receiving data in this client
///
public event EventHandler ConnectionFailure = (s, e) => { };
///
/// Occurs when a client is disconnected
///
public event EventHandler ClientDisconnected = (s, e) => { };
#endregion
#region Properties
///
/// Gets the unique identifier of this connection.
/// This field is filled out upon instantiation of this class.
///
///
/// The identifier.
///
public Guid Id {
get;
}
///
/// Gets the active stream. Returns an SSL stream if the connection is secure, otherwise returns
/// the underlying NetworkStream.
///
///
/// The active stream.
///
public Stream? ActiveStream => this.SecureStream ?? this.NetworkStream as Stream;
///
/// Gets a value indicating whether the current connection stream is an SSL stream.
///
///
/// true if this instance is active stream secure; otherwise, false.
///
public Boolean IsActiveStreamSecure => this.SecureStream != null;
///
/// Gets the text encoding for send and receive operations.
///
///
/// The text encoding.
///
public Encoding TextEncoding {
get;
}
///
/// Gets the remote end point of this TCP connection.
///
///
/// The remote end point.
///
public IPEndPoint? RemoteEndPoint {
get;
}
///
/// Gets the local end point of this TCP connection.
///
///
/// The local end point.
///
public IPEndPoint? LocalEndPoint {
get;
}
///
/// Gets the remote client of this TCP connection.
///
///
/// The remote client.
///
public TcpClient? RemoteClient {
get; private set;
}
///
/// When in continuous reading mode, and if set to greater than 0,
/// a Data reception event will be fired whenever the amount of bytes
/// determined by this property has been received. Useful for fixed-length message protocols.
///
///
/// The size of the protocol block.
///
public Int32 ProtocolBlockSize {
get;
}
///
/// Gets a value indicating whether this connection is in continuous reading mode.
/// Remark: Whenever a disconnect event occurs, the background thread is terminated
/// and this property will return false whenever the reading thread is not active.
/// Therefore, even if continuous reading was not disabled in the constructor, this property
/// might return false.
///
///
/// true if this instance is continuous reading enabled; otherwise, false.
///
public Boolean IsContinuousReadingEnabled => this._continuousReadingThread != null;
///
/// Gets the start time at which the connection was started in UTC.
///
///
/// The connection start time UTC.
///
public DateTime ConnectionStartTimeUtc {
get;
}
///
/// Gets the start time at which the connection was started in local time.
///
///
/// The connection start time.
///
public DateTime ConnectionStartTime => this.ConnectionStartTimeUtc.ToLocalTime();
///
/// Gets the duration of the connection.
///
///
/// The duration of the connection.
///
public TimeSpan ConnectionDuration => DateTime.UtcNow.Subtract(this.ConnectionStartTimeUtc);
///
/// Gets the last time data was received at in UTC.
///
///
/// The data received last time UTC.
///
public DateTime DataReceivedLastTimeUtc {
get; private set;
}
///
/// Gets how long has elapsed since data was last received.
///
public TimeSpan DataReceivedIdleDuration => DateTime.UtcNow.Subtract(this.DataReceivedLastTimeUtc);
///
/// Gets the last time at which data was sent in UTC.
///
///
/// The data sent last time UTC.
///
public DateTime DataSentLastTimeUtc {
get; private set;
}
///
/// Gets how long has elapsed since data was last sent.
///
///
/// The duration of the data sent idle.
///
public TimeSpan DataSentIdleDuration => DateTime.UtcNow.Subtract(this.DataSentLastTimeUtc);
///
/// Gets a value indicating whether this connection is connected.
/// Remarks: This property polls the socket internally and checks if it is available to read data from it.
/// If disconnect has been called, then this property will return false.
///
///
/// true if this instance is connected; otherwise, false.
///
public Boolean IsConnected {
get {
if(this._disconnectCalls > 0) {
return false;
}
try {
Socket? socket = this.RemoteClient?.Client;
if(socket == null || this.NetworkStream == null) {
return false;
}
Boolean pollResult = !(socket.Poll(1000, SelectMode.SelectRead) && this.NetworkStream.DataAvailable == false || !socket.Connected);
if(pollResult == false) {
this.Disconnect();
}
return pollResult;
} catch {
this.Disconnect();
return false;
}
}
}
private NetworkStream? NetworkStream {
get; set;
}
private SslStream? SecureStream {
get; set;
}
#endregion
#region Read Methods
///
/// Reads data from the remote client asynchronously and with the given timeout.
///
/// The timeout.
/// The cancellation token.
/// A byte array containing the results of encoding the specified set of characters.
/// Read methods have been disabled because continuous reading is enabled.
/// Reading data from {ActiveStream} timed out in {timeout.TotalMilliseconds} m.
public async Task ReadDataAsync(TimeSpan timeout, CancellationToken cancellationToken = default) {
if(this.IsContinuousReadingEnabled) {
throw new InvalidOperationException("Read methods have been disabled because continuous reading is enabled.");
}
if(this.RemoteClient == null) {
throw new InvalidOperationException("An open connection is required");
}
Byte[] receiveBuffer = new Byte[this.RemoteClient.ReceiveBufferSize * 2];
List receiveBuilder = new List(receiveBuffer.Length);
try {
DateTime startTime = DateTime.UtcNow;
while(receiveBuilder.Count <= 0) {
if(DateTime.UtcNow.Subtract(startTime) >= timeout) {
throw new TimeoutException($"Reading data from {this.ActiveStream} timed out in {timeout.TotalMilliseconds} ms");
}
if(this._readTask == null) {
this._readTask = this.ActiveStream?.ReadAsync(receiveBuffer, 0, receiveBuffer.Length, cancellationToken);
}
if(this._readTask != null && this._readTask.Wait(this._continuousReadingInterval)) {
Int32 bytesReceivedCount = this._readTask.Result;
if(bytesReceivedCount > 0) {
this.DataReceivedLastTimeUtc = DateTime.UtcNow;
Byte[] buffer = new Byte[bytesReceivedCount];
Array.Copy(receiveBuffer, 0, buffer, 0, bytesReceivedCount);
receiveBuilder.AddRange(buffer);
}
this._readTask = null;
} else {
await Task.Delay(this._continuousReadingInterval, cancellationToken).ConfigureAwait(false);
}
}
} catch(Exception ex) {
ex.Error(typeof(Connection).FullName, "Error while reading network stream data asynchronously.");
throw;
}
return receiveBuilder.ToArray();
}
///
/// Reads data asynchronously from the remote stream with a 5000 millisecond timeout.
///
/// The cancellation token.
///
/// A byte array containing the results the specified sequence of bytes.
///
public Task ReadDataAsync(CancellationToken cancellationToken = default) => this.ReadDataAsync(TimeSpan.FromSeconds(5), cancellationToken);
///
/// Asynchronously reads data as text with the given timeout.
///
/// The timeout.
/// The cancellation token.
///
/// A that contains the results of decoding the specified sequence of bytes.
///
public async Task ReadTextAsync(TimeSpan timeout, CancellationToken cancellationToken = default) {
Byte[] buffer = await this.ReadDataAsync(timeout, cancellationToken).ConfigureAwait(false);
return buffer == null ? null : this.TextEncoding.GetString(buffer);
}
///
/// Asynchronously reads data as text with a 5000 millisecond timeout.
///
/// The cancellation token.
///
/// When this method completes successfully, it returns the contents of the file as a text string.
///
public Task ReadTextAsync(CancellationToken cancellationToken = default) => this.ReadTextAsync(TimeSpan.FromSeconds(5), cancellationToken);
///
/// Performs the same task as this method's overload but it defaults to a read timeout of 30 seconds.
///
/// The cancellation token.
///
/// A task that represents the asynchronous read operation. The value of the TResult parameter
/// contains the next line from the stream, or is null if all the characters have been read.
///
public Task ReadLineAsync(CancellationToken cancellationToken = default) => this.ReadLineAsync(TimeSpan.FromSeconds(30), cancellationToken);
///
/// Reads the next available line of text in queue. Return null when no text is read.
/// This method differs from the rest of the read methods because it keeps an internal
/// queue of lines that are read from the stream and only returns the one line next in the queue.
/// It is only recommended to use this method when you are working with text-based protocols
/// and the rest of the read methods are not called.
///
/// The timeout.
/// The cancellation token.
/// A task with a string line from the queue.
/// Read methods have been disabled because continuous reading is enabled.
public async Task ReadLineAsync(TimeSpan timeout, CancellationToken cancellationToken = default) {
if(this.IsContinuousReadingEnabled) {
throw new InvalidOperationException("Read methods have been disabled because continuous reading is enabled.");
}
if(this._readLineBuffer.Count > 0) {
return this._readLineBuffer.Dequeue();
}
StringBuilder builder = new StringBuilder();
while(true) {
String? text = await this.ReadTextAsync(timeout, cancellationToken).ConfigureAwait(false);
if(String.IsNullOrEmpty(text)) {
break;
}
_ = builder.Append(text);
if(!text.EndsWith(this._newLineSequence)) {
continue;
}
String[] lines = builder.ToString().TrimEnd(this._newLineSequenceChars).Split(this._newLineSequenceLineSplitter, StringSplitOptions.None);
foreach(String item in lines) {
this._readLineBuffer.Enqueue(item);
}
break;
}
return this._readLineBuffer.Count > 0 ? this._readLineBuffer.Dequeue() : null;
}
#endregion
#region Write Methods
///
/// Writes data asynchronously.
///
/// The buffer.
/// if set to true [force flush].
/// The cancellation token.
/// A task that represents the asynchronous write operation.
public async Task WriteDataAsync(Byte[] buffer, Boolean forceFlush, CancellationToken cancellationToken = default) {
try {
_ = this._writeDone.WaitOne();
_ = this._writeDone.Reset();
if(this.ActiveStream == null) {
return;
}
await this.ActiveStream.WriteAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false);
if(forceFlush) {
await this.ActiveStream.FlushAsync(cancellationToken).ConfigureAwait(false);
}
this.DataSentLastTimeUtc = DateTime.UtcNow;
} finally {
_ = this._writeDone.Set();
}
}
///
/// Writes text asynchronously.
///
/// The text.
/// The cancellation token.
/// A task that represents the asynchronous write operation.
public Task WriteTextAsync(String text, CancellationToken cancellationToken = default) => this.WriteTextAsync(text, this.TextEncoding, cancellationToken);
///
/// Writes text asynchronously.
///
/// The text.
/// The encoding.
/// The cancellation token.
/// A task that represents the asynchronous write operation.
public Task WriteTextAsync(String text, Encoding encoding, CancellationToken cancellationToken = default) => this.WriteDataAsync(encoding.GetBytes(text), true, cancellationToken);
///
/// Writes a line of text asynchronously.
/// The new line sequence is added automatically at the end of the line.
///
/// The line.
/// The encoding.
/// The cancellation token.
/// A task that represents the asynchronous write operation.
public Task WriteLineAsync(String line, Encoding encoding, CancellationToken cancellationToken = default) => this.WriteDataAsync(encoding.GetBytes($"{line}{this._newLineSequence}"), true, cancellationToken);
///
/// Writes a line of text asynchronously.
/// The new line sequence is added automatically at the end of the line.
///
/// The line.
/// The cancellation token.
/// A task that represents the asynchronous write operation.
public Task WriteLineAsync(String line, CancellationToken cancellationToken = default) => this.WriteLineAsync(line, this.TextEncoding, cancellationToken);
#endregion
#region Socket Methods
///
/// Upgrades the active stream to an SSL stream if this connection object is hosted in the server.
///
/// The server certificate.
/// true if the object is hosted in the server; otherwise, false.
public async Task UpgradeToSecureAsServerAsync(X509Certificate2 serverCertificate) {
if(this.IsActiveStreamSecure) {
return true;
}
_ = this._writeDone.WaitOne();
SslStream? secureStream = null;
try {
secureStream = new SslStream(this.NetworkStream, true);
await secureStream.AuthenticateAsServerAsync(serverCertificate).ConfigureAwait(false);
this.SecureStream = secureStream;
return true;
} catch(Exception ex) {
ConnectionFailure(this, new ConnectionFailureEventArgs(ex));
secureStream?.Dispose();
return false;
}
}
///
/// Upgrades the active stream to an SSL stream if this connection object is hosted in the client.
///
/// The hostname.
/// The callback.
/// A tasks with true if the upgrade to SSL was successful; otherwise, false.
public async Task UpgradeToSecureAsClientAsync(String? hostname = null, RemoteCertificateValidationCallback? callback = null) {
if(this.IsActiveStreamSecure) {
return true;
}
SslStream secureStream = callback == null ? new SslStream(this.NetworkStream, true) : new SslStream(this.NetworkStream, true, callback);
try {
await secureStream.AuthenticateAsClientAsync(hostname ?? Network.HostName.ToLowerInvariant()).ConfigureAwait(false);
this.SecureStream = secureStream;
} catch(Exception ex) {
secureStream.Dispose();
ConnectionFailure(this, new ConnectionFailureEventArgs(ex));
return false;
}
return true;
}
///
/// Disconnects this connection.
///
public void Disconnect() {
if(this._disconnectCalls > 0) {
return;
}
this._disconnectCalls++;
_ = this._writeDone.WaitOne();
try {
ClientDisconnected(this, EventArgs.Empty);
} catch {
// ignore
}
try {
this.RemoteClient?.Dispose();
this.SecureStream?.Dispose();
this.NetworkStream?.Dispose();
} finally {
this.NetworkStream = null;
this.SecureStream = null;
this.RemoteClient = null;
this._continuousReadingThread = null;
}
}
#endregion
#region Dispose
///
public void Dispose() {
if(this._hasDisposed) {
return;
}
// Release managed resources
this.Disconnect();
this._continuousReadingThread = null;
this._writeDone.Dispose();
this._hasDisposed = true;
}
#endregion
#region Continuous Read Methods
private void RaiseReceiveBufferEvents(IEnumerable receivedData) {
if(this.RemoteClient == null) {
return;
}
Boolean moreAvailable = this.RemoteClient.Available > 0;
foreach(Byte data in receivedData) {
this.ProcessReceivedBlock(data, moreAvailable);
}
// Check if we are left with some more stuff to handle
if(this._receiveBufferPointer <= 0) {
return;
}
// Extract the segments split by newline terminated bytes
List sequences = this._receiveBuffer.Skip(0).Take(this._receiveBufferPointer).ToArray().Split(0, this._newLineSequenceBytes);
// Something really wrong happened
if(sequences.Count == 0) {
throw new InvalidOperationException("Split function failed! This is terribly wrong!");
}
// We only have one sequence and it is not newline-terminated
// we don't have to do anything.
if(sequences.Count == 1 && sequences[0].EndsWith(this._newLineSequenceBytes) == false) {
return;
}
// Process the events for each sequence
for(Int32 i = 0; i < sequences.Count; i++) {
Byte[] sequenceBytes = sequences[i];
Boolean isNewLineTerminated = sequences[i].EndsWith(this._newLineSequenceBytes);
Boolean isLast = i == sequences.Count - 1;
if(isNewLineTerminated) {
ConnectionDataReceivedEventArgs eventArgs = new ConnectionDataReceivedEventArgs(sequenceBytes, ConnectionDataReceivedTrigger.NewLineSequenceEncountered, isLast == false);
DataReceived(this, eventArgs);
}
// Depending on the last segment determine what to do with the receive buffer
if(!isLast) {
continue;
}
if(isNewLineTerminated) {
// Simply reset the buffer pointer if the last segment was also terminated
this._receiveBufferPointer = 0;
} else {
// If we have not received the termination sequence, then just shift the receive buffer to the left
// and adjust the pointer
Array.Copy(sequenceBytes, this._receiveBuffer, sequenceBytes.Length);
this._receiveBufferPointer = sequenceBytes.Length;
}
}
}
private void ProcessReceivedBlock(Byte data, Boolean moreAvailable) {
this._receiveBuffer[this._receiveBufferPointer] = data;
this._receiveBufferPointer++;
// Block size reached
if(this.ProtocolBlockSize > 0 && this._receiveBufferPointer >= this.ProtocolBlockSize) {
this.SendBuffer(moreAvailable, ConnectionDataReceivedTrigger.BlockSizeReached);
return;
}
// The receive buffer is full. Time to flush
if(this._receiveBufferPointer >= this._receiveBuffer.Length) {
this.SendBuffer(moreAvailable, ConnectionDataReceivedTrigger.BufferFull);
}
}
private void SendBuffer(Boolean moreAvailable, ConnectionDataReceivedTrigger trigger) {
Byte[] eventBuffer = new Byte[this._receiveBuffer.Length];
Array.Copy(this._receiveBuffer, eventBuffer, eventBuffer.Length);
DataReceived(this, new ConnectionDataReceivedEventArgs(eventBuffer, trigger, moreAvailable));
this._receiveBufferPointer = 0;
}
private void PerformContinuousReading(Object threadContext) {
this._continuousReadingThread = Thread.CurrentThread;
// Check if the RemoteClient is still there
if(this.RemoteClient == null) {
return;
}
Byte[] receiveBuffer = new Byte[this.RemoteClient.ReceiveBufferSize * 2];
while(this.IsConnected && this._disconnectCalls <= 0) {
Boolean doThreadSleep = false;
try {
if(this._readTask == null) {
this._readTask = this.ActiveStream?.ReadAsync(receiveBuffer, 0, receiveBuffer.Length);
}
if(this._readTask != null && this._readTask.Wait(this._continuousReadingInterval)) {
Int32 bytesReceivedCount = this._readTask.Result;
if(bytesReceivedCount > 0) {
this.DataReceivedLastTimeUtc = DateTime.UtcNow;
Byte[] buffer = new Byte[bytesReceivedCount];
Array.Copy(receiveBuffer, 0, buffer, 0, bytesReceivedCount);
this.RaiseReceiveBufferEvents(buffer);
}
this._readTask = null;
} else {
doThreadSleep = this._disconnectCalls <= 0;
}
} catch(Exception ex) {
ex.Log(nameof(PerformContinuousReading), "Continuous Read operation errored");
} finally {
if(doThreadSleep) {
Thread.Sleep(this._continuousReadingInterval);
}
}
}
}
#endregion
}
}