Я хочу создать DAG из задач на Java, где задачи могут зависеть от результатов других задач. Если между двумя задачами нет направленного пути, они могут выполняться параллельно. Задания могут быть отменены. Если какая-либо задача вызывает исключение, все задачи отменяются.
Я хотел использовать для этого CompleteableFuture
, но, несмотря на реализацию интерфейса Future
(включая Future.cancel(boolean)
, CompletableFuture
не поддерживает отмену -- CompletableFuture.cancel(true)
просто игнорируется (кто-нибудь знает, почему?)
Поэтому я прибегаю к созданию собственной DAG задач с использованием Future
. Это много шаблонов, и их сложно понять правильно. Есть ли лучший метод, чем этот?
Вот пример:
- I want to call
Process process = Runtime.getRuntime().exec(cmd)
to start a commandline process, creating aFuture<Process>
. Then I want to launch (fan out to) three subtasks:- One task that consumes input from
process.getInputStream()
- Одна задача, использующая входные данные от
process.getErrorStream()
- Одна задача, которая вызывает
process.waitFor()
, а затем ждет результата.
- One task that consumes input from
- Затем я хочу дождаться завершения всех трех запущенных подзадач (т.е. веера/барьер завершения). Это должно быть собрано в конечном
Future<Integer> exitCode
, который собирает код выхода, возвращаемый задачейprocess.waitFor()
. Две входные задачи-потребители просто возвращаютVoid
, поэтому их вывод можно игнорировать, но барьер выполнения все равно должен ждать их завершения.
Я хочу, чтобы сбой в любой из запущенных подзадач привел к отмене всех подзадач и уничтожению основного процесса.
Обратите внимание, что Process process = Runtime.getRuntime().exec(cmd)
на первом шаге может вызвать исключение, которое должно привести к каскадному сбою вплоть до exitCode
.
@FunctionalInterface
public static interface ConsumerThrowingIOException<T> {
public void accept(T val) throws IOException;
}
public static Future<Integer> exec(
ConsumerThrowingIOException<InputStream> stdoutConsumer,
ConsumerThrowingIOException<InputStream> stderrConsumer,
String... cmd) {
Future<Process> processFuture = executor.submit(
() -> Runtime.getRuntime().exec(cmd));
AtomicReference<Future<Void>> stdoutProcessorFuture = //
new AtomicReference<>();
AtomicReference<Future<Void>> stderrProcessorFuture = //
new AtomicReference<>();
AtomicReference<Future<Integer>> exitCodeFuture = //
new AtomicReference<>();
Runnable cancel = () -> {
try {
processFuture.get().destroy();
} catch (Exception e) {
// Ignore (exitCodeFuture.get() will still detect exceptions)
}
if (stdoutProcessorFuture.get() != null) {
stdoutProcessorFuture.get().cancel(true);
}
if (stderrProcessorFuture.get() != null) {
stderrProcessorFuture.get().cancel(true);
}
if (exitCodeFuture.get() != null) {
stderrProcessorFuture.get().cancel(true);
}
};
if (stdoutConsumer != null) {
stdoutProcessorFuture.set(executor.submit(() -> {
try {
InputStream inputStream = processFuture.get()
.getInputStream();
stdoutConsumer.accept(inputStream != null
? inputStream
: new ByteArrayInputStream(new byte[0]));
return null;
} catch (Exception e) {
cancel.run();
throw e;
}
}));
}
if (stderrConsumer != null) {
stderrProcessorFuture.set(executor.submit(() -> {
try {
InputStream errorStream = processFuture.get()
.getErrorStream();
stderrConsumer.accept(errorStream != null
? errorStream
: new ByteArrayInputStream(new byte[0]));
return null;
} catch (Exception e) {
cancel.run();
throw e;
}
}));
}
exitCodeFuture.set(executor.submit(() -> {
try {
return processFuture.get().waitFor();
} catch (Exception e) {
cancel.run();
throw e;
}
}));
// Async completion barrier -- wait for process to exit,
// and for output processors to complete
return executor.submit(() -> {
Exception exception = null;
int exitCode = 1;
try {
exitCode = exitCodeFuture.get().get();
} catch (InterruptedException | CancellationException
| ExecutionException e) {
cancel.run();
exception = e;
}
if (stderrProcessorFuture.get() != null) {
try {
stderrProcessorFuture.get().get();
} catch (InterruptedException | CancellationException
| ExecutionException e) {
cancel.run();
if (exception == null) {
exception = e;
} else if (e instanceof ExecutionException) {
exception.addSuppressed(e);
}
}
}
if (stdoutProcessorFuture.get() != null) {
try {
stdoutProcessorFuture.get().get();
} catch (InterruptedException | CancellationException
| ExecutionException e) {
cancel.run();
if (exception == null) {
exception = e;
} else if (e instanceof ExecutionException) {
exception.addSuppressed(e);
}
}
}
if (exception != null) {
throw exception;
} else {
return exitCode;
}
});
}
Примечание. Я понимаю, что Runtime.getRuntime().exec(cmd)
должен быть неблокирующим, поэтому не требует собственного Future
, но я все равно написал код, используя его, чтобы подчеркнуть важность построения DAG.
flatMap
? - person Eugene   schedule 30.11.2020CompletableFuture.cancel(true)
? Да, конечно, потоки должны реагировать на прерывание, чтобы это работало, но для примера, который я привел, все три подзадачи блокируются, поэтому в любом случае нужно будет перехватыватьInterruptedException
внутри и повторно вызывать его какCompletionException
. И я не думаю, что разветвление — это простоflatMap
, потому что для того, чтобы разветвление работало, три подзадачи должны иметь возможность запускаться из родительской задачи — или по завершении родительской задачи — - что добавляет сложности. - person Luke Hutchison   schedule 30.11.2020Future
, а неCompletableFuture
- спасибо за предупреждение. - person Luke Hutchison   schedule 30.11.2020