I am scratching my head on this one.
I have a Flask App w/ Flask-Cache and SqlAlchemy:
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql+pg8000://[Redacted]
cache = Cache(app, config={'CACHE_TYPE':'redis', 'CACHE_REDIS_URL':'[Redacted]'})
db = SQLAlchemy(app)
Celery workers:
@worker_process_init.connect
def init_worker(**kwargs):
global db_session
print('Initializing database connection for worker.')
db_session = database.get_session()
@worker_process_shutdown.connect
def shutdown_worker(**kwargs):
global db_session
if db_session.is_active:
print('Closing database connectionn for worker.')
db_session.close()
And a general-purpose get_or_create def:
@cache.cached(timeout=200, key_prefix="get_or_create")
def get_or_create(model, **kwargs):
instance = model.query.filter_by(**kwargs).first()
if instance:
return instance
else:
instance = model(**kwargs)
db_session.add(instance)
return instance
I'm trying to use the cache to resolve the multiprocessing from causing UniqueConstraint violations (ie when two of the workers insert non-unique objects at the same time, when one should be updating after the first one inserts)
The workers are spewing
InvalidRequestError: Instance '<[Redacted]>' is not persistent within this Session
Best I can figure out is that I need to expand the session's scope?