Я использую boost::asio и boost::thread для реализации службы сообщений, которая принимает сообщения и отправляет их асинхронно, если сообщения нет. обрабатывается или помещает в очередь сообщение, если оно обрабатывается.
Скорость сообщений, на мой взгляд, высокая, около 2.000 сообщений в секунду. С таким количеством сообщений я сталкиваюсь с поврежденным сообщением, хотя очень редко. Из 2000 сообщений около 4-8 повреждены. Я считаю, что проблема связана с неправильным использованием библиотеки boost::asio и/или boost::thread.
Код, который я реализовал, в основном основан на этом учебник по продвижению. Я не могу найти ошибку, и, поскольку основные сообщения работают, мне трудно сузить проблему.
Может быть, у кого-то еще есть идея, что здесь происходит не так?
В основном этот класс используется следующим образом:
(1) Конструктор вызывается в начале моей программы для запуска потока, таким образом, служба для приема и передачи сообщений
(2) Всякий раз, когда я хочу передать сообщение, я вызываю MessageService::transmitMessage()
, который делегирует задачу с async_write
потоку, обрабатывающему очередь сообщений.
using namespace google::protobuf::io;
using boost::asio::ip::tcp;
MessageService::MessageService(std::string ip, std::string port) :
work(io_service), resolver(io_service), socket(io_service) {
messageQueue = new std::deque<AgentMessage>;
tcp::resolver::query query(ip, port);
endpoint_iterator = resolver.resolve(query);
tcp::endpoint endpoint = *endpoint_iterator;
socket.async_connect(endpoint, boost::bind(&MessageService::handle_connect,
this, boost::asio::placeholders::error, ++endpoint_iterator));
boost::thread t(boost::bind(&boost::asio::io_service::run, &io_service));
}
void MessageService::await() {
while (!messageQueue->empty()) {
signal(SIGINT, exit);
int messagesLeft = messageQueue->size();
sleep(3);
std::cout << "Pending Profiler Agents Messages: "
<< messageQueue->size() << std::endl;
if (messagesLeft == messageQueue->size()) {
std::cout << "Connection Error" << std::endl;
break;
}
}
std::cout << i << std::endl;
}
void MessageService::write(AgentMessage agentMessage, long systemTime,
int JVM_ID) {
agentMessage.set_timestamp(Agent::Helper::getCurrentClockCycle());
agentMessage.set_jvm_id(JVM_ID);
agentMessage.set_systemtime(systemTime);
io_service.post(boost::bind(&MessageService::do_write, this, agentMessage));
}
void MessageService::do_close() {
socket.close();
}
void MessageService::transmitMessage(AgentMessage agentMessage) {
++i;
boost::asio::streambuf b;
std::ostream os(&b);
ZeroCopyOutputStream *raw_output = new OstreamOutputStream(&os);
CodedOutputStream *coded_output = new CodedOutputStream(raw_output);
coded_output->WriteVarint32(agentMessage.ByteSize());
agentMessage.SerializeToCodedStream(coded_output);
delete coded_output;
delete raw_output;
boost::system::error_code ignored_error;
boost::asio::async_write(socket, b.data(), boost::bind(
&MessageService::handle_write, this,
boost::asio::placeholders::error));
}
void MessageService::do_write(AgentMessage agentMessage) {
bool write_in_progress = !messageQueue->empty();
messageQueue->push_back(agentMessage);
if (!write_in_progress) {
transmitMessage(agentMessage);
}
}
void MessageService::handle_write(const boost::system::error_code &error) {
if (!error) {
messageQueue->pop_front();
if (!messageQueue->empty()) {
transmitMessage(messageQueue->front());
}
} else {
std::cout << error << std::endl;
do_close();
}
}
void MessageService::handle_connect(const boost::system::error_code &error,
tcp::resolver::iterator endpoint_iterator) {
// can be used to receive commands from the Java profiler interface
}
MessageService::~MessageService() {
// TODO Auto-generated destructor stub
}
Заголовочный файл:
using boost::asio::ip::tcp;
class MessageService {
public:
MessageService(std::string ip, std::string port);
virtual ~MessageService();
void write(AgentMessage agentMessage, long systemTime, int JVM_ID);
void await();
private:
boost::asio::io_service io_service;
boost::asio::io_service::work work;
tcp::resolver resolver;
tcp::resolver::iterator endpoint_iterator;
tcp::socket socket;
std::deque<AgentMessage> *messageQueue;
void do_write(AgentMessage agentMessage);
void do_close();
void handle_write(const boost::system::error_code &error);
void handle_connect(const boost::system::error_code &error,
tcp::resolver::iterator endpoint_iterator);
void transmitMessage(AgentMessage agentMessage);
};
boost::thread
, я могу только предположить, что вы используете более одного потока в программе, но у вас нет блокировки, обеспечивающей безопасность доступа к очереди сообщений (я предполагаю, что это единственный общий ресурс) - я должен признаться, что я не прошел код, так что, возможно, доступ к очереди является однопоточным ... вы мне скажите - person David Rodríguez - dribeas   schedule 05.07.2011transmitMessage
не вызывается несколькими потоками, у вас есть только один поток, отправляющий службу io. Я не вижу ничего плохого в приведенном выше коде (пока вы когда-либо вызываетеwrite
только из другого потока - скажем, из основного потока) - я бы проверил код получателя. - person Nim   schedule 05.07.2011write
да, эта функция вызывается из основного потока и, возможно, из других потоков. - person Konrad Reiche   schedule 05.07.2011post
превращаете обработчик службы ввода-вывода в фактическиpush_back
. Рискну предположить, что проблема кроется в коде приемника. - person Nim   schedule 05.07.2011boost::thread
? - person Sam Miller   schedule 06.07.2011