3

After reading this and this I'm having difficulties understanding how to configure my trident topology.

Basically my storm application is reading from kafka, doing some data manipulations and finally writing to Cassandra.

Here is how I'm currently building my topology:

private static StormTopology buildTopology() {
// connection to kafka
ZkHosts zkHosts = new ZkHosts(broker_zk, broker_path);
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHosts, topic);
kafkaConfig.scheme = new RawMultiScheme();
StateFactoryFields[] cassandraStateFactories = createStateFactories();
TransactionalTridentKafkaSpout spout = new TransactionalTridentKafkaSpout(kafkaConfig);
TridentTopology topology = new TridentTopology();
Stream kafkaSpout = topology.newStream("kafkaspout", spout).parallelismHint(1).shuffle();
Stream filterValidatStream = kafkaSpout.each(new Fields("bytes"), new SplitKafkaInput(), EventData.getEventDataFields()).parallelismHint(1);
for (StateFactoryFields stateFactoryFields : cassandraStateFactories) {
    filterValidatStream.groupBy(stateFactoryFields.groupingFields)
        .persistentAggregate(stateFactoryFields.cassandraStateFactor, new Count(), new Fields("count")).parallelismHint(2);
}
logger.info("Building topology");
return topology.build();
}

So I got a spout and a few operations (filter, grouopBy) with parallelismHint. I don't understant hor to determine the optimal parallelismHint, moreover if I'm setting this value in my code, how does it work in conjunction with storm standard topology configurations such as

topology.max.task.parallelism
topology.workers
topology.acker.executors

Thanks in advance

forhas
  • 11,551
  • 21
  • 77
  • 111

1 Answers1

3

There is an excellent gist by mrflip here that attempts to outline how to tune a storm/trident topology. This should guide you in selecting your parameters (both the ones you have suggested in your question and others you may not have thought of yet).

lorcan
  • 3,280
  • 3
  • 24
  • 31