4

I'm using tweepy to datamine the public stream of tweets for keywords. This is pretty straightforward and has been described in multiple places:

http://runnable.com/Us9rrMiTWf9bAAW3/how-to-stream-data-from-twitter-with-tweepy-for-python

http://adilmoujahid.com/posts/2014/07/twitter-analytics/

Copying code directly from the second link:

#Import the necessary methods from tweepy library
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream

#Variables that contains the user credentials to access Twitter API 
access_token = "ENTER YOUR ACCESS TOKEN"
access_token_secret = "ENTER YOUR ACCESS TOKEN SECRET"
consumer_key = "ENTER YOUR API KEY"
consumer_secret = "ENTER YOUR API SECRET"


#This is a basic listener that just prints received tweets to stdout.
class StdOutListener(StreamListener):

    def on_data(self, data):
        print data
        return True

    def on_error(self, status):
        print status


if __name__ == '__main__':

    #This handles Twitter authetification and the connection to Twitter Streaming API
    l = StdOutListener()
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)
    stream = Stream(auth, l)

    #This line filter Twitter Streams to capture data by the keywords: 'python', 'javascript', 'ruby'
    stream.filter(track=['python', 'javascript', 'ruby'])

What I can't figure out is how can I stream this data into a python variable? Instead of printing it to the screen... I'm working in an ipython notebook and want to capture the stream in some variable, foo after streaming for a minute or so. Furthermore, how do I get the stream to timeout? It runs indefinitely in this manner.

Related:

Using tweepy to access Twitter's Streaming API

Community
  • 1
  • 1
Adam Hughes
  • 14,601
  • 12
  • 83
  • 122

2 Answers2

2

Yes, in the post, @Adil Moujahid mentions that his code ran for 3 days. I adapted the same code and for initial testing, did the following tweaks:

a) Added a location filter to get limited tweets instead of universal tweets containing the keyword. See How to add a location filter to tweepy module. From here, you can create an intermediate variable in the above code as follows:

stream_all = Stream(auth, l)

Suppose we, select San Francisco area, we can add:

stream_SFO = stream_all.filter(locations=[-122.75,36.8,-121.75,37.8])  

It is assumed that the time to filter for location is lesser than filter for the keywords.

(b) Then you can filter for the keywords:

tweet_iter = stream_SFO.filter(track=['python', 'javascript', 'ruby']) 

(c) You can then write it to file as follows:

with open('file_name.json', 'w') as f:
        json.dump(tweet_iter,f,indent=1)

This should take much lesser time. I co-incidently wanted to address the same question that you have posted today. Hence, I don't have the execution time.

Hope this helps.

Community
  • 1
  • 1
KarthikS
  • 883
  • 1
  • 11
  • 17
  • Ok cool, let me play around with this some. Have you done anything in regard to streaming in a subprocess so it doesn't freeze out the program for 3 days while running? – Adam Hughes Feb 24 '15 at 19:38
  • Yes, what I used to do is to run the python program with nohup as follows: nohup python python_file.py & exit See [http://unix.stackexchange.com/questions/479/keep-ssh-sessions-running-after-disconnection]. Later, if there are problems and if you want to kill the process, try: ps -ef |grep nohup and use that process id to kill the process started with nohup: Such as: kill -9 1787 787 See [http://stackoverflow.com/questions/17385794/how-to-get-the-process-id-to-kill-a-nohup-process] – KarthikS Feb 24 '15 at 19:50
  • You can also use Mosh from MIT, which is essentially a persistent SSH that works on mobile devices. – Adam Erickson Aug 16 '16 at 10:33
2

I notice that you are looking to stream data into a variable for later use. The way that I have done this is to create a method to stream data into a database using sqlite3 and sqlalchemy.

For example, first here is the regular code:

import tweepy
import json
import time
import db_commands
import credentials

API_KEY = credentials.ApiKey 
API_KEY_SECRET = credentials.ApiKeySecret
ACCESS_TOKEN = credentials.AccessToken
ACCESS_TOKEN_SECRET = credentials.AccessTokenSecret

def create_auth_instance():
    """Set up Authentication Instance"""
    auth = tweepy.OAuthHandler(API_KEY, API_KEY_SECRET)
    auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
    api = tweepy.API(auth, wait_on_rate_limit = True)

    return api

class MyStreamListener(tweepy.StreamListener):
    """ Listen for tweets """
    def __init__(self, api=None):
        self.counter = 0
        # References the auth instance for the listener
        self.api = create_auth_instance()
        # Creates a database command instance
        self.dbms = db_commands.MyDatabase(db_commands.SQLITE, dbname='mydb.sqlite')
        # Creates a database table
        self.dbms.create_db_tables()


    def on_connect(self):
        """Notify when user connected to twitter"""
        print("Connected to Twitter API!")


    def on_status(self, tweet):
        """
        Everytime a tweet is tracked, add the contents of the tweet,
        its username, text, and date created, into a sqlite3 database
        """         
        user = tweet.user.screen_name
        text = tweet.text
        date_created = tweet.created_at

        self.dbms.insert(user, text, date_created)


    def on_error(self, status_code):
        """Handle error codes"""
        if status_code == 420:
            # Return False if stream disconnects
            return False  

def main():
    """Create twitter listener (Stream)"""
    tracker_subject = input("Type subject to track: ")
    twitter_listener = MyStreamListener()
    myStream = tweepy.Stream(auth=twitter_listener.api.auth, listener=twitter_listener)
    myStream.filter(track=[tracker_subject], is_async=True)


main()

As you can see in the code, we authenticate and create a listener and then activate a stream

twitter_listener = MyStreamListener()
myStream = tweepy.Stream(auth=twitter_listener.api.auth, listener=twitter_listener)
myStream.filter(track=[tracker_subject], is_async=True)

Everytime we receive a tweet, the 'on_status' function will execute, which can be used to perform a set of actions on the tweet data that is being streamed.

def on_status(self, tweet):
    """
    Everytime a tweet is tracked, add the contents of the tweet,
    its username, text, and date created, into a sqlite3 database
    """         
    user = tweet.user.screen_name
    text = tweet.text
    date_created = tweet.created_at

    self.dbms.insert(user, text, date_created)

Tweet data, tweet, is captured in three variables user, text, date_created and then referenced the Database controller initialized in the MyStreamListener Class's init function. This insert function is called from the imported db_commands file.

Here is the code located in db_commands.py file that is imported into the code using import db_commands.

from sqlalchemy import create_engine
from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey

# Global Variables
SQLITE                  = 'sqlite'
# MYSQL                   = 'mysql'
# POSTGRESQL              = 'postgresql'
# MICROSOFT_SQL_SERVER    = 'mssqlserver'

# Table Names
TWEETS           = 'tweets'


class MyDatabase:
    # http://docs.sqlalchemy.org/en/latest/core/engines.html
    DB_ENGINE = {
        SQLITE: 'sqlite:///{DB}',
        # MYSQL: 'mysql://scott:tiger@localhost/{DB}',
        # POSTGRESQL: 'postgresql://scott:tiger@localhost/{DB}',
        # MICROSOFT_SQL_SERVER: 'mssql+pymssql://scott:tiger@hostname:port/{DB}'
    }

    # Main DB Connection Ref Obj
    db_engine = None

    def __init__(self, dbtype, username='', password='', dbname=''):
        dbtype = dbtype.lower()

        if dbtype in self.DB_ENGINE.keys():
            engine_url = self.DB_ENGINE[dbtype].format(DB=dbname)

            self.db_engine = create_engine(engine_url)
            print(self.db_engine)

        else:
            print("DBType is not found in DB_ENGINE")


    def create_db_tables(self):
        metadata = MetaData()
        tweets = Table(TWEETS, metadata,
                      Column('id', Integer, primary_key=True),
                      Column('user', String),
                      Column('text', String),
                      Column('date_created', String),
                  )

        try:
            metadata.create_all(self.db_engine)
            print("Tables created")
        except Exception as e:
            print("Error occurred during Table creation!")
            print(e)

    # Insert, Update, Delete
    def execute_query(self, query=''):
        if query == '' : return

        print (query)
        with self.db_engine.connect() as connection:
            try:
                connection.execute(query)
            except Exception as e:
                print(e)

    def insert(self, user, text, date_created):
        # Insert Data
        query = "INSERT INTO {}(user, text, date_created)"\
                "VALUES ('{}', '{}', '{}');".format(TWEETS, user, text, date_created)
        self.execute_query(query)

This code uses sqlalchemy package to create a sqlite3 database and post tweets to a tweets table. Sqlalchemy can easily be installed with pip install sqlalchemy. If you use these two codes together, you should be able to scrape tweets through a filter into a databse. Please let me know if this helps and if you have any further questions.

RedSunDave
  • 51
  • 5