1

I'm fairly new to this task so please help me identify the concepts I'm missing.

I'm trying to stream data from an API to my SQLite db and let a Flask app consume the data. I defined the models in models.py like this

# models.py
import os
from flask_sqlalchemy import SQLAlchemy

# Create sqlite db
db = SQLAlchemy()


class MyDataModel(db.Model):
    # Manual table name choice
    __tablename__ = 'table1'

    id = db.Column(db.Integer, primary_key=True)
    created_at = db.Column(db.Text)
    text = db.Column(db.Text)

    def __init__(self, created_at, text):
        self.created_at = created_at
        self.text = text

    def __repr__(self):
        return f"Data: {self.text} ... created at {self.created_at}"

In app.py I have a simple view function that counts the rows and return a Server Sent Event to the frontend for realtime tracking.

# app.py
import os
import time
from flask import Flask, render_template, url_for, redirect, Response
from flask_migrate import Migrate
from models import db, MyDataModel
from settings import *


app = Flask(__name__)
logging.basicConfig(level=logging.DEBUG)

basedir = os.path.abspath(os.path.dirname(__file__))

app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + os.path.join(basedir, 'data.sqlite')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db.init_app(app)
Migrate(app, db)

@app.route('/')
def index():
    return render_template('index.html')

@app.route('/getcount')
def getcount():
    def count_stream():
        count = db.session.query(MyDataModel).count()
        while True:
            yield f"data:{str(count)}\n\n"
            time.sleep(0.5)
    return Response(count_stream(), mimetype='text/event-stream')


if __name__ == "__main__":
    app.run(debug=True, port=PORT)

Now I have another python script stream_to_db.py that gets data from an API as a stream, approximately like this

# stream_to_db.py
import logging
import os
from models import db, MyDataModel
from settings import *
from SomeExternalAPI import SomeAPI


def stream_to_db():
    api = SomeAPI(
        API_KEY, API_SECRET_KEY, ACCESS_TOKEN, ACCESS_TOKEN_SECRET
    )

    r = api.request()

    for item in r:
        created_at, text = item['created_at'], item['text']
        logging.info(text)
        datum = MyDataModel(created_at, text)
        db.session.add(datum)
        db.session.commit()

# Stream data to sqlite
stream_to_db()

When I try to run this python stream_to_db.py I get error

RuntimeError: No application found. Either work inside a view function or push an application context. See http://flask-sqlalchemy.pocoo.org/contexts/.

I looked at the documentation of application context but am still confused. Say I go without SQLAlchemy and use Python and SQL directly for data insertion, this stream_to_db.py script should be independent of the Flask app. But if I still want to leverage SQLAlchemy for its syntax and model definitions from models.py, how should I do it?

Conceptually I feel the streaming to db part is independent of the Flask app and should be a script that's essentially a while True loop and goes forever. The Flask app just reads the db, send data to frontend and does nothing else. I tried put the stream_to_db() function into __main__ in app.py but that doesn't make sense, app.run() and stream_to_db() are both essentially while True loops and can't be put together.

I'm lost and miss key concepts here. Please help and suggest the right way/best practices to do it. I feel this is a really basic task which should have a best practice and a set of dedicated tools already. Thanks in advance!


EDIT

To further experiment, I imported app into stream_to_db.py and added

with app.app_context():
    stream_to_db()

Now I can run python stream_to_db.py with no problem, but if I start the Flask app at the same time, I get several

Debugging middleware caught exception in streamed response at a point where response headers were already sent.

and also

Traceback (most recent call last):
  File "/Users/<username>/webapps/<appname>/venv/lib/python3.6/site-packages/werkzeug/wsgi.py", line 507, in __next__
    return self._next()
  File "/Users/<username>/webapps/<appname>/venv/lib/python3.6/site-packages/werkzeug/wrappers/base_response.py", line 45, in _iter_encoded
    for item in iterable:
  File "/Users/<username>/webapps/<appname>/app.py", line 33, in count_stream
    count = db.session.query(CardanoTweet).count()
  File "/Users/<username>/webapps/<appname>/venv/lib/python3.6/site-packages/sqlalchemy/orm/scoping.py", line 162, in do
    return getattr(self.registry(), name)(*args, **kwargs)
  File "/Users/<username>/webapps/<appname>/venv/lib/python3.6/site-packages/sqlalchemy/util/_collections.py", line 1012, in __call__
    return self.registry.setdefault(key, self.createfunc())
  File "/Users/<username>/webapps/<appname>/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 3214, in __call__
    return self.class_(**local_kw)
  File "/Users/<username>/webapps/<appname>/venv/lib/python3.6/site-packages/flask_sqlalchemy/__init__.py", line 136, in __init__
    self.app = app = db.get_app()
  File "/Users/<username>/webapps/<appname>/venv/lib/python3.6/site-packages/flask_sqlalchemy/__init__.py", line 982, in get_app
    'No application found. Either work inside a view function or push'
RuntimeError: No application found. Either work inside a view function or push an application context. See http://flask-sqlalchemy.pocoo.org/contexts/.

Looks like SQLAlchemy can't find the application. Not sure if it's a problem about the db.init_app(app) in app.py and db = SQLAlchemy() in models.py. I avoided having db = SQLAlchemy(app) in models.py because I can't import app into models for the circular dependency reason.


EDIT2

This time I moved all the code from models.py to app.py and used db = SQLAlchemy(app), removed db.init_app(app), kept the import app to stream_to_db.py and the app context in it, and it worked!

My questions

  • How to correctly move the model definitions to models.py with no circular dependency?
  • What's the implication of having app context in stream_to_db.py? If I have gunicorn running many workers, there will essentially be multiple Flask app instances. Would that cause a problem?

EDIT3

Thanks for all the replies. I don't think this is a duplicate to flask make_response with large files.

The problem is not streaming data FROM Flask to the client, it is to stream data from external API to DB, and have Flask consume some statistics from the data in DB. So, I don't see why conceptually the streaming job should be related to Flask at all, they are independent. The problem is that I'm using SQLAlchemy in the streaming job for data model and db transaction, and SQLAlchemy needs Flask app defined. This part is where I get confused about.

What is the right way to write this ever-running background job that streams data to db, with data models defined with SQLAlchemy? Should I strip out the SQLAlchemy code from streaming code and just use SQL, and manually make sure the schema agrees if there are further migration down the road?

Logan Yang
  • 2,364
  • 6
  • 27
  • 43
  • You need to use `stream_with_context()` here, because the app context is torn down when you view returns, so *before* your generator runs. – Martijn Pieters Jul 10 '19 at 16:20
  • @Logan Yang, I was thinking about using a background job to read data from the API, so you have the Flask App and a background job separated. This is related to your question about "Many workers". I am not sure if that related to what you need? – J.K Jul 10 '19 at 16:30
  • @MartijnPieters I got everything working now. I have this web page that shows a realtime count from the db. There are two things running in the background, one is the Flask app server, the other is `stream_to_db.py` with `app_context` in it because it needs data model from SQLAlchemy. Please take a look at my newest edit, my issue is not streaming FROM Flask to client, but is the relationship between a data ingestion job and the Flask app. – Logan Yang Jul 10 '19 at 16:46
  • @J.K Yeah exactly, I need this data ingestion job which is conceptually independent from the Flask app, the app just consumes the data in the db, there can be a lot of different apps consuming this data, so I don't see why the streaming ingestion job should be related to the Flask app. The only reason I have to import app to the streaming code is that the data model and db transactions depend on SQLAlchemy which is Flask app dependent. – Logan Yang Jul 10 '19 at 16:49
  • @LoganYang, I posted this link as an answer and it was deleted. Maybe because it is not the exact answer and should be as a comment. https://blog.miguelgrinberg.com/post/the-flask-mega-tutorial-part-xxii-background-jobs – J.K Jul 10 '19 at 16:58
  • 1
    @J.K Cool! I think this is the right pointer for me to study and clear my concepts, thank you very much! Will read and come back for a final edit, document what I learned. As a beginner in this architecture stuff, I feel a lot of people will share same questions as this one. I don't think the current `Marked as a duplicate` is fair. For people who share the same question as mine, they will just skip this one and read that unrelated question – Logan Yang Jul 10 '19 at 17:08
  • @J.K Also can I get a quick answer to my final question (in EDIT3) about possible schema migration? If I don't use SQLAlchemy in order to be independent of Flask, seems there is no way but to manually make sure the SQL insert clause agrees with whatever new schema. Is there a better way? – Logan Yang Jul 10 '19 at 17:17
  • 1
    @LoganYang, I am new to SQLAlchemy as well. I just found this repo that might be related. I will let you know if I find something related, and please keep us updated if you find a solution. https://github.com/mardix/active-alchemy – J.K Jul 10 '19 at 18:18

1 Answers1

-3

Please try following change in models.py where you've created db. db = SQLAlchemy(app)

Deep Bhatt
  • 313
  • 1
  • 11
  • Doesn't that require importing app to models.py which creates circular dependency. I ran into that problem and am using `db.init_app(app)` in `app.py`, leaving `db = SQLAlchemy()` without app to avoid importing app in models.py – Logan Yang Jul 10 '19 at 15:39
  • I tried moving all code from models.py to app.py, and used `db = SQLAlchemy(app)` as you suggested, and it worked! Thanks for the suggestion, but I still am confused about how to correctly separate the model code to models.py without circular dependency. I also have a question about using app context in stream_to_db.py. Could you please take a look at my edit? Thank you! – Logan Yang Jul 10 '19 at 16:12
  • This absolutely has nothing to do with the problem. You can create the `SQLAlchemy()` instance first and then *later on* attach it to the `Flask` object with `.init_app()`. The OP is already doing this, *correctly*, with `db.init_app(app)`. – Martijn Pieters Jul 10 '19 at 16:16
  • @LoganYang: no, sorry, this *can't* have worked, because you are not using `stream_with_context()`. You were already attaching `SQLAlchemy()` to your app with `db.init_app(app)` anyway. – Martijn Pieters Jul 10 '19 at 16:22
  • @MartijnPieters The streaming FROM Flask to client part was never the problem. The problem is that in that stream_to_db.py, I have to import models. Before, I import from models.py which didn't have `app` in it, it only has `db = SQLAlchemy()`. So stream_to_db.py complained about no app defined when it tries to do db transaction. Now, I moved all code from models.py to app.py and have `db = SQLAlchemy(app)`, then import app to stream_to_db.py. Now the latter doesn't complain since SQLAlchemy has the app defined. – Logan Yang Jul 10 '19 at 16:55
  • @LoganYang: yes, which is where using `with app.app_context()` comes in. You need to make sure that `db` is registered with the `app` in that expression, of course. – Martijn Pieters Jul 10 '19 at 17:00