Объединение нескольких CompletableFuture

У меня есть следующий компонент:

private JobInfo aggregateJobInfo() {
    final JobsResult jobsResult = restClient().getJobs();
    final List<String> jobIds = extractJobIds(jobsResult);

    //fetch details, exceptions and config for each job
    final List<JobDetails> jobDetails = jobIds.stream().map(jobId -> {
        final JobDetailResult jobDetailResult = restClient().getJobDetails(jobId);
        final JobExceptionsResult jobExceptionsResult = restClient().getJobExceptions(jobId);
        final JobConfigResult jobConfigResult = restClient().getJobConfig(jobId);
        return new JobDetails(jobDetailResult, jobExceptionsResult, jobConfigResult);
    }).collect(Collectors.toList());
    return new JobInfo(jobsResult, jobDetails);
}

private static List<String> extractJobIds(final JobsResult jobsResult) {
    final ArrayList<String> jobIds = new ArrayList<>();
    jobIds.addAll(jobsResult.getRunning());
    jobIds.addAll(jobsResult.getFinished());
    jobIds.addAll(jobsResult.getCanceled());
    jobIds.addAll(jobsResult.getFailed());
    return jobIds;
}

Он просто вызывает некоторые КОНЕЧНЫЕ ТОЧКИ и увеличивает некоторые данные. Теперь я пытаюсь сделать это неблокирующим, используя CompletableFutures, который я раньше не использовал.

private CompletableFuture<JobInfo> aggregateJobInfo() {
    final CompletableFuture<JobsResult> jobsResultFuture = restClient().getJobs();
    final CompletableFuture<List<String>> jobIdsFuture = jobsResultFuture.thenApply(JobInfoCollector::extractJobIds);

     //fetch details, exceptions and config for each job
    final CompletableFuture<List<CompletableFuture<JobDetails>>> jobDetailsFuture = jobIdsFuture.thenApply(jobIds -> {
        return jobIds.stream().map(jobId -> {
            final CompletableFuture<JobDetailResult> jobDetailsResultFuture = restClient().getJobDetails(jobId);
            final CompletableFuture<JobExceptionsResult> jobExceptionsFuture = restClient().getJobExceptions(jobId);
            final CompletableFuture<JobConfigResult> jobConfigFuture = restClient().getJobConfig(jobId);
            return jobDetailsResultFuture.thenCompose(jobDetailResult -> {
                return jobExceptionsFuture.thenCombine(jobConfigFuture, (jobExceptionsResult, jobConfigResult) -> {
                    return new JobDetails(jobDetailResult, jobExceptionsResult, jobConfigResult);
                });
            });

        }).collect(Collectors.toList());
    });
    return null;

Моя проблема заключается в том, как создать CompletableFuture здесь, когда JobInfo является «новой JobInfo (jobsResult, jobDetails)?!

Как я уже сказал, я новичок в этом, может быть, мой подход плохой и есть решения получше?

Любые идеи оценены, спасибо в

Первая рабочая версия:

private CompletableFuture<JobInfo> aggregateJobInfo() {

    final CompletableFuture<JobsResult> jobsResultFuture = restClient().getJobs();
    final CompletableFuture<List<String>> jobIdsFuture = jobsResultFuture.thenApply(JobInfoCollector::extractJobIds);

    final CompletableFuture<List<CompletableFuture<JobDetails>>> jobDetailsFutureListFuture =
            jobIdsFuture.thenApply(jobIds -> jobIds.stream().map(jobId -> {
                final CompletableFuture<JobDetailResult> jobDetailsResultFuture = restClient().getJobDetails(jobId);
                final CompletableFuture<JobExceptionsResult> jobExceptionsFuture = restClient().getJobExceptions(jobId);
                final CompletableFuture<JobConfigResult> jobConfigFuture = restClient().getJobConfig(jobId);
                return jobDetailsResultFuture.thenCompose(jobDetailResult ->
                        jobExceptionsFuture.thenCombine(jobConfigFuture, (jobExceptionsResult, jobConfigResult) ->
                                new JobDetails(jobDetailResult, jobExceptionsResult, jobConfigResult)));
            }).collect(Collectors.toList()));

    return jobDetailsFutureListFuture.thenCompose(jobDetailsFutures ->
            CompletableFuture.allOf(jobDetailsFutures.toArray(
                    new CompletableFuture[jobDetailsFutures.size()])).thenApply(aVoid ->
                    jobDetailsFutures.stream()
                            .map(CompletableFuture::join)
                            .collect(Collectors.toList())))
            .thenApply(jobDetails -> jobsResultFuture.thenApply(jobsResult ->
                    new JobInfo(jobsResult, jobDetails)))
            .join();
}

person Markus Lamm    schedule 26.06.2016    source источник
comment
Не похоже, что большая часть кода, который вы вставили, не имеет отношения к вопросу. Можете ли вы сократить его до минимального примера того, что вам нужно?   -  person the8472    schedule 26.06.2016
comment
Вопрос заключался в том, как сопоставить будущие данные выше, чтобы CompletableFuture‹JobInfo› возвращался.   -  person Markus Lamm    schedule 26.06.2016


Ответы (1)


У тебя есть:

  • CompletableFuture<JobsResult> jobsResultFuture
  • CompletableFuture<List<CompletableFuture<JobDetails>>> jobDetailsFuture
  • JobInfo(JobsResult a, List<JobDetails> b)

Вы хотите

CompletableFuture<JobInfo>

дополнительное наблюдение: jobDetailsFuture может быть выполнено только после завершения jobsResultFuture.

Итак, вы можете реализовать следующее:

  1. List<CompletableFuture<JobDetails>> -> Void через allOf в thenCompose
  2. Void + List<CompletableFuture<JobDetails>> (как захваченная переменная) -> List<JobDetails> через thenApply
  3. List<JobDetails> + CompletableFuture<JobsResult> (как захваченная переменная) -> JobInfo через thenApply

Вы можете просто развернуть фьючерсы через get() внутри этих функций сопоставления, потому что фьючерсы гарантированно завершатся в этот момент из-за зависимостей их фьючерсов-предков в этот момент.

Возможны и другие подходы, использующие thenCombine и сокращение потока, но они более подробные и создают больше промежуточных вариантов будущего.

person the8472    schedule 26.06.2016
comment
Извините, я не понимаю... Что значит «awaitAll» и «Пустота»? - person Markus Lamm; 27.06.2016
comment
означало все. Он возвращает CompletableFuture<Void>, следовательно, следующий шаг для возврата данных - person the8472; 27.06.2016
comment
Все еще не понимаю. Пробовал jobDetailsListFuture.thenCompose(jobDetailsFutures -> { return CompletableFuture.allOf(jobDetailsFutures.toArray( new CompletableFuture[jobDetailsFutures.size()])) .thenApply(aVoid -> { ??? }); }); но не знаю, что делать с пустотой. - person Markus Lamm; 27.06.2016
comment
ничего с ним не делаешь, важна вторая часть - person the8472; 27.06.2016
comment
Думайте о завершаемом будущем как о двух отдельных потоках информации: о завершении/прогрессе вычислений и переносимых объектах данных. Иногда вас интересует только завершение, а не данные, поэтому вы можете знать, когда вы можете безопасно вызывать get() без блокировки или исключений, возможно, на другом фьючерсе, чем тот, который завершается. - person the8472; 27.06.2016
comment
Эй, спасибо большое!! Я, наконец, заставил это работать, см. выше, хотя я должен признать, что я еще не совсем понимаю, но мне все еще нужно глубже изучить cp-api. И я думаю, что есть много возможностей для улучшений, но пока я в порядке.. - person Markus Lamm; 27.06.2016