using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Net; using System.Net.Security; using System.Net.Sockets; using System.Security.Cryptography.X509Certificates; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Unosquare.Swan.Networking { /// /// Represents a network connection either on the server or on the client. It wraps a TcpClient /// and its corresponding network streams. It is capable of working in 2 modes. Typically on the server side /// you will need to enable continuous reading and events. On the client side you may want to disable continuous reading /// and use the Read methods available. In continuous reading mode Read methods are not available and will throw /// an invalid operation exceptions if they are used. /// Continuous Reading Mode: Subscribe to data reception events, it runs a background thread, don't use Read methods /// Manual Reading Mode: Data reception events are NEVER fired. No background threads are used. Use Read methods to receive data. /// /// /// /// The following code explains how to create a TCP server. /// /// using System.Text; /// using Unosquare.Swan.Networking; /// /// class Example /// { /// static void Main() /// { /// // create a new connection listener on a specific port /// var connectionListener = new ConnectionListener(1337); /// /// // handle the OnConnectionAccepting event /// connectionListener.OnConnectionAccepted += (s, e) => /// { /// // create a new connection /// using (var con = new Connection(e.Client)) /// { /// con.WriteLineAsync("Hello world!").Wait(); /// } /// }; /// /// connectionListener.Start(); /// } /// } /// /// The following code describes how to create a TCP client. /// /// using System.Net.Sockets; /// using System.Text; /// using System.Threading.Tasks; /// using Unosquare.Swan.Networking; /// /// class Example /// { /// static async Task Main() /// { /// // create a new TcpClient object /// var client = new TcpClient(); /// /// // connect to a specific address and port /// client.Connect("localhost", 1337); /// /// //create a new connection with specific encoding, /// //new line sequence and continuous reading disabled /// using (var cn = new Connection(client, Encoding.UTF8, "\r\n", true, 0)) /// { /// var response = await cn.ReadTextAsync(); /// } /// } /// } /// /// public sealed class Connection : IDisposable { // New Line definitions for reading. This applies to both, events and read methods private readonly String _newLineSequence; private readonly Byte[] _newLineSequenceBytes; private readonly Char[] _newLineSequenceChars; private readonly String[] _newLineSequenceLineSplitter; private readonly Byte[] _receiveBuffer; private readonly TimeSpan _continuousReadingInterval = TimeSpan.FromMilliseconds(5); private readonly Queue _readLineBuffer = new Queue(); private readonly ManualResetEvent _writeDone = new ManualResetEvent(true); // Disconnect and Dispose private Boolean _hasDisposed; private Int32 _disconnectCalls; // Continuous Reading private Thread _continuousReadingThread; private Int32 _receiveBufferPointer; // Reading and writing private Task _readTask; /// /// Initializes a new instance of the class. /// /// The client. /// The text encoding. /// The new line sequence used for read and write operations. /// if set to true [disable continuous reading]. /// Size of the block. -- set to 0 or less to disable. public Connection( TcpClient client, Encoding textEncoding, String newLineSequence, Boolean disableContinuousReading, Int32 blockSize) { // Setup basic properties this.Id = Guid.NewGuid(); this.TextEncoding = textEncoding; // Setup new line sequence if(String.IsNullOrEmpty(newLineSequence)) { throw new ArgumentException("Argument cannot be null", nameof(newLineSequence)); } this._newLineSequence = newLineSequence; this._newLineSequenceBytes = this.TextEncoding.GetBytes(this._newLineSequence); this._newLineSequenceChars = this._newLineSequence.ToCharArray(); this._newLineSequenceLineSplitter = new[] { this._newLineSequence }; // Setup Connection timers this.ConnectionStartTimeUtc = DateTime.UtcNow; this.DataReceivedLastTimeUtc = this.ConnectionStartTimeUtc; this.DataSentLastTimeUtc = this.ConnectionStartTimeUtc; // Setup connection properties this.RemoteClient = client; this.LocalEndPoint = client.Client.LocalEndPoint as IPEndPoint; this.NetworkStream = this.RemoteClient.GetStream(); this.RemoteEndPoint = this.RemoteClient.Client.RemoteEndPoint as IPEndPoint; // Setup buffers this._receiveBuffer = new Byte[this.RemoteClient.ReceiveBufferSize * 2]; this.ProtocolBlockSize = blockSize; this._receiveBufferPointer = 0; // Setup continuous reading mode if enabled if(disableContinuousReading) { return; } #if NETSTANDARD1_3 ThreadPool.QueueUserWorkItem(PerformContinuousReading, this); #else ThreadPool.GetAvailableThreads(out Int32 availableWorkerThreads, out _); ThreadPool.GetMaxThreads(out Int32 maxWorkerThreads, out Int32 _); Int32 activeThreadPoolTreads = maxWorkerThreads - availableWorkerThreads; if(activeThreadPoolTreads < Environment.ProcessorCount / 4) { ThreadPool.QueueUserWorkItem(this.PerformContinuousReading, this); } else { new Thread(this.PerformContinuousReading) { IsBackground = true }.Start(); } #endif } /// /// Initializes a new instance of the class in continuous reading mode. /// It uses UTF8 encoding, CRLF as a new line sequence and disables a protocol block size. /// /// The client. public Connection(TcpClient client) : this(client, Encoding.UTF8, "\r\n", false, 0) { // placeholder } /// /// Initializes a new instance of the class in continuous reading mode. /// It uses UTF8 encoding, disables line sequences, and uses a protocol block size instead. /// /// The client. /// Size of the block. public Connection(TcpClient client, Int32 blockSize) : this(client, Encoding.UTF8, new String('\n', blockSize + 1), false, blockSize) { // placeholder } #region Events /// /// Occurs when the receive buffer has encounters a new line sequence, the buffer is flushed or the buffer is full. /// public event EventHandler DataReceived = (s, e) => { }; /// /// Occurs when an error occurs while upgrading, sending, or receiving data in this client /// public event EventHandler ConnectionFailure = (s, e) => { }; /// /// Occurs when a client is disconnected /// public event EventHandler ClientDisconnected = (s, e) => { }; #endregion #region Properties /// /// Gets the unique identifier of this connection. /// This field is filled out upon instantiation of this class. /// /// /// The identifier. /// public Guid Id { get; } /// /// Gets the active stream. Returns an SSL stream if the connection is secure, otherwise returns /// the underlying NetworkStream. /// /// /// The active stream. /// public Stream ActiveStream => this.SecureStream ?? this.NetworkStream as Stream; /// /// Gets a value indicating whether the current connection stream is an SSL stream. /// /// /// true if this instance is active stream secure; otherwise, false. /// public Boolean IsActiveStreamSecure => this.SecureStream != null; /// /// Gets the text encoding for send and receive operations. /// /// /// The text encoding. /// public Encoding TextEncoding { get; } /// /// Gets the remote end point of this TCP connection. /// /// /// The remote end point. /// public IPEndPoint RemoteEndPoint { get; } /// /// Gets the local end point of this TCP connection. /// /// /// The local end point. /// public IPEndPoint LocalEndPoint { get; } /// /// Gets the remote client of this TCP connection. /// /// /// The remote client. /// public TcpClient RemoteClient { get; private set; } /// /// When in continuous reading mode, and if set to greater than 0, /// a Data reception event will be fired whenever the amount of bytes /// determined by this property has been received. Useful for fixed-length message protocols. /// /// /// The size of the protocol block. /// public Int32 ProtocolBlockSize { get; } /// /// Gets a value indicating whether this connection is in continuous reading mode. /// Remark: Whenever a disconnect event occurs, the background thread is terminated /// and this property will return false whenever the reading thread is not active. /// Therefore, even if continuous reading was not disabled in the constructor, this property /// might return false. /// /// /// true if this instance is continuous reading enabled; otherwise, false. /// public Boolean IsContinuousReadingEnabled => this._continuousReadingThread != null; /// /// Gets the start time at which the connection was started in UTC. /// /// /// The connection start time UTC. /// public DateTime ConnectionStartTimeUtc { get; } /// /// Gets the start time at which the connection was started in local time. /// /// /// The connection start time. /// public DateTime ConnectionStartTime => this.ConnectionStartTimeUtc.ToLocalTime(); /// /// Gets the duration of the connection. /// /// /// The duration of the connection. /// public TimeSpan ConnectionDuration => DateTime.UtcNow.Subtract(this.ConnectionStartTimeUtc); /// /// Gets the last time data was received at in UTC. /// /// /// The data received last time UTC. /// public DateTime DataReceivedLastTimeUtc { get; private set; } /// /// Gets how long has elapsed since data was last received. /// public TimeSpan DataReceivedIdleDuration => DateTime.UtcNow.Subtract(this.DataReceivedLastTimeUtc); /// /// Gets the last time at which data was sent in UTC. /// /// /// The data sent last time UTC. /// public DateTime DataSentLastTimeUtc { get; private set; } /// /// Gets how long has elapsed since data was last sent. /// /// /// The duration of the data sent idle. /// public TimeSpan DataSentIdleDuration => DateTime.UtcNow.Subtract(this.DataSentLastTimeUtc); /// /// Gets a value indicating whether this connection is connected. /// Remarks: This property polls the socket internally and checks if it is available to read data from it. /// If disconnect has been called, then this property will return false. /// /// /// true if this instance is connected; otherwise, false. /// public Boolean IsConnected { get { if(this._disconnectCalls > 0) { return false; } try { Socket socket = this.RemoteClient.Client; Boolean pollResult = !(socket.Poll(1000, SelectMode.SelectRead) && this.NetworkStream.DataAvailable == false || !socket.Connected); if(pollResult == false) { this.Disconnect(); } return pollResult; } catch { this.Disconnect(); return false; } } } private NetworkStream NetworkStream { get; set; } private SslStream SecureStream { get; set; } #endregion #region Read Methods /// /// Reads data from the remote client asynchronously and with the given timeout. /// /// The timeout. /// The cancellation token. /// A byte array containing the results of encoding the specified set of characters. /// Read methods have been disabled because continuous reading is enabled. /// Reading data from {ActiveStream} timed out in {timeout.TotalMilliseconds} m. public async Task ReadDataAsync(TimeSpan timeout, CancellationToken ct = default) { if(this.IsContinuousReadingEnabled) { throw new InvalidOperationException( "Read methods have been disabled because continuous reading is enabled."); } if(this.RemoteClient == null) { throw new InvalidOperationException("An open connection is required"); } Byte[] receiveBuffer = new Byte[this.RemoteClient.ReceiveBufferSize * 2]; List receiveBuilder = new List(receiveBuffer.Length); try { DateTime startTime = DateTime.UtcNow; while(receiveBuilder.Count <= 0) { if(DateTime.UtcNow.Subtract(startTime) >= timeout) { throw new TimeoutException( $"Reading data from {this.ActiveStream} timed out in {timeout.TotalMilliseconds} ms"); } if(this._readTask == null) { this._readTask = this.ActiveStream.ReadAsync(receiveBuffer, 0, receiveBuffer.Length, ct); } if(this._readTask.Wait(this._continuousReadingInterval)) { Int32 bytesReceivedCount = this._readTask.Result; if(bytesReceivedCount > 0) { this.DataReceivedLastTimeUtc = DateTime.UtcNow; Byte[] buffer = new Byte[bytesReceivedCount]; Array.Copy(receiveBuffer, 0, buffer, 0, bytesReceivedCount); receiveBuilder.AddRange(buffer); } this._readTask = null; } else { await Task.Delay(this._continuousReadingInterval, ct).ConfigureAwait(false); } } } catch(Exception ex) { ex.Error(typeof(Connection).FullName, "Error while reading network stream data asynchronously."); throw; } return receiveBuilder.ToArray(); } /// /// Reads data asynchronously from the remote stream with a 5000 millisecond timeout. /// /// The cancellation token. /// A byte array containing the results the specified sequence of bytes. public Task ReadDataAsync(CancellationToken ct = default) => this.ReadDataAsync(TimeSpan.FromSeconds(5), ct); /// /// Asynchronously reads data as text with the given timeout. /// /// The timeout. /// The cancellation token. /// A that contains the results of decoding the specified sequence of bytes. public async Task ReadTextAsync(TimeSpan timeout, CancellationToken ct = default) { Byte[] buffer = await this.ReadDataAsync(timeout, ct).ConfigureAwait(false); return buffer == null ? null : this.TextEncoding.GetString(buffer); } /// /// Asynchronously reads data as text with a 5000 millisecond timeout. /// /// The cancellation token. /// When this method completes successfully, it returns the contents of the file as a text string. public Task ReadTextAsync(CancellationToken ct = default) => this.ReadTextAsync(TimeSpan.FromSeconds(5), ct); /// /// Performs the same task as this method's overload but it defaults to a read timeout of 30 seconds. /// /// The cancellation token. /// /// A task that represents the asynchronous read operation. The value of the TResult parameter /// contains the next line from the stream, or is null if all the characters have been read. /// public Task ReadLineAsync(CancellationToken ct = default) => this.ReadLineAsync(TimeSpan.FromSeconds(30), ct); /// /// Reads the next available line of text in queue. Return null when no text is read. /// This method differs from the rest of the read methods because it keeps an internal /// queue of lines that are read from the stream and only returns the one line next in the queue. /// It is only recommended to use this method when you are working with text-based protocols /// and the rest of the read methods are not called. /// /// The timeout. /// The cancellation token. /// A task with a string line from the queue. /// Read methods have been disabled because continuous reading is enabled. public async Task ReadLineAsync(TimeSpan timeout, CancellationToken ct = default) { if(this.IsContinuousReadingEnabled) { throw new InvalidOperationException( "Read methods have been disabled because continuous reading is enabled."); } if(this._readLineBuffer.Count > 0) { return this._readLineBuffer.Dequeue(); } StringBuilder builder = new StringBuilder(); while(true) { String text = await this.ReadTextAsync(timeout, ct).ConfigureAwait(false); if(text.Length == 0) { break; } _ = builder.Append(text); if(text.EndsWith(this._newLineSequence) == false) { continue; } String[] lines = builder.ToString().TrimEnd(this._newLineSequenceChars) .Split(this._newLineSequenceLineSplitter, StringSplitOptions.None); foreach(String item in lines) { this._readLineBuffer.Enqueue(item); } break; } return this._readLineBuffer.Count > 0 ? this._readLineBuffer.Dequeue() : null; } #endregion #region Write Methods /// /// Writes data asynchronously. /// /// The buffer. /// if set to true [force flush]. /// The cancellation token. /// A task that represents the asynchronous write operation. public async Task WriteDataAsync(Byte[] buffer, Boolean forceFlush, CancellationToken ct = default) { try { _ = this._writeDone.WaitOne(); _ = this._writeDone.Reset(); await this.ActiveStream.WriteAsync(buffer, 0, buffer.Length, ct).ConfigureAwait(false); if(forceFlush) { await this.ActiveStream.FlushAsync(ct).ConfigureAwait(false); } this.DataSentLastTimeUtc = DateTime.UtcNow; } finally { _ = this._writeDone.Set(); } } /// /// Writes text asynchronously. /// /// The text. /// The cancellation token. /// A task that represents the asynchronous write operation. public Task WriteTextAsync(String text, CancellationToken ct = default) => this.WriteTextAsync(text, this.TextEncoding, ct); /// /// Writes text asynchronously. /// /// The text. /// The encoding. /// The cancellation token. /// A task that represents the asynchronous write operation. public Task WriteTextAsync(String text, Encoding encoding, CancellationToken ct = default) => this.WriteDataAsync(encoding.GetBytes(text), true, ct); /// /// Writes a line of text asynchronously. /// The new line sequence is added automatically at the end of the line. /// /// The line. /// The encoding. /// The cancellation token. /// A task that represents the asynchronous write operation. public Task WriteLineAsync(String line, Encoding encoding, CancellationToken ct = default) => this.WriteDataAsync(encoding.GetBytes($"{line}{this._newLineSequence}"), true, ct); /// /// Writes a line of text asynchronously. /// The new line sequence is added automatically at the end of the line. /// /// The line. /// The cancellation token. /// A task that represents the asynchronous write operation. public Task WriteLineAsync(String line, CancellationToken ct = default) => this.WriteLineAsync(line, this.TextEncoding, ct); #endregion #region Socket Methods /// /// Upgrades the active stream to an SSL stream if this connection object is hosted in the server. /// /// The server certificate. /// true if the object is hosted in the server; otherwise, false. public async Task UpgradeToSecureAsServerAsync(X509Certificate2 serverCertificate) { if(this.IsActiveStreamSecure) { return true; } _ = this._writeDone.WaitOne(); SslStream secureStream = null; try { secureStream = new SslStream(this.NetworkStream, true); await secureStream.AuthenticateAsServerAsync(serverCertificate).ConfigureAwait(false); this.SecureStream = secureStream; return true; } catch(Exception ex) { ConnectionFailure(this, new ConnectionFailureEventArgs(ex)); secureStream?.Dispose(); return false; } } /// /// Upgrades the active stream to an SSL stream if this connection object is hosted in the client. /// /// The hostname. /// The callback. /// A tasks with true if the upgrade to SSL was successful; otherwise, false. public async Task UpgradeToSecureAsClientAsync( String hostname = null, RemoteCertificateValidationCallback callback = null) { if(this.IsActiveStreamSecure) { return true; } SslStream secureStream = callback == null ? new SslStream(this.NetworkStream, true) : new SslStream(this.NetworkStream, true, callback); try { await secureStream.AuthenticateAsClientAsync(hostname ?? Network.HostName.ToLowerInvariant()).ConfigureAwait(false); this.SecureStream = secureStream; } catch(Exception ex) { secureStream.Dispose(); ConnectionFailure(this, new ConnectionFailureEventArgs(ex)); return false; } return true; } /// /// Disconnects this connection. /// public void Disconnect() { if(this._disconnectCalls > 0) { return; } this._disconnectCalls++; this._writeDone.WaitOne(); try { ClientDisconnected(this, EventArgs.Empty); } catch { // ignore } try { #if !NET452 RemoteClient.Dispose(); SecureStream?.Dispose(); NetworkStream?.Dispose(); #else this.RemoteClient.Close(); this.SecureStream?.Close(); this.NetworkStream?.Close(); #endif } catch { // ignored } finally { this.NetworkStream = null; this.SecureStream = null; this.RemoteClient = null; this._continuousReadingThread = null; } } #endregion #region Dispose /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// public void Dispose() { if(this._hasDisposed) { return; } // Release managed resources this.Disconnect(); this._continuousReadingThread = null; this._writeDone.Dispose(); this._hasDisposed = true; } #endregion #region Continuous Read Methods /// /// Raises the receive buffer events. /// /// The received data. /// Split function failed! This is terribly wrong. private void RaiseReceiveBufferEvents(Byte[] receivedData) { Boolean moreAvailable = this.RemoteClient.Available > 0; foreach(Byte data in receivedData) { this.ProcessReceivedBlock(data, moreAvailable); } // Check if we are left with some more stuff to handle if(this._receiveBufferPointer <= 0) { return; } // Extract the segments split by newline terminated bytes List sequences = this._receiveBuffer.Skip(0).Take(this._receiveBufferPointer).ToArray() .Split(0, this._newLineSequenceBytes); // Something really wrong happened if(sequences.Count == 0) { throw new InvalidOperationException("Split function failed! This is terribly wrong!"); } // We only have one sequence and it is not newline-terminated // we don't have to do anything. if(sequences.Count == 1 && sequences[0].EndsWith(this._newLineSequenceBytes) == false) { return; } // Process the events for each sequence for(Int32 i = 0; i < sequences.Count; i++) { Byte[] sequenceBytes = sequences[i]; Boolean isNewLineTerminated = sequences[i].EndsWith(this._newLineSequenceBytes); Boolean isLast = i == sequences.Count - 1; if(isNewLineTerminated) { ConnectionDataReceivedEventArgs eventArgs = new ConnectionDataReceivedEventArgs( sequenceBytes, ConnectionDataReceivedTrigger.NewLineSequenceEncountered, isLast == false); DataReceived(this, eventArgs); } // Depending on the last segment determine what to do with the receive buffer if(!isLast) { continue; } if(isNewLineTerminated) { // Simply reset the buffer pointer if the last segment was also terminated this._receiveBufferPointer = 0; } else { // If we have not received the termination sequence, then just shift the receive buffer to the left // and adjust the pointer Array.Copy(sequenceBytes, this._receiveBuffer, sequenceBytes.Length); this._receiveBufferPointer = sequenceBytes.Length; } } } private void ProcessReceivedBlock(Byte data, Boolean moreAvailable) { this._receiveBuffer[this._receiveBufferPointer] = data; this._receiveBufferPointer++; // Block size reached if(this.ProtocolBlockSize > 0 && this._receiveBufferPointer >= this.ProtocolBlockSize) { this.SendBuffer(moreAvailable, ConnectionDataReceivedTrigger.BlockSizeReached); return; } // The receive buffer is full. Time to flush if(this._receiveBufferPointer >= this._receiveBuffer.Length) { this.SendBuffer(moreAvailable, ConnectionDataReceivedTrigger.BufferFull); } } private void SendBuffer(Boolean moreAvailable, ConnectionDataReceivedTrigger trigger) { Byte[] eventBuffer = new Byte[this._receiveBuffer.Length]; Array.Copy(this._receiveBuffer, eventBuffer, eventBuffer.Length); DataReceived(this, new ConnectionDataReceivedEventArgs( eventBuffer, trigger, moreAvailable)); this._receiveBufferPointer = 0; } private void PerformContinuousReading(Object threadContext) { this._continuousReadingThread = Thread.CurrentThread; // Check if the RemoteClient is still there if(this.RemoteClient == null) { return; } Byte[] receiveBuffer = new Byte[this.RemoteClient.ReceiveBufferSize * 2]; while(this.IsConnected && this._disconnectCalls <= 0) { Boolean doThreadSleep = false; try { if(this._readTask == null) { this._readTask = this.ActiveStream.ReadAsync(receiveBuffer, 0, receiveBuffer.Length); } if(this._readTask.Wait(this._continuousReadingInterval)) { Int32 bytesReceivedCount = this._readTask.Result; if(bytesReceivedCount > 0) { this.DataReceivedLastTimeUtc = DateTime.UtcNow; Byte[] buffer = new Byte[bytesReceivedCount]; Array.Copy(receiveBuffer, 0, buffer, 0, bytesReceivedCount); this.RaiseReceiveBufferEvents(buffer); } this._readTask = null; } else { doThreadSleep = this._disconnectCalls <= 0; } } catch(Exception ex) { ex.Log(nameof(Connection), "Continuous Read operation errored"); } finally { if(doThreadSleep) { Thread.Sleep(this._continuousReadingInterval); } } } } #endregion } }