11

Running a python script using tweepy which streams (using the twitter streaming API) in a random sample of english tweets, for a minute and then alternates to searching (using the twitter searching API) for a minute and then returns. Issue I've found is that after about 40+ seconds the streaming crashes and gives the following error:

Full Error:

urllib3.exceptions.ProtocolError: ('Connection broken: IncompleteRead(0 bytes read)', IncompleteRead(0 bytes read))

The amount of bytes read can vary from 0 to well in the 1000's.

The first time this is seen the streaming cuts out prematurely and the search function starts early, after the search function is done it comes back to the stream once again and on the second recurrence of this error the code crashes.

The code I'm running is:

# Handles date time calculation
def calculateTweetDateTime(tweet):
    tweetDateTime = str(tweet.created_at)

    tweetDateTime = ciso8601.parse_datetime(tweetDateTime)
    time.mktime(tweetDateTime.timetuple())
    return tweetDateTime

# Checks to see whether that permitted time has past.
def hasTimeThresholdPast():
    global startTime
    if time.clock() - startTime > 60:
        return True
    else:
        return False

#override tweepy.StreamListener to add logic to on_status
class StreamListener(StreamListener):

    def on_status(self, tweet):
        if hasTimeThresholdPast():
            return False

        if hasattr(tweet, 'lang'):
            if tweet.lang == 'en':

                try:
                    tweetText = tweet.extended_tweet["full_text"]
                except AttributeError:
                    tweetText = tweet.text

                tweetDateTime = calculateTweetDateTime(tweet)

                entityList = DataProcessing.identifyEntities(True, tweetText)
                DataStorage.storeHotTerm(entityList, tweetDateTime)
                DataStorage.storeTweet(tweet)


    def on_error(self, status_code):
        def on_error(self, status_code):
            if status_code == 420:
                # returning False in on_data disconnects the stream
                return False


def startTwitterStream():

    searchTerms = []

    myStreamListener = StreamListener()
    twitterStream = Stream(auth=api.auth, listener=StreamListener())
    global geoGatheringTag
    if geoGatheringTag == False:
        twitterStream.filter(track=['the', 'this', 'is', 'their', 'though', 'a', 'an'], async=True, stall_warnings=True)

    if geoGatheringTag == True:
        twitterStream.filter(track=['the', 'this', 'is', 'their', 'though', 'a', 'an', 'they\'re'],
                             async=False, locations=[-4.5091, 55.7562, -3.9814, 55.9563], stall_warnings=True)



# ----------------------- Twitter API Functions ------------------------
# XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
# --------------------------- Main Function ----------------------------

startTime = 0


def main():
    global startTime
    userInput = ""
    userInput.lower()
    while userInput != "-1":
        userInput = input("Type ACTiVATE to activate the Crawler, or DATABASE to access data analytic option (-1 to exit): \n")
        if userInput.lower() == 'activate':
            while(True):
                startTime = time.clock()

                startTwitterStream()

                startTime = time.clock()
                startTwitterSearchAPI()

if __name__ == '__main__':
    main() 

I've trimmed out the search function, and database handling aspects given they're seperate and to avoid cluttering up the code.

If anyone has any ideas why this is happening and how I might solve it please let me know, I'd be curious on any insight.


Solutions I have tried:
Try/Except block with the http.client.IncompleteRead:
As per Error-while-fetching-tweets-with-tweepy

Setting Stall_Warning = to True:
As per Incompleteread-error-when-retrieving-twitter-data-using-python

Removing the english language filter.

Chris Cookman
  • 246
  • 2
  • 8

2 Answers2

12

Solved.

To those curious or who are experiencing a similar issue: after some experimentation I've discovered the backlog of incoming tweets was the issue. Every time the system recieves a tweet my system ran a process of entity identification and storing which cost a small piece of time and over the time of gathering several hundred to thousand tweets this backlog grew larger and larger until the API couldn't handle it and threw up that error.

Solution: Strip your "on_status/on_data/on_success" function to the bare essentials and handle any computations, i.e storing or entity identification, seperately after the streaming session has closed. Alternatively you could make your computations much more efficient and make the gap in time insubstantial, up to you.

Chris Cookman
  • 246
  • 2
  • 8
  • this helped me a lot, i was having the same issues. Basically the solution is to just dump the data and do the processing separately as you rightly mention.. – tezzaaa Mar 14 '19 at 21:42
  • Hi there. Thanks for this but what do you mean by stripping away the "on_status/on_data/on_success" function? I guess I am confused by the fact that you don't even implement the function in your StreamListener. – Steak Overflow Sep 26 '20 at 10:32
  • 1
    @SteakOverflow Hi there, so "on_status" is the first function declared under SteamListener: "class StreamListener(StreamListener): def on_status(self, tweet):" The other names I've offered with on_data/on_success are the commonly used alternative names for that type of function. Whatever name you choose the key is to minimize the intensity of the processing done on the data while the stream is active as it can overload it and cause it to crash. Whatever function you have that is reading in the data would be classed as the 'on_data' function. – Chris Cookman Sep 28 '20 at 13:19
  • how do you check backlog of tweepy? i am not quite sure how to validate that process. – A-nak Wannapaschaiyong Jan 29 '21 at 21:04
0

I'm just sharing my experience based on following user Chris Cookman's result. After doing as he advice, the same problem I had with you disappeared. But on my case I was using it with discord.py. So what I did was create a universal list (status_list) and whenever the tweepy on_status fires up, it'll append to that universal list.

Then I set up a @tasks.loop(seconds=10) using discord.py to monitor if status_list is not empty every few seconds, then if it detects that it has a content, it'll loop through it and then starts the process on each list.

Aeiddius
  • 328
  • 1
  • 3
  • 12