неожиданное поведение сокетов push/pull в netmq (zeromq)

Я пытаюсь использовать netmq (порт zeroMQ). Вот проблема, которую я нашел. Вот код:

class Program
{
    private static NetMQContext context;

    static void Main(string[] args)
    {
        context = NetMQContext.Create();
        using (var puller = context.CreatePullSocket())
        {
            puller.Bind("tcp://127.0.0.1:5651");
            Thread.Sleep(500);
            for (int i = 0; i < 5; i++)
            {
                Task.Run(new Action(PusheThread));
            }
            Console.WriteLine("any key to start receive");
            Console.ReadKey();
            Msg msg = new Msg();
            msg.InitEmpty();
            for (; puller.TryReceive(ref msg, new TimeSpan(0, 0, 5)); msg = new Msg(), msg.InitEmpty())
            {
                var s = Encoding.UTF8.GetString(msg.Data);
                Console.WriteLine(s);
            }
        }
        Console.WriteLine("any");
        Console.ReadKey();
    }

    static void PusheThread()
    {
        var guid = Guid.NewGuid();
        Console.WriteLine("started: " + guid);
        using (var pusher = context.CreatePushSocket())
        {
            pusher.Connect("tcp://127.0.0.1:5651");
            for (int i = 0; i < 5; i++)
            {
                pusher.Send("helo! " + guid);
            }
        }
    }
}

Если мы запустим этот код и посмотрим в консоли, мы увидим, что некоторые сообщения были потеряны. Нравиться:

any key to start receive
started: 8aeca8e5-ed41-4055-ab72-750a0e61a680
started: 4211d77a-ad9f-40f1-9382-121156325128
started: bd735e75-2692-4abe-b8b1-fbddbe21e546
started: 6749d3bb-6b2b-4caa-b22e-755dba4d932d
started: 281ff59e-4430-4fc6-9435-4dc2c5e6015e
helo! 8aeca8e5-ed41-4055-ab72-750a0e61a680
helo! 6749d3bb-6b2b-4caa-b22e-755dba4d932d
helo! 8aeca8e5-ed41-4055-ab72-750a0e61a680
helo! 6749d3bb-6b2b-4caa-b22e-755dba4d932d
helo! 8aeca8e5-ed41-4055-ab72-750a0e61a680
helo! 6749d3bb-6b2b-4caa-b22e-755dba4d932d
helo! 8aeca8e5-ed41-4055-ab72-750a0e61a680
helo! 6749d3bb-6b2b-4caa-b22e-755dba4d932d
helo! 8aeca8e5-ed41-4055-ab72-750a0e61a680
helo! 6749d3bb-6b2b-4caa-b22e-755dba4d932d
any

Как видим ни где нет сообщений от 4211d77a-ad9f-40f1-9382-121156325128, bd735e75-2692-4abe-b8b1-fbddbe21e546 и еще каких-то. Проблема в многопоточности? Или я что-то не так делаю? Спасибо.


person Alexey Kulikov    schedule 03.06.2015    source источник
comment
Проверьте наличие ошибок при подключении и отправке потоков пушеров, посмотрите, не появится ли что-нибудь.   -  person Jason    schedule 03.06.2015
comment
Я обнаружил, что эта проблема существует только в netmq. Если я использую родную оболочку zeromq .net, все работает нормально   -  person Alexey Kulikov    schedule 03.06.2015


Ответы (1)


Проблема в PusheThread, где вы просто быстро убиваете вновь созданный PushSocket.

static void PusheThread()
{
    var guid = Guid.NewGuid();
    Console.WriteLine("started: " + guid);
    using (var pusher = context.CreatePushSocket())
    {
        pusher.Connect("tcp://127.0.0.1:5651");
        for (int i = 0; i < 5; i++)
        {
            pusher.Send("helo! " + guid);
        }
        Thread.Sleep(5000);
    }
}

Несмотря на то, что это не надежное решение, оно должно доказать, что проблема заключается в том, что PushSocket удаляется оператором using, прежде чем можно будет отправлять сообщения.

То, что некоторые сообщения даже через просто показывают, насколько быстры NetMQ и ZeroMQ :)

person Emil Ingerslev    schedule 17.07.2015