I'm trying to debug Storm topology in local mode in Eclipse. I use Apache Storm 2.4.0. And my code looks like this:
package stormtest;
import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
public class TestSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private static int currentNumber = 1;
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
// TODO Auto-generated method stub
this.collector = collector;
}
public void nextTuple() {
// TODO Auto-generated method stub
collector.emit(new Values(new Integer(currentNumber++)));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
declarer.declare(new Fields("number"));
}
}
package stormtest;
import java.util.Iterator;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class TestBolt extends BaseRichBolt {
OutputCollector collector;
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
// TODO Auto-generated method stub
this.collector = collector;
}
public void execute(Tuple input) {
// TODO Auto-generated method stub
int number = input.getInteger(0);
if(isPrime(number)) {
System.out.println(number);
}
collector.ack(input);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
declarer.declare(new Fields("number"));
}
private boolean isPrime(int n) {
if (n==1 || n==2 || n==3) {
return true;
}
if (n%2 == 0 || n%3 == 0 || n%5 == 0 || n%7 == 0) {
return false;
}
for (int i = 11; i*i <= n; i+=2) {
if(n%i == 0) {
return false;
}
}
return true;
}
}
package stormtest;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;
public class TestTopology {
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new TestSpout());
builder.setBolt("prime", new TestBolt()).shuffleGrouping("spout");
Config conf = new Config();
StormSubmitter.submitTopologyWithProgressBar("test_topology", conf, builder.createTopology());
}
}
I'm able to use remote debugging in cluster mode meantioned here How to debug Apache Storm in Eclipse?.
But with this method I need build the project, add the topology to storm and then kill the topology, which is quite lengthy.
Unfortunetly, I'm not able to run topology from Eclipse.
I tried use LocalCluster.withLocalModeOverride
, but the code from documentation https://storm.apache.org/releases/current/Local-mode.html cannot be compiled:
package stormtest;
import org.apache.storm.LocalCluster;
public class LocalRun {
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
LocalCluster.withLocalModeOverride(() -> TestTopology.main(args), 20);
}
}
I get this error:
Exception in thread "main" java.lang.Error: Unresolved compilation problems:
The method withLocalModeOverride(Callable<T>, long) in the type LocalCluster is not applicable for the arguments (() -> {}, int)
Cannot return a void result
at stormtest.LocalRun.main(LocalRun.java:10)
When I try:
package stormtest;
import org.apache.storm.LocalCluster;
public class LocalRun {
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
LocalCluster.withLocalModeOverride(() -> {TestTopology.main(args); return 0;}, 20);
}
}
I get this error:
java.lang.RuntimeException: Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.
at org.apache.storm.StormSubmitter.submitJarAs(StormSubmitter.java:485) ~[storm-client-2.4.0.jar:2.4.0]
at org.apache.storm.StormSubmitter.submitTopologyInDistributeMode(StormSubmitter.java:350) ~[storm-client-2.4.0.jar:2.4.0]
at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:292) ~[storm-client-2.4.0.jar:2.4.0]
at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:212) ~[storm-client-2.4.0.jar:2.4.0]
at org.apache.storm.StormSubmitter.submitTopologyWithProgressBar(StormSubmitter.java:429) ~[storm-client-2.4.0.jar:2.4.0]
at org.apache.storm.StormSubmitter.submitTopologyWithProgressBar(StormSubmitter.java:410) ~[storm-client-2.4.0.jar:2.4.0]
at stormtest.TestTopology.main(TestTopology.java:20) ~[classes/:?]
at stormtest.LocalRun.lambda$0(LocalRun.java:9) ~[classes/:?]
at org.apache.storm.LocalCluster.withLocalModeOverride(LocalCluster.java:349) ~[storm-server-2.4.0.jar:2.4.0]
at org.apache.storm.LocalCluster.withLocalModeOverride(LocalCluster.java:324) ~[storm-server-2.4.0.jar:2.4.0]
at stormtest.LocalRun.main(LocalRun.java:9) ~[classes/:?]
Another way to use local mode in Eclipse is to run org.apache.storm.LocalCluster
instead of main class and pass the name as argument.
But I get same error as above.
How should I run Storm topology from Eclipse?