У нас есть приложение Java, которое действует как сервер. Клиентские приложения (написанные на C#) взаимодействуют с ним с помощью ZeroMQ. Мы (в основном) следуем модели Ленивого Пирата.
На сервере есть сокет Router, реализованный следующим образом (с использованием JeroMQ):
ZContext context = new ZContext();
Socket socket = context.createSocket(ZMQ.ROUTER);
socket.bind("tcp://*:5555");
Клиенты подключаются и отправляют сообщения следующим образом:
ZContext context = ZContext.Create();
ZSocket socket = ZSocket.Create(context, ZSocketType.REQ);
socket.Identity = Encoding.UTF8.GetBytes("Some identity");
socket.Connect("tcp://my_host:5555");
socket.Send(new ZFrame("request data"));
Мы столкнулись с потерей сообщений, когда несколько клиентов отправляют сообщения одновременно. С одним клиентом проблем не возникает.
Правильно ли мы реализуем это для установки с несколькими клиентами и одним сервером?
Обновление: пример клиента и сервера, демонстрирующих такое поведение:
Сервер:
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.PollItem;
import org.zeromq.ZMQ.Poller;
import org.zeromq.ZMQ.Socket;
import org.zeromq.ZMsg;
public class SimpleServer
{
public static void main(String[] args) throws InterruptedException
{
ZContext context = new ZContext();
Socket socket = context.createSocket(ZMQ.ROUTER);
socket.setRouterMandatory(true);
socket.bind("tcp://*:5559");
PollItem pollItem = new PollItem(socket, Poller.POLLIN);
int messagesReceived = 0;
int pollCount = 0;
while ((pollCount = ZMQ.poll(new PollItem[]{pollItem}, 3000)) > -1)
{
messagesReceived += pollCount;
for (int i = 0 ; i < pollCount ; i++)
{
ZMsg msg = ZMsg.recvMsg(socket);
System.out.println(String.format("Received message: %s. Total messages received: %d", msg, messagesReceived));
}
if (pollCount == 0)
{
System.out.println(String.format("No messages on socket. Total messages received: %d", messagesReceived));
}
}
}
}
Клиент:
using NetMQ;
using System;
using System.Text;
namespace SimpleClient
{
class Program
{
static byte[] identity = Encoding.UTF8.GetBytes("id" + DateTime.UtcNow.Ticks);
static void Main(string[] args)
{
for (int i = 0; i < 100; i++)
{
SendMessage();
}
}
private static void SendMessage()
{
using (NetMQContext context = NetMQContext.Create())
{
using (NetMQSocket socket = context.CreateRequestSocket())
{
socket.Options.Identity = identity;
socket.Connect("tcp://localhost:5559");
socket.Send(Encoding.UTF8.GetBytes("hello!"));
}
}
}
}
}
Если я запускаю сервер и один клиент, я вижу, что приходят все мои 100 сообщений. Если я запускаю, скажем, 5 клиентов одновременно, я получаю только около 200 -> 300 сообщений вместо полных 500. Кроме того, кажется, что закрытие сокета в клиенте каким-то образом останавливает сокет маршрутизатора на сервере. получение сообщений ненадолго, хотя это всего лишь теория.