4

I am quiet new to Apache Storm and have been trying with trident topology for Kafka i.e. TransactionalTridentKafkaSpout. All works fine except the Storm UI. Even though I've not produced any data to my topic, the Storm UI keeps showing invalid emitted/transferred values. Meaning the count keeps on increasing even when there is no data in the topic. I've tried deleting the data/logs stored in zookeeper, storm, kafka and recreate kafka topics and also have set

topology.stats.sample.rate: 1.0 

but still the problem persists.

And also I came across a tool called Capillary to monitor storm cluster. I am using the below properties

capillary.zookeepers="192.168.125.20:2181"
capillary.kafka.zkroot="192.168.125.20:/home/storm/kafka_2.11-0.8.2.0"
capillary.storm.zkroot="192.168.125.20:/home/storm/apache-storm-0.9.3"

I am using Kafka's embedded zookeeper here. Even this is not working getting the below exception.

! @6mbg4bp7l - Internal server error, for (GET) [/] ->

play.api.Application$$anon$1: Execution exception[[JsonParseException: Unexpected character ('.' (code 46)): Expected space separating root-level values
 at [Source: java.io.StringReader@24adb083; line: 1, column: 9]]]
        at play.api.Application$class.handleError(Application.scala:296) ~[com.typesafe.play.play_2.10-2.3.4.jar:2.3.4]
        at play.api.DefaultApplication.handleError(Application.scala:402) [com.typesafe.play.play_2.10-2.3.4.jar:2.3.4]
        at play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$14$$anonfun$apply$1.applyOrElse(PlayDefaultUpstreamHandler.scala:205) [com.typesafe.play.play_2.10-2.3.4.jar:2.3.4]
        at play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$14$$anonfun$apply$1.applyOrElse(PlayDefaultUpstreamHandler.scala:202) [com.typesafe.play.play_2.10-2.3.4.jar:2.3.4]
        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) [org.scala-lang.scala-library-2.10.4.jar:na]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('.' (code 46)): Expected space separating root-level values
 at [Source: java.io.StringReader@24adb083; line: 1, column: 9]
        at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1524) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2]
        at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:557) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2]
        at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:475) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2]
        at com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:495) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2]
        at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._verifyRootSpace(ReaderBasedJsonParser.java:1178) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2]

Any help on either would be great. Thanks in advance.

Configuration and source code snippet:

 final Config config = new Config();
    config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 3000);
    config.setNumWorkers(2);
    config.put(Config.NIMBUS_HOST, "192.168.125.20");
    config.put(Config.NIMBUS_THRIFT_PORT, 6627);
    config.put(Config.STORM_ZOOKEEPER_PORT, 2181);
    config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("192.168.125.20"));
    config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
    config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
    config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 10);
    config.put(Config.DRPC_SERVERS, Arrays.asList("192.168.125.20"));
    config.put(Config.DRPC_PORT, 3772);

final BrokerHosts zkHosts = new ZkHosts("192.168.125.20");
final TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHosts, "Test_Topic", "");
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaConfig.bufferSizeBytes = 1024 * 1024 * 4;
kafkaConfig.fetchSizeBytes = 1024 * 1024 * 4;
kafkaConfig.forceFromStart = false;

final TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig);
final TridentTopology topology = new TridentTopology();
topology.newStream("spout", kafkaSpout)
       .each(new Fields("str"), new TestFunction(), new Fields("test"))
       .each(new Fields("str"), new PrintFilter());

Topology Summary Image : Topology Stats

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
DMA
  • 1,033
  • 1
  • 11
  • 22
  • Haven't use capillary, but it looks like something wrong with the config file.looking at their source code how does your conf/route file looks? – user2720864 Jun 02 '15 at 21:01
  • @user2720864 Thanks for your reply. I've updated my conf settings in my question. – DMA Jun 03 '15 at 07:50
  • I believe this is something wromng with Capillary. might worth looking at https://github.com/keenlabs/capillary/issues/5 – user2720864 Jun 03 '15 at 09:58
  • @user2720864 Thanks again for your reply. Yes I did look at it. But I was hoping there could be some other solution from Storm end to view correct values in the Storm UI. – DMA Jun 03 '15 at 10:04
  • why not use the default storm ui to check if it works, at least that will confirm the code is fine and you can then focus on the capillary part – user2720864 Jun 03 '15 at 10:26
  • @user2720864 that is right. I did test it with the default storm ui. I tested with normal kafka spout with storm. The values showed up fine when I set **topology.stats.sample.rate: 1.0** Trident also works fine as expected. But as I mentioned the problem is with emitted/transferred showing up improper values in storm ui. I tried capillary just to monitor the storm and kafka topic since the storm ui was showing improper counts. – DMA Jun 03 '15 at 10:39
  • You may be seeing processed tuples on the control streams. These will continue even if your main spout isn't producing new tuples. You should be able to see a breakdown of what stream the tuples are coming from in the bolt's UI page. – Joshua Martell Jun 03 '15 at 12:51
  • @JoshuaMartell Thanks for your reply. Apparently my bolt has not started processing since the spout has no data to read from the kafka topic. But still I could see the emitted/transferred being increased all the time in the topology summary page as well as in spout0 component summary page. I don't understand how/why this is happening. I could not see anything in the logs too. I've just embedded the image to my original question which shows the stats. – DMA Jun 03 '15 at 13:27

1 Answers1

2

Are you possibly seeing the what I'd call the UI metric artifacts of Trident? These tuples also show up in counters of the Storm UI:

Trident executes a batch every 500ms (by default). A batch involves a bunch of coordination messages going out to all the bolts to coordinate the batch (even if the batch is empty). So that's what you're seeing.

(source: Trident Kafka Spout - Ack Count Increasing Even Though No Messages Are Processed)

miguno
  • 14,498
  • 3
  • 47
  • 63
  • Thanks so much for your reply. Yes probably that is what I am seeing in the UI. The co-ordination messages shouldn't have been shown as emitted/processed values which leads to confusion in knowing the actual values. Is there a way to get the actual number of messages emitted/processed leaving out the co-ordination message counts as asked in the link you mentioned? – DMA Jun 08 '15 at 14:05
  • @DMA Did you find any way to get around this? – Sachin Nov 22 '15 at 07:35
  • @miguno Links seems to be expired. Do you have alternate links? – Sachin Nov 22 '15 at 07:35
  • @Sach Nothing till now. – DMA Nov 23 '15 at 09:41