1

We are currently using Apache Storm 0.9.5 in clustered topology mode to process Amazon Kinesis records (spout) and store them into a Redshift data warehouse (bolt). Our Storm cluster is deployed in AWS and consists of 1 nimbus + UI node, 1 zookeeper node and 3 supervisor + logviewer nodes. Our topology configuration supports processing multiple Kinesis streams and for every stream it includes:

  • One Kinesis stream spout to listen for incoming records
  • One Redshift bolt to insert records to the data warehouse

Topology:

final TopologyBuilder topologyBuilder = new TopologyBuilder();

// for every configured kinesis stream
final List<KinesisStreamSpout> kinesisStreamSpouts = kinesisStreamService.getKinesisStreamSpouts();
for (final KinesisStreamSpout kinesisStreamSpout : kinesisStreamSpouts) {
    final String spoutId = kinesisStreamSpout.getSpoutId();
    topologyBuilder.setSpout(spoutId, kinesisStreamSpout.getKinesisSpout());

    // set the corresponding redshift bolt
    final String streamName = kinesisStreamSpout.getStreamName();
    final RedshiftBolt redshiftBolt = new RedshiftBolt(streamName);
    topologyBuilder.setBolt(redshiftBolt.getId(),
        redshiftBolt, stormProperties.getNumberOfWorkersPerStream()).shuffleGrouping(spoutId);
}

return topologyBuilder.createTopology();

A bugbear of the system has been its inability to guarantee once-only processing of input messages resulting in multiple records with the same business key inserted into the target database. To get an idea of the magnitude of the problem we have run a controlled test and found that roughly a third of all input records were submitted for processing more than once.

As per this thread (which goes currently unanswered), we too have considered using Trident in order to guarantee once-only processing but have also come to the conclusion that it is more important to have idempotency built into the system (along with at-least-once semantics) rather than add complexity, lower performance and generate state as this other article has suggested.

We are now seeking advice on the best way of implementing idempotency within our existing topology in a manner that supports clustering. So far, we are leaning towards introducing a RedisBolt that would key values by tuple message id. Is there an existing pattern to achieve this using Apache Storm?

Community
  • 1
  • 1
Lex Luthor
  • 523
  • 6
  • 18
  • are your sure, you don't have duplicates already inside Kinesis? the 1/3 value looks very hight to me... – SQL.injection Sep 28 '15 at 09:32
  • Duplicate submissions analysis is based on the unique messageId that the Amazon Kinesis spout component constructs from ShardId:SequenceNumber values. In other words, multiple Kinesis records for the same business payload would be assigned a different tuple messageId by the system. – Lex Luthor Sep 29 '15 at 23:15
  • well, then something is very wrong with your topology... do you have many failed tuples? – SQL.injection Sep 30 '15 at 07:59
  • we don't have any failed tuples within our tes run.. what we do get however is retries of the same message (anchored using the Kinesis record sequence number): ` c.a.s.k.s.s.z.InflightRecordTracker [INFO] Retrying record with partition key 1 sequence number 49554939912789135525468194646830500145840472776150876162. Retry attempt 1 ` – Lex Luthor Oct 01 '15 at 06:17
  • can you post an image of the storm UI -> show visualization (it should show the topology graph). – SQL.injection Oct 01 '15 at 07:57
  • Amazon have recently announced the release of FireHose which will probably do what we want: http://aws.amazon.com/kinesis/firehose/details/ – Lex Luthor Oct 08 '15 at 07:06

1 Answers1

0

If you don't want to use Trident, you might want to read the following article about "transactional topologies". This is the concept behind Trident and you can still apply it "manually". It seems to be a good pattern for your use case: https://storm.apache.org/documentation/Transactional-topologies.html

Furthermore, I want to add that Storm (as any other system like Apache Flink [disclaimer: I am a committer to Flink] and Apache Spark Streaming) can only guarantee exactly-once processing within the system. If data is forwarded to an external system, exactly-once can only be achieved if-and-only-if the external system can support idempotent operations.

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137