2

I was using storm previously and I need to more batching capabilities so I searched for batching in storm. And I found out Trident which do micro-batching in real-time.

But somehow, I cannot figure out how Trident handle micro-batching (flow, batch size, batch interval) to know it really has what I need.

What I would like to do is to collect/save tuples emitted by a spout in an interval and re-emit them to downstream component/bolt/function with another interval of time. (For example, spout emit one tuple per second, next trident function will collect/save tuples and emit 50 tuples per minute to next function.)

Can somebody guide me how I can apply Trident in this case? Or any other applicable way using storm features?

yelo
  • 371
  • 1
  • 5
  • 17

1 Answers1

2

Excellent question! But sadly this kind of micro batching is not supported out of the Trident box.

But you can try implementing your own frequency driven micro-batching. Something like this skeleton example:

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MicroBatchingBolt extends BaseRichBolt {

    private static final long serialVersionUID = 8500984730263268589L;
    private static final Logger LOG = LoggerFactory.getLogger(MicroBatchingBolt.class);

    protected LinkedBlockingQueue<Tuple> queue = new LinkedBlockingQueue<Tuple>();

    /** The threshold after which the batch should be flushed out. */
    int batchSize = 100;

    /**
     * The batch interval in sec. Minimum time between flushes if the batch sizes
     * are not met. This should typically be equal to
     * topology.tick.tuple.freq.secs and half of topology.message.timeout.secs
     */
    int batchIntervalInSec = 45;

    /** The last batch process time seconds. Used for tracking purpose */
    long lastBatchProcessTimeSeconds = 0;

    private OutputCollector collector;

    @Override
    @SuppressWarnings("rawtypes")
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
      // Check if the tuple is of type Tick Tuple
      if (isTickTuple(tuple)) {
         // If so, it is indication for batch flush. But don't flush if previous
         // flush was done very recently (either due to batch size threshold was
         // crossed or because of another tick tuple

        if ((System.currentTimeMillis() / 1000 - lastBatchProcessTimeSeconds) >= batchIntervalInSec) {
          LOG.debug("Current queue size is " + this.queue.size()
              + ". But received tick tuple so executing the batch");

          finishBatch();
        } else {
          LOG.debug("Current queue size is " + this.queue.size()
              + ". Received tick tuple but last batch was executed "
              + (System.currentTimeMillis() / 1000 - lastBatchProcessTimeSeconds)
              + " seconds back that is less than " + batchIntervalInSec
              + " so ignoring the tick tuple");
        }
      } else {
        // Add the tuple to queue. But don't ack it yet.
        this.queue.add(tuple);
        int queueSize = this.queue.size();
        LOG.debug("current queue size is " + queueSize);
        if (queueSize >= batchSize) {
          LOG.debug("Current queue size is >= " + batchSize
              + " executing the batch");

          finishBatch();
        }
      }
    }

    private boolean isTickTuple(Tuple tuple) {
        // Check if it is tick tuple here
        return false;
    }

    /**
     * Finish batch.
     */
    public void finishBatch() {

      LOG.debug("Finishing batch of size " + queue.size());
      lastBatchProcessTimeSeconds = System.currentTimeMillis() / 1000;
      List<Tuple> tuples = new ArrayList<Tuple>();
      queue.drainTo(tuples);

      for (Tuple tuple : tuples) {
        // Prepare your batch here (may it be JDBC, HBase, ElasticSearch, Solr or
        // anything else.
        // List<Response> responses = externalApi.get("...");
      }

      try {
        // Execute your batch here and ack or fail the tuples
        LOG.debug("Executed the batch. Processing responses.");
        //        for (int counter = 0; counter < responses.length; counter++) {
        //          if (response.isFailed()) {
        //            LOG.error("Failed to process tuple # " + counter);
        //            this.collector.fail(tuples.get(counter));
        //          } else {
        //            LOG.debug("Successfully processed tuple # " + counter);
        //            this.collector.ack(tuples.get(counter));
        //          }
        //        }
      } catch (Exception e) {
        LOG.error("Unable to process " + tuples.size() + " tuples", e);
        // Fail entire batch
        for (Tuple tuple : tuples) {
          this.collector.fail(tuple);
        }
      }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // ... 
    }

}

Source: http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/ and Using tick tuples with trident in storm

Community
  • 1
  • 1
Alma Alma
  • 1,641
  • 2
  • 16
  • 19
  • How can we ensure the same Queue is not getting processed by two different threads. The method finishBatch should not be handled by two different executors for the same queue. Any thoughts on this ? – phaigeim Apr 04 '18 at 13:09