namespace Unosquare.Swan.Networking { using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Net; using System.Net.Security; using System.Net.Sockets; using System.Security.Cryptography.X509Certificates; using System.Text; using System.Threading; using System.Threading.Tasks; /// /// Represents a network connection either on the server or on the client. It wraps a TcpClient /// and its corresponding network streams. It is capable of working in 2 modes. Typically on the server side /// you will need to enable continuous reading and events. On the client side you may want to disable continuous reading /// and use the Read methods available. In continuous reading mode Read methods are not available and will throw /// an invalid operation exceptions if they are used. /// Continuous Reading Mode: Subscribe to data reception events, it runs a background thread, don't use Read methods /// Manual Reading Mode: Data reception events are NEVER fired. No background threads are used. Use Read methods to receive data. /// /// /// /// The following code explains how to create a TCP server. /// /// using System.Text; /// using Unosquare.Swan.Networking; /// /// class Example /// { /// static void Main() /// { /// // create a new connection listener on a specific port /// var connectionListener = new ConnectionListener(1337); /// /// // handle the OnConnectionAccepting event /// connectionListener.OnConnectionAccepted += (s, e) => /// { /// // create a new connection /// using (var con = new Connection(e.Client)) /// { /// con.WriteLineAsync("Hello world!").Wait(); /// } /// }; /// /// connectionListener.Start(); /// } /// } /// /// The following code describes how to create a TCP client. /// /// using System.Net.Sockets; /// using System.Text; /// using System.Threading.Tasks; /// using Unosquare.Swan.Networking; /// /// class Example /// { /// static async Task Main() /// { /// // create a new TcpClient object /// var client = new TcpClient(); /// /// // connect to a specific address and port /// client.Connect("localhost", 1337); /// /// //create a new connection with specific encoding, /// //new line sequence and continuous reading disabled /// using (var cn = new Connection(client, Encoding.UTF8, "\r\n", true, 0)) /// { /// var response = await cn.ReadTextAsync(); /// } /// } /// } /// /// public sealed class Connection : IDisposable { // New Line definitions for reading. This applies to both, events and read methods private readonly string _newLineSequence; private readonly byte[] _newLineSequenceBytes; private readonly char[] _newLineSequenceChars; private readonly string[] _newLineSequenceLineSplitter; private readonly byte[] _receiveBuffer; private readonly TimeSpan _continuousReadingInterval = TimeSpan.FromMilliseconds(5); private readonly Queue _readLineBuffer = new Queue(); private readonly ManualResetEvent _writeDone = new ManualResetEvent(true); // Disconnect and Dispose private bool _hasDisposed; private int _disconnectCalls; // Continuous Reading private Thread _continuousReadingThread; private int _receiveBufferPointer; // Reading and writing private Task _readTask; /// /// Initializes a new instance of the class. /// /// The client. /// The text encoding. /// The new line sequence used for read and write operations. /// if set to true [disable continuous reading]. /// Size of the block. -- set to 0 or less to disable. public Connection( TcpClient client, Encoding textEncoding, string newLineSequence, bool disableContinuousReading, int blockSize) { // Setup basic properties Id = Guid.NewGuid(); TextEncoding = textEncoding; // Setup new line sequence if (string.IsNullOrEmpty(newLineSequence)) throw new ArgumentException("Argument cannot be null", nameof(newLineSequence)); _newLineSequence = newLineSequence; _newLineSequenceBytes = TextEncoding.GetBytes(_newLineSequence); _newLineSequenceChars = _newLineSequence.ToCharArray(); _newLineSequenceLineSplitter = new[] { _newLineSequence }; // Setup Connection timers ConnectionStartTimeUtc = DateTime.UtcNow; DataReceivedLastTimeUtc = ConnectionStartTimeUtc; DataSentLastTimeUtc = ConnectionStartTimeUtc; // Setup connection properties RemoteClient = client; LocalEndPoint = client.Client.LocalEndPoint as IPEndPoint; NetworkStream = RemoteClient.GetStream(); RemoteEndPoint = RemoteClient.Client.RemoteEndPoint as IPEndPoint; // Setup buffers _receiveBuffer = new byte[RemoteClient.ReceiveBufferSize * 2]; ProtocolBlockSize = blockSize; _receiveBufferPointer = 0; // Setup continuous reading mode if enabled if (disableContinuousReading) return; #if NETSTANDARD1_3 ThreadPool.QueueUserWorkItem(PerformContinuousReading, this); #else ThreadPool.GetAvailableThreads(out var availableWorkerThreads, out _); ThreadPool.GetMaxThreads(out var maxWorkerThreads, out var _); var activeThreadPoolTreads = maxWorkerThreads - availableWorkerThreads; if (activeThreadPoolTreads < Environment.ProcessorCount / 4) { ThreadPool.QueueUserWorkItem(PerformContinuousReading, this); } else { new Thread(PerformContinuousReading) { IsBackground = true }.Start(); } #endif } /// /// Initializes a new instance of the class in continuous reading mode. /// It uses UTF8 encoding, CRLF as a new line sequence and disables a protocol block size. /// /// The client. public Connection(TcpClient client) : this(client, Encoding.UTF8, "\r\n", false, 0) { // placeholder } /// /// Initializes a new instance of the class in continuous reading mode. /// It uses UTF8 encoding, disables line sequences, and uses a protocol block size instead. /// /// The client. /// Size of the block. public Connection(TcpClient client, int blockSize) : this(client, Encoding.UTF8, new string('\n', blockSize + 1), false, blockSize) { // placeholder } #region Events /// /// Occurs when the receive buffer has encounters a new line sequence, the buffer is flushed or the buffer is full. /// public event EventHandler DataReceived = (s, e) => { }; /// /// Occurs when an error occurs while upgrading, sending, or receiving data in this client /// public event EventHandler ConnectionFailure = (s, e) => { }; /// /// Occurs when a client is disconnected /// public event EventHandler ClientDisconnected = (s, e) => { }; #endregion #region Properties /// /// Gets the unique identifier of this connection. /// This field is filled out upon instantiation of this class. /// /// /// The identifier. /// public Guid Id { get; } /// /// Gets the active stream. Returns an SSL stream if the connection is secure, otherwise returns /// the underlying NetworkStream. /// /// /// The active stream. /// public Stream ActiveStream => SecureStream ?? NetworkStream as Stream; /// /// Gets a value indicating whether the current connection stream is an SSL stream. /// /// /// true if this instance is active stream secure; otherwise, false. /// public bool IsActiveStreamSecure => SecureStream != null; /// /// Gets the text encoding for send and receive operations. /// /// /// The text encoding. /// public Encoding TextEncoding { get; } /// /// Gets the remote end point of this TCP connection. /// /// /// The remote end point. /// public IPEndPoint RemoteEndPoint { get; } /// /// Gets the local end point of this TCP connection. /// /// /// The local end point. /// public IPEndPoint LocalEndPoint { get; } /// /// Gets the remote client of this TCP connection. /// /// /// The remote client. /// public TcpClient RemoteClient { get; private set; } /// /// When in continuous reading mode, and if set to greater than 0, /// a Data reception event will be fired whenever the amount of bytes /// determined by this property has been received. Useful for fixed-length message protocols. /// /// /// The size of the protocol block. /// public int ProtocolBlockSize { get; } /// /// Gets a value indicating whether this connection is in continuous reading mode. /// Remark: Whenever a disconnect event occurs, the background thread is terminated /// and this property will return false whenever the reading thread is not active. /// Therefore, even if continuous reading was not disabled in the constructor, this property /// might return false. /// /// /// true if this instance is continuous reading enabled; otherwise, false. /// public bool IsContinuousReadingEnabled => _continuousReadingThread != null; /// /// Gets the start time at which the connection was started in UTC. /// /// /// The connection start time UTC. /// public DateTime ConnectionStartTimeUtc { get; } /// /// Gets the start time at which the connection was started in local time. /// /// /// The connection start time. /// public DateTime ConnectionStartTime => ConnectionStartTimeUtc.ToLocalTime(); /// /// Gets the duration of the connection. /// /// /// The duration of the connection. /// public TimeSpan ConnectionDuration => DateTime.UtcNow.Subtract(ConnectionStartTimeUtc); /// /// Gets the last time data was received at in UTC. /// /// /// The data received last time UTC. /// public DateTime DataReceivedLastTimeUtc { get; private set; } /// /// Gets how long has elapsed since data was last received. /// public TimeSpan DataReceivedIdleDuration => DateTime.UtcNow.Subtract(DataReceivedLastTimeUtc); /// /// Gets the last time at which data was sent in UTC. /// /// /// The data sent last time UTC. /// public DateTime DataSentLastTimeUtc { get; private set; } /// /// Gets how long has elapsed since data was last sent. /// /// /// The duration of the data sent idle. /// public TimeSpan DataSentIdleDuration => DateTime.UtcNow.Subtract(DataSentLastTimeUtc); /// /// Gets a value indicating whether this connection is connected. /// Remarks: This property polls the socket internally and checks if it is available to read data from it. /// If disconnect has been called, then this property will return false. /// /// /// true if this instance is connected; otherwise, false. /// public bool IsConnected { get { if (_disconnectCalls > 0) return false; try { var socket = RemoteClient.Client; var pollResult = !((socket.Poll(1000, SelectMode.SelectRead) && (NetworkStream.DataAvailable == false)) || !socket.Connected); if (pollResult == false) Disconnect(); return pollResult; } catch { Disconnect(); return false; } } } private NetworkStream NetworkStream { get; set; } private SslStream SecureStream { get; set; } #endregion #region Read Methods /// /// Reads data from the remote client asynchronously and with the given timeout. /// /// The timeout. /// The cancellation token. /// A byte array containing the results of encoding the specified set of characters. /// Read methods have been disabled because continuous reading is enabled. /// Reading data from {ActiveStream} timed out in {timeout.TotalMilliseconds} m. public async Task ReadDataAsync(TimeSpan timeout, CancellationToken ct = default) { if (IsContinuousReadingEnabled) { throw new InvalidOperationException( "Read methods have been disabled because continuous reading is enabled."); } if (RemoteClient == null) { throw new InvalidOperationException("An open connection is required"); } var receiveBuffer = new byte[RemoteClient.ReceiveBufferSize * 2]; var receiveBuilder = new List(receiveBuffer.Length); try { var startTime = DateTime.UtcNow; while (receiveBuilder.Count <= 0) { if (DateTime.UtcNow.Subtract(startTime) >= timeout) { throw new TimeoutException( $"Reading data from {ActiveStream} timed out in {timeout.TotalMilliseconds} ms"); } if (_readTask == null) _readTask = ActiveStream.ReadAsync(receiveBuffer, 0, receiveBuffer.Length, ct); if (_readTask.Wait(_continuousReadingInterval)) { var bytesReceivedCount = _readTask.Result; if (bytesReceivedCount > 0) { DataReceivedLastTimeUtc = DateTime.UtcNow; var buffer = new byte[bytesReceivedCount]; Array.Copy(receiveBuffer, 0, buffer, 0, bytesReceivedCount); receiveBuilder.AddRange(buffer); } _readTask = null; } else { await Task.Delay(_continuousReadingInterval, ct).ConfigureAwait(false); } } } catch (Exception ex) { ex.Error(typeof(Connection).FullName, "Error while reading network stream data asynchronously."); throw; } return receiveBuilder.ToArray(); } /// /// Reads data asynchronously from the remote stream with a 5000 millisecond timeout. /// /// The cancellation token. /// A byte array containing the results the specified sequence of bytes. public Task ReadDataAsync(CancellationToken ct = default) => ReadDataAsync(TimeSpan.FromSeconds(5), ct); /// /// Asynchronously reads data as text with the given timeout. /// /// The timeout. /// The cancellation token. /// A that contains the results of decoding the specified sequence of bytes. public async Task ReadTextAsync(TimeSpan timeout, CancellationToken ct = default) { var buffer = await ReadDataAsync(timeout, ct).ConfigureAwait(false); return buffer == null ? null : TextEncoding.GetString(buffer); } /// /// Asynchronously reads data as text with a 5000 millisecond timeout. /// /// The cancellation token. /// When this method completes successfully, it returns the contents of the file as a text string. public Task ReadTextAsync(CancellationToken ct = default) => ReadTextAsync(TimeSpan.FromSeconds(5), ct); /// /// Performs the same task as this method's overload but it defaults to a read timeout of 30 seconds. /// /// The cancellation token. /// /// A task that represents the asynchronous read operation. The value of the TResult parameter /// contains the next line from the stream, or is null if all the characters have been read. /// public Task ReadLineAsync(CancellationToken ct = default) => ReadLineAsync(TimeSpan.FromSeconds(30), ct); /// /// Reads the next available line of text in queue. Return null when no text is read. /// This method differs from the rest of the read methods because it keeps an internal /// queue of lines that are read from the stream and only returns the one line next in the queue. /// It is only recommended to use this method when you are working with text-based protocols /// and the rest of the read methods are not called. /// /// The timeout. /// The cancellation token. /// A task with a string line from the queue. /// Read methods have been disabled because continuous reading is enabled. public async Task ReadLineAsync(TimeSpan timeout, CancellationToken ct = default) { if (IsContinuousReadingEnabled) { throw new InvalidOperationException( "Read methods have been disabled because continuous reading is enabled."); } if (_readLineBuffer.Count > 0) return _readLineBuffer.Dequeue(); var builder = new StringBuilder(); while (true) { var text = await ReadTextAsync(timeout, ct).ConfigureAwait(false); if (text.Length == 0) break; builder.Append(text); if (text.EndsWith(_newLineSequence) == false) continue; var lines = builder.ToString().TrimEnd(_newLineSequenceChars) .Split(_newLineSequenceLineSplitter, StringSplitOptions.None); foreach (var item in lines) _readLineBuffer.Enqueue(item); break; } return _readLineBuffer.Count > 0 ? _readLineBuffer.Dequeue() : null; } #endregion #region Write Methods /// /// Writes data asynchronously. /// /// The buffer. /// if set to true [force flush]. /// The cancellation token. /// A task that represents the asynchronous write operation. public async Task WriteDataAsync(byte[] buffer, bool forceFlush, CancellationToken ct = default) { try { _writeDone.WaitOne(); _writeDone.Reset(); await ActiveStream.WriteAsync(buffer, 0, buffer.Length, ct).ConfigureAwait(false); if (forceFlush) await ActiveStream.FlushAsync(ct).ConfigureAwait(false); DataSentLastTimeUtc = DateTime.UtcNow; } finally { _writeDone.Set(); } } /// /// Writes text asynchronously. /// /// The text. /// The cancellation token. /// A task that represents the asynchronous write operation. public Task WriteTextAsync(string text, CancellationToken ct = default) => WriteTextAsync(text, TextEncoding, ct); /// /// Writes text asynchronously. /// /// The text. /// The encoding. /// The cancellation token. /// A task that represents the asynchronous write operation. public Task WriteTextAsync(string text, Encoding encoding, CancellationToken ct = default) => WriteDataAsync(encoding.GetBytes(text), true, ct); /// /// Writes a line of text asynchronously. /// The new line sequence is added automatically at the end of the line. /// /// The line. /// The encoding. /// The cancellation token. /// A task that represents the asynchronous write operation. public Task WriteLineAsync(string line, Encoding encoding, CancellationToken ct = default) => WriteDataAsync(encoding.GetBytes($"{line}{_newLineSequence}"), true, ct); /// /// Writes a line of text asynchronously. /// The new line sequence is added automatically at the end of the line. /// /// The line. /// The cancellation token. /// A task that represents the asynchronous write operation. public Task WriteLineAsync(string line, CancellationToken ct = default) => WriteLineAsync(line, TextEncoding, ct); #endregion #region Socket Methods /// /// Upgrades the active stream to an SSL stream if this connection object is hosted in the server. /// /// The server certificate. /// true if the object is hosted in the server; otherwise, false. public async Task UpgradeToSecureAsServerAsync(X509Certificate2 serverCertificate) { if (IsActiveStreamSecure) return true; _writeDone.WaitOne(); SslStream secureStream = null; try { secureStream = new SslStream(NetworkStream, true); await secureStream.AuthenticateAsServerAsync(serverCertificate).ConfigureAwait(false); SecureStream = secureStream; return true; } catch (Exception ex) { ConnectionFailure(this, new ConnectionFailureEventArgs(ex)); secureStream?.Dispose(); return false; } } /// /// Upgrades the active stream to an SSL stream if this connection object is hosted in the client. /// /// The hostname. /// The callback. /// A tasks with true if the upgrade to SSL was successful; otherwise, false. public async Task UpgradeToSecureAsClientAsync( string hostname = null, RemoteCertificateValidationCallback callback = null) { if (IsActiveStreamSecure) return true; var secureStream = callback == null ? new SslStream(NetworkStream, true) : new SslStream(NetworkStream, true, callback); try { await secureStream.AuthenticateAsClientAsync(hostname ?? Network.HostName.ToLowerInvariant()).ConfigureAwait(false); SecureStream = secureStream; } catch (Exception ex) { secureStream.Dispose(); ConnectionFailure(this, new ConnectionFailureEventArgs(ex)); return false; } return true; } /// /// Disconnects this connection. /// public void Disconnect() { if (_disconnectCalls > 0) return; _disconnectCalls++; _writeDone.WaitOne(); try { ClientDisconnected(this, EventArgs.Empty); } catch { // ignore } try { #if !NET452 RemoteClient.Dispose(); SecureStream?.Dispose(); NetworkStream?.Dispose(); #else RemoteClient.Close(); SecureStream?.Close(); NetworkStream?.Close(); #endif } catch { // ignored } finally { NetworkStream = null; SecureStream = null; RemoteClient = null; _continuousReadingThread = null; } } #endregion #region Dispose /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// public void Dispose() { if (_hasDisposed) return; // Release managed resources Disconnect(); _continuousReadingThread = null; _writeDone.Dispose(); _hasDisposed = true; } #endregion #region Continuous Read Methods /// /// Raises the receive buffer events. /// /// The received data. /// Split function failed! This is terribly wrong. private void RaiseReceiveBufferEvents(byte[] receivedData) { var moreAvailable = RemoteClient.Available > 0; foreach (var data in receivedData) { ProcessReceivedBlock(data, moreAvailable); } // Check if we are left with some more stuff to handle if (_receiveBufferPointer <= 0) return; // Extract the segments split by newline terminated bytes var sequences = _receiveBuffer.Skip(0).Take(_receiveBufferPointer).ToArray() .Split(0, _newLineSequenceBytes); // Something really wrong happened if (sequences.Count == 0) throw new InvalidOperationException("Split function failed! This is terribly wrong!"); // We only have one sequence and it is not newline-terminated // we don't have to do anything. if (sequences.Count == 1 && sequences[0].EndsWith(_newLineSequenceBytes) == false) return; // Process the events for each sequence for (var i = 0; i < sequences.Count; i++) { var sequenceBytes = sequences[i]; var isNewLineTerminated = sequences[i].EndsWith(_newLineSequenceBytes); var isLast = i == sequences.Count - 1; if (isNewLineTerminated) { var eventArgs = new ConnectionDataReceivedEventArgs( sequenceBytes, ConnectionDataReceivedTrigger.NewLineSequenceEncountered, isLast == false); DataReceived(this, eventArgs); } // Depending on the last segment determine what to do with the receive buffer if (!isLast) continue; if (isNewLineTerminated) { // Simply reset the buffer pointer if the last segment was also terminated _receiveBufferPointer = 0; } else { // If we have not received the termination sequence, then just shift the receive buffer to the left // and adjust the pointer Array.Copy(sequenceBytes, _receiveBuffer, sequenceBytes.Length); _receiveBufferPointer = sequenceBytes.Length; } } } private void ProcessReceivedBlock(byte data, bool moreAvailable) { _receiveBuffer[_receiveBufferPointer] = data; _receiveBufferPointer++; // Block size reached if (ProtocolBlockSize > 0 && _receiveBufferPointer >= ProtocolBlockSize) { SendBuffer(moreAvailable, ConnectionDataReceivedTrigger.BlockSizeReached); return; } // The receive buffer is full. Time to flush if (_receiveBufferPointer >= _receiveBuffer.Length) { SendBuffer(moreAvailable, ConnectionDataReceivedTrigger.BufferFull); } } private void SendBuffer(bool moreAvailable, ConnectionDataReceivedTrigger trigger) { var eventBuffer = new byte[_receiveBuffer.Length]; Array.Copy(_receiveBuffer, eventBuffer, eventBuffer.Length); DataReceived(this, new ConnectionDataReceivedEventArgs( eventBuffer, trigger, moreAvailable)); _receiveBufferPointer = 0; } private void PerformContinuousReading(object threadContext) { _continuousReadingThread = Thread.CurrentThread; // Check if the RemoteClient is still there if (RemoteClient == null) return; var receiveBuffer = new byte[RemoteClient.ReceiveBufferSize * 2]; while (IsConnected && _disconnectCalls <= 0) { var doThreadSleep = false; try { if (_readTask == null) _readTask = ActiveStream.ReadAsync(receiveBuffer, 0, receiveBuffer.Length); if (_readTask.Wait(_continuousReadingInterval)) { var bytesReceivedCount = _readTask.Result; if (bytesReceivedCount > 0) { DataReceivedLastTimeUtc = DateTime.UtcNow; var buffer = new byte[bytesReceivedCount]; Array.Copy(receiveBuffer, 0, buffer, 0, bytesReceivedCount); RaiseReceiveBufferEvents(buffer); } _readTask = null; } else { doThreadSleep = _disconnectCalls <= 0; } } catch (Exception ex) { ex.Log(nameof(Connection), "Continuous Read operation errored"); } finally { if (doThreadSleep) Thread.Sleep(_continuousReadingInterval); } } } #endregion } }