Неправильное использование boost::asio и boost::thread

Я использую boost::asio и boost::thread для реализации службы сообщений, которая принимает сообщения и отправляет их асинхронно, если сообщения нет. обрабатывается или помещает в очередь сообщение, если оно обрабатывается.

Скорость сообщений, на мой взгляд, высокая, около 2.000 сообщений в секунду. С таким количеством сообщений я сталкиваюсь с поврежденным сообщением, хотя очень редко. Из 2000 сообщений около 4-8 повреждены. Я считаю, что проблема связана с неправильным использованием библиотеки boost::asio и/или boost::thread.

Код, который я реализовал, в основном основан на этом учебник по продвижению. Я не могу найти ошибку, и, поскольку основные сообщения работают, мне трудно сузить проблему.

Может быть, у кого-то еще есть идея, что здесь происходит не так?

В основном этот класс используется следующим образом:

(1) Конструктор вызывается в начале моей программы для запуска потока, таким образом, служба для приема и передачи сообщений

(2) Всякий раз, когда я хочу передать сообщение, я вызываю MessageService::transmitMessage(), который делегирует задачу с async_write потоку, обрабатывающему очередь сообщений.

using namespace google::protobuf::io;
using boost::asio::ip::tcp;

MessageService::MessageService(std::string ip, std::string port) :
    work(io_service), resolver(io_service), socket(io_service) {

    messageQueue = new std::deque<AgentMessage>;
    tcp::resolver::query query(ip, port);
    endpoint_iterator = resolver.resolve(query);

    tcp::endpoint endpoint = *endpoint_iterator;

    socket.async_connect(endpoint, boost::bind(&MessageService::handle_connect,
            this, boost::asio::placeholders::error, ++endpoint_iterator));

    boost::thread t(boost::bind(&boost::asio::io_service::run, &io_service));
}

void MessageService::await() {

    while (!messageQueue->empty()) {

        signal(SIGINT, exit);

        int messagesLeft = messageQueue->size();
        sleep(3);
        std::cout << "Pending Profiler Agents Messages: "
                << messageQueue->size() << std::endl;
        if (messagesLeft == messageQueue->size()) {
            std::cout << "Connection Error" << std::endl;
            break;
        }
    }

    std::cout << i << std::endl;
}

void MessageService::write(AgentMessage agentMessage, long systemTime,
        int JVM_ID) {
    agentMessage.set_timestamp(Agent::Helper::getCurrentClockCycle());
    agentMessage.set_jvm_id(JVM_ID);
    agentMessage.set_systemtime(systemTime);
    io_service.post(boost::bind(&MessageService::do_write, this, agentMessage));
}

void MessageService::do_close() {
    socket.close();
}

void MessageService::transmitMessage(AgentMessage agentMessage) {

    ++i;

    boost::asio::streambuf b;
    std::ostream os(&b);

    ZeroCopyOutputStream *raw_output = new OstreamOutputStream(&os);
    CodedOutputStream *coded_output = new CodedOutputStream(raw_output);

    coded_output->WriteVarint32(agentMessage.ByteSize());
    agentMessage.SerializeToCodedStream(coded_output);

    delete coded_output;
    delete raw_output;

    boost::system::error_code ignored_error;

    boost::asio::async_write(socket, b.data(), boost::bind(
            &MessageService::handle_write, this,
            boost::asio::placeholders::error));
}

void MessageService::do_write(AgentMessage agentMessage) {

    bool write_in_progress = !messageQueue->empty();
    messageQueue->push_back(agentMessage);

    if (!write_in_progress) {
        transmitMessage(agentMessage);
    }
}

void MessageService::handle_write(const boost::system::error_code &error) {

    if (!error) {
        messageQueue->pop_front();
        if (!messageQueue->empty()) {
            transmitMessage(messageQueue->front());
        }
    } else {
        std::cout << error << std::endl;
        do_close();
    }
}

void MessageService::handle_connect(const boost::system::error_code &error,
        tcp::resolver::iterator endpoint_iterator) {
    // can be used to receive commands from the Java profiler interface
}

MessageService::~MessageService() {
    // TODO Auto-generated destructor stub
}

Заголовочный файл:

    using boost::asio::ip::tcp;

class MessageService {
public:
    MessageService(std::string ip, std::string port);
    virtual ~MessageService();
    void write(AgentMessage agentMessage, long systemTime, int JVM_ID);
    void await();

private:
    boost::asio::io_service io_service;
    boost::asio::io_service::work work;
    tcp::resolver resolver;
    tcp::resolver::iterator endpoint_iterator;
    tcp::socket socket;
    std::deque<AgentMessage> *messageQueue;

    void do_write(AgentMessage agentMessage);

    void do_close();

    void handle_write(const boost::system::error_code &error);

    void handle_connect(const boost::system::error_code &error,
            tcp::resolver::iterator endpoint_iterator);

    void transmitMessage(AgentMessage agentMessage);
};

person Konrad Reiche    schedule 05.07.2011    source источник
comment
Поскольку вы упомянули boost::thread, я могу только предположить, что вы используете более одного потока в программе, но у вас нет блокировки, обеспечивающей безопасность доступа к очереди сообщений (я предполагаю, что это единственный общий ресурс) - я должен признаться, что я не прошел код, так что, возможно, доступ к очереди является однопоточным ... вы мне скажите   -  person David Rodríguez - dribeas    schedule 05.07.2011
comment
Я думал, что есть только один поток, работающий с сообщениями и, следовательно, с очередью. Поскольку единственная функция, по крайней мере, я думаю, что это так, которая вызывается несколькими потоками, - это передать сообщение и передать делегаты сообщения с помощью: io_service.post(boost::bind(&MessageService::do_write, this, agentMessage)); Что, как я думал, выполняется только одним потоком, запущенным в конструкторе. Но, может быть, я ошибаюсь.   -  person Konrad Reiche    schedule 05.07.2011
comment
transmitMessage не вызывается несколькими потоками, у вас есть только один поток, отправляющий службу io. Я не вижу ничего плохого в приведенном выше коде (пока вы когда-либо вызываете write только из другого потока - скажем, из основного потока) - я бы проверил код получателя.   -  person Nim    schedule 05.07.2011
comment
О, я имел в виду write да, эта функция вызывается из основного потока и, возможно, из других потоков.   -  person Konrad Reiche    schedule 05.07.2011
comment
в этом случае все в порядке, потому что вы post превращаете обработчик службы ввода-вывода в фактически push_back. Рискну предположить, что проблема кроется в коде приемника.   -  person Nim    schedule 05.07.2011
comment
@Nim: код приемника написан на Java и очень прост, по крайней мере, на мой взгляд, но да - это все еще возможно.   -  person Konrad Reiche    schedule 05.07.2011
comment
Вы всегда можете захватить сетевой трафик и проанализировать его, чтобы определить, являются ли пакеты в сети правильными или нет.   -  person David Rodríguez - dribeas    schedule 05.07.2011
comment
Я думаю, что std::ostream os; созданный в стеке не правильно! Вы должны создать его в куче. См. stackoverflow.com/questions/5344809/ и comments.gmane.org/gmane.comp.lib.boost .asio.user/2161 Сделайте поток переменной-членом и, пожалуйста, дайте мне знать, решит ли это вашу проблему.   -  person O.C.    schedule 05.07.2011
comment
Думаю, я был неправ. Создание boost::asio::streambuf в стеке мне кажется некорректным!.   -  person O.C.    schedule 05.07.2011
comment
Что заставляет вас полагать, что это проблема с boost::thread?   -  person Sam Miller    schedule 06.07.2011


Ответы (1)


этот метод кажется мне сомнительным

void MessageService::transmitMessage(AgentMessage agentMessage) {
    ++i;

    boost::asio::streambuf b;
    std::ostream os(&b);

    ZeroCopyOutputStream *raw_output = new OstreamOutputStream(&os);
    CodedOutputStream *coded_output = new CodedOutputStream(raw_output);

    coded_output->WriteVarint32(agentMessage.ByteSize());
    agentMessage.SerializeToCodedStream(coded_output);

    delete coded_output;
    delete raw_output;

    boost::system::error_code ignored_error;

    boost::asio::async_write(socket, b.data(), boost::bind(
            &MessageService::handle_write, this,
            boost::asio::placeholders::error));
}

Похоже, вы сериализуете AgentMessage (которое должно быть передано через ссылку const между прочим) в streambuf. Однако существование этих сериализованных данных не гарантируется до тех пор, пока не будет вызван обработчик завершения async_write, который явно описан в async_write документация

буферы

Один или несколько буферов, содержащих данные для записи. Хотя объект буферов может быть скопирован по мере необходимости, право собственности на нижележащие блоки памяти сохраняется за вызывающей стороной, которая должна гарантировать, что они останутся действительными до тех пор, пока не будет вызван обработчик.

Чтобы решить эту проблему, убедитесь, что буфер остается в области видимости, пока не будет вызван обработчик завершения. Один из способов сделать это — передать буфер в качестве аргумента обработчику ограниченного завершения:

boost::asio::async_write(socket, b.data(), boost::bind(
            &MessageService::handle_write, this,
            boost::asio::placeholders::error,
            coded_output
            // ^^^ buffer goes here
            ));

затем удалите его из обработчика завершения. Я бы посоветовал вам также взглянуть на использование shared_ptr вместо голых указателей.

person Sam Miller    schedule 05.07.2011
comment
Таким образом, поскольку async_write возвращается немедленно, но неизвестно, когда завершится async_write и останется передаваемое сообщение, может быть неясно, что происходит с выделенными буферами? Что бы вы предложили для противодействия этой проблеме? - person Konrad Reiche; 06.07.2011
comment
@platzhirsch время, когда async_write завершается, хорошо известно, когда вызывается ваш обработчик завершения. Я обновил свой ответ одним возможным решением. - person Sam Miller; 06.07.2011