0

I set up a system to filter the twitter real time stream sample. Obviously, the database writes are too slow to keep up with anything more complex than a couple of low-volume keywords. I implemented django-rq as a simple queuing system to push the tweets off into a redis based queue as they came in, and that works great. My issue is on the other side. The context to this question is I have a system that's running right now, with 1.5m tweets for analysis, and another 375,000 queued through redis. At current rates of performance, it's going to take me ~3 days to catch up, if I turn off the streams, which I don't want to. If I maintain the streams, then it'll take about a month, on my last estimates.

The database has a couple of million rows across two main tables now, and the writes are very slow. The optimal number of rq-workers seems to be four, and that's averaging out at 1.6 queue tasks per second. (Code of what's being enqueued below). I thought that maybe the issue was the opening of DB connections for every new queue task, so put CONN_MAX_AGE to 60, but that hasn't improved anything.

Having just tested this on localhost, I got in excess of 13 writes/second, on a Macbook 2011, with Chrome, etc etc running, but there are only a few thousand rows in that database, which leads me to believe it's size related. There are a couple of get_or_create commands I'm using (see below), which could be slowing things down, but can't see any other way through using them - I need to check if the user exists, and I need to check if the tweet already exists (I could possibly, I suspect, move the latter to a try/except, on the basis that tweets coming in from the live stream shouldn't already exist, for obvious reasons.) Would I get much performance gain out of that? As this is running still, I'm keen to optimise the code a bit and get some faster/more efficient workers in there so I can catch up! Would running a pre-vetting worker to batch things up work? (i.e. so I can batch create users that don't exist, or something similar?)

I"m running a 4 Core/8Gb Ram droplet on digital ocean, so feel this is some pretty terrible performance, and presumably code related. Where am I going wrong here?
(I've posted this here rather than code-review, as I think this is relevant to the Q&A format for SO, as I'm trying to solve a specific code problem, rather than 'how can I do this generally better?')

Note: I'm working in django 1.6 as this is code that I've had floating around for a while and wasn't confident about upgrading at the time - it's not public facing, so unless there's a compelling reason right now (like this performance issue), I wasn't going to upgrade (for this project).

Stream Listener:

class StdOutListener(tweepy.StreamListener):
            def on_data(self, data):
                # Twitter returns data in JSON format - we need to decode it first
                decoded = json.loads(data)
                #print type(decoded), decoded
                # Also, we convert UTF-8 to ASCII ignoring all bad characters sent by users
                try:
                    if decoded['lang'] == 'en':
                        django_rq.enqueue(read_both, decoded)
                    else:
                        pass
                except KeyError,e:
                    print "Error on Key", e
                except DataError, e:
                    print "DataError", e
                return True

            def on_error(self, status):
                print status

Read User/Tweet/Both

def read_user(tweet):
    from harvester.models import User
    from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned
    #We might get weird results where user has changed their details"], so first we check the UID.
    #print "MULTIPLE USER DEBUG", tweet["user"]["id_str"]
    try:
        current_user = User.objects.get(id_str=tweet["user"]["id_str"])
        created=False
        return current_user, created
    except ObjectDoesNotExist:
        pass
    except MultipleObjectsReturned:
        current_user = User.objects.filter(id_str=tweet["user"]["id_str"])[0]
        return current_user, False
    if not tweet["user"]["follow_request_sent"]:
        tweet["user"]["follow_request_sent"] = False
    if not tweet["user"]["following"]:
        tweet["user"]["following"] = False
    if not tweet["user"]["description"]:
        tweet["user"]["description"] = " "
    if not tweet["user"]["notifications"]:
        tweet["user"]["notifications"] = False

    #If that doesn't work"], then we'll use get_or_create (as a failback rather than save())
    from dateutil.parser import parse
    if not tweet["user"]["contributors_enabled"]:
        current_user, created = User.objects.get_or_create(
            follow_request_sent=tweet["user"]["follow_request_sent"],
            _json = {},
            verified = tweet["user"]["verified"],
            followers_count = tweet["user"]["followers_count"],
            profile_image_url_https = tweet["user"]["profile_image_url_https"],
            id_str = tweet["user"]["id_str"],
            listed_count = tweet["user"]["listed_count"],
            utc_offset = tweet["user"]["utc_offset"],
            statuses_count = tweet["user"]["statuses_count"],
            description = tweet["user"]["description"],
            friends_count = tweet["user"]["friends_count"],
            location = tweet["user"]["location"],
            profile_image_url= tweet["user"]["profile_image_url"],
            following = tweet["user"]["following"],
            geo_enabled = tweet["user"]["geo_enabled"],
            profile_background_image_url =tweet["user"]["profile_background_image_url"],
            screen_name = tweet["user"]["screen_name"],
            lang =  tweet["user"]["lang"],
            profile_background_tile = tweet["user"]["profile_background_tile"],
            favourites_count = tweet["user"]["favourites_count"],
            name = tweet["user"]["name"],
            notifications = tweet["user"]["notifications"],
            url = tweet["user"]["url"],
            created_at = parse(tweet["user"]["created_at"]),
            contributors_enabled = False,
            time_zone = tweet["user"]["time_zone"],
            protected = tweet["user"]["protected"],
            default_profile = tweet["user"]["default_profile"],
            is_translator = tweet["user"]["is_translator"]
        )
    else:
        current_user, created = User.objects.get_or_create(
            follow_request_sent=tweet["user"]["follow_request_sent"],
            _json = {},
            verified = tweet["user"]["verified"],
            followers_count = tweet["user"]["followers_count"],
            profile_image_url_https = tweet["user"]["profile_image_url_https"],
            id_str = tweet["user"]["id_str"],
            listed_count = tweet["user"]["listed_count"],
            utc_offset = tweet["user"]["utc_offset"],
            statuses_count = tweet["user"]["statuses_count"],
            description = tweet["user"]["description"],
            friends_count = tweet["user"]["friends_count"],
            location = tweet["user"]["location"],
            profile_image_url= tweet["user"]["profile_image_url"],
            following = tweet["user"]["following"],
            geo_enabled = tweet["user"]["geo_enabled"],
            profile_background_image_url =tweet["user"]["profile_background_image_url"],
            screen_name = tweet["user"]["screen_name"],
            lang =  tweet["user"]["lang"],
            profile_background_tile = tweet["user"]["profile_background_tile"],
            favourites_count = tweet["user"]["favourites_count"],
            name = tweet["user"]["name"],
            notifications = tweet["user"]["notifications"],
            url = tweet["user"]["url"],
            created_at = parse(tweet["user"]["created_at"]),
            contributors_enabled = tweet["user"]["contributers_enabled"],
            time_zone = tweet["user"]["time_zone"],
            protected = tweet["user"]["protected"],
            default_profile = tweet["user"]["default_profile"],
            is_translator = tweet["user"]["is_translator"]
        )
    #print "CURRENT USER:""], type(current_user)"], current_user
    #current_user"], created = User.objects.get_or_create(current_user)
    return current_user, created

def read_tweet(tweet, current_user):
    import logging
    logger = logging.getLogger('django')
    from datetime import date, datetime
    #print "Inside read_Tweet"
    from harvester.models import Tweet
    from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned
    from django.db import DataError
    #We might get weird results where user has changed their details"], so first we check the UID.
    #print tweet_data["created_at"]
    from dateutil.parser import parse
    tweet["created_at"] = parse(tweet["created_at"])
    try:
        #print "trying tweet_data["id"
        current_tweet =Tweet.objects.get(id_str=tweet["id_str"])
        created=False
        return current_user, created
    except ObjectDoesNotExist:
        pass
    except MultipleObjectsReturned:
        current_tweet =Tweet.objects.filter(id_str=tweet["id_str"])[0]
    try:
        current_tweet, created = Tweet.objects.get_or_create(
        truncated=tweet["truncated"],
        text=tweet["text"],
        favorite_count=tweet["favorite_count"],
        author = current_user,
        _json = {},
        source=tweet["source"],
        retweeted=tweet["retweeted"],
        coordinates = tweet["coordinates"],
        entities = tweet["entities"],
        in_reply_to_screen_name = tweet["in_reply_to_screen_name"],
        id_str = tweet["id_str"],
        retweet_count = tweet["retweet_count"],
        favorited = tweet["favorited"],
        user = tweet["user"],
        geo = tweet["geo"],
        in_reply_to_user_id_str = tweet["in_reply_to_user_id_str"],
        lang = tweet["lang"],
        created_at = tweet["created_at"],
        place = tweet["place"])
        print "DEBUG", current_user, current_tweet
        return current_tweet, created
    except DataError, e:
        #Catchall to pick up non-parsed tweets
        print "DEBUG ERROR", e, tweet
        return None, False

def read_both(tweet):
    current_user, created = read_user(tweet)
    current_tweet, created = read_tweet(tweet, current_user)
Withnail
  • 3,128
  • 2
  • 30
  • 47
  • Are you sure the error is from django? Reason for my question is because twitter limits the rate you get the data from the Streaming API. 13 writes/sec is tremendous, what are you getting for django? Also, I would recommend changing the filter for when you call the stream listener to the desired language instead of filtering it after you get your data (that alone should speed things up a bit). – Leb Jun 22 '15 at 00:27
  • The issue isn't with the Stream Listener - it's set to pick up a wide geography, and list of keywords, and can keep up with, at one point over the weekend, in excess of 100 tweets/second to memory- the queue in Redis has over 350,000 queued tweets waiting to be written to the DB, so it's the write side - I can see how many it's done in the last 500 seconds. The 13/second was in django on localhost to a basically empty database. The twitter filter doesn't (afaik) let you filter on language directly, you have to do it as the stream is received, but happy to be corrected. – Withnail Jun 22 '15 at 06:21

1 Answers1

2

I eventually managed to cobble together an answer from some redditors and a couple of other things.

Fundamentally, though I was doing a double lookup on the id_str field, which wasn't indexed. I added indexes db_index=True to that field on both read_tweet and read_user, and moved read tweet to a try/except Tweet.objects.create approach, falling back to the get_or_create if there's a problem, and saw a 50-60x speed improvement, with the workers now being scalable - if I add 10 workers, I get 10x speed.

I currently have one worker that's happily processing 6 or so tweets a second. Next up I'll add a monitoring daemon to check the queue size and add extra workers if it's still increasing.

tl;dr - REMEMBER INDEXING!

Withnail
  • 3,128
  • 2
  • 30
  • 47