JeroMQ время от времени выдает IOException. слишком много файлов открыто

У меня есть приложение tomcat7, которое взаимодействует с другим моим приложением, используя конечную точку IPC с JeroMQ в Java. Существует схема клиент-сервер, и клиент некоторое время ждет ответа от сервера, и если он не получает ответ, происходит сбой в первый раз без повторной попытки.

Код ниже

@Override
public List<Result> call() throws Exception {
    final List<Result> results = new LinkedList<>();
    try {
        for (DTO dto : messages) {
            Message m = MessageHelper.MessageMapper(dto);

            Thread.sleep(dto.getDelayBeforeSend());
            final Result mtresult = send(dto);
            results.add(result);
        }
    } catch (RuntimeException e) {
        LOGGER.error("Flow => Uncaught Exception: {}", e.getMessage());
        LOGGER.debug("Flow => Uncaught Exception: ", e);
        Thread t = Thread.currentThread();
        t.getUncaughtExceptionHandler().uncaughtException(t, e);
    }
    return results;
}

private Result send(Message m) {
    ZMQ.Socket client = MQSocketFactory.getMQSocket(serverEndpoint).createRequester();
    try {
        final byte[] DTO = Helper.serializeMessage(m);
        int retriesLeft = 1;
        Result result = new Result(MessageConstants.MESSAGE_FAIL);

        while (retriesLeft > 0 && !Thread.currentThread().isInterrupted()) {

            client.send(DTO, 0);
            int expect_reply = 1;

            while (expect_reply > 0) {

                ZMQ.PollItem items[] = { new ZMQ.PollItem(client, Poller.POLLIN) };
                int rc = ZMQ.poll(items, 3000);
                if (rc == -1) break; // Interrupted

                if (items[0].isReadable()) {
                    final byte[] reply = client.recv(0);
                    if (reply == null) break;
                    result = new Result(new String(reply));
                    if (result.isSuccessful()) {
                        LOGGER.trace("Server replied OK. Result: [{}]", result);
                        retriesLeft = 0;
                        expect_reply = 0;
                    } else LOGGER.error("Malformed reply from server: [{}]", result);

                } else if (--retriesLeft == 0) {
                    LOGGER.error("Server:[{}] seems to be offline, abandoning sending message [{}]!", serverEndpoint, m);
                    break;
                } else {
                    LOGGER.warn("No response from server, retrying...");
                    client = MQSocketFactory.getMQSocket(serverEndpoint).resetRequester(client);
                    client.send(DTO, 0);
                }
            }
        }
        return result;
    } finally {
        MQSocketFactory.getMQSocket(serverEndpoint).destroyRequester(client);

    }
}

Теперь класс MQSocketFactory выглядит следующим образом:

public final class MQSocketFactory {

private static final Map<String, MQSocket> store = new HashMap<String, MQSocket>();

private static final Logger LOGGER = LoggerFactory.getLogger(MQSocketFactory.class);

public static MQSocket getMQSocket(String endpointName) {
    synchronized (store) {
        MQSocket result = store.get(endpointName);
        if (result == null) {
            result = new MQSocket(endpointName);
            store.put(endpointName, result);
        }
        return result;
    }
}

public static final class MQSocket {

    private final String endpoint;
    private final ZMQ.Context ctx;

    private MQSocket(String endpointName) {
        this.endpoint = endpointName;
        this.ctx = ZMQ.context(1);
    }

    public ZMQ.Socket createRequester() {
        ZMQ.Socket client = null;
        try {
            client = ctx.socket(ZMQ.REQ);
            assert (client != null);
            client.connect(endpoint);
        } catch (Exception e) {
            LOGGER.error("Error: {}", e.getMessage());
            LOGGER.error("Error: {}", e);
        }
        return client;
    }

    public ZMQ.Socket resetRequester(ZMQ.Socket socket) {
        destroyRequester(socket);
        return createRequester();
    }

    public void destroyRequester(ZMQ.Socket socket) {
        if (socket != null) {
            try {
                socket.close();
            } catch (Exception e) {
                LOGGER.error("Error: {}", e.getMessage());
                LOGGER.debug("Error: {}", e);
            }
        }
    }

    public ZMQ.Context getContext() {
        return ctx;
    }

    // Responder Unit
    private ZMQ.Socket responder;

    public ZMQ.Socket createResponder() {
        if (responder == null) {
            this.responder = ctx.socket(ZMQ.REP);
            responder.bind(endpoint);
        }
        return responder;
    }

    public ZMQ.Socket resetResponder() {
        destroyResponder();
        return createResponder();
    }

    public void destroyResponder() {
        try {
            responder.close();
        } catch (Exception e) {
            LOGGER.error("Error: {}", e.getMessage());
            LOGGER.debug("Error: {}", e);
        }
    }

}

}

Я сделал это специально, чтобы все сокеты закрывались после выполнения запроса, чтобы избежать этой конкретной проблемы с IOExcpetion Too Many file Open. Однако из очень редко я получаю эту проблему, и я не могу понять, почему. Приложение может работать несколько дней при примерно одинаковой нагрузке, и все в порядке, но в некоторые моменты оно начинает выдавать исключение, и я не знаю, почему.

Также есть ли способ увеличить ulimit в tomcat7? Сейчас 1024.


person idipous    schedule 18.08.2015    source источник
comment
Одной из возможностей является использование ThreadLocal для хранения PollSelectors: Tomcat повторно использует потоки, поэтому они могут не закрываться и продолжать расти. Но без стресс-теста, который воспроизводит ошибку (с кодом JeroMQ, работающим автономно, а не с запущенным в Tomcat), невозможно узнать наверняка. Для теста вы можете уменьшить максимальное количество открытых файлов (это то, что вы устанавливаете до запуска Tomcat, см. этот вопрос).   -  person vanOekel    schedule 19.08.2015
comment
Мне интересно, есть ли что-то явно не так с кодом выше. Я попробую то, что вы предложили, хотя   -  person idipous    schedule 19.08.2015