I am using sqlalchemy sessionmaker
and scoped_session
to create a pool of connection for my threads so that I can avoid the error in the title, but unfortunately I still get it. I have been reading similar questions and reading blogs but unfortunately I still can't get my head around the problem.
My app is listening on a pubsub and writes some stuff in the database when data arrives. The app gets loads of messages, and therefore after a certain number I would get the error. I thought that using sessionmaker
and scoped_session
would easily handle the situation, but I am obviously missing something.
Here is the simplified code:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from google.cloud import pubsub_v1
TOPIC_SUBSCRIBER = os.environ.get('PUBSUB_SUBSCRIBER')
PROJECT_ID = os.environ.get('PROJECT_ID')
client = pubsub_v1.SubscriberClient()
subscription_path = client.subscription_path(PROJECT_ID, TOPIC_SUBSCRIBER)
db_uri = os.environ.get('DATABASE_URI')
engine = create_engine(db_uri)
session_factory = sessionmaker(bind=engine)
Session = scoped_session(session_factory)
def event_handler(message):
session_db = Session()
# Do stuff
Session.remove()
def run():
streaming_pull_future = client.subscribe(
subscription_path, callback=event_handler
)
print("Listening for messages on {}".format(subscription_path))
# Calling result() on StreamingPullFuture keeps the main thread from
# exiting while messages get processed in the callbacks.
try:
streaming_pull_future.result()
except Exception as e: # noqa
streaming_pull_future.cancel()
print("ERROR: {}".format(str(e)))
if __name__ == '__main__':
run()