3

I am using MDC Logger, which is perfectly working for me except in one case. Wherever in the code we have used CompletableFuture, for the created thread the MDC data is not getting passed to next thread and due to which Logs are failing. For example in the code I have used below snippet for creating new Thread.

CompletableFuture.runAsync(() -> getAcountDetails(user));

And the result of logs as below

2019-04-29 11:44:13,690 INFO  | /app/rest/controller/userdetails | f80fdc1f-8123-3932-a405-dda2dc2a80d5 |[http-nio-8182-exec-5] RestServiceExecutor:  service: 
2019-04-29 11:44:13,690 INFO  | /app/rest/controller/userdetails | f80fdc1f-8123-3932-a405-dda2dc2a80d5 |[http-nio-8182-exec-5] RestServiceExecutor: 
2019-04-29 11:44:13,779 INFO  | /app/rest/controller/userdetails | f80fdc1f-8123-3932-a405-dda2dc2a80d5 |[http-nio-8182-exec-5] UserDetailsRepoImpl: 
2019-04-29 11:44:13,950 INFO   [ForkJoinPool.commonPool-worker-3] RestServiceExecutor:  header: 
2019-04-29 11:44:13,950 INFO   [ForkJoinPool.commonPool-worker-3] RestServiceExecutor:  service: 
2019-04-29 11:44:14,012 INFO   [ForkJoinPool.commonPool-worker-3] CommonMasterDataServiceImpl: Cache: Retrieving Config Data details.
2019-04-29 11:44:14,028 INFO   [ForkJoinPool.commonPool-worker-3] CommonMasterDataServiceImpl: Cache: Retrieved Config Data details : 1
2019-04-29 11:44:14,028 INFO   [ForkJoinPool.commonPool-worker-3] CommonMasterDataServiceImpl: Cache: Retrieving Config Data details.
2019-04-29 11:44:14,033 INFO   [ForkJoinPool.commonPool-worker-3] CommonMasterDataServiceImpl: Cache: Retrieved Config Data details : 1
2019-04-29 11:44:14,147 INFO  | /app/rest/controller/userdetails | f80fdc1f-8123-3932-a405-dda2dc2a80d5 |[http-nio-8182-exec-5] SecondaryCacheServiceImpl: Fetching from secondary cache
2019-04-29 11:44:14,715 INFO  | /app/rest/controller/userdetails | f80fdc1f-8123-3932-a405-dda2dc2a80d5 |[http-nio-8182-exec-5] CommonMasterDataServiceImpl: Cache: Retrieving Config Data details.
2019-04-29 11:44:14,749 INFO  | /app/rest/controller/userdetails | f80fdc1f-8123-3932-a405-dda2dc2a80d5 |[http-nio-8182-exec-5]

Below is my MDC Data, which is not getting passed with Thread [ForkJoinPool.commonPool-worker-3]

| /app/rest/controller/userdetails | f80fdc1f-8123-3932-a405-dda2dc2a80d5 |

Below is my logback.xml configuration, where sessionID is MDC data

<configuration scan="true">
    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <charset>utf-8</charset>
            <Pattern>%d %-5level %X{sessionID} [%thread] %logger{0}: %msg%n</Pattern>
        </encoder>
    </appender>
</configuration>

I tried below Link

http://shengwangi.blogspot.com/2015/09/using-log-mdc-in-multi-thread-helloworld-example.html?_sm_au_=iVVrZDSwwf0vP6MR

Which perfectly work for TaskExecutor. But I have not found any solution for CompletableFuture.

Jess Chen
  • 3,136
  • 1
  • 26
  • 35
Mayur
  • 864
  • 6
  • 14
  • 25

2 Answers2

7

Create wrapper method

static CompletableFuture<Void> myMethod(Runnable runnable) {
    Map<String, String> previous = MDC.getCopyOfContextMap();
    return CompletableFuture.runAsync(() -> {
        MDC.setContextMap(previous);
        try {
            runnable.run();
        } finally {
            MDC.clear();
        }
    });
}

and use it instead of CompletableFuture.runAsync.

talex
  • 17,973
  • 3
  • 29
  • 66
  • But don't call it `myMethod`... `runAsyncWithMDC` or such – Thilo Apr 29 '19 at 08:43
  • 2
    Thanks Talex, it should work , but the problem is mutiple places we are using completablefuture and we might need to update at every place. right ? – Mayur Apr 29 '19 at 08:57
  • Since you using static method to run your async calls there is no way to intercept it. So you have to amend each call. – talex Apr 29 '19 at 09:05
2

My solution theme would be to (It would work with JDK 9+ as a couple of overridable methods are exposed since that version)

Make the complete ecosystem aware of MDC

And for that, we need to address the following scenarios:

  • When all do we get new instances of CompletableFuture from within this class? → We need to return a MDC aware version of the same rather.
  • When all do we get new instances of CompletableFuture from outside this class? → We need to return a MDC aware version of the same rather.
  • Which executor is used when in CompletableFuture class? → In all circumstances, we need to make sure that all executors are MDC aware

For that, let's create a MDC aware version class of CompletableFuture by extending it. My version of that would look like below

import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.function.Supplier;

public class MDCAwareCompletableFuture<T> extends CompletableFuture<T> {

    public static final ExecutorService MDC_AWARE_ASYNC_POOL = new MDCAwareForkJoinPool();

    @Override
    public CompletableFuture newIncompleteFuture() {
        return new MDCAwareCompletableFuture();
    }

    @Override
    public Executor defaultExecutor() {
        return MDC_AWARE_ASYNC_POOL;
    }

    public static <T> CompletionStage<T> getMDCAwareCompletionStage(CompletableFuture<T> future) {
        return new MDCAwareCompletableFuture<>()
                .completeAsync(() -> null)
                .thenCombineAsync(future, (aVoid, value) -> value);
    }

    public static <T> CompletionStage<T> getMDCHandledCompletionStage(CompletableFuture<T> future,
                                                                Function<Throwable, T> throwableFunction) {
        Map<String, String> contextMap = MDC.getCopyOfContextMap();
        return getMDCAwareCompletionStage(future)
                .handle((value, throwable) -> {
                    setMDCContext(contextMap);
                    if (throwable != null) {
                        return throwableFunction.apply(throwable);
                    }
                    return value;
                });
    }
}

The MDCAwareForkJoinPool class would look like (have skipped the methods with ForkJoinTask parameters for simplicity)

public class MDCAwareForkJoinPool extends ForkJoinPool {
    //Override constructors which you need

    @Override
    public <T> ForkJoinTask<T> submit(Callable<T> task) {
        return super.submit(MDCUtility.wrapWithMdcContext(task));
    }

    @Override
    public <T> ForkJoinTask<T> submit(Runnable task, T result) {
        return super.submit(wrapWithMdcContext(task), result);
    }

    @Override
    public ForkJoinTask<?> submit(Runnable task) {
        return super.submit(wrapWithMdcContext(task));
    }

    @Override
    public void execute(Runnable task) {
        super.execute(wrapWithMdcContext(task));
    }
}

The utility methods to wrap would be such as

public static <T> Callable<T> wrapWithMdcContext(Callable<T> task) {
    //save the current MDC context
    Map<String, String> contextMap = MDC.getCopyOfContextMap();
    return () -> {
        setMDCContext(contextMap);
        try {
            return task.call();
        } finally {
            // once the task is complete, clear MDC
            MDC.clear();
        }
    };
}

public static Runnable wrapWithMdcContext(Runnable task) {
    //save the current MDC context
    Map<String, String> contextMap = MDC.getCopyOfContextMap();
    return () -> {
        setMDCContext(contextMap);
        try {
            task.run();
        } finally {
            // once the task is complete, clear MDC
            MDC.clear();
        }
    };
}

public static void setMDCContext(Map<String, String> contextMap) {
   MDC.clear();
   if (contextMap != null) {
       MDC.setContextMap(contextMap);
    }
}

Below are some guidelines for usage:

  • Use the class MDCAwareCompletableFuture rather than the class CompletableFuture.
  • A couple of methods in the class CompletableFuture instantiates the self version such as new CompletableFuture.... For such methods (most of the public static methods), use an alternative method to get an instance of MDCAwareCompletableFuture. An example of using an alternative could be rather than using CompletableFuture.supplyAsync(...), you can choose new MDCAwareCompletableFuture<>().completeAsync(...)
  • Convert the instance of CompletableFuture to MDCAwareCompletableFuture by using the method getMDCAwareCompletionStage when you get stuck with one because of say some external library which returns you an instance of CompletableFuture. Obviously, you can't retain the context within that library but this method would still retain the context after your code hits the application code.
  • While supplying an executor as a parameter, make sure that it is MDC Aware such as MDCAwareForkJoinPool. You could create MDCAwareThreadPoolExecutor by overriding execute method as well to serve your use case. You get the idea!

You can find a detailed explanation of all of the above here in a post about the same.

With that, your code might look like

new MDCAwareCompletableFuture<>().completeAsync(() -> {
            getAcountDetails(user);
            return null;
        });
Victor
  • 3,395
  • 1
  • 23
  • 26
Laks
  • 244
  • 2
  • 7