ZeroMQ: исчезающие сообщения

У нас есть приложение Java, которое действует как сервер. Клиентские приложения (написанные на C#) взаимодействуют с ним с помощью ZeroMQ. Мы (в основном) следуем модели Ленивого Пирата.

На сервере есть сокет Router, реализованный следующим образом (с использованием JeroMQ):

ZContext context = new ZContext();
Socket socket = context.createSocket(ZMQ.ROUTER);
socket.bind("tcp://*:5555");

Клиенты подключаются и отправляют сообщения следующим образом:

ZContext context = ZContext.Create();
ZSocket socket = ZSocket.Create(context, ZSocketType.REQ);
socket.Identity = Encoding.UTF8.GetBytes("Some identity");
socket.Connect("tcp://my_host:5555");
socket.Send(new ZFrame("request data"));

Мы столкнулись с потерей сообщений, когда несколько клиентов отправляют сообщения одновременно. С одним клиентом проблем не возникает.

Правильно ли мы реализуем это для установки с несколькими клиентами и одним сервером?

Обновление: пример клиента и сервера, демонстрирующих такое поведение:

Сервер:

import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.PollItem;
import org.zeromq.ZMQ.Poller;
import org.zeromq.ZMQ.Socket;
import org.zeromq.ZMsg;

public class SimpleServer
{
    public static void main(String[] args) throws InterruptedException
    {
        ZContext context = new ZContext();
        Socket socket = context.createSocket(ZMQ.ROUTER);
        socket.setRouterMandatory(true);
        socket.bind("tcp://*:5559");

        PollItem pollItem = new PollItem(socket, Poller.POLLIN);

        int messagesReceived = 0;
        int pollCount = 0;

        while ((pollCount = ZMQ.poll(new PollItem[]{pollItem}, 3000)) > -1)
        {
            messagesReceived += pollCount;

            for (int i = 0 ; i < pollCount ; i++)
            {
                ZMsg msg = ZMsg.recvMsg(socket);
                System.out.println(String.format("Received message: %s. Total messages received: %d", msg, messagesReceived));
            }

            if (pollCount == 0)
            {
                System.out.println(String.format("No messages on socket. Total messages received: %d", messagesReceived));
            }
        }
    }
}

Клиент:

using NetMQ;
using System;
using System.Text;

namespace SimpleClient
{
    class Program
    {
        static byte[] identity = Encoding.UTF8.GetBytes("id" + DateTime.UtcNow.Ticks);

        static void Main(string[] args)
        {
            for (int i = 0; i < 100; i++)
            {
                SendMessage();
            }
        }

        private static void SendMessage()
        {
            using (NetMQContext context = NetMQContext.Create())
            {
                using (NetMQSocket socket = context.CreateRequestSocket())
                {
                    socket.Options.Identity = identity;
                    socket.Connect("tcp://localhost:5559");
                    socket.Send(Encoding.UTF8.GetBytes("hello!"));
                }
            }
        }
    }
}

Если я запускаю сервер и один клиент, я вижу, что приходят все мои 100 сообщений. Если я запускаю, скажем, 5 клиентов одновременно, я получаю только около 200 -> 300 сообщений вместо полных 500. Кроме того, кажется, что закрытие сокета в клиенте каким-то образом останавливает сокет маршрутизатора на сервере. получение сообщений ненадолго, хотя это всего лишь теория.


person Jamie Poole    schedule 14.04.2015    source источник
comment
Не могли бы вы предоставить больше кода? Как ваш сервер обрабатывает запросы? Пробовали ли вы реализовать простой сервер «hello world» с вашей архитектурой? Где ваш запросчик слушает свой ответ? Если вы используете REQ, вы должны выполнить синхронное получение ответа.   -  person antiduh    schedule 14.04.2015
comment
@antiduh - Спасибо, я обновил вопрос, указав простой клиент/сервер, который ведет себя так. На самом деле я еще не приступил к обработке ответов, это всего лишь случай проверки того, что все сообщения доставляются на сервер.   -  person Jamie Poole    schedule 14.04.2015
comment
Пробовали ли вы раскручивать один контекст/сокет, который используется на протяжении всего жизненного цикла клиента, вместо того, чтобы раскручивать их и разбивать на каждое сообщение? Это было бы моим первым предложением по отладке. Параметр ZMQ_LINGER по умолчанию должен предотвратить любые проблемы, связанные с тем, что вам нужно. делаю, но кто знает   -  person Jason    schedule 14.04.2015
comment
@ Джейсон - я думал об этом, хотя на данный момент это не вариант для меня. Мне нужно попытаться добраться до сути проблемы с этой моделью соединения на запрос.   -  person Jamie Poole    schedule 16.04.2015
comment
Я спрашиваю, если вы внесете это изменение в свой пример кода, он начнет работать или все еще испытывает ту же проблему? Если он действительно начинает себя вести, это указывает на то, что существует определенная проблема с такой большой турбулентностью в вашем контексте ZMQ.   -  person Jason    schedule 16.04.2015


Ответы (3)


Часть 1. Опрос может возвращать более одного события

ZMQ.poll() возвращает количество найденных событий:

int rc = ZMQ.poll(new PollItem[]{pollItem}, 3000);

В настоящее время вы предполагаете, что одно возвращение из poll является одним событием. Вместо этого вы должны перебрать ZMsg msg = ZMsg.recvMsg(socket); для количества событий, указанных возвратом ZMQ.Poll().

Из источника JeroMQ:

/**
 * Polling on items. This has very poor performance.
 * Try to use zmq_poll with selector
 * CAUTION: This could be affected by jdk epoll bug
 *
 * @param items
 * @param timeout
 * @return number of events
 */
public static int zmq_poll(PollItem[] items, long timeout)
{
    return zmq_poll(items, items.length, timeout);
}

Часть 2. ZMsg.receive() может возвращать несколько кадров

Когда вы получаете ZMsg от ZMsg msg = ZMsg.recvMsg(socket);, ZMsg может содержать несколько ZFrame, каждый из которых содержит данные клиента.

Из комментариев класса ZMsg в JeroMQ источник:

 * // Receive message from ZMQSocket "input" socket object and iterate over frames
 * ZMsg receivedMessage = ZMsg.recvMsg(input);
 * for (ZFrame f : receivedMessage) {
 *     // Do something with frame f (of type ZFrame)
 * }

Часть 3 — сообщения можно разделить на несколько ZFrames

Из исходного кода ZFrame в JeroMQ :

 * The ZFrame class provides methods to send and receive single message
 * frames across 0MQ sockets. A 'frame' corresponds to one underlying zmq_msg_t in the libzmq code.
 * When you read a frame from a socket, the more() method indicates if the frame is part of an
 * unfinished multipart message.

Если я правильно понимаю, то для каждого события вы можете получить несколько фреймов, а одно клиентское сообщение может отображать 1..N фреймов (если сообщение большое?).

Итак, подведем итог:

  • Один результат опроса может указывать на несколько событий.
  • Одно событие и, следовательно, одно ZMsg.receive() может содержать несколько кадров.
  • Один фрейм может содержать одно полное клиентское сообщение или только часть клиентского сообщения; одно клиентское сообщение сопоставляется с 1..N кадрами.
person antiduh    schedule 14.04.2015
comment
Спасибо, да, я думал, что это может быть так. Я обновил код выше, он все еще ведет себя так же. На самом деле, я никогда не видел, чтобы сокет моего маршрутизатора возвращал что-либо, кроме 0 или 1 в Poll(). - person Jamie Poole; 16.04.2015
comment
Я определенно думаю о том же, что и вы! Весь день я экспериментировал с различными подходами, и, распечатывая сообщения для консоли и изучая размеры сообщений, я уверен, что каждое полученное сообщение — это одно сообщение, которое было отправлено, без дополнительных кадров для сообщений, объединенных вместе или разделенных. . Главное открытие сегодняшнего дневного тестирования заключается в том, что отброшенные сообщения возникают чаще всего, когда ЦП сервера очень высок (не уверен, является ли это причиной или следствием) ... на самом деле это может быть «ошибка jdk epoll», упомянутая в Javadoc опрос () выше? - person Jamie Poole; 17.04.2015
comment
@JamiePoole Хм, я начинаю думать, что в этом есть что-то большее. В коде C# важно понимать, что вы вызываете socket.Send(), за которым сразу же следует socket.Dispose() из-за блока using. Мне интересно, имеет ли это какое-то отношение к этому. Кроме этого, я мог бы начать считать, что это ошибка. Не забудьте обновить свой вопрос своими выводами по мере работы над этим, чтобы другие люди могли извлечь пользу из вашего исследования. - person antiduh; 17.04.2015

К сожалению, мы не смогли решить эту конкретную проблему и отказались от использования ZeroMQ для этого интерфейса. В случае, если это поможет кому-то еще, единственное, что мы точно выяснили, это то, что быстрое открытие/закрытие сокетов запросов вызывало нежелательное поведение (потеря сообщений) на стороне сокета маршрутизатора. Проблема усугублялась низкой производительностью процессора сервера и вообще не появлялась, когда сервер работал на быстрой многоядерной машине.

person Jamie Poole    schedule 22.04.2015
comment
Вы должны собрать наименьший из возможных тестовый пример, который воспроизводит потерю пакетов, и отправить его как отчет об ошибке в ZeroMQ, похоже, вы обнаружили ошибку/состояние потери сообщения. - person antiduh; 28.04.2015

К сожалению, я даже близко не работал с ZMQ, когда этот вопрос был активен. Но сегодня у меня была такая же проблема, и я нашел эту страницу. И ваш ответ (без использования ZMQ) меня не удовлетворил. Так что я искал немного больше и, наконец, узнал, что делать.

Напоминаем: это работает с "POLLER" в ZMQ [1].

Если вы используете соединение "PAIR", вы точно НЕ потеряете файлы, НО отправка/получение занимает прибл. в то же время. Таким образом, вы не можете ускориться и не было решением для меня.

Решение:

  • в zmq_setsockopt (python: zmq.setsockopt) вы можете установить для ZMQ_HWM (zmq.SNDHWM, zmq.RCVHWM) значение '0' [2]

    • в python: sock.setsockopt(zmq.SNDHWM, 0) соотв. sock.setsockopt(zmq.RCVHWM, 0) для отправителя, соответственно. приемник

    • примечание: я думаю, что обозначения изменились с HWM на SNDWHM/RCVHWM

    • HWM = 0 означает, что на количество сообщений НЕТ ограничения (так что будьте осторожны, возможно, установите (очень высокий) лимит)

  • есть также ZMQ_SNDBUF/ ZMQ_RCVBUF (python: zmq.SNDBUF/zmq.RCVBUF), которые вы также можете указать, т.е. sock.setsockopt(zmq.RCVBUF, 0) соотв. ..... [2]

    • так что это установит операционную систему «SO_RCVBUF» по умолчанию (здесь мои знания заканчиваются)

    • установка этого параметра или нет НЕ повлияла на мой случай, но я думаю, что это может

Производительность:

Таким образом, я мог «отправить» 100 000 файлов размером 98 КБ за ~ 8 с (~ 10 ГБ): это заполнит вашу оперативную память (если она будет заполнена, я думаю, ваша программа замедлится), см. также рисунок

тем временем я «получил» и сохранил файлы примерно через ~введите здесь описание изображения118 с и снова освобождение оперативной памяти

Кроме того, с этим я NERVER потерял файл до сих пор. (вы могли бы, если бы вы достигли пределов вашего ПК)

потеря данных — это "ХОРОШО":

  • если вам действительно НУЖНЫ все данные, вы должны использовать этот метод

  • если вы можете считать, что некоторые потери в порядке (например, живые графики: пока ваш FPS> ~ 50, вы будете плавно видеть графики, и вам все равно, потеряете ли вы что-то)

  • --> Вы можете сэкономить оперативную память и избежать блокировки всего компьютера!

Надеюсь, этот пост поможет следующему человеку, который придет...

[1]: https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/pyzmq/multisocket/zmqpoller.htm

[2]: http://api.zeromq.org/2-1:zmq-setsockopt

Вы найдете изображение оперативной памяти: RAM загружается примерно через 8 секунд. Далее диск сохраняет файлы из буфера

person Andres Forrer    schedule 22.08.2019