0

Is there a way to add custom shutdown hook to kafka connect which I can put in the classpath using the plugins.path property? Use Case: The Kafka Connect cluster tries to connect to Kafka Cluster. If it fails it logs and shutsdown immediately The error logs does not reach to the remote log server like Splunk I need to delay the shutdown so that the log collector agent can push the logs to the target log server.

I am using Confluent Platform v 6.1

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Spartacus
  • 337
  • 4
  • 12
  • Sounds like an XY problem. If you log to a file, any log collector is going to be able to pick that up – OneCricketeer Aug 22 '21 at 13:01
  • @OneCricketeer, its a docker container, so the container gets killed. The log collector process inside the container will be killed once the container dies. Is there a way to add shutdown hook to Kafka Connect process? I am having a custom SMT. I tried to register a java class which adds shutdown hook, but that class is never loaded. – Spartacus Aug 23 '21 at 02:11
  • No, there isn't a way... If it's in a container, then mount a volume for logs or use a docker logging agent that collects stdout to disk – OneCricketeer Aug 23 '21 at 05:00

1 Answers1

0

The best way to accomplish what you are looking for is write the log file to a location that outlives the container. This could be a persistent volume like @OneCricketeer mentioned previously, or a network based logging service.

If for some reason you can't do that, you can create a JVM shutdown hook using a Java Agent. This hook could delay the JVM's shutdown long enough (risky) or it could force a flush of the logging library, or any other cleanup. Since the agent is configured as a JVM command line argument, it should work for your kafka-connect workers. You just need to modify the command line that runs the workers.

There are good instructions for creating a Java Agent here and an example for setting up a shutdown hook here: Java shutdown hook

Here is a super simple example class that has both the applications main() method and the Agent's premain() in the same class:

public class App 
{
    public static void main( String[] args ) throws InterruptedException {
        System.out.println( System.currentTimeMillis() + ": Main Started!" );
        Thread.sleep(1000);
        System.out.println( System.currentTimeMillis() + ": Main Ended!" );
    }

    public static void premain(String agentArgs, Instrumentation inst) {
        System.out.println(System.currentTimeMillis() + ": Agent Started");
        Runtime.getRuntime()
            .addShutdownHook(
                new Thread() {
                    @Override
                    public void run() {
                        System.out.println(System.currentTimeMillis() + ": Shutdown Hook is running!");
                        try {
                            Thread.sleep(5000);
                        } catch (InterruptedException e) {
                            // do nothing
                        }
                        System.out.println(System.currentTimeMillis() + ": Shutdown hook is completed");
                    }
                });
    }
}

Note that in your case, you only need the premain method, as the main method is implemented by the connect worker.

Running the above class with the following command line:

java -javaagent:<MY_AGENT_FILE>.jar -classpath <MY_APP_FILE>.jar org.example.App

generates the following output:

1630187996652: Agent Started
1630187996672: Main Started!
1630187997672: Main Ended!
1630187997673: Shutdown Hook is running!
1630188002675: Shutdown hook is completed

So you have your delay.

Barak
  • 3,066
  • 2
  • 20
  • 33
  • thanks a lot for your suggestion. Sorry. for the delayed response. I solved it by using trap command, but your suggestion may be worth – Spartacus Sep 13 '21 at 22:56
  • If you used a different solution, it would be a good idea to add an answer with the solution you built, so others will use it as well. – Barak Sep 14 '21 at 07:17