3

Sorry if the question is solved, but I tried to find it and I haven't had success. There are some similar, but I don't found help where I've seen. I have the next problem:

603  [main] WARN  b.s.StormSubmitter - Topology submission exception: 
    Component: [escribirFichero] subscribes from non-existent stream: 
               [default] of component [buscamosEnKlout]
Exception in thread "main" java.lang.RuntimeException: 
    InvalidTopologyException(msg:Component: 
               [escribirFichero] subscribes from non-existent stream: 
                   [default] of component [buscamosEnKlout])

I can't understand why I have this exception. I declare the bolt "buscamosEnKlout" before I use "escribirFichero". Next to my topology I'll put the elemental lines of the bolts. I know the spout is OK,because a trial-and-error approach.

The code of my topology is:

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.stats.RollingWindow;
import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.TopologyBuilder;
import bolt.*;
import spout.TwitterSpout;
import twitter4j.FilterQuery;

public class TwitterTopologia {
    private static String consumerKey = "xxx1";
    private static String consumerSecret = "xxx2";
    private static String accessToken = "yyy1";
    private static String accessTokenSecret="yyy2";

    public static void main(String[] args) throws Exception {
        /**************** SETUP ****************/
        String remoteClusterTopologyName = null;
        if (args!=null) { ... } 

        TopologyBuilder builder = new TopologyBuilder();
        FilterQuery tweetFilterQuery = new FilterQuery();
        tweetFilterQuery.track(new String[]{"Vacaciones","Holy Week", "Semana Santa","Holidays","Vacation"});
        tweetFilterQuery.language(new String[]{"en","es"});


        TwitterSpout spout = new TwitterSpout(consumerKey, consumerSecret, accessToken, accessTokenSecret, tweetFilterQuery);

        KloutBuscador buscamosEnKlout = new KloutBuscador();
        FileWriterBolt fileWriterBolt = new FileWriterBolt("idUsuarios.txt");

        builder.setSpout("spoutLeerTwitter",spout,1);
        builder.setBolt("buscamosEnKlout",buscamosEnKlout,1).shuffleGrouping("spoutLeerTwitter");
        builder.setBolt("escribirFichero",fileWriterBolt,1).shuffleGrouping("buscamosEnKlout");


        Config conf = new Config();
        conf.setDebug(true);
        if (args != null && args.length > 0) {
            conf.setNumWorkers(3);

            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        }
        else {
            conf.setMaxTaskParallelism(3);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("twitter-fun", conf, builder.createTopology());
            Thread.sleep(460000);
            cluster.shutdown();
        }
    }
}

Bolt "KloutBuscador", alias "buscamosEnKlout", is the next code:

String text = tuple.getStringByField("id");

String cadenaUrl;

cadenaUrl = "http://api.klout.com/v2/identity.json/twitter?screenName=";
cadenaUrl += text.replaceAll("\\[", "").replaceAll("\\]","");
cadenaUrl += "&key=" + kloutKey;
URL url = new URL(cadenaUrl);
HttpURLConnection c = (HttpURLConnection) url.openConnection();
        ...........c.setRequestMethod("GET");c.setRequestProperty("Content-length", "0");c.setUseCaches(false);c.setAllowUserInteraction(false);c.connect();
int status = c.getResponseCode();
StringBuilder sb = new StringBuilder();
switch (status) {
    case 200:
    case 201:
       BufferedReader br = new BufferedReader(new InputStreamReader(c.getInputStream()));
       String line;
       while ((line = br.readLine()) != null) sb.append(line + "\n");
           br.close();
       }

JSONObject jsonResponse = new JSONObject(sb.toString());
//getJSONArray("id");
String results = jsonResponse.toString();
_collector.emit(new Values(text,results));

And the second bolt, fileWriterBolt, alias "escribirFichero", is the next one:

public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
    _collector = outputCollector;
    try {
        writer = new PrintWriter(filename, "UTF-8");...}...}

    public void execute(Tuple tuple) {
        writer.println((count++)+":::"+tuple.getValues());

 //+"+++"+tweet.getUser().getId()+"__FINAL__"+tweet.getUser().getName()
        writer.flush();
        // Confirm that this tuple has been treated.
        //_collector.ack(tuple);

    }

If I pass over the bolt of Klous and only write the result of the spout, it works. I don't understand why the Klous's bolt causes this failure

Krakenudo
  • 182
  • 1
  • 17

1 Answers1

3

Your buscamosEnKlout bolt needs to declare the format of the tuples it will emit, as well as which streams it will emit to. You most likely haven't implemented declareOutputFields correctly in that bolt. It should contain something like declarer.declare(new Fields("your-text-field", "your-results-field"))

Stig Rohde Døssing
  • 3,621
  • 2
  • 7
  • 7
  • Thank you. I didn't have implemented it. One more question. I'm send to the second Bolt "values", and you've told that I have to declare "fields". Does it have sense? Or does it not matter? – Krakenudo Mar 25 '18 at 14:44
  • You use Fields to declare the format of your tuples, and Values to wrap the actual values you want to send. For example, you might do `declarer.declare(new Fields("my-counter"))`, and then have the bolt emit `new Values(counter++)`. Here's a complete example https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java – Stig Rohde Døssing Mar 25 '18 at 14:51
  • Also consider taking another look at https://storm.apache.org/releases/2.0.0-SNAPSHOT/Concepts.html, it explains the concepts fairly well. – Stig Rohde Døssing Mar 25 '18 at 14:53