0

I'm attempting to read from a kafka topic that contains json data and write to a new topic based on the value of the field "entity". I'm using the following code to read and write from kafka

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Properties;

public class entityDataLoader {
public static void main(final String[] args) throws Exception {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "map-function-lambda-example");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

// Set up serializers and deserializers, which we will use for overriding the default serdes
// specified above.
final Serde<String> stringSerde = Serdes.String();
final Serde<byte[]> byteArraySerde = Serdes.ByteArray();

// In the subsequent lines we define the processing topology of the Streams application.
final KStreamBuilder builder = new KStreamBuilder();

// Read the input Kafka topic into a KStream instance.
final KStream<byte[], String> textLines = builder.stream(byteArraySerde, stringSerde, "postilion-events");

String content = textLines.toString();
String entity = JSONExtractor.returnJSONValue(content, "entity");
System.out.println(entity);

textLines.to(entity);

final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.cleanUp();
streams.start();

// Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}

Any ideas what to do to run this application successfully?

Using Netbeans, i build with dependencies and place the jar file in the /home/kafka path and attempt to run it as the classpath and specifying the class i've created(using the command java -cp mavenproject.jar postilionkafka.entityDataLoader). I get the following error

Error: A JNI error has occurred, please check your installation and try again
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/streams/processor/TopologyBuilder
    at java.lang.Class.getDeclaredMethods0(Native Method)
    at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
    at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
    at java.lang.Class.getMethod0(Class.java:3018)
    at java.lang.Class.getMethod(Class.java:1784)
    at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
    at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.streams.processor.TopologyBuilder
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 7 more

Thanks to @James, i've been able to solve this. I'm unable to extract the entity data from the records in the topic. The records in the topic are JSON and an example is {"date":{"string":"2017-03-20"},"time":{"string":"20:04:13:563"},"event_nr":1572470,"interface":"Transaction Manager","event_id":5001,"date_time":1490040253563,"entity":"Transaction Manager","state":0,"msg_param_1":{"string":"ISWSnk"},"msg_param_2":{"string":"Application startup"},"msg_param_3":null,"msg_param_4":null,"msg_param_5":null,"msg_param_6":null,"msg_param_7":null,"msg_param_8":null,"msg_param_9":null,"long_msg_param_1":null,"long_msg_param_2":null,"long_msg_param_3":null,"long_msg_param_4":null,"long_msg_param_5":null,"long_msg_param_6":null,"long_msg_param_7":null,"long_msg_param_8":null,"long_msg_param_9":null,"last_sent":{"long":1490040253563},"transmit_count":{"int":1},"team_id":null,"app_id":{"int":4},"logged_by_app_id":{"int":4},"entity_type":{"int":3},"binary_data":null}

I'll like to write to a topic based on the value of the entity field(for the json example below, it should write to a topic Transaction Manager. If i run my current code, i get the error below

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. org.apache.kafka.streams.kstream.internals.KStreamImpl@568db2f2 No Object found Unexpected character (o) at position 0. null Exception in thread "main" java.lang.NullPointerException: topic can't be null at java.util.Objects.requireNonNull(Objects.java:228) at org.apache.kafka.streams.kstream.internals.KStreamImpl.to(KStreamImpl.java:353) at org.apache.kafka.streams.kstream.internals.KStreamImpl.to(KStreamImpl.java:337) at postilionkafka.dataload.main(dataload.java:35)

The JSONExtractor class is defined as

import org.json.simple.JSONObject;
import org.json.simple.parser.ParseException;
import org.json.simple.parser.JSONParser;
class JSONExtractor {

/**
 *
 */
public static String returnJSONValue(String args, String value){
    JSONParser parser = new JSONParser();
    String app= null;
    System.out.println(args);
    try{
        Object obj = parser.parse(args);
        JSONObject JObj = (JSONObject)obj;
        app= (String) JObj.get(value);
        return app;
    }
    catch(ParseException pe){
        System.out.println("No Object found");
        System.out.println(pe);
    }
    return app;
}
}
Zigmaphi
  • 15
  • 5
  • I was able to extract the entity from the sample JSON you have provided using your `JSONExtractor` class. Try logging the `args` input parameter to ensure that you are passing what you think you are passing. – James Mar 21 '17 at 13:21

1 Answers1

3

This looks like a simple class path issue, try adding all the jars that are not standard java in the classpath parameter, for example:

java -cp kafka-stream.jar:mavenproject.jar postilionkafka.entityDataLoader

This tends to get too convoluted very quickly and is one of the reasons we use Maven to manage dependencies. I generally run any application I am working on directly from the IDE, this is an easier approach for debugging as well. If I did have to launch outside of my IDE, I would still try from my IDE, IntelliJ logs out the execution command that includes the required dependencies and saves me the time of re-establishing what those might be and how to extract them from my local maven repo.

An alternative approach if running from the IDE doesn't work for you, it to use Maven exec. See this answer on running a project from maven.

Community
  • 1
  • 1
James
  • 1,095
  • 7
  • 20
  • I noticed the error is for `TopologyBuilder` but this isn't referenced in your code. But when I tried replicate your issue with running only your generated jar on the classpath, I saw exactly the same error. So the problem is that the kafka-streams jar isn't in your classpath, but just adding that isn't enough. You also need to include all the runtime dependencies of that jar, and it's dependencies (etc, etc) as well. – James Mar 19 '17 at 15:18
  • Thanks James... Testing it in a bit. I'll be home in a few minutes – Zigmaphi Mar 20 '17 at 17:54
  • Hello James... It worked.... i've got some issue with my code i'll need to figure out, but many thanks for the effort – Zigmaphi Mar 20 '17 at 19:19
  • Hello @James I'm having an issue with my code that i'm unable to figure out. I've added a new answer below. – Zigmaphi Mar 21 '17 at 03:36
  • hi @Zigmaphi, it looks like your exception is caused by a JSON `ParseException` and then you are returning `null` from `returnJSONValue` – James Mar 21 '17 at 09:24
  • Hello James, Looking at the data that i'm trying to import, how can i get the value of **entity** from each json record, and write the whole line to the topic? A code block/ review to allow me do that would be super – Zigmaphi Mar 21 '17 at 21:15
  • @Zigmaphi your JSONExtractor works, but I don't think you are passing in the JSON you expect. If you log that out or step through in debug mode I think you'll find it helpful. – James Mar 21 '17 at 21:26
  • I realize that `String content = textLines.toString();` isn't copying the json above into content. I've looked through the KStream documentation and i'm unable to find how to return just the json for me to use my JSONExtractor on. – Zigmaphi Mar 22 '17 at 03:38
  • I couldn't find a way you can dynamically `create` topic streams to forward to. But perhaps that's not entirely what you need? If you know what the possible options are then perhaps you can apply a `KStream.predicate` or if that isn't possible then maybe using `KStream.map` to apply a new key value? – James Mar 22 '17 at 06:48