I notice that you are looking to stream data into a variable for later use. The way that I have done this is to create a method to stream data into a database using sqlite3 and sqlalchemy.
For example, first here is the regular code:
import tweepy
import json
import time
import db_commands
import credentials
API_KEY = credentials.ApiKey
API_KEY_SECRET = credentials.ApiKeySecret
ACCESS_TOKEN = credentials.AccessToken
ACCESS_TOKEN_SECRET = credentials.AccessTokenSecret
def create_auth_instance():
"""Set up Authentication Instance"""
auth = tweepy.OAuthHandler(API_KEY, API_KEY_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
api = tweepy.API(auth, wait_on_rate_limit = True)
return api
class MyStreamListener(tweepy.StreamListener):
""" Listen for tweets """
def __init__(self, api=None):
self.counter = 0
# References the auth instance for the listener
self.api = create_auth_instance()
# Creates a database command instance
self.dbms = db_commands.MyDatabase(db_commands.SQLITE, dbname='mydb.sqlite')
# Creates a database table
self.dbms.create_db_tables()
def on_connect(self):
"""Notify when user connected to twitter"""
print("Connected to Twitter API!")
def on_status(self, tweet):
"""
Everytime a tweet is tracked, add the contents of the tweet,
its username, text, and date created, into a sqlite3 database
"""
user = tweet.user.screen_name
text = tweet.text
date_created = tweet.created_at
self.dbms.insert(user, text, date_created)
def on_error(self, status_code):
"""Handle error codes"""
if status_code == 420:
# Return False if stream disconnects
return False
def main():
"""Create twitter listener (Stream)"""
tracker_subject = input("Type subject to track: ")
twitter_listener = MyStreamListener()
myStream = tweepy.Stream(auth=twitter_listener.api.auth, listener=twitter_listener)
myStream.filter(track=[tracker_subject], is_async=True)
main()
As you can see in the code, we authenticate and create a listener and then activate a stream
twitter_listener = MyStreamListener()
myStream = tweepy.Stream(auth=twitter_listener.api.auth, listener=twitter_listener)
myStream.filter(track=[tracker_subject], is_async=True)
Everytime we receive a tweet, the 'on_status' function will execute, which can be used to perform a set of actions on the tweet data that is being streamed.
def on_status(self, tweet):
"""
Everytime a tweet is tracked, add the contents of the tweet,
its username, text, and date created, into a sqlite3 database
"""
user = tweet.user.screen_name
text = tweet.text
date_created = tweet.created_at
self.dbms.insert(user, text, date_created)
Tweet data, tweet, is captured in three variables user, text, date_created and then referenced the Database controller initialized in the MyStreamListener Class's init function. This insert function is called from the imported db_commands file.
Here is the code located in db_commands.py file that is imported into the code using import db_commands.
from sqlalchemy import create_engine
from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey
# Global Variables
SQLITE = 'sqlite'
# MYSQL = 'mysql'
# POSTGRESQL = 'postgresql'
# MICROSOFT_SQL_SERVER = 'mssqlserver'
# Table Names
TWEETS = 'tweets'
class MyDatabase:
# http://docs.sqlalchemy.org/en/latest/core/engines.html
DB_ENGINE = {
SQLITE: 'sqlite:///{DB}',
# MYSQL: 'mysql://scott:tiger@localhost/{DB}',
# POSTGRESQL: 'postgresql://scott:tiger@localhost/{DB}',
# MICROSOFT_SQL_SERVER: 'mssql+pymssql://scott:tiger@hostname:port/{DB}'
}
# Main DB Connection Ref Obj
db_engine = None
def __init__(self, dbtype, username='', password='', dbname=''):
dbtype = dbtype.lower()
if dbtype in self.DB_ENGINE.keys():
engine_url = self.DB_ENGINE[dbtype].format(DB=dbname)
self.db_engine = create_engine(engine_url)
print(self.db_engine)
else:
print("DBType is not found in DB_ENGINE")
def create_db_tables(self):
metadata = MetaData()
tweets = Table(TWEETS, metadata,
Column('id', Integer, primary_key=True),
Column('user', String),
Column('text', String),
Column('date_created', String),
)
try:
metadata.create_all(self.db_engine)
print("Tables created")
except Exception as e:
print("Error occurred during Table creation!")
print(e)
# Insert, Update, Delete
def execute_query(self, query=''):
if query == '' : return
print (query)
with self.db_engine.connect() as connection:
try:
connection.execute(query)
except Exception as e:
print(e)
def insert(self, user, text, date_created):
# Insert Data
query = "INSERT INTO {}(user, text, date_created)"\
"VALUES ('{}', '{}', '{}');".format(TWEETS, user, text, date_created)
self.execute_query(query)
This code uses sqlalchemy package to create a sqlite3 database and post tweets to a tweets table. Sqlalchemy can easily be installed with pip install sqlalchemy. If you use these two codes together, you should be able to scrape tweets through a filter into a databse. Please let me know if this helps and if you have any further questions.