.NET вопрос об асинхронных операциях с сокетами и кадрировании сообщений

Я везде искал примеры того, как работать с фреймами TCP-сообщений. Я вижу много примеров, когда NetworkStreams передаются в объект StreamReader или StreamWriter, а затем используют методы ReadLine или WriteLine для сообщений с разделителями '\ n'. Протокол моего приложения содержит сообщения, заканчивающиеся на '\ n', поэтому NetworkStream, похоже, подходит. Однако я не могу найти конкретных примеров того, как правильно обрабатывать все это в сочетании с асинхронными сокетами. Когда ниже вызывается ReceiveCallback (), как мне реализовать классы NetworkStream и StreamReader для обработки фреймов сообщений? Согласно тому, что я прочитал, я могу получить часть одного сообщения в одном приеме, а остальную часть сообщения (включая '\ n') в следующем приеме. Означает ли это, что я могу получить конец одного сообщения и часть следующего? Конечно, должен быть более простой способ справиться с этим.

У меня такой код:

    private void StartRead(Socket socket)
    {
        try
        {
            StateObject state = new StateObject();
            state.AsyncSocket = socket;

            socket.BeginReceive(state.Buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReceiveCallback), state);
        }
        catch (SocketException)
        {
            m_Socket.Shutdown(SocketShutdown.Both);
            Disconnect();
        }
    }

    private void ReceiveCallback(IAsyncResult ar)
    {
        try
        {
            StateObject state = (StateObject)ar.AsyncState;

            int bytes_read = state.AsyncSocket.EndReceive(ar);

            char[] chars = new char[bytes_read + 1];
            System.Text.Decoder decoder = System.Text.Encoding.UTF8.GetDecoder();
            int charLength = decoder.GetChars(state.Buffer, 0, bytes_read, chars, 0);

            String data = new String(chars);

            ParseMessage(data);

            StartRead(state.AsyncSocket);
        }
        catch (SocketException)
        {
            m_Socket.Shutdown(SocketShutdown.Both);
            Disconnect();
        }
    }

person Andrew    schedule 19.05.2011    source источник
comment
К вашему сведению, C # .NET не существует. Язык называется C #.   -  person John Saunders    schedule 19.05.2011


Ответы (3)


По сути, вы создаете буфер, и каждый раз, когда вы получаете данные, вы добавляете эти данные в буфер и определяете, получили ли вы уже одно или несколько полных сообщений.

Между ReceiveCallback и StartRead вы не получите никаких асинхронных сообщений (входящие данные будут автоматически буферизироваться на уровне сокета), поэтому это идеальное место для проверки полных сообщений и удаления их из буфера.

Возможны все варианты, включая получение конца сообщения 1, плюс сообщения 2, плюс начало сообщения 3, все в одном фрагменте.

Я не рекомендую декодировать блок с помощью UTF8, поскольку один символ UTF8 может состоять из двух байтов, и если они разделятся между блоками, ваши данные могут быть повреждены. В этом случае вы можете сохранить байт [] - буфер (MemoryStream?) И разделить сообщения по байту 0x0A.

person C.Evenhuis    schedule 19.05.2011

Приставлять к фрагментам длину лучше, чем использовать символ-разделитель. Вам не нужно иметь дело с каким-либо экранированием, чтобы таким образом отправить данные с новой строкой.

Этот ответ может не иметь отношения к вам сейчас, потому что он использует функции из AsyncCTP , который будет только в следующей версии .net. Однако это делает вещи намного более лаконичными. По сути, вы пишете точно такой же код, как и для синхронного случая, но вставляете операторы await там, где есть асинхронные вызовы.

    public static async Task<Byte[]> ReadChunkAsync(this Stream me) {
        var size = BitConverter.ToUInt32(await me.ReadExactAsync(4), 0);
        checked {
            return await me.ReadExactAsync((int)size);
        }
    }

    public static async Task<Byte[]> ReadExactAsync(this Stream me, int count) {
        var buf = new byte[count];
        var t = 0;
        while (t < count) {
            var n = await me.ReadAsync(buf, t, count - t);
            if (n <= 0) {
                if (t > 0) throw new IOException("End of stream (fragmented)");
                throw new IOException("End of stream");
            }
            t += n;
        }
        return buf;
    }

    public static void WriteChunk(this Stream me, byte[] buffer, int offset, int count) {
        me.Write(BitConverter.GetBytes(count), 0, 4);
        me.Write(buffer, offset, count);
    }
person Craig Gidney    schedule 19.05.2011

Хорошо, вот что я в итоге сделал. Я создал поток чтения, который создает NetworkStream и StreamReader на основе сетевого потока. Затем я использую StreamReader.ReadLine для чтения строк таким образом. Это синхронный вызов, но он находится в отдельном потоке. Кажется, работает намного лучше. Мне пришлось реализовать это, поскольку это наш протокол для приложения (сообщения с разделителями на новую строку). Я знаю, что другие люди будут адски искать ответ, как я, вот соответствующий код чтения в моем классе Client:

public class Client
{
    Socket              m_Socket;

    EventWaitHandle     m_WaitHandle;
    readonly object     m_Locker;
    Queue<IEvent>       m_Tasks;
    Thread              m_Thread;

    Thread              m_ReadThread;

    public Client()
    {
        m_WaitHandle = new AutoResetEvent(false);
        m_Locker = new object();
        m_Tasks = new Queue<IEvent>();

        m_Thread = new Thread(Run);
        m_Thread.IsBackground = true;
        m_Thread.Start();
    }

    public void EnqueueTask(IEvent task)
    {
        lock (m_Locker)
        {
            m_Tasks.Enqueue(task);
        }

        m_WaitHandle.Set();
    }

    private void Run()
    {
        while (true)
        {
            IEvent task = null;

            lock (m_Locker)
            {
                if (m_Tasks.Count > 0)
                {
                    task = m_Tasks.Dequeue();

                    if (task == null)
                    {
                        return;
                    }
                }
            }

            if (task != null)
            {
                task.DoTask(this);
            }
            else
            {
                m_WaitHandle.WaitOne();
            }
        }
    }

    public void Connect(string hostname, int port)
    {
        try
        {
            m_Socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

            IPAddress[] IPs = Dns.GetHostAddresses(hostname);

            m_Socket.BeginConnect(IPs, port, new AsyncCallback(ConnectCallback), m_Socket);
        }
        catch (SocketException)
        {
            m_Socket.Close();
            OnConnect(false, "Unable to connect to server.");
        }
    }

    private void ConnectCallback(IAsyncResult ar)
    {
        try
        {
            Socket socket = (Socket)ar.AsyncState;

            socket.EndConnect(ar);

            OnConnect(true, "Successfully connected to server.");

            m_ReadThread = new Thread(new ThreadStart(this.ReadThread));
            m_ReadThread.Name = "Read Thread";
            m_ReadThread.IsBackground = true;
            m_ReadThread.Start();
        }
        catch (SocketException)
        {
            m_Socket.Close();
            OnConnect(false, "Unable to connect to server.");
        }
    }

    void ReadThread()
    {
        NetworkStream networkStream = new NetworkStream(m_Socket);
        StreamReader reader = new StreamReader(networkStream);

        while (true)
        {
            try
            {
                String message = reader.ReadLine();

                // To keep the code thread-safe, enqueue a task in the CLient class thread to parse the message received.
                EnqueueTask(new ServerMessageEvent(message));
            }
            catch (IOException)
            {
                // The code will reach here if the server disconnects from the client. Make sure to cleanly shutdown...
                Disconnect();
                break;
            }
        }
    }

    ... Code for sending/parsing the message in the Client class thread.
}
person Andrew    schedule 19.05.2011