7

I'm learning Trident framework. There are several methods on Trident Streams for aggregation tuples within a batch, including this one which allows to preform a stateful mapping of the tuples using Aggregator interface. But unfortunately a built-in counterpart to additionally persist the map state, like other 9 overloadings of persistentAggregate(), only with Aggregator as an argument, is not present.

Thus how can I implement the desired functionality by combining lower-level Trident and Storm abstractions and tools? It is pretty hard to explore the API because there is almost no Javadoc documentation.

In other words, persistentAggregate() methods allow to end stream processing with updating some persistent state:

stream of tuples ---> persistent state

I want to update persistent state and emit different tuples by the way:

stream of tuples ------> stream of different tuples
                  with
            persistent state

Stream.aggregate(Fields, Aggregator, Fields) doesn't provide fault-tolerance:

stream of tuples ------> stream of different tuples
                  with
          simple in-memory state
leventov
  • 14,760
  • 11
  • 69
  • 98

1 Answers1

3

You can create a new stream from a state using the method TridentState#newValuesStream(). This will allow you to retrieve a stream of the aggregated values.

For illustrative purpose, we can improve the example in Trident documentation by adding this method and a Debug Filter :

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
    new Values("the cow jumped over the moon"),
    new Values("the man went to the store and bought some candy"),
    new Values("four score and seven years ago"),
    new Values("how many apples can you eat"));
spout.setCycle(true);

TridentTopology topology = new TridentTopology();        
topology.newStream("spout1", spout)
    .each(new Fields("sentence"), new Split(), new Fields("word"))
    .groupBy(new Fields("word"))
    .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))                
    .newValuesStream().each(new Fields("count"), new Debug());

Running this topology will output (to the console) the aggregated counts.

Hope it helps

Pierre Merienne
  • 391
  • 3
  • 6
  • I don't need a stream of aggregated counts. I want to transform the values stream statefully. [This](http://nathanmarz.github.io/storm/doc/storm/trident/Stream.html#aggregate%28backtype.storm.tuple.Fields,%20storm.trident.operation.Aggregator,%20backtype.storm.tuple.Fields%29) method solves the issue, except `Aggregator` class doesn't support persistency. – leventov Nov 27 '13 at 18:26
  • If I understand, you want to transform tuples according to previous state, then update state and emit transformed tuples. Can you confirm that? – Pierre Merienne Nov 28 '13 at 17:45
  • According current state. Simply speaking, I would be satisfied with persisting implementation of `Aggregator`, but since it doesn't exist (in the standard Storm distribution, at least), I'm looking for different approaches. – leventov Nov 28 '13 at 20:02
  • Did you try with `stateQuery()`? – schiavuzzi Jan 24 '14 at 10:00