From 021332f0166d4bcf6b2572e39db6d704c9d62c1f Mon Sep 17 00:00:00 2001 From: ChiefDesigner Date: Wed, 14 Oct 2015 19:00:05 +0200 Subject: [PATCH] Reworked network communication due to performance issues and buffer overflow if debug TorEvent is enabled. --- Shem/BaseController.cs | 233 ++++++++++++++++++++-------------- Shem/Replies/Reply.cs | 111 +++++----------- Shem/Sockets/ControlSocket.cs | 93 ++++++++++++-- Shem/TorController.cs | 24 ++-- 4 files changed, 257 insertions(+), 204 deletions(-) diff --git a/Shem/BaseController.cs b/Shem/BaseController.cs index 80d0c60..bd825e7 100644 --- a/Shem/BaseController.cs +++ b/Shem/BaseController.cs @@ -1,12 +1,11 @@ using System; using System.Collections.Generic; using System.Threading; -using System.Threading.Tasks; using Shem.AsyncEvents; using Shem.Commands; using Shem.Replies; using Shem.Sockets; -using Shem.Utils; +using Shem.Exceptions; namespace Shem { @@ -15,38 +14,46 @@ namespace Shem /// If you wanna do things in the right way use the 'TorController' /// class instead. /// - public abstract class BaseController + public abstract class BaseController : IDisposable { + /// + /// Contains type of reply which is currently received. + /// + private bool _receiveData; - protected ControlSocket controlSocket; - - protected int sleep = 10; - - protected uint responseTimeout = 1000; + /// + /// Reply which is currently constrocted from received data. + /// + private Reply _currentReply; - protected Task asyncEventsListener; + /// + /// Socket which receives data from TOR. + /// + private ControlSocket _controlSocket; - protected bool asyncEventsListenerStop = false; + /// + /// FIFO buffer for received regular replies. + /// + private Queue _replyBuffer; - protected bool canListen = false; + /// + /// Eventhandle which signals the receiving of a regular reply. + /// + private EventWaitHandle _replyReceivedHdl; /// - /// The time the library should wait for a reply. + /// The time (in milliseconds) the library should wait for a reply. /// - protected uint ResponseTimeout - { - get { return responseTimeout; } - set { responseTimeout = value; } - } + protected uint ResponseTimeout { get; set; } + /// /// Is the Controller connected to the server. /// protected bool Connected { - get { return controlSocket != null ? controlSocket.Connected : false; } // Null reference exception sucks balls. + get { return _controlSocket != null ? _controlSocket.Connected : false; } // Null reference exception sucks balls. } - /// /// Construct a new TorController, used to control TOR /// @@ -55,119 +62,139 @@ protected bool Connected /// If the controller should connect just after the initialization protected BaseController(string address = "127.0.0.1", uint port = 9051, bool connect = true) { - controlSocket = new ControlSocket(address, port, connect); + _receiveData = false; + ResponseTimeout = 1000; + + _controlSocket = new ControlSocket(address, port, connect); + _controlSocket.OnLineReceived += ControlSocket_OnLineReceived; - canListen = true; + _replyReceivedHdl = new EventWaitHandle(false, EventResetMode.AutoReset); - asyncEventsListener = Task.Run(() => { ListenForAsyncEvents(); }); + _replyBuffer = new Queue(); } - private async void ListenForAsyncEvents() + /// + /// Handle a single line which was received by the ControlSocket. + /// + /// Received line (ending with CRLF) + private void ControlSocket_OnLineReceived(string line) { - while (!asyncEventsListenerStop) + if(_receiveData) { - if (canListen) + if (line.Length > 0) { - lock (controlSocket) + if (line[0] == '.') { - if (controlSocket.ResponseAvailable) + if (line == ".\r\n") + { + // terminating sequence received + // RFC2821: If the line is composed of a single period, it is + // treated as the end of mail indicator. + _receiveData = false; + HandleFinishedReply(); + return; + } + else { - string rawReply = controlSocket.Receive(); - List asyncEvents = new List(); - List replies = Reply.Parse(rawReply); - foreach (var r in replies) - { - try - { - asyncEvents.Add(TorEvent.Parse(r)); - } - catch (Exception ex) - { - Logger.LogError(ex.Message); - } - } - AsyncEventDispatcher(asyncEvents); + // escaped line received + // RFC2821: If the first character is a period and there are + // other characters on the line, the first character + // is deleted + line = line.Substring(1); } } } - await Task.Delay(250); + + _currentReply.RawString += line; + } + else + { + _currentReply = new Reply(line); + + if ((line[3] == ' ') || // EndReply + (line[3] == '-')) // MidReply + { + HandleFinishedReply(); + } + else if (line[3] == '+') // DataReply + { + _receiveData = true; + } + else + { + throw new NullReplyCodeException(line, 3); + } } } /// - /// Send a command and returns the reply as a raw string + /// Handle a completely received reply. /// - /// The command to be sent - /// Returns the raw string replied by the server - public virtual string SendRawCommand(TCCommand command) + private void HandleFinishedReply() { - string rawReply = ""; - - canListen = false; - - lock (controlSocket) + if (((int)_currentReply.Code >= 600) && ((int)_currentReply.Code < 700)) { - //Send the command - controlSocket.Send(command.Raw()); - - //Wait for response - int timeout = (int)ResponseTimeout / sleep; - int i = 1; - while (!controlSocket.ResponseAvailable && i < timeout) + // reply is async if first number in reply code is 6 + AsyncEventDispatcher(TorEvent.Parse(_currentReply)); + } + else + { + // all other replies are defined as synchronous + lock (_replyBuffer) { - Thread.Sleep(sleep); - i++; + _replyBuffer.Enqueue(_currentReply); } - //Read Response - rawReply = controlSocket.Receive(); + _replyReceivedHdl.Set(); } - - canListen = true; - - return rawReply; } /// - /// + /// Send a command and returns all replies /// - /// - /// + /// The command to be sent + /// List of all received replies public virtual List SendCommand(TCCommand command) { - List replies = Reply.Parse(SendRawCommand(command)); - List asyncEventsReply = new List(); - List asyncEvents = new List(); + _controlSocket.Send(command.Raw()); + + List replies = new List(); - foreach (var r in replies) + // receive replies until we get an end reply + for (;;) { - if (r.Code == ReplyCodes.ASYNC_EVENT_NOTIFICATION) + lock (_replyBuffer) { - asyncEventsReply.Add(r); + while (_replyBuffer.Count > 0) + { + Reply reply = _replyBuffer.Dequeue(); + replies.Add(reply); + if (reply.RawString[3] == ' ') + { + // end reply received --> finished + return replies; + } + } + } +#if DEBUG + if (!_replyReceivedHdl.WaitOne()) +#else + if(!_replyReceivedHdl.WaitOne((int)ResponseTimeout)) +#endif + { + throw new TimeoutException("Timeout while receiving replies"); } } - - foreach (var e in asyncEventsReply) - { - replies.Remove(e); - asyncEvents.Add(TorEvent.Parse(e)); - } - - AsyncEventDispatcher(asyncEvents); - - return replies; } protected abstract void AsyncEventDispatcher(TorEvent asyncEvent); - protected abstract void AsyncEventDispatcher(List asyncEvents); - /// /// Connect to the control port. /// protected void Connect() { - controlSocket.Connect(); + _controlSocket.Connect(); } /// @@ -178,20 +205,32 @@ public virtual void Close() { if (Connected) { + SendCommand(new Quit()); + _controlSocket.Close(); + } + } - SendRawCommand(new Quit()); - asyncEventsListenerStop = true; - if (asyncEventsListener.Exception == null) - { - asyncEventsListener.Wait(); - } - controlSocket.Close(); + public void Dispose(bool disposing) + { + Close(); + + if (disposing) + { + _controlSocket.Dispose(); + _controlSocket = null; + _replyReceivedHdl.Dispose(); + _replyReceivedHdl = null; } } + public void Dispose() + { + Dispose(true); + } + ~BaseController() { - Close(); + Dispose(false); } } } \ No newline at end of file diff --git a/Shem/Replies/Reply.cs b/Shem/Replies/Reply.cs index e69513b..6adc6c6 100644 --- a/Shem/Replies/Reply.cs +++ b/Shem/Replies/Reply.cs @@ -1,6 +1,4 @@ using System; -using System.Collections.Generic; -using System.Text; using Shem.Exceptions; using Shem.Utils; @@ -31,95 +29,50 @@ protected set /// /// The actual reply from the server /// - public virtual string ReplyLine { get; protected set; } + public virtual string ReplyLine + { + get + { + if(RawString.Length >= 6) + { + // remove 4 for chars (reply code) + return RawString.Substring(4, RawString.Length - 6); + } + return ""; + } + } /// /// Returns the raw string replied by TOR /// - /// The raw string replied by TOR - public virtual string RawString { get; protected set; } - - protected Reply(ReplyCodes code, string replyline, string raw) - { - this.ReplyLine = replyline; - this.RawString = raw; - this.Code = code; - } - - protected Reply(Reply reply) - { - this.ReplyLine = reply.ReplyLine; - this.RawString = reply.RawString; - this.Code = reply.Code; - } + public virtual string RawString { get; internal set; } /// - /// Parse a reply from the tor deamon + /// Create a reply from the raw string replied by TOR /// - /// The string the server replied - /// A list of replies (IT COULD BE EMPTY) - public static List Parse(string rawstring) - { - List replies = new List(); - - rparse(rawstring, 0, ref replies); - - return replies; - } - - // i is the position, named i only cause is shorter to write. - private static void rparse(string rawstring, int i, ref List current) + /// Thrown when raw string is to short or does + /// not contain a reply code at the beginning. + /// Raw string replied by TOR + internal Reply(string rawstring) { int code; - int j; - bool multiline; - StringBuilder replyline; - - j = i; // used for substring the rawstring - replyline = new StringBuilder(); - - if ((rawstring.Length < i + 3) || // the relative string is < 3 chars - (!int.TryParse(rawstring.Substring(i, 3), out code))) // or the first 3 chars are NOT a 3 digit number - { - throw new NullReplyCodeException(rawstring, i); - } - - multiline = rawstring[i + 3] == '+'; // it is a multiline (multiline ends with ".\r\n"); - i += 4; // we computed the code and the multiline, go ahead BOY! - - while (i < rawstring.Length) // while we are on the right way + if ((rawstring.Length < 4) || // 4 chars need for replycode + reply type + (!int.TryParse(rawstring.Substring(0, 3), out code))) // or the first 3 chars are NOT a 3 digit number { - bool end; - - end = (multiline && rawstring.Length > i+4 && rawstring.Substring(i, 5) == "\r\n.\r\n") || // we are at the end of a multiline reply - (!multiline && rawstring.Length > i+1 && rawstring.Substring(i,2) == "\r\n"); // we are at the end of a singleline reply - - if (end) - { - i += multiline ? 5 : 2; - - // Add the reply to the return collection - current.Add(new Reply((ReplyCodes)code, - replyline.ToString(), - rawstring.Substring(j, i - j))); - - if (i == rawstring.Length) - { - return; // we are at the end of the string (if it is a multi response one) - } - else - { - rparse(rawstring, i, ref current); // We are not at the end of the string (if it is a multi response one) - return; - } - } - - replyline.Append(rawstring[i]); - i++; + throw new NullReplyCodeException(rawstring, 0); } + this.Code = (ReplyCodes)code; + this.RawString = rawstring; + } - // THIS SHOULD NEVER EVER EVER HAPPEN - Logger.LogWarn(String.Format("Reply not parsed well, reached the end of the recursive funciton: \"{0}\".", rawstring)); + /// + /// Create a reply as copy of another. + /// + /// Reply to copy from. + protected Reply(Reply reply) + { + this.RawString = reply.RawString; + this.Code = reply.Code; } } } diff --git a/Shem/Sockets/ControlSocket.cs b/Shem/Sockets/ControlSocket.cs index 5a7b523..b82a0f6 100644 --- a/Shem/Sockets/ControlSocket.cs +++ b/Shem/Sockets/ControlSocket.cs @@ -1,4 +1,5 @@ -using System.Net; +using System; +using System.Net; using System.Net.Sockets; using System.Text; using Shem.Utils; @@ -9,9 +10,13 @@ namespace Shem.Sockets /// /// This class manage to communicate with TOR, low level way. /// - public class ControlSocket + public class ControlSocket : IDisposable { private Socket _socket; + private bool _disposed = false; + private NetworkStream _ns; + private byte[] _rcvBuffer = new byte[8192]; + public event Action OnLineReceived; public IPAddress Address { get; private set; } public uint Port { get; private set; } public bool Connected @@ -37,7 +42,9 @@ public ControlSocket(string address = "127.0.0.1", uint port = 9051, bool connec { IPAddress[] tmp = Dns.GetHostAddresses(address); if (tmp.Length < 1) + { throw new ServerNotFoundException(); + } _addr = tmp[0]; } this.Address = _addr; @@ -45,7 +52,9 @@ public ControlSocket(string address = "127.0.0.1", uint port = 9051, bool connec _socket = new Socket(SocketType.Stream, ProtocolType.Tcp); if (connect) + { this.Connect(); + } } /// @@ -55,6 +64,8 @@ public void Connect() { Logger.LogDebug(string.Format("Connecting to the server \"{0}:{1}\".", this.Address, this.Port)); _socket.Connect(Address, (int)Port); + _ns = new NetworkStream(_socket); + _ns.BeginRead(_rcvBuffer, 0, _rcvBuffer.Length, AsyncRcvCallback, ""); } /// @@ -65,6 +76,8 @@ public void Close() if (Connected) { Logger.LogDebug(string.Format("Closing the connection to \"{0}:{1}\".", this.Address, this.Port)); + _ns.Dispose(); + _ns = null; _socket.Close(); } } @@ -76,17 +89,74 @@ public void Close() public void Send(string message) { Logger.LogDebug(string.Format("Sent message to the server: \"{0}\".", message.Replace("\r\n", "\\r\\n"))); - _socket.Send(Encoding.ASCII.GetBytes(message)); + byte[] asciiData = Encoding.ASCII.GetBytes(message); + _ns.Write(asciiData, 0, asciiData.Length); + } + + /// + /// Callback for asynchronos read from networkstream. + /// + /// Async handle + private void AsyncRcvCallback(IAsyncResult ar) + { + try + { + int bytes = _ns.EndRead(ar); + string rcvBufferStr = ar.AsyncState as string; + + if(_disposed) + { + return; + } + + if (bytes > 0) + { + rcvBufferStr += Encoding.ASCII.GetString(_rcvBuffer, 0, bytes); + + int offset = 0; + for (int start = rcvBufferStr.IndexOf("\r\n", offset); start > 0; start = rcvBufferStr.IndexOf("\r\n", offset)) + { + string line = rcvBufferStr.Substring(offset, start - offset + 2); + if (OnLineReceived != null) + { + OnLineReceived.Invoke(line); + } + offset = start + 2; // +2 for CRLF + } + rcvBufferStr = rcvBufferStr.Substring(offset); + + _ns.BeginRead(_rcvBuffer, 0, _rcvBuffer.Length, AsyncRcvCallback, rcvBufferStr); + } + } + catch (ObjectDisposedException) + { + return; + } } - public string Receive() + public void Dispose(bool disposing) { - byte[] buffer = new byte[_socket.Available]; - string reply; - _socket.Receive(buffer); - reply = Encoding.ASCII.GetString(buffer); - Logger.LogDebug(string.Format("Received a reply from the server: \"{0}\".", reply.Replace("\r\n", "\\r\\n"))); - return reply; + _disposed = true; + Close(); + + if (disposing) + { + if(_ns != null) + { + _ns.Dispose(); + _ns = null; + } + if(_socket != null) + { + _socket.Dispose(); + _socket = null; + } + } + } + + public void Dispose() + { + Dispose(true); } //private string Format(string message) @@ -112,8 +182,7 @@ public string Receive() ~ControlSocket() { - if (Connected) - this.Close(); + Dispose(false); } } } diff --git a/Shem/TorController.cs b/Shem/TorController.cs index c6aa895..2b7528b 100644 --- a/Shem/TorController.cs +++ b/Shem/TorController.cs @@ -52,7 +52,7 @@ public TorController(string address = "127.0.0.1", uint port = 9051, bool connec /// True in case of successful authentication, false in every other case. public bool Authenticate(string password = "") { - if (!this.controlSocket.Connected) + if (!this.Connected) throw new SocketException(); AuthenticateReply reply; @@ -132,29 +132,21 @@ private void Resolve_Event(TorEvent obj) // TODO: wait tykonket to implement the ADDRMAP event. } - protected override void AsyncEventDispatcher(List asyncEvents) + protected override void AsyncEventDispatcher(TorEvent asyncEvent) { Task.Run(() => { - foreach (var e in asyncEvents) + if (OnAsyncEvents != null) + { + OnAsyncEvents.Invoke(asyncEvent); + } + if (OnAsyncEvent.ContainsKey(asyncEvent.Event)) { - if (OnAsyncEvents != null) - { - OnAsyncEvents.Invoke(e); - } - if (OnAsyncEvent.ContainsKey(e.Event)) - { - OnAsyncEvent[e.Event].Dispatch(e); - } + OnAsyncEvent[asyncEvent.Event].Dispatch(asyncEvent); } }); } - protected override void AsyncEventDispatcher(TorEvent asyncEvent) - { - AsyncEventDispatcher(new List() { asyncEvent }); - } - /// /// Close the connection with the control port. ///