Повторное использование потоков на UDP-сервере

Я пишу UDP-сервер и хочу уменьшить количество распределений, которые происходят для входящих и исходящих пакетов датаграмм. Одна из вещей, которую я хотел бы сделать, - это повторно использовать потоки, которые я выделяю для чтения и записи моих пакетов датаграмм.

У меня есть этот метод, который использует экземпляр BinaryReader при получении пакетов для чтения буфера на объект-экземпляр.

// Simplified version of the receive data method.
private void ReceiveClientData(IAsyncResult result)
{
    int receivedData = socket.EndReceiveFrom(result, ref endPoint);
    if (receivedData == 0)
    {
        this.ListenForData(socket);
        return;
    }

    // Read the header in from the buffer first so we know what kind of message and how to route.
    this.binaryReader.BaseStream.SetLength(receivedData);
    this.binaryReader.BaseStream.Seek (0, SeekOrigin.Begin);

    // Can't reset the MemoryStream buffer to the new packet buffer.
    // How does this get consumed by the stream without allocating a new MemoryStream(buffer)?
    byte[] buffer = (byte[])result.AsyncState;

    // Deserialize the bytes into the header and datagram.
    IClientHeader header = new ClientHeader();
    header.GetMessageFromBytes(this.binaryReader);
    IClientDatagram datagram = this.datagramFactory.CreateDatagramFromClientHeader(header);
    datagram.GetMessageFromBytes(this.binaryReader);

    this.ListenForData(socket);
}

Безопасно ли повторно использовать тот же BinaryReader и лежащий в основе MemoryStream, возвращая I Seek к началу потока? После некоторого чтения может показаться, что Socket не будет читать и писать пакеты одновременно, что потребовало бы от меня использования Stream на вызов EndReceiveFrom. Похоже, вы можете выполнять каждую операцию одновременно, например, чтение + запись, но не одновременное чтение или одновременную запись. Из-за этого я бы подумал, что смогу повторно использовать ридер, но я не уверен, что смогу. Для этого мне пришлось бы предоставить MemoryStream новый буфер для чтения. Можете ли вы это сделать, или мне нужно создавать новый MemoryBuffer для каждого нового пакета, полученного из сокета?

Я хотел бы применить тот же подход к отправке пакетов обратно клиенту. Для этого я попытался установить длину моего MemoryStream на 0 и вернуться к началу. Похоже, это не очищает буфер, как описано в другом ответе здесь. Я все еще вижу данные старого буфера в t ниже.

public void SendMessage(IServerDatagram message)
{
    this.binaryStream.Seek(0, SeekOrigin.Begin);
    this.binaryStream.SetLength (0);

    // Contains old buffer data.
    var t = this.binaryStream.GetBuffer ();
    message.Serialize (this.binaryWriter);

    // Contains a mixture of new bytes + old bytes.
    byte[] data = this.binaryStream.GetBuffer();
    this.binaryWriter.Flush ();

    // Send the datagram packet.
    this.udpServerSocket.SendTo(data, this.connectedClients.FirstOrDefault().Key);
}

Я не знаю заранее, насколько большим должен быть буфер, поэтому я не могу SetLength соответствовать размеру моего нового буфера. Нет ли другого способа повторно использовать Stream без необходимости создавать новый для каждого сообщения? Если я отправляю тысячи сообщений в секунду, это может вызвать некоторую нехватку памяти, не так ли?


person Johnathon Sullinger    schedule 13.03.2016    source источник
comment
С MemoryStream, если вы хотите сохранить его, вы захотите повторно использовать базовый массив байтов. Если вы не укажете фиксированную длину MemoryStream (передав массив байтов в ctor), он может перераспределить его буфер ... так что в этой воде есть опасность.   -  person Clay    schedule 12.05.2016


Ответы (1)


У меня возникли связанные проблемы с использованием потоков, программ чтения и сетевого оборудования - и я решил использовать несколько иной подход. Я изучил реализации BinaryReader, BinaryWriter и BitConverter и написал методы расширения для чтения и записи данных непосредственно в нижележащий буфер byte []. Мягкая PITA, но у меня больше нет стримов, читателей или писателей.

Вот, например, метод расширения для записи int:

[System.Security.SecuritySafeCritical]
public unsafe static int Write( this byte[ ] array, int value, int offset = 0 )
{
  if ( offset + sizeof( int ) > array.Length ) throw new IndexOutOfRangeException( );
  fixed ( byte* b = array )
  {
    *( ( int* ) ( b + offset ) ) = value;
  }
  return sizeof( int );
}

Результат может показаться глупым, но все методы расширения возвращают количество байтов, которое я «переместил» в массив байтов ... и поэтому я могу постоянно знать, где я нахожусь; псевдопоток, если хотите. Кроме того, пишущий код не всегда знает, что он пишет. Существуют идентичные переопределения для каждого простого типа: одно для строк и одно для byte [].

Моя большая проблема была на стороне клиента, где у меня более ограниченная операционная среда ... и я поддерживаю долговечный буфер чтения. Метод UdpClient.Receive создает новые байтовые массивы при каждом чтении, и это меня просто убило. GC сошел с ума ... что, конечно, очень мешает потоковой передаче UDP :-) Итак, я нашел реализации Receive и ReceiveAsync и обнаружил, что могу контролировать то, что должно быть подано в базовый сокет, и снова сделал свой собственный RecieveBroadcastToBuffer метод расширения:

const int MaxUdpSize = 0x0000ffff;
const int AnyPort = 0;
static EndPoint anyV4Endpoint = new IPEndPoint( IPAddress.Any, AnyPort );
static EndPoint anyV6Endpoint = new IPEndPoint( IPAddress.IPv6Any, AnyPort );

/// <summary>Receives a UDP datagram into the specified buffer at the specified offset</summary>
/// <returns>The length of the received data</returns>
public static int ReceiveBroadcastToBuffer( this UdpClient client, byte[ ] buffer, int offset = 0 )
{
  int received;
  var socket = client.Client;
  if ( socket.AddressFamily == AddressFamily.InterNetwork )
  {
    received = socket.ReceiveFrom( buffer, offset, MaxUdpSize, SocketFlags.None, ref anyV4Endpoint );
  }
  else
  {
    received = socket.ReceiveFrom( buffer, offset, MaxUdpSize, SocketFlags.None, ref anyV6Endpoint );
  }
  return received;
}

Это почти именно то, что делает UdpClient.Receive ... за исключением того, что я управляю буфером. Я обнаружил, что UdpClient использует один буфер для чтения, а я просто использую простой ol 'Socket на стороне отправки.

person Clay    schedule 12.05.2016