Регистратор MDC с CompletableFuture

Я использую MDC Logger, который отлично работает для меня, за исключением одного случая. Где бы в коде мы ни использовали CompletableFuture, для созданного потока данные MDC не передаются следующему потоку, из-за чего журналы не работают. Например, в коде, который я использовал ниже, фрагмент для создания нового потока.

CompletableFuture.runAsync(() -> getAcountDetails(user));

И результат журналов, как показано ниже

2019-04-29 11:44:13,690 INFO  | /app/rest/controller/userdetails | f80fdc1f-8123-3932-a405-dda2dc2a80d5 |[http-nio-8182-exec-5] RestServiceExecutor:  service: 
2019-04-29 11:44:13,690 INFO  | /app/rest/controller/userdetails | f80fdc1f-8123-3932-a405-dda2dc2a80d5 |[http-nio-8182-exec-5] RestServiceExecutor: 
2019-04-29 11:44:13,779 INFO  | /app/rest/controller/userdetails | f80fdc1f-8123-3932-a405-dda2dc2a80d5 |[http-nio-8182-exec-5] UserDetailsRepoImpl: 
2019-04-29 11:44:13,950 INFO   [ForkJoinPool.commonPool-worker-3] RestServiceExecutor:  header: 
2019-04-29 11:44:13,950 INFO   [ForkJoinPool.commonPool-worker-3] RestServiceExecutor:  service: 
2019-04-29 11:44:14,012 INFO   [ForkJoinPool.commonPool-worker-3] CommonMasterDataServiceImpl: Cache: Retrieving Config Data details.
2019-04-29 11:44:14,028 INFO   [ForkJoinPool.commonPool-worker-3] CommonMasterDataServiceImpl: Cache: Retrieved Config Data details : 1
2019-04-29 11:44:14,028 INFO   [ForkJoinPool.commonPool-worker-3] CommonMasterDataServiceImpl: Cache: Retrieving Config Data details.
2019-04-29 11:44:14,033 INFO   [ForkJoinPool.commonPool-worker-3] CommonMasterDataServiceImpl: Cache: Retrieved Config Data details : 1
2019-04-29 11:44:14,147 INFO  | /app/rest/controller/userdetails | f80fdc1f-8123-3932-a405-dda2dc2a80d5 |[http-nio-8182-exec-5] SecondaryCacheServiceImpl: Fetching from secondary cache
2019-04-29 11:44:14,715 INFO  | /app/rest/controller/userdetails | f80fdc1f-8123-3932-a405-dda2dc2a80d5 |[http-nio-8182-exec-5] CommonMasterDataServiceImpl: Cache: Retrieving Config Data details.
2019-04-29 11:44:14,749 INFO  | /app/rest/controller/userdetails | f80fdc1f-8123-3932-a405-dda2dc2a80d5 |[http-nio-8182-exec-5]

Ниже приведены мои данные MDC, которые не передаются с потоком [ForkJoinPool.commonPool-worker-3].

| /app/rest/controller/userdetails | f80fdc1f-8123-3932-a405-dda2dc2a80d5 |

Ниже приведена моя конфигурация logback.xml, где sessionID — это данные MDC.

<configuration scan="true">
    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <charset>utf-8</charset>
            <Pattern>%d %-5level %X{sessionID} [%thread] %logger{0}: %msg%n</Pattern>
        </encoder>
    </appender>
</configuration>

Я пробовал ниже Ссылка

http://shengwangi.blogspot.com/2015/09/using-log-mdc-in-multi-thread-helloworld-example.html?_sm_au_=iVVrZDSwwf0vP6MR

Которые прекрасно работают для TaskExecutor. Но я не нашел решения для CompletableFuture.


person Mayur    schedule 29.04.2019    source источник
comment
проверьте это   -  person Michał Krzywański    schedule 29.04.2019


Ответы (2)


Создать метод-оболочку

static CompletableFuture<Void> myMethod(Runnable runnable) {
    Map<String, String> previous = MDC.getCopyOfContextMap();
    return CompletableFuture.runAsync(() -> {
        MDC.setContextMap(previous);
        try {
            runnable.run();
        } finally {
            MDC.clear();
        }
    });
}

и используйте его вместо CompletableFuture.runAsync.

person talex    schedule 29.04.2019
comment
Но не называйте это myMethod... runAsyncWithMDC или как-то так. - person Thilo; 29.04.2019
comment
Спасибо, Talex, это должно работать, но проблема в том, что мы используем completablefuture в нескольких местах, и нам может потребоваться обновление в каждом месте. Правильно ? - person Mayur; 29.04.2019
comment
Поскольку вы используете статический метод для запуска асинхронных вызовов, его невозможно перехватить. Таким образом, вы должны изменить каждый вызов. - person talex; 29.04.2019

Моей темой решения было бы (это будет работать с JDK 9+, так как с этой версии выставлено несколько переопределяемых методов)

Ознакомьте всю экосистему с MDC

А для этого нам нужно рассмотреть следующие сценарии:

  • Когда все мы получим новые экземпляры CompletableFuture из этого класса? → Нам нужно скорее вернуть ту же версию, поддерживающую MDC.
  • Когда все мы получим новые экземпляры CompletableFuture из-за пределов этого класса? → Нам нужно вернуть версию того же самого с поддержкой MDC.
  • Какой исполнитель используется в классе CompletableFuture? → При любых обстоятельствах мы должны убедиться, что все исполнители поддерживают MDC.

Для этого давайте создадим класс версии CompletableFuture с поддержкой MDC, расширив его. Моя версия этого будет выглядеть ниже

import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.function.Supplier;

public class MDCAwareCompletableFuture<T> extends CompletableFuture<T> {

    public static final ExecutorService MDC_AWARE_ASYNC_POOL = new MDCAwareForkJoinPool();

    @Override
    public CompletableFuture newIncompleteFuture() {
        return new MDCAwareCompletableFuture();
    }

    @Override
    public Executor defaultExecutor() {
        return MDC_AWARE_ASYNC_POOL;
    }

    public static <T> CompletionStage<T> getMDCAwareCompletionStage(CompletableFuture<T> future) {
        return new MDCAwareCompletableFuture<>()
                .completeAsync(() -> null)
                .thenCombineAsync(future, (aVoid, value) -> value);
    }

    public static <T> CompletionStage<T> getMDCHandledCompletionStage(CompletableFuture<T> future,
                                                                Function<Throwable, T> throwableFunction) {
        Map<String, String> contextMap = MDC.getCopyOfContextMap();
        return getMDCAwareCompletionStage(future)
                .handle((value, throwable) -> {
                    setMDCContext(contextMap);
                    if (throwable != null) {
                        return throwableFunction.apply(throwable);
                    }
                    return value;
                });
    }
}

Класс MDCAwareForkJoinPool будет выглядеть так (для простоты пропущены методы с параметрами ForkJoinTask)

public class MDCAwareForkJoinPool extends ForkJoinPool {
    //Override constructors which you need

    @Override
    public <T> ForkJoinTask<T> submit(Callable<T> task) {
        return super.submit(MDCUtility.wrapWithMdcContext(task));
    }

    @Override
    public <T> ForkJoinTask<T> submit(Runnable task, T result) {
        return super.submit(wrapWithMdcContext(task), result);
    }

    @Override
    public ForkJoinTask<?> submit(Runnable task) {
        return super.submit(wrapWithMdcContext(task));
    }

    @Override
    public void execute(Runnable task) {
        super.execute(wrapWithMdcContext(task));
    }
}

Утилиты для переноса будут такими, как

public static <T> Callable<T> wrapWithMdcContext(Callable<T> task) {
    //save the current MDC context
    Map<String, String> contextMap = MDC.getCopyOfContextMap();
    return () -> {
        setMDCContext(contextMap);
        try {
            return task.call();
        } finally {
            // once the task is complete, clear MDC
            MDC.clear();
        }
    };
}

public static Runnable wrapWithMdcContext(Runnable task) {
    //save the current MDC context
    Map<String, String> contextMap = MDC.getCopyOfContextMap();
    return () -> {
        setMDCContext(contextMap);
        try {
            task.run();
        } finally {
            // once the task is complete, clear MDC
            MDC.clear();
        }
    };
}

public static void setMDCContext(Map<String, String> contextMap) {
   MDC.clear();
   if (contextMap != null) {
       MDC.setContextMap(contextMap);
    }
}

Ниже приведены некоторые рекомендации по использованию:

  • Используйте класс MDCAwareCompletableFuture, а не класс CompletableFuture.
  • Несколько методов в классе CompletableFuture создают экземпляр собственной версии, такой как new CompletableFuture.... Для таких методов (большинство общедоступных статических методов) используйте альтернативный метод для получения экземпляра MDCAwareCompletableFuture. Примером использования альтернативы может быть вместо использования CompletableFuture.supplyAsync(...), вы можете выбрать new MDCAwareCompletableFuture<>().completeAsync(...)
  • Преобразуйте экземпляр CompletableFuture в MDCAwareCompletableFuture с помощью метода getMDCAwareCompletionStage, когда вы застряли с одним, скажем, из-за какой-то внешней библиотеки, которая возвращает вам экземпляр CompletableFuture. Очевидно, что вы не можете сохранить контекст в этой библиотеке, но этот метод все равно сохранит контекст после того, как ваш код попадет в код приложения.
  • При указании исполнителя в качестве параметра убедитесь, что он поддерживает MDC, например MDCAwareForkJoinPool. Вы также можете создать MDCAwareThreadPoolExecutor, переопределив метод execute для вашего варианта использования. Вы поняли идею!

Вы можете найти подробное объяснение всего вышеперечисленного здесь, в post о том же.

При этом ваш код может выглядеть так

new MDCAwareCompletableFuture<>().completeAsync(() -> {
            getAcountDetails(user);
            return null;
        });
person Laks    schedule 30.12.2019