3

I need some help understanding why merging two streams blocks one of the spouts of class FixedBatchSpout.

Short Description: I’m trying to merge two streams s1 and s2, but calling topology.merge(s1, s2) blocks the FixedBatchSpout (a trident spout) from which s1 originates, whereas the BaseRichSpout (a storm spout) from s2 seems to work properly.

Details: In the below main method, just adding the line topology.merge(s1, s2); prevents the FixedBatchSpout to emit past its first batch. This happens with multireduce as well.

FixedBatchSpout spout1 = new FixedBatchSpout(new Fields("sentence"), 2,
                new Values("the cow jumped over the moon"),
                new Values("the man went to the store and bought some candy”));

FixedLoopSpout spout2 = new FixedLoopSpout(new Fields("sentence"),
                new Values("THE COW JUMPED OVER THE MOON"),
                new Values("THE MAN WENT TO THE STORE AND BOUGHT SOME CANDY"));

Stream s1 = topology.newStream("hello", spout1);
Stream s2 = topology.newStream("world", spout2);
topology.merge(s1, s2);

public class FixedLoopSpout extends BaseRichSpout {

    Values[] values;
    List<Values> loop = new LinkedList<Values>();
    Iterator<Values> head;
    private SpoutOutputCollector collector;
    private final Fields outputFields;

    private long emitted = 0;

    public FixedLoopSpout(Fields outputFields, Values... values) {
        this.outputFields = outputFields;
        this.values = values;
    }

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
        for (Values value: this.values) {
            this.loop.add(value);
        }
        this.head = this.loop.iterator();
    }

    public void nextTuple() {
        if (!this.head.hasNext()) {
            // wrap
            this.head = this.loop.iterator();
        }
        this.collector.emit(this.head.next(), this.emitted++);
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(this.outputFields);
    }
}

Help is appreciated, Thanks! Jacques

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
Colegram
  • 106
  • 3

0 Answers0