2

We have a high-load Apache Camel application that utilizes logback/MDC for logging information. We are finding that some of the MDC info is stale on threads as forewarned in logback's documentation. I found this SO question that addresses this concern:

How to use MDC with thread pools?

How should we apply this to our camel application to avoid stale info? Is there a simple to globally change the default ThreadPoolExecutor to a custom variation as suggested in the linked question? I see you can do it for the pools themselves, but didn't see any examples for the executor. Keep in mind that our application is quite large and services a high volume of orders on a daily basis--I'd like as minimal impact to the existing application as possible.

Community
  • 1
  • 1
Dakota Brown
  • 730
  • 7
  • 20

1 Answers1

3

I figured it out and wanted to post what I did in case it benefits someone else. Please note I'm using JDK 6/camel2.13.2

  • Camel has a DefaultExecutorServiceManager that uses a DefaultThreadPoolFactory. I extended the default factory into a MdcThreadPoolFactory

  • The DefaultThreadPoolFactory has methods to generate RejectableThreadPoolExecutors and RejectableScheduledThreadPoolExecutors. I extended both of these into Mdc* versions that override the execute() method to wrap the Runnable and hand-off the MDC info between threads (as specified by the link in my original question).

  • I created a bean instance of the MdcThreadPoolFactory in my application configuration that is automatically picked up by Camel and used in the ExecutorServiceManager

MdcThreadPoolExecutor:

package com.mypackage.concurrent

import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor
import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.*;

/**
 * A SLF4J MDC-compatible {@link ThreadPoolExecutor}.
 * <p/>
 * In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
 * logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
 * thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately.
 * <p/>
 * Created by broda20.
 * Date: 10/29/15
 */
public class MdcThreadPoolExecutor extends RejectableThreadPoolExecutor {

    @SuppressWarnings("unchecked")
    private Map<String, Object> getContextForTask() {
        return MDC.getCopyOfContextMap();
    }

    public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    /**
     * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
     * all delegate to this.
     */
    @Override
    public void execute(Runnable command) {
        super.execute(wrap(command, getContextForTask()));
    }

    public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) {
        return new Runnable() {
            @Override
            public void run() {
                Map previous = MDC.getCopyOfContextMap();
                if (context == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(context);
                }
                try {
                    runnable.run();
                } finally {
                    if (previous == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(previous);
                    }
                }
            }
        };
    }
}

MdcScheduledThreadPoolExecutor:

package com.mypackage.concurrent

import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor
import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.*;

/**
 * A SLF4J MDC-compatible {@link ThreadPoolExecutor}.
 * <p/>
 * In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
 * logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
 * thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately.
 * <p/>
 * Created by broda20.
 * Date: 10/29/15
 */
public class MdcScheduledThreadPoolExecutor extends RejectableScheduledThreadPoolExecutor {

    @SuppressWarnings("unchecked")
    private Map<String, Object> getContextForTask() {
        return MDC.getCopyOfContextMap();
    }

    public MdcScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize);
    }

    public MdcScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
        super(corePoolSize, threadFactory);
    }

    public MdcScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
        super(corePoolSize, handler);
    }

    public MdcScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, threadFactory, handler);
    }

    /**
     * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
     * all delegate to this.
     */
    @Override
    public void execute(Runnable command) {
        super.execute(wrap(command, getContextForTask()));
    }

    public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) {
        return new Runnable() {
            @Override
            public void run() {
                Map previous = MDC.getCopyOfContextMap();
                if (context == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(context);
                }
                try {
                    runnable.run();
                } finally {
                    if (previous == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(previous);
                    }
                }
            }
        };
    }
}

MdcThreadPoolFactory:

package com.mypackage.concurrent

import org.apache.camel.impl.DefaultThreadPoolFactory
import org.apache.camel.spi.ThreadPoolProfile
import org.apache.camel.util.concurrent.SizedScheduledExecutorService
import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.*;

public class MdcThreadPoolFactory extends DefaultThreadPoolFactory {

    @SuppressWarnings("unchecked")
    private Map<String, Object> getContextForTask() {
        return MDC.getCopyOfContextMap();
    }


    public ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize, boolean allowCoreThreadTimeOut,
                                             RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory) throws IllegalArgumentException {

            // the core pool size must be 0 or higher
            if (corePoolSize < 0) {
               throw new IllegalArgumentException("CorePoolSize must be >= 0, was " + corePoolSize);
            }

            // validate max >= core
            if (maxPoolSize < corePoolSize) {
                throw new IllegalArgumentException("MaxPoolSize must be >= corePoolSize, was " + maxPoolSize + " >= " + corePoolSize);
            }

            BlockingQueue<Runnable> workQueue;
            if (corePoolSize == 0 && maxQueueSize <= 0) {
                // use a synchronous queue for direct-handover (no tasks stored on the queue)
                workQueue = new SynchronousQueue<Runnable>();
                // and force 1 as pool size to be able to create the thread pool by the JDK
                corePoolSize = 1;
                maxPoolSize = 1;
            } else if (maxQueueSize <= 0) {
                // use a synchronous queue for direct-handover (no tasks stored on the queue)
                workQueue = new SynchronousQueue<Runnable>();
            } else {
                // bounded task queue to store tasks on the queue
                workQueue = new LinkedBlockingQueue<Runnable>(maxQueueSize);
            }

            ThreadPoolExecutor answer = new MdcThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue);
            answer.setThreadFactory(threadFactory);
            answer.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
            if (rejectedExecutionHandler == null) {
                rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
            }
            answer.setRejectedExecutionHandler(rejectedExecutionHandler);
            return answer;
        }

        @Override
        public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
            RejectedExecutionHandler rejectedExecutionHandler = profile.getRejectedExecutionHandler();
            if (rejectedExecutionHandler == null) {
                rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
            }

            ScheduledThreadPoolExecutor answer = new MdcScheduledThreadPoolExecutor(profile.getPoolSize(), threadFactory, rejectedExecutionHandler);
            //JDK7: answer.setRemoveOnCancelPolicy(true);

            // need to wrap the thread pool in a sized to guard against the problem that the
            // JDK created thread pool has an unbounded queue (see class javadoc), which mean
            // we could potentially keep adding tasks, and run out of memory.
            if (profile.getMaxPoolSize() > 0) {
                return new SizedScheduledExecutorService(answer, profile.getMaxQueueSize());
            } else {
                return answer;
            }
        }
}

And finally, the bean instance:

<bean id="mdcThreadPoolFactory" class="com.mypackage.concurrent.MdcThreadPoolFactory"/>
Dakota Brown
  • 730
  • 7
  • 20
  • 1
    To get this to work in Camel 2.16.3 for new threads requested by org.apache.camel.util.component.AbstractApiProducer.process(Exchange, AsyncCallback) I also had to override java.util.concurrent.ScheduledThreadPoolExecutor.submit(Runnable) – Paul M Jun 27 '16 at 16:54
  • 2
    I subsequently changed this to override public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit) which is what submit() & execute() bith delegate to (in JDK8 at least). I think this would make a good submission to core camel. If I find time to get round to this are you ok with signing over the copyright to Apache (or whatever licensing work is required)? – Paul M Jun 29 '16 at 11:43
  • 1
    I'm completely fine with it. Anything to make Camel better in the long run :) – Dakota Brown Jun 29 '16 at 12:07
  • @PaulM - did this make it into camel-core? I am hitting a similar problem with the CXF component in asynchronous mode resulting in different MDC data being logged after faults are thrown and I'm wondering if this solution might help. Thanks – Tom Bunting Jan 31 '19 at 13:53