8

I am currently working on writing a Samza Script that will just take data from a Kafka topic and output the data to another Kafka topic. I have written a very basic StreamTask however upon execution I am running into an error.

The error is below:

Exception in thread "main" org.apache.samza.SamzaException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 193 ms.
    at org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer.send(CoordinatorStreamSystemProducer.java:112)
    at org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer.writeConfig(CoordinatorStreamSystemProducer.java:129)
    at org.apache.samza.job.JobRunner.run(JobRunner.scala:79)
    at org.apache.samza.job.JobRunner$.main(JobRunner.scala:48)
    at org.apache.samza.job.JobRunner.main(JobRunner.scala)
 Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 193 ms

I not entirely sure how to configure or have the script write the required Kafka metadata. Below is my code for the StreamTask and the properties file. In the properties file I added the Metadata section to see if that would assist in the process afterwards but to no avail. Is that the right direction or am I missing something entirely?

import org.apache.samza.task.StreamTask;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;

/*
*   Take all messages received and send them to
*   a Kafka topic called "words"
*/

public class TestStreamTask implements StreamTask{

    private static final SystemStream OUTPUT_STREAM = new SystemStream("kafka" , "words");  // create new system stream for kafka topic "words"

    @Override
    public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator){

        String message = (String) envelope.getMessage();    // pull message from stream

        for(String word : message.split(" "))
            collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, word, 1));    // output messsage to new system stream for kafka topic "words"
    }   
}

# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=test-words

# YARN
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz

# Task
task.class=samza.examples.wikipedia.task.TestStreamTask
task.inputs=kafka.test
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka
task.checkpoint.replication.factor=1

# Metrics
metrics.reporters=snapshot,jmx
metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
metrics.reporter.snapshot.stream=kafka.metrics
metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory

# Serializers
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory

# Systems
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.msg.serde=string
systems.kafka.consumer.zookeeper.connect=localhost:2181/
systems.kafka.consumer.auto.offset.reset=largest
systems.kafka.producer.bootstrap.servers=localhost:9092

# Metadata
systems.kafka.metadata.bootstrap.servers=localhost:9092
dimo414
  • 47,227
  • 18
  • 148
  • 244
Zerbraxi
  • 113
  • 2
  • 5
  • Do you have kafka (and zookeeper) running on localhost? Depending on how you started them, you can use the `kafka-console-consumer.sh` script that comes with kafka and the `zkCli.sh` script that comes with zookeeper to do some troubleshooting. – palimpsestor Jun 05 '15 at 21:12
  • @palimpsestor I do have kafka and zookeeper running. I am able to use the `kafka-console-consumer.sh` and the `kafka-console-producer.sh` to produce and consume data. It's only when running the Samza StreamTask that I get the above error. The Hello-Samza from Apache I was able to get functioning in it's entirety. – Zerbraxi Jun 08 '15 at 14:13
  • I was not able to reproduce the exact error message you posted, but I think it might be a red herring. In your `OutgoingMessageEnvelope` you are trying to post a "message" with an integer value (1), without telling samza what it should use to serialize that integer. I was able to get your example to work by changing that line to `collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, "string", "string", word, word, "1"));` – palimpsestor Jun 08 '15 at 18:06
  • @palimpsestor Thank for you the help. I changed that line and am sadly still running into the same problem. You said you were able to get it to work though so the issue is definitely me. I am currently launching the code through the samza `run-job.sh` which I assume is the correct way of doing it. My question is how or what specifically are you using to launch the application? I'm just using the Samza provided in the Hello-Samza test from Apache. Could this be my problem? Thank you again for your patience and assistance! – Zerbraxi Jun 08 '15 at 18:59
  • What version of kafka are you running? Do you have `auto.create.topics.enable` set to true? Just wondering if your issue is related to something like this: https://issues.apache.org/jira/browse/KAFKA-1124. Are you able to produce and consume messages on your 'words' topic independently from samza? – palimpsestor Jun 08 '15 at 20:20
  • @palimpsestor I am running Kafka version 0.8.2.1. I was able to have someone else run my job successfully as well so this is either user error, totally possible, or something is wrong configuration wise on my end. I am not able to produce and consume messages from that topic through just Kafka but am able to do so for other topics. I changed the topic to an existing one that I can send messages through with Kafka alone. However when I try to execute the Samza script with that topic instead the same issue occurs. I've shutdown Kafka entirely and started from scratch, but still no luck. – Zerbraxi Jun 09 '15 at 15:19
  • If it isn't too much trouble would you be able to explain the process you go through when you create a java file and properties file and execute it. I just want to make sure it isn't that I'm executing the script incorrectly – Zerbraxi Jun 09 '15 at 16:01
  • Hard to describe my process since I've customized quite a bit. Just now I took a clean checkout of the hello-samza project, dropped in `TestStreamTask.java` (with my modification), dropped in properties file (as is), added the properties file to the `assembly/src.xml` to include it in the package, and commented out the rat-plugin stuff from the pom. Then was able to start zk, kafka, and yarn, with `bin/grid start all`, and deploy the task using `run-job.sh`. Then sent some messages on the 'test' topic using `kafka-console-producer.sh` and saw a bunch of '1's show up on the 'words' topic. – palimpsestor Jun 12 '15 at 18:50
  • 1
    did you solve this issue? i'm facing almost the same – Rodrigo Montano Oct 30 '15 at 17:38

1 Answers1

0

This question is about Kafka 0.8 which should be out of support if I am not mistaken.

This fact, combined with the context of people only running into this issue sometimes, but not all the time (and nobody seems to struggle with this in recent years), gives me very good confidence that upgrading to a more recent version of Kafka will resolve the problem.

Dennis Jaheruddin
  • 21,208
  • 8
  • 66
  • 122