We are currently using Apache Storm 0.9.5 in clustered topology mode to process Amazon Kinesis records (spout) and store them into a Redshift data warehouse (bolt). Our Storm cluster is deployed in AWS and consists of 1 nimbus + UI node, 1 zookeeper node and 3 supervisor + logviewer nodes. Our topology configuration supports processing multiple Kinesis streams and for every stream it includes:
- One Kinesis stream spout to listen for incoming records
- One Redshift bolt to insert records to the data warehouse
Topology:
final TopologyBuilder topologyBuilder = new TopologyBuilder();
// for every configured kinesis stream
final List<KinesisStreamSpout> kinesisStreamSpouts = kinesisStreamService.getKinesisStreamSpouts();
for (final KinesisStreamSpout kinesisStreamSpout : kinesisStreamSpouts) {
final String spoutId = kinesisStreamSpout.getSpoutId();
topologyBuilder.setSpout(spoutId, kinesisStreamSpout.getKinesisSpout());
// set the corresponding redshift bolt
final String streamName = kinesisStreamSpout.getStreamName();
final RedshiftBolt redshiftBolt = new RedshiftBolt(streamName);
topologyBuilder.setBolt(redshiftBolt.getId(),
redshiftBolt, stormProperties.getNumberOfWorkersPerStream()).shuffleGrouping(spoutId);
}
return topologyBuilder.createTopology();
A bugbear of the system has been its inability to guarantee once-only processing of input messages resulting in multiple records with the same business key inserted into the target database. To get an idea of the magnitude of the problem we have run a controlled test and found that roughly a third of all input records were submitted for processing more than once.
As per this thread (which goes currently unanswered), we too have considered using Trident in order to guarantee once-only processing but have also come to the conclusion that it is more important to have idempotency built into the system (along with at-least-once semantics) rather than add complexity, lower performance and generate state as this other article has suggested.
We are now seeking advice on the best way of implementing idempotency within our existing topology in a manner that supports clustering. So far, we are leaning towards introducing a RedisBolt that would key values by tuple message id. Is there an existing pattern to achieve this using Apache Storm?