0

I have a Producer-Consumer problem to implement in Java, where I want the producer thread to run for a specific amount of time e.g. 1 day, putting objects in a BlockingQueue -specifically tweets, streamed from Twitter Streaming API via Twitter4j- and the consumer thread to consume these objects from the queue and write them to file. I've used the PC logic from Read the 30Million user id's one by one from the big file, where producer is the FileTask and consumer is the CPUTask (check first answer; my approach uses the same iterations/try-catch blocks with it). Of course I adapted the implementations accordingly. My main function is:

public static void main(String[] args) {
    ....

    final int threadCount = 2;

    // BlockingQueue with a capacity of 200
    BlockingQueue<Tweet> tweets = new ArrayBlockingQueue<>(200);

    // create thread pool with given size
    ExecutorService service = Executors.newFixedThreadPool(threadCount);

    Future<?> f = service.submit(new GathererTask(tweets));
    try {
        f.get(1,TimeUnit.MINUTES); // Give specific time to the GathererTask
    } catch (InterruptedException | ExecutionException | TimeoutException e) {
        f.cancel(true); // Stop the Gatherer
    }

    try {
        service.submit(new FileTask(tweets)).get(); // Wait til FileTask completes
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }

    service.shutdownNow();

   try {
        service.awaitTermination(7, TimeUnit.DAYS);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

Now, the problem is that, although it does stream the tweets and writes them to file, it never terminates and never gets to the f.cancel(true) part. What should I change for it to work properly? Also, could you explain in your answer what went wrong here with the thread logic, so I learn from my mistake? Thank you in advance.

These are the run() functions of my PC classes:

Producer:

@Override
    public void run() {
        StatusListener listener = new StatusListener(){
            public void onStatus(Status status) {
                try {
                    tweets.put(new Tweet(status.getText(),status.getCreatedAt(),status.getUser().getName(),status.getHashtagEntities()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    Thread.currentTread.interrupt(); // Also tried this command
            }
        }

        public void onException(Exception ex) {
            ex.printStackTrace();
        }
    };
    twitterStream.addListener(listener);
    ... // More Twitter4j commands
}

Consumer:

public void run() {
    Tweet tweet;
    try(PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter("out.csv", true)))) {
        while(true) {
            try {
                // block if the queue is empty
                tweet = tweets.take();
                writeTweetToFile(tweet,out);

            } catch (InterruptedException ex) {
                break; // GathererTask has completed
            }
        }
        // poll() returns null if the queue is empty
        while((tweet = tweets.poll()) != null) {
            writeTweetToFile(tweet,out);
        }

    } catch (IOException e) {
        e.printStackTrace();
    }
}
Community
  • 1
  • 1
syfantid
  • 366
  • 5
  • 20

1 Answers1

0

You should check if your Thread classes are handling the InterruptedException, if not, they will wait forever. This might help.

Community
  • 1
  • 1
User2709
  • 573
  • 5
  • 17
  • I edited my question, so now it also includes the PC run methods. – syfantid Feb 22 '16 at 15:23
  • Those look fine to me, I think you would need to take a thread dump with jstack and see where the threads are hung. You could find quick instructions http://middlewaremagic.com/weblogic/?p=2281 – User2709 Feb 23 '16 at 04:24