0

I have created a Storm topology with a Spout that emits a number of tuples for benchmarking. I want to stop/kill my topology once all the tuples are emitted from the spout or there are no longer any tuples flowing in the topology.

Here is what my topology looks like.

LocalCluster cluster = new LocalCluster();
TopologyBuilder builder = new TopologyBuilder();
Config conf = new Config();
//Disabled ACK'ing for higher throughput
conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0); 

LoadGeneratorSource loadGenerator = new LoadGeneratorSource(runtime,numberOfTuplesToBeEmitted);
builder.setSpout("loadGenerator", loadGenerator);

//Some Bolts Here

while (loadGenerator.isRunning()){
//Active Waiting
}
//DO SOME STUFF WITH JAVA
cluster.killTopology("StormBenchmarkTopology");

The problem is that the loadGenerator instance that I'm referring in this scope is different than the one running in the spout thread. Hence, isRuning() is always returning true, even though inside the spout thread its value is false when there are no more tuples to be emitted.

Here is a part of LoadGeneratorSource class.


public class LoadGeneratorSource extends BaseRichSpout {

    private final int throughput;
    private boolean running;
    private final long runtime;


    public LoadGeneratorSource(long runtime,int throughput) {
        this.throughput = throughput;
        this.runtime = runtime;
    }

    @Override
    public void nextTuple() {
        ThroughputStatistics.getInstance().pause(false);

        long endTime = System.currentTimeMillis() + runtime;
        while (running) {
            long startTs = System.currentTimeMillis();

            for (int i = 0; i < throughput; i++) {
                try {
                    emitValue(readNextTuple());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            while (System.currentTimeMillis() < startTs + 1000) {
                // active waiting
            }

            if (endTime <= System.currentTimeMillis())
                setRunning(false);
        }
    }

    public boolean isRunning() {
        return running;
    }

    public void setRunning(boolean running) {
        this.running = running;
    }

    //MORE STUFF

}

Can someone tell me a way to stop my topology once there are no more tuples emitted from the spout or flowing in the topology? Thanks for your help in advance.

Batuhan Tüter
  • 301
  • 1
  • 3
  • 14

1 Answers1

0

This seems like a duplicate of Killing storm topology from spout. Please try the answer given there.

Just to give a quick summary; The way you're trying to do it won't work, but you can use a NimbusClient from the spout to ask Nimbus to kill your topology. Side benefit is that will also work once you deploy to a real cluster.

Stig Rohde Døssing
  • 3,621
  • 2
  • 7
  • 7
  • I am aware of the topic. I don't want to kill the topology directly within the spout. I still have things to handle in the above scope once the tuples are finished. Maybe I wasn't clear on that. – Batuhan Tüter Mar 27 '19 at 08:14
  • 1
    I'm not clear on what what your code is intended to do. Are you just benchmarking locally, and don't intend to deploy the code to a real cluster? If so, just put the load generator in a static field and refer to it from there (and make it thread safe). If you want to use your code on a real cluster, you need to rethink what you're doing. The teardown code needs to happen either in the spout, or in a bolt you trigger with an "end of stream" tuple. The "do some stuff with Java" code you have won't run in the same JVM as the spout on a real cluster. – Stig Rohde Døssing Mar 27 '19 at 10:13
  • I see what you mean. I might end-up trying on a cluster thus I think I should change the structure. Thank you for your comments. – Batuhan Tüter Mar 27 '19 at 14:05