0

I am using sqlalchemy sessionmaker and scoped_session to create a pool of connection for my threads so that I can avoid the error in the title, but unfortunately I still get it. I have been reading similar questions and reading blogs but unfortunately I still can't get my head around the problem. My app is listening on a pubsub and writes some stuff in the database when data arrives. The app gets loads of messages, and therefore after a certain number I would get the error. I thought that using sessionmaker and scoped_session would easily handle the situation, but I am obviously missing something. Here is the simplified code:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from google.cloud import pubsub_v1

TOPIC_SUBSCRIBER = os.environ.get('PUBSUB_SUBSCRIBER')
PROJECT_ID = os.environ.get('PROJECT_ID')
client = pubsub_v1.SubscriberClient()
subscription_path = client.subscription_path(PROJECT_ID, TOPIC_SUBSCRIBER)

db_uri = os.environ.get('DATABASE_URI')
engine = create_engine(db_uri)

session_factory = sessionmaker(bind=engine)
Session = scoped_session(session_factory)

def event_handler(message):

    session_db = Session()

    # Do stuff

    Session.remove()


def run():
    streaming_pull_future = client.subscribe(
        subscription_path, callback=event_handler
    )
    print("Listening for messages on {}".format(subscription_path))

    # Calling result() on StreamingPullFuture keeps the main thread from
    # exiting while messages get processed in the callbacks.
    try:
        streaming_pull_future.result()
    except Exception as e:  # noqa
        streaming_pull_future.cancel()
        print("ERROR: {}".format(str(e)))


if __name__ == '__main__':
    run()


DarioB
  • 1,349
  • 2
  • 21
  • 44

1 Answers1

0

The error is due to pubsub creating too many threads and overloading the database with too many connections. I solved it by limiting the number of concurrent messages which can be processed at the same time. The approach is also discussed here. Here is my code :

def run():
    # Limit the subscriber to only have ten outstanding messages at a time.
    flow_control = pubsub_v1.types.FlowControl(max_messages=MAX_WORKERS)
    custom_executor = concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS)
    custom_scheduler = pubsub_v1.subscriber.scheduler.ThreadScheduler(custom_executor)

    streaming_pull_future = client.subscribe(
        subscription_path, callback=callback, flow_control=flow_control, scheduler=custom_scheduler
    )
    print("Listening for messages on {}".format(subscription_path))

    # Calling result() on StreamingPullFuture keeps the main thread from
    # exiting while messages get processed in the callbacks.
    try:
        streaming_pull_future.result()
    except Exception as e:  # noqa
        streaming_pull_future.cancel()
        print("ERROR: {}".format(str(e)))

The code above fixed the problem, however I thought that sessionmaker and scoped_session would manage the concurrency by themself, instead I had to handle it somewhere else. I would love to hear some SQLAlchemy expert opinion on this. I hope this would help someone in the future.

DarioB
  • 1,349
  • 2
  • 21
  • 44