4

I'm reading tweets from Twitter Streaming API. After connecting to the API, I'm getting a generator.

I'm looping through each tweet received but I want to exit from the iterator, say, at 18PM. After receiving each tweet, I'm checking if it's later than the specified timestamp and stopping.

The issue is that I'm not receiving tweets frequently enough. So, I could receive one at 17:50 and the next one at 19PM. That's when I'll find out that the time has passed and I need to stop.

Is there a way to force the stop at 18PM exactly?

Here's a high-level view of my code:

def getStream(tweet_iter):
    for tweet in tweet_iter:
        #do stuff
        if time_has_passed():
            return

tweet_iter = ConnectAndGetStream()
getStream(tweet_iter)
Stergios
  • 3,126
  • 6
  • 33
  • 55
  • 2
    Side note: it is a good idea to follow PEP 8 (`getStream` should be `get_stream`, as is officially recommended). – Eric O. Lebigot Nov 11 '16 at 09:21
  • Why does it matter if your script doesn't stop running precisely at six? – jonrsharpe Nov 11 '16 at 09:22
  • 2
    I guess the time to get the yieldvalue back from the tweet-generator is dynamic, so you would have to wrap the next()-call in some kind of timeout to make room to check what time it is. See http://stackoverflow.com/questions/492519/timeout-on-a-function-call – Moberg Nov 11 '16 at 09:25
  • 1
    See [Non-blocking generator on Python](http://stackoverflow.com/q/19736463/12892). One of the answers suggests putting the tweets in a `Queue` in a separate thread and process them in another using [`Queue.get`](https://docs.python.org/library/queue.html#queue.Queue.get) with the right `block` and `timeout` parameters. – Cristian Ciupitu Nov 11 '16 at 10:10

3 Answers3

2

Create a separate thread for the producer and use a Queue to communicate. I also had to use a threading.Event for stopping the producer.

import itertools, queue, threading, time

END_TIME = time.time() + 5  # run for ~5 seconds

def time_left():
    return END_TIME - time.time()

def ConnectAndGetStream():             # stub for the real thing
    for i in itertools.count():
        time.sleep(1)
        yield "tweet {}".format(i)

def producer(tweets_queue, the_end):   # producer
    it = ConnectAndGetStream()
    while not the_end.is_set():
        tweets_queue.put(next(it))

def getStream(tweets_queue, the_end):  # consumer
    try:
        while True:
            tweet = tweets_queue.get(timeout=time_left())
            print('Got', tweet)
    except queue.Empty:
        print('THE END')
        the_end.set()

tweets_queue = queue.Queue()  # you might wanna use the maxsize parameter
the_end = threading.Event()
producer_thread = threading.Thread(target=producer,
                                   args=(tweets_queue, the_end))
producer_thread.start()
getStream(tweets_queue, the_end)
producer_thread.join()
Cristian Ciupitu
  • 20,270
  • 7
  • 50
  • 76
1

Your problem could be resolved by splitting the functionality of your design into two separated processes:

  1. A twitter process that acts as wrapper to Twitter API and
  2. A monitor process that is able to terminate the twitter process when the exit time is reached.

The following piece of code prototypes the functionality described above using Python's multiprocessing module:

import multiprocessing as mp
import time

EXIT_TIME = '12:21' #'18:00'

def twitter():

    while True:
        print 'Twittttttttttt.....'
        time.sleep(5)

def get_time():

    return time.ctime().split()[3][:5]

if __name__ == '__main__':

    # Execute the function as a process
    p = mp.Process( target=twitter, args=() )
    p.start()

    # Monitoring the process p
    while True:
        print 'Checking the hour...'
        if get_time() == EXIT_TIME:
            p.terminate()
            print 'Current time:', time.ctime()
            print 'twitter process has benn terminated...'
            break
        time.sleep(5)

Of course you can use p.join(TIMEOUT) instead of using the while True loop presented in my example as pointed here.

Community
  • 1
  • 1
funk
  • 2,221
  • 1
  • 24
  • 23
  • Thank you. Your prototype seems to be working perfectly however I face some issues. I'm passing some arguments to the 'twitter' function. Among them, I pass a logger object and I'm getting this error message "TypeError: can't pickle thread.lock objects". Do you know something about this? – Stergios Nov 11 '16 at 11:19
  • Check the following post: http://stackoverflow.com/a/7865512/2194843 Includes a workaround for the type of error you faced. – funk Nov 11 '16 at 11:45
1

Here is an example with threading and python scheduler:

import threading
import time
import os
import schedule

def theKillingJob():
    print("Kenny and Cartman die!")
    os._exit(1)

schedule.every().day.at("18:00").do(theKillingJob,'It is 18:00')

def getStream(tweet_iter):
    for tweet in tweet_iter:
        #do stuff

def kenny():
    while True:
        print("Kenny alive..")
        schedule.run_pending()
        time.sleep(1)

def cartman():
    while True:
        print("Cartman alive..")

        tweet_iter = ConnectAndGetStream()
        getStream(tweet_iter)

        # You can change whenever you want to check for tweets by changing sleep time here
        time.sleep(1)

if __name__ == '__main__':
    daemon_kenny = threading.Thread(name='kenny', target=kenny)
    daemon_cartman = threading.Thread(name='cartman', target=cartman)
    daemon_kenny.setDaemon(True)
    daemon_cartman.setDaemon(True)

    daemon_kenny.start()
    daemon_cartman.start()
    daemon_kenny.join()
    daemon_cartman.join()
Abhijay Ghildyal
  • 4,044
  • 6
  • 33
  • 54