3

I have a basic Apex stream with a Kafka input operator feeding into a Couchbase set operator. The Kafka sides works fine and have proven it by removing the couch operator. I have tried switching around versions of the Malhar library to see if it was broken in the latest.

Also I am using the DataTorrent flavor of Apex.

When I add in the couch operator I get the following exception

java.lang.RuntimeException: Error creating local cluster

at org.apache.apex.engine.EmbeddedAppLauncherImpl.getController(EmbeddedAppLauncherImpl.java:124)
at org.apache.apex.engine.EmbeddedAppLauncherImpl.getController(EmbeddedAppLauncherImpl.java:46)
at com.usaa.data.streams.apex.ApplicationTest.testApplication(ApplicationTest.java:30)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:68)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
at org.junit.runners.ParentRunner.run(ParentRunner.java:300)
at org.junit.runner.JUnitCore.run(JUnitCore.java:157)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): org.codehaus.jackson.map.DeserializationConfig
Serialization trace:
_deserializationConfig (org.codehaus.jackson.map.ObjectMapper)
mapper (com.datatorrent.contrib.couchbase.CouchBaseJSONSerializer)
serializer (com.datatorrent.contrib.couchbase.CouchbasePOJOSetOperator)
at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at com.datatorrent.common.util.FSStorageAgent.retrieve(FSStorageAgent.java:192)
at com.datatorrent.stram.plan.logical.LogicalPlan$OperatorMeta.readObject(LogicalPlan.java:898)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at java.util.ArrayList.readObject(ArrayList.java:791)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at com.datatorrent.stram.plan.logical.LogicalPlan.read(LogicalPlan.java:2326)
at com.datatorrent.stram.StramLocalCluster.cloneLogicalPlan(StramLocalCluster.java:323)
at com.datatorrent.stram.StramLocalCluster.<init>(StramLocalCluster.java:285)
at org.apache.apex.engine.EmbeddedAppLauncherImpl.getController(EmbeddedAppLauncherImpl.java:122)
... 24 more

Here is the associated Application code:

KafkaSinglePortInputOperator kafkaInput = dag.addOperator("kafkaInput", KafkaSinglePortInputOperator.class);
    kafkaInput.setTopics("testing2");
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("enable.auto.commit", "true");
    kafkaInput.setConsumerProps(props);
    kafkaInput.setClusters("localhost:9092");


    CouchbasePOJOSetOperator couchOutput = dag.addOperator("couchOutput", CouchbasePOJOSetOperator.class);

    CouchBaseWindowStore store = new CouchBaseWindowStore();
    store.setBucket("default");
    store.setUriString("localhost:8091,localhost:8091");
    store.setUserConfig("");
    store.setPasswordConfig("");
    store.setPassword("");
    couchOutput.setStore(store);
    try{
        store.connect();
    }catch (Exception e){
        e.printStackTrace();
    }

    CouchBaseJSONSerializer serializer = new CouchBaseJSONSerializer();
    couchOutput.setSerializer(serializer);
    ArrayList<String> expressions = new ArrayList<String>();
    expressions.add("getValue()");
    couchOutput.setExpressions(expressions);


    dag.addStream("kafkaInput", kafkaInput.outputPort, couchOutput.input).setLocality(Locality.CONTAINER_LOCAL);
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Mark Kouba
  • 236
  • 2
  • 12
  • The serialization stack trace says it is not able to de-serialise some mapper object. Check that. – Atom Apr 17 '17 at 21:47
  • Can the mapper object be transient as Kryo by default needs default constructor to be specified to serialise/de-serialise and ObjectMapper doesn't seem to have default constructor . – Atom Apr 17 '17 at 21:49
  • That is the thought I had as well. The DeserializationConfig definitely has no no-arg constructor, but I am at a loss if this has always been a problem when using the couch operators from malhar or if this is set up problem on my side. – Mark Kouba Apr 18 '17 at 02:53
  • To add a bit more info. The problem does specifically exist with the CouchBaseJSONSerializer, with that removed I can get it to mostly work. – Mark Kouba Apr 18 '17 at 15:29
  • 2
    You are right. Looks like CouchBaseJsonSerialiser has a non transient mapper. Not sure why it is non-transient! – Atom Apr 20 '17 at 23:55

0 Answers0