After reading this and this I'm having difficulties understanding how to configure my trident topology.
Basically my storm application is reading from kafka, doing some data manipulations and finally writing to Cassandra.
Here is how I'm currently building my topology:
private static StormTopology buildTopology() {
// connection to kafka
ZkHosts zkHosts = new ZkHosts(broker_zk, broker_path);
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHosts, topic);
kafkaConfig.scheme = new RawMultiScheme();
StateFactoryFields[] cassandraStateFactories = createStateFactories();
TransactionalTridentKafkaSpout spout = new TransactionalTridentKafkaSpout(kafkaConfig);
TridentTopology topology = new TridentTopology();
Stream kafkaSpout = topology.newStream("kafkaspout", spout).parallelismHint(1).shuffle();
Stream filterValidatStream = kafkaSpout.each(new Fields("bytes"), new SplitKafkaInput(), EventData.getEventDataFields()).parallelismHint(1);
for (StateFactoryFields stateFactoryFields : cassandraStateFactories) {
filterValidatStream.groupBy(stateFactoryFields.groupingFields)
.persistentAggregate(stateFactoryFields.cassandraStateFactor, new Count(), new Fields("count")).parallelismHint(2);
}
logger.info("Building topology");
return topology.build();
}
So I got a spout and a few operations (filter, grouopBy) with parallelismHint. I don't understant hor to determine the optimal parallelismHint, moreover if I'm setting this value in my code, how does it work in conjunction with storm standard topology configurations such as
topology.max.task.parallelism
topology.workers
topology.acker.executors
Thanks in advance