libevent kqueue не работает с fd, возвращенным из zmq_getsockopt()

ИСХОДНОЕ СООБЩЕНИЕ: я пишу сервис на языке C, используя libevent и zmq. Msg передается из кода Python в службу C с использованием шаблона PUSH-PULL.

fd, полученный из сокета zmq:

void *receiver = zmq_socket (base.zmq_ctx, ZMQ_PULL);
zmq_connect (receiver, "tcp://localhost:5557");
int fd=0;
size_t fd_len = sizeof(fd);
zmq_getsockopt (receiver, ZMQ_FD, &fd, &fd_len);

Используя Libevent, событие, зарегистрированное с помощью fd для постоянного чтения

struct event *read_data_on_zmq =event_new(base.evbase, fd, EV_READ | EV_PERSIST , read_data_on_zmq_cb,receiver);
event_add(read_data_on_zmq,NULL);
event_base_dispatch(base.evbase);

В методе обратного вызова я делаю неблокирующий прием

void read_data_on_zmq_cb(evutil_socket_t fd, short what, void *arg)
{
    char *msg = calloc(1024,sizeof(char));
    int size = zmq_recv (receiver, msg, 255, ZMQ_DONTWAIT);
    if (size != -1) 
    {
        puts ("is size is not -1");
        printf("msg = %s\n",msg); 
    }
}

В коде Python я постоянно отправляю сообщение в сокет.

import zmq
import time

c=zmq.Context()
s=c.socket(zmq.PUSH)
s.bind('tcp://127.0.0.1:5557')
while(True):
    s.send("abc")
    time.sleep(2)

Проблема в том, что я могу получить сообщение только один раз, после чего обратный вызов события никогда не срабатывает. Если я делаю zmq_connect внутри read_data_on_zmq_cb после zmq_recv, то он работает нормально, но я думаю, что это избыточно и не является правильным способом сделать это. В чем проблема?

EDIT1: В дополнение к проверке ZMQ_EVENTS после выполнения zmq_recv() вам необходимо получить все сообщения, поскольку zmq запускается EDGE. Отличное объяснение триггерного уведомления EDGE находится здесь http://funcptr.net/2012/09/10/zeromq---edge-triggered-notification/, поэтому в конечном итоге мой обратный вызов события будет выглядеть так:

void read_data_on_zmq_cb(evutil_socket_t fd, short what, void *arg)
{
    unsigned int     zmq_events;
    size_t           zmq_events_size  = sizeof(zmq_events);
    char *msg=NULL;

    zmq_getsockopt (receiver, ZMQ_EVENTS, &zmq_events, &zmq_events_size);

    while(zmq_events & ZMQ_POLLIN)
    {
        msg = calloc(1024,sizeof(char));
        int size = zmq_recv (receiver, msg, 255, ZMQ_DONTWAIT);
        if (size != -1) {
        #ifdef DEBUG    
            printf("msg = %s\n",msg);
        #endif
            //return msg;
        }
        zmq_getsockopt (receiver, ZMQ_EVENTS, &zmq_events, &zmq_events_size);
    }
}

person Pritesh Acharya    schedule 03.05.2013    source источник
comment
Установлен ли базовый fd в неблокирующий режим?   -  person ldx    schedule 07.05.2013
comment
Я нашел в чем проблема и решил ее. Пожалуйста, проверьте мой ответ ниже.   -  person Pritesh Acharya    schedule 08.05.2013


Ответы (1)


ПОЖАЛУЙСТА, прочитайте мой EDIT1 для полного ответа с кодом. Проблема заключалась в следующем: я не перепроверил ZMQ_EVENTS после выполнения zmq_recv(), так как в это время изменилось состояние сокета.

Так зовет

zmq_getsockopt (receiver, ZMQ_EVENTS, &fd, &fd_size);

после того, как zmq_recv() решил мою проблему.

person Pritesh Acharya    schedule 08.05.2013