Создание DAG из отменяемых задач Java

Я хочу создать DAG из задач на Java, где задачи могут зависеть от результатов других задач. Если между двумя задачами нет направленного пути, они могут выполняться параллельно. Задания могут быть отменены. Если какая-либо задача вызывает исключение, все задачи отменяются.

Я хотел использовать для этого CompleteableFuture, но, несмотря на реализацию интерфейса Future (включая Future.cancel(boolean), CompletableFuture не поддерживает отмену -- CompletableFuture.cancel(true) просто игнорируется (кто-нибудь знает, почему?)

Поэтому я прибегаю к созданию собственной DAG задач с использованием Future. Это много шаблонов, и их сложно понять правильно. Есть ли лучший метод, чем этот?

Вот пример:

  1. I want to call Process process = Runtime.getRuntime().exec(cmd) to start a commandline process, creating a Future<Process>. Then I want to launch (fan out to) three subtasks:
    • One task that consumes input from process.getInputStream()
    • Одна задача, использующая входные данные от process.getErrorStream()
    • Одна задача, которая вызывает process.waitFor(), а затем ждет результата.
  2. Затем я хочу дождаться завершения всех трех запущенных подзадач (т.е. веера/барьер завершения). Это должно быть собрано в конечном Future<Integer> exitCode, который собирает код выхода, возвращаемый задачей process.waitFor(). Две входные задачи-потребители просто возвращают Void, поэтому их вывод можно игнорировать, но барьер выполнения все равно должен ждать их завершения.

Я хочу, чтобы сбой в любой из запущенных подзадач привел к отмене всех подзадач и уничтожению основного процесса.

Обратите внимание, что Process process = Runtime.getRuntime().exec(cmd) на первом шаге может вызвать исключение, которое должно привести к каскадному сбою вплоть до exitCode.

@FunctionalInterface
public static interface ConsumerThrowingIOException<T> {
    public void accept(T val) throws IOException;
}

public static Future<Integer> exec(
        ConsumerThrowingIOException<InputStream> stdoutConsumer,
        ConsumerThrowingIOException<InputStream> stderrConsumer,
        String... cmd) {

    Future<Process> processFuture = executor.submit(
            () -> Runtime.getRuntime().exec(cmd));

    AtomicReference<Future<Void>> stdoutProcessorFuture = //
            new AtomicReference<>();
    AtomicReference<Future<Void>> stderrProcessorFuture = //
            new AtomicReference<>();
    AtomicReference<Future<Integer>> exitCodeFuture = //
            new AtomicReference<>();

    Runnable cancel = () -> {
        try {
            processFuture.get().destroy();
        } catch (Exception e) {
            // Ignore (exitCodeFuture.get() will still detect exceptions)
        }
        if (stdoutProcessorFuture.get() != null) {
            stdoutProcessorFuture.get().cancel(true);
        }
        if (stderrProcessorFuture.get() != null) {
            stderrProcessorFuture.get().cancel(true);
        }
        if (exitCodeFuture.get() != null) {
            stderrProcessorFuture.get().cancel(true);
        }
    };

    if (stdoutConsumer != null) {
        stdoutProcessorFuture.set(executor.submit(() -> {
            try {
                InputStream inputStream = processFuture.get()
                        .getInputStream();
                stdoutConsumer.accept(inputStream != null
                        ? inputStream
                        : new ByteArrayInputStream(new byte[0]));
                return null;
            } catch (Exception e) {
                cancel.run();
                throw e;
            }
        }));
    }

    if (stderrConsumer != null) {
        stderrProcessorFuture.set(executor.submit(() -> {
            try {
                InputStream errorStream = processFuture.get()
                        .getErrorStream();
                stderrConsumer.accept(errorStream != null
                        ? errorStream
                        : new ByteArrayInputStream(new byte[0]));
                return null;
            } catch (Exception e) {
                cancel.run();
                throw e;
            }
        }));
    }

    exitCodeFuture.set(executor.submit(() -> {
        try {
            return processFuture.get().waitFor();
        } catch (Exception e) {
            cancel.run();
            throw e;
        }
    }));

    // Async completion barrier -- wait for process to exit,
    // and for output processors to complete
    return executor.submit(() -> {
        Exception exception = null;
        int exitCode = 1;
        try {
            exitCode = exitCodeFuture.get().get();
        } catch (InterruptedException | CancellationException
                | ExecutionException e) {
            cancel.run();
            exception = e;
        }
        if (stderrProcessorFuture.get() != null) {
            try {
                stderrProcessorFuture.get().get();
            } catch (InterruptedException | CancellationException
                    | ExecutionException e) {
                cancel.run();
                if (exception == null) {
                    exception = e;
                } else if (e instanceof ExecutionException) {
                    exception.addSuppressed(e);
                }
            }
        }
        if (stdoutProcessorFuture.get() != null) {
            try {
                stdoutProcessorFuture.get().get();
            } catch (InterruptedException | CancellationException
                    | ExecutionException e) {
                cancel.run();
                if (exception == null) {
                    exception = e;
                } else if (e instanceof ExecutionException) {
                    exception.addSuppressed(e);
                }
            }
        }
        if (exception != null) {
            throw exception;
        } else {
            return exitCode;
        }
    });
}

Примечание. Я понимаю, что Runtime.getRuntime().exec(cmd) должен быть неблокирующим, поэтому не требует собственного Future, но я все равно написал код, используя его, чтобы подчеркнуть важность построения DAG.


person Luke Hutchison    schedule 30.11.2020    source источник
comment
CompletableFuture не поддерживает отмену, если только вы не отключите базовый пул и ваши потоки не реагируют на прерывания. А разветвление это просто flatMap?   -  person Eugene    schedule 30.11.2020
comment
@ Юджин, тогда что делает CompletableFuture.cancel(true)? Да, конечно, потоки должны реагировать на прерывание, чтобы это работало, но для примера, который я привел, все три подзадачи блокируются, поэтому в любом случае нужно будет перехватывать InterruptedException внутри и повторно вызывать его как CompletionException. И я не думаю, что разветвление — это просто flatMap, потому что для того, чтобы разветвление работало, три подзадачи должны иметь возможность запускаться из родительской задачи — или по завершении родительской задачи — - что добавляет сложности.   -  person Luke Hutchison    schedule 30.11.2020
comment
отмена не прерывает, читайте документацию. Это буквально говорит, что   -  person Eugene    schedule 30.11.2020
comment
@Eugene Я обновил вопрос, чтобы использовать Future, а не CompletableFuture - спасибо за предупреждение.   -  person Luke Hutchison    schedule 30.11.2020


Ответы (1)


Ни за что. Процесс не имеет асинхронного интерфейса (кроме Process.onExit()). Таким образом, вы должны использовать потоки для ожидания создания процесса и чтения из InputStreams. Другими компонентами вашей DAG могут быть асинхронные задачи (CompletableFutures).

Это не большая проблема. Единственное преимущество асинхронных задач перед потоками — меньшее потребление памяти. Ваш Process все равно потребляет много памяти, так что особого смысла экономить память здесь нет.

person Alexei Kaigorodov    schedule 30.11.2020
comment
Я думаю, вы упустили весь смысл моего вопроса. Process запускает другой поток, который работает параллельно, хотя waitFor() будет блокироваться при выходе из этого потока. Вся причина, по которой я пытаюсь использовать CompletableFuture, заключается в том, чтобы сделать код выхода из waitFor() и stdout/stderr из процесса доступными асинхронно. - person Luke Hutchison; 30.11.2020
comment
stdout/stderr имеют только блокирующий интерфейс, они недоступны асинхронно. Конечно, вы всегда можете преобразовать синхронный интерфейс в асинхронный (но не наоборот), но это будет пустая трата ресурсов. - person Alexei Kaigorodov; 30.11.2020
comment
waitFor уже возвращает код выхода, промежуточный CompletableFuture не требуется. - person Alexei Kaigorodov; 30.11.2020