2

Currently I am working on a project where I have setup a Storm cluster across four Unix hosts.

The topology itself is as follows:

  1. JMS Spout listens to an MQ for new messages
  2. JMS Spout parses and then emits the result to an Esper Bolt
  3. The Esper Bolt then processes the event and emits a result to a JMS Bolt
  4. The JMS Bolt then publishes the message back onto the MQ on a different topic

I realize that Storm is a "at least-once" framework. However, if I receive 5 events and pass these onto the Esper Bolt for counting then for some reason I am receiving 5 count results in the JMS Bolt(all the same value).

Ideally, I want to receive one result output, is there some way I can tell Storm to ignore duplicate tuples?

I think this has something to do with the parallelism that I have setup because it works as expected when I just have a single thread:

 TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout(JMS_DATA_SPOUT, new JMSDataSpout(),2).setNumTasks(2);
    builder.setBolt("esperBolt", new EsperBolt.Builder().build(),6).setNumTasks(6)
            .fieldsGrouping(JMS_DATA_SPOUT,new Fields("eventGrouping"));
    builder.setBolt("jmsBolt", new JMSBolt(),2).setNumTasks(2).fieldsGrouping("esperBolt", new Fields("eventName"));

I have also seen Trident for "exactly-once" semantics. I am not fully convinced this would solve this issue however.

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
mchinaloy
  • 1,444
  • 3
  • 25
  • 42
  • Prior to trident this problem usually were solved by Transaction topology, but since the later is deprecated I think trident is the way to go. Take a look at [relevant section of their doc](https://github.com/nathanmarz/storm/wiki/Trident-state). – om-nom-nom May 20 '14 at 15:52
  • 1
    Can you post the code for your Esper Bolt? – Chris Gerken May 31 '14 at 12:05
  • I second Chris's request - we need the code for the Esper bold. In Esper, a statement like `select count(x) from A` would produce an output for every A sent into the engine. If you send 5 events and you only want to see a result after that, you will need to define this "boundary" (by sending a separate event for example). – xpa1492 Sep 30 '14 at 02:52

1 Answers1

0

If your Esper Bolt does not explicitly ack() each tuple at the end of its execute() method OR use an iBasicBolt implementation, then each tuple it receives will eventually be replayed by your origin JMS Spout after a timeout.

Alternatively, if you are asking your bolt to "only process unique messages" consider adding this processing behavior to your execute() method. It could first check a local Guava cache for tuple value uniqueness, then process accordingly.

Wes Floyd
  • 346
  • 3
  • 6