Как структурировать логику рабочего потока для IOCP

Я создаю клиентскую программу, которая взаимодействует с устройством, подключенным к моему ПК через локальную сеть.

Типичная связь между моей программой и устройством выглядит следующим образом:

Program -> Device   1616000D  08 02 00 00 00 21 11 A1 00 01 22 08 00 // Sender sends data (a specific command to the device) to Receiver
Program <- Device   16160002  80 00 // Receiver sends ACK to sender
Program <- Device   16160005  08 20 00 00 00 // Receiver sends command response to sender
Program -> Device   16160002  80 00 // Sender sends ACK to receiver

Последнее шестнадцатеричное число первой последовательности байтов указывает размер данных, которые следуют (D = 13 байтов).

Моя процедура отправки выглядит так:

bool TcpConnection::SendCommand(const Command& rCommand, const std::vector<BYTE>& rvecCommandOptions)
{
    std::vector<BYTE> vecCommandData;
    m_commandBuilder.BuildCommand(rCommand, rvecCommandOptions, vecCommandData);

    if (vecCommandData.empty())
        return false;

    PerIoData *pPerIoData = new PerIoData;
    if (!pPerIoData)
        return false;

    SecureZeroMemory(&(pPerIoData->m_overlapped), sizeof(WSAOVERLAPPED));

    pPerIoData->m_socket = m_socket.Get();
    pPerIoData->m_overlapped.hEvent = WSACreateEvent();
    pPerIoData->m_vecBuffer.assign(vecCommandData.begin(), vecCommandData.end());
    pPerIoData->m_wsaBuf.buf = (CHAR*)(&(pPerIoData->m_vecBuffer[0]));
    pPerIoData->m_wsaBuf.len = pPerIoData->m_vecBuffer.size();
    pPerIoData->m_dwFlags = 0;
    pPerIoData->m_dwNumberOfBytesSent = 0;
    pPerIoData->m_dwNumberOfBytesToSend = pPerIoData->m_wsaBuf.len;
    pPerIoData->m_operationType = OP_TYPE_SEND;

    if (!m_socket.Send(pPerIoData))
        return false;

    return true;
}

И моя процедура рабочего потока выглядит так:

DWORD WINAPI TcpConnection::WorkerThread(LPVOID lpParameter)
{
    HANDLE hCompletionPort = (HANDLE)lpParameter;
    DWORD dwNumberOfBytesTransferred;
    ULONG ulCompletionKey;
    PerIoData *pPerIoData;

    DWORD dwNumberOfBytesReceived;
    DWORD dwNumberOfBytesSent;
    DWORD dwFlags;

    while (GetQueuedCompletionStatus(hCompletionPort, &dwNumberOfBytesTransferred, &ulCompletionKey, (LPOVERLAPPED*)&pPerIoData, INFINITE)) 
    {
        if (!pPerIoData)
            continue;

        if ((dwNumberOfBytesTransferred == 0) && ((pPerIoData->m_operationType  == OP_TYPE_SEND) || (pPerIoData->m_operationType  == OP_TYPE_RECEIVE)))
        {
            closesocket(pPerIoData->m_socket);
            delete pPerIoData;
            continue;
        }

        if (pPerIoData->m_operationType == OP_TYPE_SEND)
        {
            pPerIoData->m_dwNumberOfBytesSent += dwNumberOfBytesTransferred;
            if (pPerIoData->m_dwNumberOfBytesSent < pPerIoData->m_dwNumberOfBytesToSend)
            {
                pPerIoData->m_wsaBuf.buf = (CHAR*)(&(pPerIoData->m_vecBuffer[pPerIoData->m_dwNumberOfBytesSent]));
                pPerIoData->m_wsaBuf.len = (pPerIoData->m_dwNumberOfBytesToSend - pPerIoData->m_dwNumberOfBytesSent);

                if (WSASend(pPerIoData->m_socket, &(pPerIoData->m_wsaBuf), 1, &dwNumberOfBytesTransferred, 0, &(pPerIoData->m_overlapped), NULL) == 0)
                    continue;

                if (WSAGetLastError() == WSA_IO_PENDING)
                    continue;
            }
            else if (pPerIoData->m_dwNumberOfBytesSent == pPerIoData->m_dwNumberOfBytesToSend)
            {
                delete pPerIoData;
            }

            // Q1. Do I create a new instance of PerIoData here before calling WSARecv() or reuse pPerIoData?

            // QA. If I did do "PerIoData pPerIoData = new PerIoData" here, how do I handle if this momory allocation request has failed?  Should I simply "continue" or "return -1"?

            // QB. Or is this a wrong place to do this memory allocation to achive the typical communication between my program and the device?

            SecureZeroMemory(&(pPerIoData->m_overlapped), sizeof(WSAOVERLAPPED));

            pPerIoData->m_overlapped.hEvent = WSACreateEvent();
            pPerIoData->m_wsaBuf.buf = (CHAR*)(&(pPerIoData->m_vecBuffer[0]));
            pPerIoData->m_wsaBuf.len = pPerIoData->m_vecBuffer.size();
            pPerIoData->m_operationType = OP_TYPE_RECEIVE;

            if (WSARecv(pPerIoData->m_socket, &(pPerIoData->m_wsaBuf), 1, &dwNumberOfBytesReceived, &(pPerIoData->m_dwFlags), &(pPerIoData->m_overlapped), NULL) == 0)
                continue;

            if (WSAGetLastError() == WSA_IO_PENDING)
                continue;
        }
        else if (pPerIoData->m_operationType == OP_TYPE_RECEIVE)
        {
            if ((pPerIoData->m_vecBuffer[0] == 0x16) && (pPerIoData->m_vecBuffer[1] == 0x16))
            {
                // Q2. Do I need to do SecureZeroMemory(&(pPerIoData->m_overlapped), sizeof(WSAOVERLAPPED)); here?

                // Q3. Or do I new PerIoData?

                pPerIoData->m_wsaBuf.buf = (CHAR*)(&(pPerIoData->m_vecBuffer[0]));
                pPerIoData->m_wsaBuf.len = pPerIoData->m_vecBuffer.size();
                pPerIoData->m_operationType = OP_TYPE_RECEIVE;

                // QC. At this point two syn bytes (0x16) are received.  I now need to receive two more bytes of data (000D = 13 bytes) to find out the size of the actual command response data.
                // If I clear my m_vecBuffer here and try to resize its size to two, I get this debug assertion: "vector iterators incompatible" at runtime.  Do you know how I can fix this problem?

                if (WSARecv(pPerIoData->m_socket, &(pPerIoData->m_wsaBuf), 1, &dwNumberOfBytesReceived, &(pPerIoData->m_dwFlags), &(pPerIoData->m_overlapped), NULL) == 0)
                    continue;

                if (WSAGetLastError() == WSA_IO_PENDING)
                    continue;
            }

            // QD. I'm not sure how to structure this if clause for when m_operationType is OP_TYPE_RECEIVE.  I mean how do I distinguish one receive operation for getting two syn bytes from another for getting data size?
            // One way I can think of doing is to create more receive operation types such as OP_TYPE_RECEIVE_DATA_SIZE or OP_TYPE_RECEIVE_DATA?  So you can have something like below.
            // Is this how you would do it?
        }
        //else if (pPerIoData->m_operationType == OP_TYPE_RECEIVE_DATA_SIZE)
        //{
            // Call WSARecv() again to get command response data
        //}
    }

    return 0;
}

Пожалуйста, смотрите мои вопросы в коде выше.

Большое спасибо


person jpen    schedule 10.07.2012    source источник


Ответы (1)


  1. Как следует из названия вашего типа PerIoData, вам нужна одна структура данных на неполный запрос ввода-вывода. Структура PerIoData должна сохраняться с момента запуска асинхронного ввода-вывода с помощью WSASend или WSARecv до момента, когда вы получаете пакет завершения этого запроса из порта завершения ввода-вывода с помощью GetQueuedCompletionStatus.
  2. Вы всегда должны повторно инициализировать свои OVERLAPPED структуры, когда собираетесь начать новый запрос.
  3. Вы можете повторно использовать структуру PerIoData до тех пор, пока запрос ввода/вывода завершен. Учитывая, что вы получили pPerIoData из порта завершения ввода-вывода, вы можете повторно использовать его для последующих запросов. Просто убедитесь, что вы сбросили все применимые поля в этой структуре, чтобы она находилась в состоянии, подходящем для нового запроса ввода-вывода.

РЕДАКТИРОВАТЬ, чтобы ответить на дополнительные вопросы:

О. Я бы continue потому что вы хотите продолжить обработку событий ввода-вывода, даже если вы не можете инициировать дополнительный запрос. Если вы этого не сделаете continue, вы больше не сможете обрабатывать завершение операций ввода-вывода. Перед тем, как continue вы, возможно, захотите вызвать какой-нибудь обработчик ошибок.

B. Я не думаю, что обязательно есть «правильное» или «неправильное» место для выделения, но имейте в виду, что когда вы выделяете свой PerIoData там, то, что вы фактически делаете, это повторяющиеся выделения и удаления одной и той же структуры данных по и так по кругу. Когда я пишу код с использованием портов завершения ввода-вывода, я заранее выделяю пул моего эквивалента PerIoData и повторно использую их.

C. У меня недостаточно контекста, чтобы знать ответ. Покажите свой код, который делает это, и строку, в которую попадает утверждение, и я смогу помочь.

D. Вы можете разбить свой тип операции на более мелкие компоненты, как вы предложили, например операцию OP_TYPE_RECEIVE_DATA_SIZE. Предупреждаю, чтение пары байтов при каждом вызове WSARecv не будет работать так хорошо, как хотелось бы. Вызовы Winsock стоят дорого; сделать запрос на пару байтов - много накладных расходов. Я бы посоветовал вам прочитать больший блок данных в буфер PerIoData за один WSARecv. Затем извлеките информацию о размерах из этого буфера, а затем начните копировать данные из этого буфера. Если поступает больше данных, чем может поместиться в буфер, вы можете сделать дополнительные вызовы WSARecv, пока не прочитаете остальные.

person Aaron Klotz    schedule 10.07.2012
comment
@ Аарон, могу ли я задать еще несколько вопросов, пожалуйста (QA, QB, QC и QD в моей процедуре рабочего потока)? Я был бы очень признателен за ваш ответ. Большое спасибо. - person jpen; 11.07.2012
comment
@ Аарон, спасибо за дальнейшие советы. RE: Когда я пишу код с использованием портов завершения ввода-вывода, я заранее выделяю пул своего эквивалента PerIoData и повторно использую их. Интересно, не могли бы вы показать мне, как это выглядит в коде, пожалуйста? - person jpen; 25.07.2012
comment
Если бы я использовал этот метод, я полагаю, что мой класс TcpConnection установил бы и закрыл этот пул. Я бы поместил каждую структуру PerIoData в вектор, но что определяет, сколько структур PerIoData нужно выделить заранее? - person jpen; 25.07.2012
comment
В моих реализациях я сначала выделяю небольшое (но разумное) число, но я вызываю WSAEventSelect для событий FD_ACCEPT. Когда это событие срабатывает, Windows сообщает мне, что мне нужно выделить больше подключений. Вот когда я выделяю дополнительные PerIoData. См. msdn.microsoft.com/en-us/magazine/cc302334.aspx - person Aaron Klotz; 27.07.2012