CompletableFuture с информацией об истекшем времени

Мне нужно иметь доступ к информации о времени выполнения асинхронного метода. Итак, я пытаюсь расширить CompletableFuture функциональность. Вот моя реализация с использованием шаблона декоратора:

import java.util.concurrent.*;
import java.util.function.*;
import static lombok.AccessLevel.PRIVATE;
import lombok.AllArgsConstructor;
import lombok.experimental.Delegate;

@AllArgsConstructor(access = PRIVATE)
public class ContinuousCompletableFuture<T> extends CompletableFuture<T> {

    @Delegate
    private final CompletableFuture<T> baseFuture;

    private final long creationTime;

    public static <U> ContinuousCompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return new ContinuousCompletableFuture<>(CompletableFuture.supplyAsync(supplier));
    }

    private ContinuousCompletableFuture(CompletableFuture<T> baseFuture) {
        this.baseFuture = baseFuture;
        this.creationTime = System.nanoTime();
    }

    public Long getElapsedTime() {
        return (System.nanoTime() - creationTime) / 1000_000L;
    }

    public ContinuousCompletableFuture<Void> thenAcceptAsync(BiConsumer<? super T, Long> action) {
        CompletionStage<Long> elapsedTime = CompletableFuture.completedFuture(getElapsedTime());
        return new ContinuousCompletableFuture<>(baseFuture.thenAcceptBothAsync(elapsedTime, action), creationTime);
    }
}

Первый тест shouldReturnElapsedTime с извлеченной переменной ContinuousCompletableFuture работает нормально, но другой shouldOperateWithOwnExecutionTime не работает. Между тем, я предпочитаю, чтобы в моем будущем коде не извлекалась переменная ContinuousCompletableFuture.

import java.util.concurrent.atomic.AtomicLong;
import lombok.extern.slf4j.Slf4j;
import org.junit.*;
import static org.junit.Assert.*;

@Slf4j
public class ContinuousCompletableFutureTest {

    private static final int DELAY = 1000;

    AtomicLong flag = new AtomicLong();

    ContinuousCompletableFuture<String> future;

    @Before
    public void before() {
        future = ContinuousCompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(DELAY);
            } catch (InterruptedException ex) {
                log.error("Error during ContinuousCompletableFuture execution", ex);
            }
            return "successfully completed";
        });
    }

    @Test
    public void shouldReturnElapsedTime() {
        future.thenAcceptAsync(s -> {
            long t = future.getElapsedTime();
            log.info("Elapsed {} ms to receive message \"{}\"", t, s);
            flag.set(t);
        });

        try {
            Thread.sleep(2000);
        } catch (InterruptedException ex) {
            log.error("Error awaiting Test completion", ex);
        }

        assertTrue("Future completion should be delayed", flag.get() >= 0.75 * DELAY);
    }

    @Test
    public void shouldOperateWithOwnExecutionTime() {
        future.thenAcceptAsync((s, t) -> {
            log.info("Elapsed {} ms to receive message \"{}\"", t, s);
            flag.set(t);
        });

        try {
            Thread.sleep(2000);
        } catch (InterruptedException ex) {
            log.error("Error awaiting Test completion", ex);
        }

        assertTrue("Future completion should be delayed", flag.get() >= 0.75 * DELAY);
    }
}

Я предполагаю, что моя проблема заключается в неправильном использовании метода thenAcceptBothAsync.

Какие-либо предложения?


person ytterrr    schedule 12.10.2015    source источник
comment
Я провел оба ваших теста, и ошибок не было.   -  person Tunaki    schedule 12.10.2015
comment
@Tunaki, это странно для меня - вот мой результат обоих тестов: Elapsed 1 ms to receive message "successfully completed" и Elapsed 993 ms to receive message "successfully completed". В shouldOperateWithOwnExecutionTime было рассчитано прошедшее время, так как оно было выполнено без каких-либо задержек.   -  person ytterrr    schedule 12.10.2015
comment
Обратите внимание, что мне пришлось исправить опубликованный вами код, чтобы он скомпилировался: я добавил конструктор, принимающий 2 параметра, в ContinuousCompletableFuture.   -  person Tunaki    schedule 12.10.2015
comment
Вы вызываете getElapsedTime() прямо в своем методе thenAcceptAsync. Конечно, на тот момент почти время уже истекло. И это длинное значение после этого не меняется…   -  person Holger    schedule 12.10.2015
comment
@Holger Должен ли я изменить тип возвращаемого значения getElapsedTime() с типа Long на тип Supplier<Long> или есть решения получше?   -  person ytterrr    schedule 12.10.2015
comment
Я не уверен в вашей цели, но мне кажется, что вам это вообще не нужно. То, что вы на самом деле хотите, кажется return new ContinuousCompletableFuture<>(baseFuture.thenAcceptAsync(t -> action.accept(t, getElapsedTime())), creationTime);   -  person Holger    schedule 12.10.2015
comment
@Holger Ваше решение отличное, теперь все работает как задумано. Спасибо. Я приму ваш ответ, если вы опубликуете его. Я много думал о том, как перевести BiConsumer -> Consumer, но не нашел красивого t -> action.accept(t, getElapsedTime()) решения.   -  person ytterrr    schedule 12.10.2015


Ответы (1)


В вашем методе

public ContinuousCompletableFuture<Void> thenAcceptAsync(
                                         BiConsumer<? super T, Long> action) {
    CompletionStage<Long> elapsedTime=CompletableFuture.completedFuture(getElapsedTime());
    return new ContinuousCompletableFuture<>(
        baseFuture.thenAcceptBothAsync(elapsedTime, action), creationTime);
}

вы оцениваете getElapsedTime() немедленно и передаете результат BiConsumer без изменений, независимо от фактического времени завершения.

Вы можете исправить это, запросив прошедшее время прямо в потребителе:

public ContinuousCompletableFuture<Void> thenAcceptAsync(
                                         BiConsumer<? super T, Long> action) {
    return new ContinuousCompletableFuture<>(
        baseFuture.thenAcceptAsync(t -> action.accept(t, getElapsedTime())), creationTime);
}
person Holger    schedule 12.10.2015