I'm fairly new to this task so please help me identify the concepts I'm missing.
I'm trying to stream data from an API to my SQLite db and let a Flask app consume the data. I defined the models in models.py
like this
# models.py
import os
from flask_sqlalchemy import SQLAlchemy
# Create sqlite db
db = SQLAlchemy()
class MyDataModel(db.Model):
# Manual table name choice
__tablename__ = 'table1'
id = db.Column(db.Integer, primary_key=True)
created_at = db.Column(db.Text)
text = db.Column(db.Text)
def __init__(self, created_at, text):
self.created_at = created_at
self.text = text
def __repr__(self):
return f"Data: {self.text} ... created at {self.created_at}"
In app.py
I have a simple view function that counts the rows and return a Server Sent Event to the frontend for realtime tracking.
# app.py
import os
import time
from flask import Flask, render_template, url_for, redirect, Response
from flask_migrate import Migrate
from models import db, MyDataModel
from settings import *
app = Flask(__name__)
logging.basicConfig(level=logging.DEBUG)
basedir = os.path.abspath(os.path.dirname(__file__))
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + os.path.join(basedir, 'data.sqlite')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db.init_app(app)
Migrate(app, db)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/getcount')
def getcount():
def count_stream():
count = db.session.query(MyDataModel).count()
while True:
yield f"data:{str(count)}\n\n"
time.sleep(0.5)
return Response(count_stream(), mimetype='text/event-stream')
if __name__ == "__main__":
app.run(debug=True, port=PORT)
Now I have another python script stream_to_db.py
that gets data from an API as a stream, approximately like this
# stream_to_db.py
import logging
import os
from models import db, MyDataModel
from settings import *
from SomeExternalAPI import SomeAPI
def stream_to_db():
api = SomeAPI(
API_KEY, API_SECRET_KEY, ACCESS_TOKEN, ACCESS_TOKEN_SECRET
)
r = api.request()
for item in r:
created_at, text = item['created_at'], item['text']
logging.info(text)
datum = MyDataModel(created_at, text)
db.session.add(datum)
db.session.commit()
# Stream data to sqlite
stream_to_db()
When I try to run this python stream_to_db.py
I get error
RuntimeError: No application found. Either work inside a view function or push an application context. See http://flask-sqlalchemy.pocoo.org/contexts/.
I looked at the documentation of application context but am still confused. Say I go without SQLAlchemy and use Python and SQL directly for data insertion, this stream_to_db.py
script should be independent of the Flask app. But if I still want to leverage SQLAlchemy for its syntax and model definitions from models.py
, how should I do it?
Conceptually I feel the streaming to db part is independent of the Flask app and should be a script that's essentially a while True
loop and goes forever. The Flask app just reads the db, send data to frontend and does nothing else. I tried put the stream_to_db()
function into __main__
in app.py
but that doesn't make sense, app.run()
and stream_to_db()
are both essentially while True
loops and can't be put together.
I'm lost and miss key concepts here. Please help and suggest the right way/best practices to do it. I feel this is a really basic task which should have a best practice and a set of dedicated tools already. Thanks in advance!
EDIT
To further experiment, I imported app
into stream_to_db.py
and added
with app.app_context():
stream_to_db()
Now I can run python stream_to_db.py
with no problem, but if I start the Flask app at the same time, I get several
Debugging middleware caught exception in streamed response at a point where response headers were already sent.
and also
Traceback (most recent call last):
File "/Users/<username>/webapps/<appname>/venv/lib/python3.6/site-packages/werkzeug/wsgi.py", line 507, in __next__
return self._next()
File "/Users/<username>/webapps/<appname>/venv/lib/python3.6/site-packages/werkzeug/wrappers/base_response.py", line 45, in _iter_encoded
for item in iterable:
File "/Users/<username>/webapps/<appname>/app.py", line 33, in count_stream
count = db.session.query(CardanoTweet).count()
File "/Users/<username>/webapps/<appname>/venv/lib/python3.6/site-packages/sqlalchemy/orm/scoping.py", line 162, in do
return getattr(self.registry(), name)(*args, **kwargs)
File "/Users/<username>/webapps/<appname>/venv/lib/python3.6/site-packages/sqlalchemy/util/_collections.py", line 1012, in __call__
return self.registry.setdefault(key, self.createfunc())
File "/Users/<username>/webapps/<appname>/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 3214, in __call__
return self.class_(**local_kw)
File "/Users/<username>/webapps/<appname>/venv/lib/python3.6/site-packages/flask_sqlalchemy/__init__.py", line 136, in __init__
self.app = app = db.get_app()
File "/Users/<username>/webapps/<appname>/venv/lib/python3.6/site-packages/flask_sqlalchemy/__init__.py", line 982, in get_app
'No application found. Either work inside a view function or push'
RuntimeError: No application found. Either work inside a view function or push an application context. See http://flask-sqlalchemy.pocoo.org/contexts/.
Looks like SQLAlchemy can't find the application. Not sure if it's a problem about the db.init_app(app)
in app.py and db = SQLAlchemy()
in models.py. I avoided having db = SQLAlchemy(app)
in models.py because I can't import app into models for the circular dependency reason.
EDIT2
This time I moved all the code from models.py to app.py and used db = SQLAlchemy(app)
, removed db.init_app(app)
, kept the import app to stream_to_db.py and the app context in it, and it worked!
My questions
- How to correctly move the model definitions to
models.py
with no circular dependency? - What's the implication of having app context in
stream_to_db.py
? If I have gunicorn running many workers, there will essentially be multiple Flask app instances. Would that cause a problem?
EDIT3
Thanks for all the replies. I don't think this is a duplicate to flask make_response with large files.
The problem is not streaming data FROM Flask to the client, it is to stream data from external API to DB, and have Flask consume some statistics from the data in DB. So, I don't see why conceptually the streaming job should be related to Flask at all, they are independent. The problem is that I'm using SQLAlchemy in the streaming job for data model and db transaction, and SQLAlchemy needs Flask app defined. This part is where I get confused about.
What is the right way to write this ever-running background job that streams data to db, with data models defined with SQLAlchemy? Should I strip out the SQLAlchemy code from streaming code and just use SQL, and manually make sure the schema agrees if there are further migration down the road?