java.lang.ArrayIndexOutOfBoundsException: 256 с версией jeromq 0.3.6

Я использую Jeromq в многопоточной среде, как показано ниже. Ниже приведен мой код, в котором конструктор SocketManager сначала подключается ко всем доступным сокетам, и я помещаю их в карту liveSocketsByDatacenter в методе connectToZMQSockets. После этого я запускаю фоновый поток в том же конструкторе, который запускается каждые 30 секунд, и он вызывает метод updateLiveSockets, чтобы пропинговать все те сокеты, которые уже были на карте liveSocketsByDatacenter, и обновить карту liveSocketsByDatacenter, указав, были ли эти сокеты живыми или нет.

И метод getNextSocket() вызывается несколькими потоками чтения одновременно, чтобы получить следующий доступный сокет, а затем мы используем этот сокет для отправки данных в него. Итак, мой вопрос: правильно ли мы используем Jeromq в многопоточной среде? Потому что мы только что увидели исключение в нашей производственной среде с этой трассировкой стека, когда мы пытались отправить данные в этот активный сокет, поэтому я не уверен, ошибка это или что-то еще?

java.lang.ArrayIndexOutOfBoundsException: 256
at zmq.YQueue.push(YQueue.java:97)
at zmq.YPipe.write(YPipe.java:47)
at zmq.Pipe.write(Pipe.java:232)
at zmq.LB.send(LB.java:83)
at zmq.Push.xsend(Push.java:48)
at zmq.SocketBase.send(SocketBase.java:590)
at org.zeromq.ZMQ$Socket.send(ZMQ.java:1271)
at org.zeromq.ZFrame.send(ZFrame.java:131)
at org.zeromq.ZFrame.sendAndKeep(ZFrame.java:146)
at org.zeromq.ZMsg.send(ZMsg.java:191)
at org.zeromq.ZMsg.send(ZMsg.java:163)

Ниже мой код:

public class SocketManager {
    private static final Random random = new Random();
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>();
    private final ZContext ctx = new ZContext();

    private static class Holder {
        private static final SocketManager instance = new SocketManager();
    }

    public static SocketManager getInstance() {
        return Holder.instance;
    }

    private SocketManager() {
      connectToZMQSockets();
      scheduler.scheduleAtFixedRate(this::updateLiveSockets, 30, 30, TimeUnit.SECONDS);
    }

    // during startup, making a connection and populate once
    private void connectToZMQSockets() {
      Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
      for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
        List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
        liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets);
      }
    }

    private List<SocketHolder> connect(List<String> addresses, int socketType) {
        List<SocketHolder> socketList = new ArrayList<>();
        for (String address : addresses) {
          try {
            Socket client = ctx.createSocket(socketType);
            // Set random identity to make tracing easier
            String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
            client.setIdentity(identity.getBytes(ZMQ.CHARSET));
            client.setTCPKeepAlive(1);
            client.setSendTimeOut(7);
            client.setLinger(0);
            client.connect(address);

            SocketHolder zmq = new SocketHolder(client, ctx, address, true);
            socketList.add(zmq);
          } catch (Exception ex) {
            // log error
          }
        }
        return socketList;
    }

    // this method will be called by multiple threads concurrently to get the next live socket
    // is there any concurrency or thread safety issue or race condition here?
    public Optional<SocketHolder> getNextSocket() {
      for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
        Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
        if (liveSocket.isPresent()) {
          return liveSocket;
        }
      }
      return Optional.absent();
    }

    private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
      if (!CollectionUtils.isEmpty(listOfEndPoints)) {
        // The list of live sockets
        List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
        for (SocketHolder obj : listOfEndPoints) {
          if (obj.isLive()) {
            liveOnly.add(obj);
          }
        }
        if (!liveOnly.isEmpty()) {
          // The list is not empty so we shuffle it an return the first element
          return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
        }
      }
      return Optional.absent();
    }

    // runs every 30 seconds to ping all the socket to make sure whether they are alive or not
    private void updateLiveSockets() {
      Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;

      for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
        List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
        List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
        for (SocketHolder liveSocket : liveSockets) { // LINE A
          Socket socket = liveSocket.getSocket();
          String endpoint = liveSocket.getEndpoint();
          Map<byte[], byte[]> holder = populateMap();
          Message message = new Message(holder, Partition.COMMAND);

          // pinging to see whether a socket is live or not
          boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
          boolean isLive = (status) ? true : false;

          SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
          liveUpdatedSockets.add(zmq);
        }
        liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
      }
    }
}

А вот как я использую метод getNextSocket() класса SocketManager одновременно из нескольких потоков чтения:

// this method will be called from multiple threads
public boolean sendAsync(final long addr, final byte[] reco) {
  Optional<SocketHolder> liveSockets = SocketManager.getInstance().getNextSocket();
  return sendAsync(addr, reco, liveSockets.get().getSocket(), false);
}

public boolean sendAsync(final long addr, final byte[] reco, final Socket socket,
    final boolean messageA) {
  ZMsg msg = new ZMsg();
  msg.add(reco);
  boolean sent = msg.send(socket);
  msg.destroy();
  retryHolder.put(addr, reco);
  return sent;
}

  public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
    boolean sent = sendAsync(address, encodedRecords, socket, true);
    // if the record was sent successfully, then only sleep for timeout period
    if (sent) {
      try {
        TimeUnit.MILLISECONDS.sleep(500);
      } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
      }
    }
    // ...
    return sent;
  } 

Я не думаю, что это правильно, я считаю. Кажется, getNextSocket() может вернуть 0MQ socket в thread A. Одновременно поток таймера может получить доступ к тому же 0MQ socket, чтобы пропинговать его. В этом случае thread A и поток таймера мутируют один и тот же 0MQ socket, что приведет к проблемам. Итак, каков наилучший и эффективный способ решить эту проблему?

Примечание. SocketHolder — это неизменяемый класс.

Обновление:

Я только что заметил, что такая же проблема возникла на моем другом ящике с тем же ArrayIndexOutOfBoundsException, но на этот раз это номер строки 71 в файле "YQueue". Единственная постоянная вещь - это 256 всегда. Значит, должно быть что-то связанное с 256, а я не могу понять, что это за 256?

java.lang.ArrayIndexOutOfBoundsException: 256
    at zmq.YQueue.backPos(YQueue.java:71)
    at zmq.YPipe.write(YPipe.java:51)
    at zmq.Pipe.write(Pipe.java:232)
    at zmq.LB.send(LB.java:83)
    at zmq.Push.xsend(Push.java:48)
    at zmq.SocketBase.send(SocketBase.java:590)
    at org.zeromq.ZMQ$Socket.send(ZMQ.java:1271)
    at org.zeromq.ZFrame.send(ZFrame.java:131)
    at org.zeromq.ZFrame.sendAndKeep(ZFrame.java:146)
    at org.zeromq.ZMsg.send(ZMsg.java:191)
    at org.zeromq.ZMsg.send(ZMsg.java:163)

person john    schedule 02.11.2017    source источник
comment
Какие версии zeromq у вас работают на серверах, которые показывают эти исключения?   -  person everton    schedule 03.11.2017
comment
Это уже есть в заголовке моего вопроса. Я использую 0.3.6 версию Jeromq.   -  person john    schedule 03.11.2017
comment
Я предположил, что 0.3.6 была версией jeromq, а не zeromq (не знаком с ними)   -  person everton    schedule 03.11.2017
comment
Я использую Java Jeromq (0.3.6) для отправки данных в нашу очередь сообщений, которая использует c++ zeromq.   -  person john    schedule 03.11.2017
comment
Кстати, какая версия Java у вас есть в клиентском приложении?   -  person everton    schedule 03.11.2017
comment
Я использую Java 7. Пока не могу использовать Java 8.   -  person john    schedule 03.11.2017


Ответы (1)


Факт № 0: ZeroMQ не является потокобезопасным — по определению

Хотя документация ZeroMQ и отличная книга Питера ХИНТДЖЕНСА "Code Connected. Volume 1" не забывают напоминать об этом факте везде, где это возможно, время от времени появляется идея возврата или даже совместного использования экземпляра сокета ZeroMQ среди потоков. Конечно, методы экземпляров класса могут доставлять это почти «скрытым» внутри их внутренних методов и атрибутов, но надлежащие усилия по проектированию должны предотвращать любые такие побочные эффекты без каких-либо исключений и оправданий.

Совместное использование, если оно разумно подкреплено количественными фактами, может быть способом для общего экземпляра zmq.Context(), но кристально чистая структура распределенной системы может жить на основе действительно многоагентной схемы, где каждый агент управляет своим собственным Context() механизмом, прекрасно -настроен на соответствующее сочетание настроек и производительности.

Итак, каков наилучший и эффективный способ решить эту проблему?

Никогда не делитесь сокетом ZeroMQ. Никогда, правда. Даже если новейшая разработка начала обещать какие-то ближайшие изменения в этом направлении. Плохая привычка загрязнять дизайн любой высокопроизводительной распределенной системы с малой задержкой совместным использованием. Не делиться ничем — лучший принцип дизайна для этого домена.


Да, я вижу, что мы не должны совместно использовать сокеты между потоками, но в моем коде
Как вы думаете, как лучше всего решить эту проблему?

Да, лучший и эффективный способ решить эту проблему — никогда не использовать сокет ZeroMQ совместно.

Это означает, что никогда не возвращайте какой-либо объект, атрибутами которого являются сокеты ZeroMQ (которые вы активно создаете и массово возвращаете из метода класса .connect(){...}. В вашем случае все методы класса кажутся сохранено private, что может решить проблему разрешения "другим потокам" касаться экземпляров сокета класса-private, но тот же принцип должен поддерживаться также на всех уровнях атрибутов, чтобы быть эффективным . Наконец, это "объединение" укорачивается и нарушается
public static SocketManager getInstance(),
который беспорядочно предлагает любому внешнему запрашивающему получить прямой доступ к совместному использованию закрытых экземпляров сокетов ZeroMQ. .

Если какая-то документация прямо предупреждает почти в каждой главе о том, что нельзя делиться вещами, то лучше не следует делиться вещами.

Итак, перепроектируйте методы, чтобы SocketManager получил больше функциональных возможностей, поскольку его методы класса, которые будут выполнять встроенные обязательные функции, чтобы явно предотвращать касание любого потока внешнего мира к не-совместно используемому -able, как описано в публикациях ZeroMQ.

Затем идет инвентаризация ресурсов: кажется, что ваш код перепроверяет каждые 30 секунд состояние мира во всех интересующих центрах данных. Это фактически создает новые объекты List дважды в минуту. Хотя вы можете спекулятивно разрешить сборщику мусора java собирать мусор весь мусор, на который больше нигде не ссылаются, это не очень хорошая идея для объектов, связанных с ZeroMQ, встроенных в List-s из ваших предыдущих повторных проверок. На объекты ZeroMQ по-прежнему ссылаются из Zcontext() — потока (потоков) ввода-вывода ZeroMQ Context()-core-factory, который также можно рассматривать как диспетчер ресурсов инвентаризации сокетов ZeroMQ. Таким образом, все new-созданные экземпляры сокетов получают не только внешний дескриптор со стороны java, но и внутренний дескриптор изнутри (Z)Context(). Все идет нормально. Но чего не видно нигде в коде, так это любого метода, который выводил бы из эксплуатации любые и все сокеты ZeroMQ в экземплярах объектов, которые были деассоциированы со стороны java, но все еще остаются ссылками со стороны (Z)Context(). -сторона. Явный вывод из эксплуатации выделенных ресурсов является справедливой практикой на стороне проектирования, особенно для ресурсов, которые ограничены или ограничены иным образом. Способ, как это сделать, может отличаться для { "дешевых" | "дорогие" }-затраты на обслуживание такой обработки управления ресурсами (экземпляры сокетов ZeroMQ чрезвычайно дороги, чтобы их можно было обрабатывать как некоторые легкие "расходные/одноразовые"... но это уже другая история).

Итак, добавьте также набор правильных методов повторного использования/разборки ресурсов, которые вернут общее количество new созданных сокетов под вашу ответственность (ваш код отвечает за количество обработчиков сокетов внутри < strong>(Z)Context()-domain-of-resources-control может быть создан и должен оставаться под управлением — сознательно или нет).

Кто-то может возразить, что могут быть некоторые «обещания» от автоматического обнаружения и (потенциально хорошо отложенной) сборки мусора, но тем не менее ваш код отвечает за правильное управление ресурсами, и даже ребята из LMAX никогда не получили бы такой смелой производительности, если бы они полагались на "обещания" из стандартного gc. Ваша проблема намного хуже, чем с максимальной производительностью LMAX приходилось бороться. Ваш код (на данный момент опубликованный) вообще ничего не делает для .close() и .term() ресурсов, связанных с ZeroMQ. Это прямо невозможная практика внутри экосистемы с неконтролируемым (распределенным) потреблением. Вы должны защитить свою лодку от перегрузки сверх предела, который, как вы знаете, она может безопасно выдержать, и динамически разгружать каждую коробку, которая не имеет получателя на «противоположном берегу».

Это ответственность капитана (вашего дизайнера кода).

Не сообщая явно матросу, ответственному за управление запасами на самом нижнем уровне ( ZeroMQ Context()-floor ), что некоторые ящики должны быть выгружены, проблема все еще остается вашей. Стандартная gc-командная цепочка не будет делать это "автоматически", какими бы "обещаниями" это ни выглядело, это не так. Так что будьте точны в отношении управления ресурсами ZeroMQ, оценивайте коды возврата при заказе этих шагов и соответствующим образом обрабатывайте любые и все исключения, возникающие при выполнении этих операций управления ресурсами под вашим явным контролем кода.

Более низкая (если вообще не самая низкая достижимая) использование ресурсов-конверты и более высокая (если вообще не самая высокая достижимая) производительность является бонусом от правильного выполнения этой работы. Ребята из LMAX являются хорошим примером того, как они делают это намного лучше стандартных «обещаний» Java, поэтому можно учиться у лучших из лучших.


Объявленные сигнатуры вызовов и используемые, похоже, не совпадают:
хотя я могу ошибаться в этом вопросе, так как большая часть моих усилий по проектированию не относится к java полиморфные интерфейсы вызовов, кажется, что в подписи есть несоответствие, опубликованное как:

private List<SocketHolder> connect( Datacenters  dc,                     // 1-st
                                    List<String> addresses,              // 2-nd
                                    int          socketType              // 3-rd
                                    ) {
        ... /* implementation */
}

и
фактический вызов метода,
вызванный внутри метода connectToZMQSockets() просто:

        List<SocketHolder> addedColoSockets = connect( entry.getValue(), // 1-st
                                                       ZMQ.PUSH          // 2-nd
                                                       );
person user3666197    schedule 03.11.2017
comment
Я только что исправил проблему с подписью, это было по ошибке. Да, я вижу, что мы не должны делиться сокетами между потоками, но в моем коде, как вы думаете, это лучший способ решить эту проблему? - person john; 03.11.2017