1

I am trying to store the tweets from Sample Stream into a database and store the raw json at the same time. I am using Twitter4jStatusClient following the example in hbc Github repository. Since I am only storing a subset of information into the database at real time, I am hoping to store the raw json of the tweet as well so that I may retrieve additional information when I need it. However using Twitter4jStatusClient means that the listener is executed on a different thread, and in here, it says that in order to get the json object, it must be executed from the same thread that retrieved the json object. Is there a way of saving the json string when using Twitter4JStatusClient? I chose not to use this example because i only wanted to perform certain actions and save the json string if it is a status. Thanks!

    // Create an appropriately sized blocking queue
    BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10000);

    // Define our endpoint: By default, delimited=length is set (we need this for our processor)
    // and stall warnings are on.
    StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint();
    // Specify the language filter for the endpoint
    endpoint.addQueryParameter(Constants.LANGUAGE_PARAM, Joiner.on(',').join(Lists.newArrayList("en")));
    endpoint.stallWarnings(false);

    Authentication auth = new OAuth1(consumerKey, consumerSecret, token, secret);

    // Create a new BasicClient. By default gzip is enabled.
    BasicClient client = new ClientBuilder()
            .name("sampleStreamClient")
            .hosts(Constants.STREAM_HOST)
            .endpoint(endpoint)
            .authentication(auth)
            .processor(new StringDelimitedProcessor(queue))
            .build();

    // Create an executor service which will spawn threads to do the actual work of parsing the incoming messages and
    // calling the listeners on each message
    int numProcessingThreads = 4;
    ExecutorService service = Executors.newFixedThreadPool(numProcessingThreads);


    StatusListener listener = new SampleStreamStatusListener(jsonInserter);

    // Wrap our BasicClient with the twitter4j client
    t4jClient = new Twitter4jStatusClient(
            client, queue, Lists.newArrayList(listener), service);
Aithusa
  • 169
  • 1
  • 14

1 Answers1

0

I had a similar problem with Twitter4jStatusClient, here are a few ideas

An intermediate queue

You could have a separate thread pool that reads the raw messages from your queue variable, stores them somewhere, and puts them into a new queue we'll call hbcQueue, which you pass into the Twitter4jStatusClient constructor instead of queue.

BlockingQueue<String> hbcQueue = new LinkedBlockingQueue<>(10000);
ExecutorService rawJsonSaver = Executors.newFixedThreadPool(numProcessingThreads);
for (int i = 0; i < numProcessingThreads; i++) {
  rawJsonSaver.execute(() -> {
    for (;;) {
      try {
        String msg = queue.take();
        JSONObject jobj = new JSONObject(msg);
        if (JSONObjectType.determine(jobj) == JSONObjectType.Type.STATUS) {
          System.out.println(msg);  // Save it
          hbcQueue.add(msg);
        }
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt(); break;
      } catch (JSONException e) {
        continue;
      }
    }
  });
}
Twitter4jStatusClient t4jClient = new Twitter4jStatusClient(
    client, hbcQueue, Lists.newArrayList(listener), service);

But of course this has the performance disadvantages of parsing the JSON a second time and adding another blocking lock operation for the second concurrent queue.

Re-serialization

If you're going to later process the raw JSON in Java, you could use plain Java serialization because the Status object passed to your StatusListener implements Serializable. This isn't a far stretch from re-serializing it back into JSON but at least you don't need to manually serialize each field.

  @Override
  public void onStatus(final Status status) {
    byte[] serializedStatus;
    try (ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
         ObjectOutputStream objStream = new ObjectOutputStream(byteStream)) {
      objStream.writeObject(status);
      serializedStatus = byteStream.toByteArray();
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
    // store serializedStatus
    // . . .
  }
Mike Placentra
  • 835
  • 1
  • 14
  • 27