7

I have a python script that continuously stores tweets related to tracked keywords to a file. However, the script tends to crash repeatedly due to an error appended below. How do I edit the script so that it automatically restarts? I've seen numerous solutions including this (Restarting a program after exception) but I'm not sure how to implement it in my script.

import sys
import tweepy
import json
import os

consumer_key=""
consumer_secret=""
access_key = ""
access_secret = ""

auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_key, access_secret)
api = tweepy.API(auth)
# directory that you want to save the json file
os.chdir("C:\Users\json_files")
# name of json file you want to create/open and append json to
save_file = open("12may.json", 'a')

class CustomStreamListener(tweepy.StreamListener):
    def __init__(self, api):
        self.api = api
        super(tweepy.StreamListener, self).__init__()

        # self.list_of_tweets = []

    def on_data(self, tweet):
        print tweet
        save_file.write(str(tweet))

    def on_error(self, status_code):
        print >> sys.stderr, 'Encountered error with status code:', status_code
        return True # Don't kill the stream
        print "Stream restarted"

    def on_timeout(self):
        print >> sys.stderr, 'Timeout...'
        return True # Don't kill the stream
        print "Stream restarted"

sapi = tweepy.streaming.Stream(auth, CustomStreamListener(api))
sapi.filter(track=["test"])

===========================================================================

Traceback (most recent call last):
  File "C:\Users\tweets_to_json.py", line 41, in <module>
    sapi.filter(track=["test"])
  File "C:\Python27\lib\site-packages\tweepy-2.3-py2.7.egg\tweepy\streaming.py", line 316, in filter
    self._start(async)
  File "C:\Python27\lib\site-packages\tweepy-2.3-py2.7.egg\tweepy\streaming.py", line 235, in _start
    self._run()
  File "C:\Python27\lib\site-packages\tweepy-2.3-py2.7.egg\tweepy\streaming.py", line 165, in _run
    self._read_loop(resp)
  File "C:\Python27\lib\site-packages\tweepy-2.3-py2.7.egg\tweepy\streaming.py", line 206, in _read_loop
    for c in resp.iter_content():
  File "C:\Python27\lib\site-packages\requests-1.2.3-py2.7.egg\requests\models.py", line 541, in generate
    chunk = self.raw.read(chunk_size, decode_content=True)
  File "C:\Python27\lib\site-packages\requests-1.2.3-py2.7.egg\requests\packages\urllib3\response.py", line 171, in read
    data = self._fp.read(amt)
  File "C:\Python27\lib\httplib.py", line 543, in read
    return self._read_chunked(amt)
  File "C:\Python27\lib\httplib.py", line 603, in _read_chunked
    value.append(self._safe_read(amt))
  File "C:\Python27\lib\httplib.py", line 660, in _safe_read
    raise IncompleteRead(''.join(s), amt)
IncompleteRead: IncompleteRead(0 bytes read, 1 more expected)
Community
  • 1
  • 1
Eugene Yan
  • 841
  • 2
  • 9
  • 23
  • does it goes to on_error when an crash occurs – sundar nataraj May 12 '14 at 05:34
  • I don't think so as it doesn't print `'Encountered error with status code:'` – Eugene Yan May 12 '14 at 05:48
  • try putting sapi=tweepy in try and except..but it is not good way.just try it – sundar nataraj May 12 '14 at 05:56
  • (''.join(s), amt) where this line in ur program – sundar nataraj May 12 '14 at 05:58
  • like this? `while True: try: sapi = tweepy.streaming.Stream(auth, CustomStreamListener(api)) sapi.filter(track=["Sony", "Xperia", "Samsung", "s4", "s5", "note" "3", "HTC", "Blackberry", "q5", "q10", "z10", "Nokia", "Lumia", "Nexus", "LG", "Huawei", "Motorola"]) except: pass ` – Eugene Yan May 12 '14 at 05:59
  • ya but for every call of streaming r u getting error – sundar nataraj May 12 '14 at 06:01
  • okay I'm running the new programme with your suggestion now and I don't see the errors. If it's not a good way, what would be a better way then? The other solution on SO also seem to put the function call within a while-try loop. – Eugene Yan May 12 '14 at 06:08
  • u showed an error in join . we should find why that is caused and we have to check all condition there. since in except ur just giving pass.thats not good method . it will go to pass but u dont get error. so can u post total program . – sundar nataraj May 12 '14 at 06:11
  • let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/52488/discussion-between-eugeneyan-and-sundar-nataraj-) – Eugene Yan May 12 '14 at 06:12

5 Answers5

19

Figured out how to incorporate the while/try loop by writing a new function for the stream:

def start_stream():
    while True:
        try:
            sapi = tweepy.streaming.Stream(auth, CustomStreamListener(api))
            sapi.filter(track=["Samsung", "s4", "s5", "note" "3", "HTC", "Sony", "Xperia", "Blackberry", "q5", "q10", "z10", "Nokia", "Lumia", "Nexus", "LG", "Huawei", "Motorola"])
        except: 
            continue

start_stream()

I tested the auto restart by manually interrupting the program with CMD + C. Nonetheless, happy to hear of better ways to test such functionality.

Eugene Yan
  • 841
  • 2
  • 9
  • 23
  • 1
    I had to catch KeyboardInterrupt to have a way to exit the script except KeyboardInterrupt: break – Rocco Dec 20 '15 at 15:34
5

I had this problem occurring recently and wanted to share more detailed information about it.

The error that's causing it is because the streaming filter that's chosen is too broad test. Therefore you receive streams at a faster rate than you can accept which causes an IncompleRead error.

This can be fixed by either refining the search or by using a more specific exception:

from http.client import IncompleteRead
...
try:
    sapi = tweepy.streaming.Stream(auth, CustomStreamListener(api))
    sapi.filter(track=["test"])
except IncompleRead:
    pass
Leb
  • 15,483
  • 10
  • 56
  • 75
2

It's better to use recursive call instead of infinite while loop. Take a look at filter function below. e.g.

from tweepy import Stream
from service.twitter.listener.tweety_listener import TweetyStreamDataListener
from settings import twitter_config

class Tweety(object):
    def __init__(self, listener=TweetyStreamDataListener()):
        self.listener = listener
        self.__auth__ = None

    def __authenticate__(self):
        from tweepy import OAuthHandler
        if self.__auth__ is None:
            self.__auth__ = OAuthHandler(twitter_config['consumer_key'], twitter_config['consumer_secret'])
            self.__auth__.set_access_token(twitter_config['access_token'], twitter_config['access_token_secret'])
        return self.__auth__ is not None

    def __streamer__(self):
        is_authenticated = self.__authenticate__()
        if is_authenticated:
            return Stream(self.__auth__, self.listener)
        return None

    def filter(self, keywords=None, async=True):
        streamer = self.__streamer__()
        try:
            print "[STREAM] Started steam"
            streamer.filter(track=keywords, async=async)
        except Exception as ex:
            print "[STREAM] Stream stopped! Reconnecting to twitter stream"
            print ex.message, ex.args
            self.filter(keywords=keywords, async=async)
Suyash Soni
  • 219
  • 2
  • 9
  • I think just the opposite: handling this situation with recursion instead of a loop will eventually cause a memory issue, specially for services designed to be long time in production. – Basa Jun 17 '20 at 09:11
0

One option would be to try the module multiprocessing. I would argue for two reasons.

  1. Ability to run the process for a set period of time without having to "kill" the whole script/process.
  2. You can place it in a for loop, and have it just start over whenever it dies or you choose to kill it.

I have taken a different approach entirely, but that is partly because I am saving my tweets at regular(or supposedly regular) intervals. @ Eugeune Yan, I think the try except is a simple and elegant way to deal with the problem. Although, and hopefully someone will have a comment on this; you don't really know when or if it failed with that method, but idk if that really matters(and it would be easy to write a few lines to make that happen).

import tiipWriter #Twitter & Textfile writer I wrote with Tweepy.
from add import ThatGuy # utility to supply log file names that won't overwrite old ones.
import multiprocessing


if __name__ == '__main__':
        #number of time increments script needs to run        
        n = 60
        dir = "C:\\Temp\\stufffolder\\twiitlog"
        list = []
        print "preloading logs"
        ThatGuy(n,dir,list) #Finds any existing logs in the folder and one-ups it

        for a in list:
            print "Collecting Tweets....."
            # this is my twitter/textfile writer process
            p = multiprocessing.Process(target=tiipWriter.tiipWriter,args = (a,)) 
            p.start()
            p.join(1800) # num of seconds the process will run
            if p.is_alive():
                print " \n Saving Twitter Stream log   @  " + str(a)
                p.terminate()
                p.join()
            a = open(a,'r')
            a.close()
            if a.closed == True:
                print "File successfully closed"
            else: a.close()
            print "jamaica" #cuz why not
bwp8nt
  • 113
  • 2
0

I have written a 2 process streaming using tweepy. It downloads, compresses and dumps the data into files which are rotated every hour. The program is restarted every hour and it can check the streaming process periodically to see if any new tweet is downloaded or not. If not it restarts the whole system.

The code can be found here. Note that for compression it uses pipes. In case compression is not needed modifying the source is easy.

Ash
  • 3,428
  • 1
  • 34
  • 44