using System; using System.Collections.Generic; using System.Net; using System.Net.Sockets; using System.Net.NetworkInformation; /* An Asker instance is used to send requests to remote servers. Each instance has a set of re-useable Sockets ( derived from Sock class ). The messages are labelled with ID values, which are carefully managed. Responses are reported by calling SimpleQuery.ProcessResponse. */ abstract class Asker { public abstract void Ask( SimpleQuery Sq, NameServer NS ); public abstract void CleanUp(); // Free inactive sockets } class Sock { public NameServer NS; public long LastActive; // Based on Cache.Clock public Socket Socket; public override int GetHashCode(){ return Data.GetHashCode( NS.IP ); } public bool IPv6 { get { return NS.IP.Length > 4; } } public EndPoint GetEndPoint( int Port ) { return new IPEndPoint( IPv6 ? new IPAddress( NS.IP, NS.Scope ) : new IPAddress( NS.IP ), Port ); } public EndPoint GetAnyEndPoint( bool RandomPort ) { int Port = RandomPort ? 10000 + Random.Number(50000) : 0; return new IPEndPoint( IPv6 ? IPAddress.IPv6Any : IPAddress.Any, Port); } public Socket GetSocket( bool Tcp ) { return new Socket ( IPv6 ? AddressFamily.InterNetworkV6 : AddressFamily.InterNetwork, Tcp ? SocketType.Stream : SocketType.Dgram, Tcp ? ProtocolType.Tcp : ProtocolType.Udp ); } public byte[] GetLocalIP { get{ return((IPEndPoint)Socket.LocalEndPoint).Address.GetAddressBytes(); } } } ////////////////////////////////////////////////////////////////////////////////////// class Sock16 : Sock { protected Map Active = new Map(); public int GetID( SimpleQuery Question ) { while (true) { UInt16 ID = Random.Number16(); if ( Active[ID] == null ) { Active[ID] = Question; return ID; } } } } ////////////////////////////////////////////////////////////////////////////////////// sealed class TCP_Asker : Asker { public static TCP_Asker Instance = new TCP_Asker(); Set Sockets = new Set(); public override void Ask( SimpleQuery Question, NameServer NS ) { TCP_Sock Socket = new TCP_Sock( NS ); Socket = Sockets[Socket]; Socket.Queue( Question ); } public override void CleanUp() { foreach( TCP_Sock S in Sockets ) { if ( Cache.Clock - S.LastActive > Config.TcpClientTimeout ) { Sockets.Remove(S); // Deregister S.Close(); } } } } ////////////////////////////////////////////////////////////////////////////////////// sealed class TCP_Sock : Sock16, IEquatable { Queue Q = new Queue(); int PacketLength; DnsRx Rx; bool Connected; bool Sending; public bool Equals( TCP_Sock x ){ return Data.BytesEqual( NS.IP, x.NS.IP ); } public void Queue( SimpleQuery Question ) { Q.Enqueue( Question ); #if (Trace) // Cache.Log( "Enqueued Question=" + Question.Name + " Sending=" + Sending + " Q.Count=" + Q.Count + " Connected=" + Connected ); #endif Init(); SendNext(); } private void Init() { if ( Socket == null ) { LastActive = Cache.Clock; Socket = GetSocket( true ); Socket.BeginConnect( GetEndPoint(ServerPort.TCP), new AsyncCallback(ConnectCallback), this ); } } private static void ConnectCallback( IAsyncResult ar ) { TCP_Sock P = (TCP_Sock)ar.AsyncState; lock( Cache.Workers ) try { P.Socket.EndConnect( ar ); P.Connected = true; P.Rx = new DnsRx(2); P.Listen(); P.SendNext(); } catch ( Exception e ) { #if (Trace) Cache.Log( "ConnectCallback Exception " + e ); #endif P.Close(); } } private void SendNext() { if ( !Connected || Sending || Q.Count == 0 ) return; SimpleQuery Question = Q.Dequeue(); #if (Trace) // Cache.Log( "SendNext Question=" + Question.Name ); #endif int ID = GetID( Question ); DnsTx Tx = new DnsTx( 4, Question ); int N = Tx.Buffer.Length; Tx.Put16( N - 2 ); Tx.Put16( ID ); byte [] Query = Tx.Buffer; Sending = true; LastActive = Cache.Clock; Socket.BeginSend( Query, 0, Query.Length, 0, new AsyncCallback(SendCallback), this ); } private static void SendCallback( IAsyncResult ar ) { TCP_Sock P = (TCP_Sock)ar.AsyncState; lock( Cache.Workers ) try { int N = P.Socket.EndSend( ar ); // ToDo : Check N is as expected? P.Sending = false; #if (Trace) // Cache.Log( "Question Sent Sending=" + P.Sending ); #endif P.SendNext(); } catch ( Exception e ) { #if (Trace) Cache.Log( "SendCallback Exception " + e ); #endif P.Close(); } } private void Listen() { Socket.BeginReceive( Rx.Buffer, 0, 2, 0/* Socket flags */, new AsyncCallback(ReceiveCallback1), this ); } private void Listen2() { Socket.BeginReceive( Rx.Buffer, Rx.N, Rx.Buffer.Length-Rx.N, 0/* Socket flags */, new AsyncCallback(ReceiveCallback2), this ); } private static void ReceiveCallback1( IAsyncResult ar ) { TCP_Sock P = (TCP_Sock)ar.AsyncState; lock( Cache.Workers ) try { P.Rx.N = P.Socket.EndReceive( ar ); // ToDo : Check N=2 ? P.Rx.Ix = 0; int PacketLength = P.Rx.Read16(); P.Rx.Buffer = new byte[ PacketLength ]; P.Rx.Ix = 0; P.Rx.N = 0; P.Listen2(); } catch ( Exception e ) { #if (Trace) Cache.Log( "ReceiveCallback Exception " + e ); #endif P.Close(); } } private static void ReceiveCallback2( IAsyncResult ar ) { TCP_Sock S = (TCP_Sock)ar.AsyncState; lock( Cache.Workers ) try { S.Rx.N += S.Socket.EndReceive( ar ); if ( S.Rx.N == S.Rx.Buffer.Length ) { UInt16 ID = S.Rx.Read16(); SimpleQuery Q = S.Active[ID]; if ( Q != null ) { Q.ProcessResponse( 0, S.Rx ); } S.Listen(); } else { S.Listen2(); } } catch ( Exception e ) { #if (Trace) Cache.Log( "ReceiveCallback2 Exception " + e ); #endif S.Close(); } } public void Close() { if ( Socket != null ) { #if (Trace) Cache.Log( "Closing TCP Socket to " + NS.IP ); #endif Socket.Close(); Socket = null; Connected = false; Sending = false; } } public TCP_Sock( NameServer _NS ) { NS = _NS; } } ////////////////////////////////////////////////////////////////////////////////////// sealed class UDP_Asker : Asker { public static UDP_Asker Instance = new UDP_Asker(); Set Sockets = new Set(); public override void Ask( SimpleQuery Question, NameServer NS ) { UDP_Sock Socket = new UDP_Sock( NS ); Socket = Sockets[Socket]; int ID = Socket.GetID( Question ); DnsTx Tx = new DnsTx( 2, Question ); Tx.Put16( ID ); Socket.Send( Tx.Buffer ); } public override void CleanUp() { foreach( UDP_Sock S in Sockets ) { if ( Cache.Clock - S.LastActive > 5 ) { Sockets.Remove(S); // Deregister if ( S.Socket != null ) { S.Socket.Close(); S.Socket = null; } } } } } ////////////////////////////////////////////////////////////////////////////////////// sealed class UDP_Sock : Sock16, IEquatable { DnsRx Rx; public bool Equals( UDP_Sock x ){ return Data.BytesEqual( NS.IP, x.NS.IP ); } public void Send( byte[] Query ) { EndPoint EP = GetEndPoint(ServerPort.UDP); Init(); Socket.SendTo( Query, EP ); } private void Listen() { Rx = new DnsRx(4096); EndPoint RemoteEP = GetAnyEndPoint( false ); Socket.BeginReceiveFrom( Rx.Buffer, 0, Rx.Buffer.Length, 0/* Socket flags */, ref RemoteEP, new AsyncCallback(ReceiveCallback), this ); } private static void ReceiveCallback( IAsyncResult ar ) { lock( Cache.Workers ) try { UDP_Sock S = (UDP_Sock)ar.AsyncState; if ( S.Socket == null ) return; S.LastActive = Cache.Clock; DnsRx Rx = S.Rx; Rx.N = S.Socket.EndReceive( ar ); S.Listen(); UInt16 ID = Rx.Read16(); SimpleQuery Q = S.Active[ID]; if ( Q != null ) { Q.ProcessResponse( 0, Rx ); } else { Cache.BadID.Add(); } } catch ( Exception e ) { #if (Trace) Cache.Log( "ReceiveCallback Exception " + e ); #endif } } private void Init() { if ( Socket == null ) { Socket = GetSocket( false ); EndPoint Local = GetAnyEndPoint( Config.RandomSourcePort ); Socket.Bind(Local); Listen(); } } public UDP_Sock( NameServer _NS ) { LastActive = Cache.Clock; NS = _NS; } } ////////////////////////////////////////////////////////////////////////////////////// sealed class QRP_LinkInfo { public int ServerToken; long SessionStart; // Time session started byte Packets; // Number of packets received ( stops at 255 ) byte Sessions; // Number of sessions ( wraps around ) byte SessionsLarge; // Number of sessions with large packets ( stops at 255 ) float SessionBytes; // Bytes received this session ( large packets only ) float RTT_Recent; // Smoothed RTT float RTT_New; // Minimum RTT over last 8 sessions, used to update RTT float RTT; // Conservative (low) estimate of minimum RTT, used for calculating INFLIGHT float RTT_Session_Min; // Minimum RTT this session float RTT_Session_Max; // Maximum RTT this session ( but not less than RTT ) float BandWidth; // Smoothed bandwidth estimate const long ms = 10000L; // Ticks per millisecond public byte GetMaxInFlight() // Key function : limits the number of packets in the network { double PT = Config.MTU / GetBandWidth(); // Time to transmit one full size packet double X = 4 + 3 * ( RTT / PT ) * ( RTT / RTT_Recent ) ; // Determines how heavily network is loaded byte Result = X > 255 ? (byte)255 : (byte)X; if ( Packets < 4 && Result > 4 ) Result = 4; // Slightly slow start #if (Trace) if (Config.TraceQRP) Cache.Log( "QRP GetMaxInFlight" + " PT=" + (float)( PT / ms ) + "ms" + " RTT=" + (float)( RTT / ms ) + "ms" + " RRT_Recent=" + (float)( RTT_Recent / ms ) + "ms" + " X=" + (float)( X ) + " Packets=" + ( Packets == 255 ? ".." : Packets.ToString() ) + " Result=" + Result ); #endif return Result; } private static float Smooth( double V, double A, byte N ) { double Alpha = N < 10 ? 1.0 / N : 0.1; return (float)( Alpha * V + ( 1 - Alpha ) * A ); } private double GetBandWidth() { if ( SessionBytes > 0 ) { return Smooth( SessionBytes / ( System.DateTime.Now.Ticks - SessionStart), BandWidth, SessionsLarge ); } if ( BandWidth == 0 ) return Config.MTU / ( 10.0 * ms ); // ToDo : Proper estimate return BandWidth; } public long GetTimeout( long Now, int InFlightHigh ) { double PT = Config.MTU / GetBandWidth(); long Timeout = (long)( InFlightHigh*PT + 2*RTT_Session_Max ); if ( Timeout < 80 * ms ) Timeout = 80 * ms; // Minimum timeout #if (Trace) if (Config.TraceQRP) Cache.Log( "QRP GetTimeout" + " InFlightHigh=" + InFlightHigh + " PT=" + (float)( PT / ms ) + "ms" + " RTT=" + ( RTT / ms ) + "ms" + " Timeout=" + ( Timeout / ms ) + "ms" ); #endif return Timeout; } public void StartSession() { SessionStart = System.DateTime.Now.Ticks; SessionBytes = 0; RTT_Session_Min = 0; RTT_Session_Max = RTT; Sessions += 1; #if (Trace) if (Config.TraceQRP) Cache.Log( "QRP StartSession" ); #endif } public void RecordPacket( QRP_Query Q, int Size ) { float Trip = System.DateTime.Now.Ticks - Q.StartTicks; if ( Trip > RTT_Session_Max ) RTT_Session_Max = Trip; if ( RTT_Session_Min == 0 || Trip < RTT_Session_Min ) RTT_Session_Min = Trip; if ( Packets == 0 || Trip < RTT ) RTT = Trip; if ( Packets < 255 ) Packets += 1; RTT_Recent = Smooth( Trip, RTT_Recent, Packets ); if ( Size >= 600 ) { if ( SessionBytes == 0 && SessionsLarge < 255 ) SessionsLarge += 1; SessionBytes += Size; } #if (Trace) if (Config.TraceQRP) Cache.Log( "QRP RecordPacket" + " Trip=" + (Trip/ms) + "ms Size=" + Size + " RTT_Recent=" + (RTT_Recent / ms) + "ms" ); #endif } public void EndSession() { BandWidth = (float)GetBandWidth(); if ( RTT_New == 0 || RTT_Session_Min < RTT_New ) RTT_New = RTT_Session_Min; if ( Sessions % 8 == 0 ) { RTT = RTT_New; RTT_New = RTT_Session_Min; } #if (Trace) // if (Config.TraceQRP) { double PT = Config.MTU / BandWidth; Cache.Log( "QRP EndSession" + " RTT=" + (RTT / ms) + "ms" + " RTT_Recent=" + (RTT_Recent / ms) + "ms" + " RTT_Session_Min=" + (RTT_Session_Min / ms) + "ms" + " RTT_Session_Max=" + (RTT_Session_Max / ms) + "ms" + " RTT_New=" + (RTT_New / ms) + "ms" + " PT=" + (float)( PT / ms ) + "ms" + " BandWidth=" + (float)( BandWidth * ms ) + "Kbyte/sec" ); } #endif } } ////////////////////////////////////////////////////////////////////////////////////// sealed class QRP_Asker : Asker { public static QRP_Asker Instance = new QRP_Asker(); private Set Sockets = new Set(); public override void Ask( SimpleQuery Question, NameServer NS ) { QRP_Sock Socket = new QRP_Sock( NS ); Socket = Sockets[Socket]; Socket.StartSession(); Socket.Send( new QRP_Query( Question, 0, 0 ), false ); } public override void CleanUp() { QRP_Sock.CleanUp(); foreach( QRP_Sock S in Sockets ) if ( Cache.Clock - S.LastActive > 5 ) S.Close(); } } ////////////////////////////////////////////////////////////////////////////////////// sealed class QRP_Query { public SimpleQuery Sq; public long StartTicks; public int FirstPage; public byte Requested; private int NotDone; public void ProcessResponse( RC ErrorCode, DnsRx Rx ) { Sq.ProcessResponse( ErrorCode, Rx ); } public bool ProcessPartialResponse( DnsRx Rx, QRP_Sock Sock ) { if ( Sq.GotResponse ) return true; int Save = Rx.Ix; Rx.Ix = Rx.N - 18; // Header fields are after data int Total = Rx.Read32(); UInt64 Cookie = Rx.Read64(); byte Sent = Rx.Read8(); int PageNum = Rx.Read24(); int PageSize = Rx.Read16(); Rx.Ix = Save; Rx.N -= 18; #if (Trace) if (Config.TraceQRP) Sq.Mr.Log( "ProcessPartialResponse PageNum=" + PageNum + " Rx.N=" + Rx.N + " PageSize=" + PageSize + " Total=" + Total + " Sent=" + Sent ); #endif Sq.Mr.RecvCount += 1; if ( Total > Config.MaxResponseSize ) { ProcessResponse( RC.ServerFail, Rx ); return true; } if ( Sq.AB == null ) { Sq.AB = new AssemblyBuffer( Requested, PageSize, Total, Cookie, Sock ); } else { // ToDo : check that PageSize,Total and Cookie match existing AB } if ( NotDone == 0 ) { NotDone = (byte)(Sent-1); Sq.AB.NoteSent( FirstPage, Sent, Requested, System.DateTime.Now.Ticks - Sq.StartTicks ); } else { NotDone -= 1; } if ( Sq.AB.Add( PageNum, Rx ) ) { Sock.EndSession(); Sq.ProcessResponse( RC.Ok, Rx ); return true; } Sq.AB.RequestMore( Sq ); return NotDone == 0; } public QRP_Query ( SimpleQuery _Sq, int _FirstPage, byte Count ) { Sq = _Sq; FirstPage = _FirstPage; Requested = Count; StartTicks = System.DateTime.Now.Ticks; } } ////////////////////////////////////////////////////////////////////////////////////// sealed class QRP_Sock : Sock, IEquatable { private DnsRx SRx; private int SessionActive; private byte [] LocalIP; public QRP_LinkInfo LinkInfo; // Established by calling Connect private Map LinkInfoMap = new Map( ByteComparer.Instance ); public static Map Active = new Map(ByteComparer.Instance); private bool GotToken; public bool Equals( QRP_Sock x ){ return Data.BytesEqual( NS.IP, x.NS.IP ); } public void StartSession() { Connect(); SessionActive += 1; if ( SessionActive == 1 ) LinkInfo.StartSession(); } public void EndSession() { SessionActive -= 1; if ( SessionActive == 0 ) LinkInfo.EndSession(); } public byte[] GetID( QRP_Query Question, NameServer NS ) { while (true) { byte [] ID = Random.Bytes( NS.PublicKey != null ? 12 : 8 ); if ( Active[ID] == null ) { Active[ID] = Question; return ID; } } } public static void CleanUp() { long Now = System.DateTime.Now.Ticks; foreach ( Map.Pair P in Active.S ) { if ( Now - P.V.StartTicks > 120 * 1000 * 10000L ) // 120 seconds { Active.S.Remove(P); } } } public void Send( QRP_Query Question, bool CheckConnect ) { if ( CheckConnect) Connect(); byte [] ID = GetID( Question, NS ); int PID = NS.PublicKey != null ? 0 : 8; // Number of plain ID bytes DnsTx Tx; if ( !GotToken ) { Tx = new DnsTx( 2 + PID ); Tx.Put16( (int)QRM.Setup ); if ( PID != 0 ) Tx.PutBytes( ID ); } else { if ( Question.Requested == 0 ) Question.Requested = LinkInfo.GetMaxInFlight(); Tx = new DnsTx( 10 + PID, Question.Sq ); Tx.Put16( (int)QRM.Single ); if ( PID !=0 ) Tx.PutBytes( ID ); Tx.Put32( LinkInfo.ServerToken ); Tx.Put8( Question.Requested ); Tx.Put8( 0 ); // Reserved Tx.Put16( Config.MTU ); #if (Trace) if (Config.TraceQRP) Question.Sq.Mr.Log( "Send" + " Count=" + Question.Requested ); #endif } Send( Tx.Buffer, ID ); } public void FollowUp( SimpleQuery Sq, int FirstPage, byte Count ) { #if (Trace) if (Config.TraceQRP) Sq.Mr.Log( "Follow-up FirstPage=" + FirstPage + " Count=" + Count ); #endif QRP_Query Question = new QRP_Query( Sq, FirstPage, Count ); Connect(); // Ensures that ServerToken is current byte [] ID = GetID( Question, NS ); int PID = NS.PublicKey != null ? 0 : 8; // Number of plain ID bytes DnsTx Tx = new DnsTx( 20 + PID, Question.Sq ); Tx.Put16( (int)QRM.Multi ); if ( PID !=0 ) Tx.PutBytes( ID ); Tx.Put32( LinkInfo.ServerToken ); Tx.Put64( Question.Sq.AB.Cookie ); Tx.Put8( Count ); Tx.Put24( FirstPage ); Tx.Put16( Question.Sq.AB.PageSize ); Send( Tx.Buffer, ID ); } private static void ReceiveCallback( IAsyncResult ar ) { if ( Cache.Stopping ) return; QRP_Sock S = (QRP_Sock)ar.AsyncState; lock( Cache.Workers ) try { if ( S.Socket == null ) return; DnsRx Rx = S.SRx; Rx.N = S.Socket.EndReceive( ar ); S.Listen(); QRM Mode = (QRM)Rx.Read16(); byte [] ID; QRP_Query Q; if ( Mode == QRM.Encrypt ) { ID = Rx.ReadBytes( 12 ); Q = Active[ID]; if ( Q == null ) return; byte [] ServerNonce = Rx.ReadBytes( 12 ); byte [] ServerData = Rx.ReadBytes( Rx.N - Rx.Pos() ); if ( !Crypt.DecodeResponse( Rx, ID, ServerNonce, ServerData, S.NS.PublicKey ) ) return; Mode = (QRM)Rx.Read16(); } else { ID = Rx.ReadBytes( 8 ); Q = Active[ID]; } if ( Q != null ) { S.LinkInfo.RecordPacket( Q, Rx.N ); // Adjust RTT and Bandwidth statistics if ( Mode == QRM.Setup ) { Active.Remove(ID); S.GotToken = true; S.LinkInfo.ServerToken = Rx.Read32(); #if (Trace) // Cache.Log( "Got server token " + S.ServerToken + " LocalIP = " + Data.BytesToString(S.LocalIP) ); #endif byte Status = Rx.Read8(); if ( Status < 10 ) { Q.Sq.AB = null; // Discard assembly buffer S.Send( Q, true ); // Retry } else { #if (Trace) Cache.Log( "Unexpected Status Code " + ((QRS)Status) ); #endif } } else if ( Mode == QRM.Single ) { S.EndSession(); Active.Remove(ID); Q.ProcessResponse( 0, Rx ); } else if ( Mode == QRM.Multi ) { if ( Q.ProcessPartialResponse( Rx, S ) ) { Active.Remove(ID); } } } } catch (Exception e) { #if (Trace) Cache.Log( "ReceiveCallback Exception " + e ); #endif } } private void Send( byte[] Query, byte[] ID ) { if ( NS.PublicKey != null ) { Query = Crypt.EncodeQuery( NS.PublicKey, Query, ID ); } Socket.Send( Query ); LastActive = Cache.Clock; } private void Listen() { LastActive = Cache.Clock; SRx = new DnsRx(Config.MTU); EndPoint RemoteEP = GetAnyEndPoint( false ); Socket.BeginReceive( SRx.Buffer, 0, SRx.Buffer.Length, 0/* Socket flags */, new AsyncCallback(ReceiveCallback), this ); } private void Connect() { // If Socket was previously connected and local IP address has changed, close the socket and // re-connect, so the local IP address is well established and the correct ServerToken is sent. if ( Socket != null && !Data.BytesEqual(GetLocalIP,LocalIP) ) { if ( SessionActive > 0 ) LinkInfo.EndSession(); Close(); } if ( Socket == null ) { Socket = GetSocket( false ); Socket.Bind( GetAnyEndPoint(false) ); EndPoint EP = GetEndPoint(ServerPort.QRP); Socket.Connect( EP ); Listen(); LocalIP = GetLocalIP; int OldServerToken = GotToken ? LinkInfo.ServerToken : 0; LinkInfo = LinkInfoMap[ LocalIP ]; if ( LinkInfo == null ) { LinkInfo = new QRP_LinkInfo(); LinkInfo.ServerToken = OldServerToken; LinkInfoMap[ LocalIP ] = LinkInfo; } if ( SessionActive > 0 ) LinkInfo.StartSession(); } #if (Trace) if (Config.TraceQRP) Cache.Log( "Connected to " + Socket.RemoteEndPoint + " Local End Point=" + Socket.LocalEndPoint ); #endif } public void Close() { if ( Socket != null ) { #if (Trace) // Cache.Log( "Closing Socket to " + NS.IP ); #endif Socket.Close(); Socket = null; } } public QRP_Sock( NameServer _NS ) { NS = _NS; } }