1

I want to run multiple strategies in concurrent processes. I came up with something like this

import logging
import multiprocessing
import os

from sqlalchemy.orm import scoped_session, Session

from pyutil.sql.interfaces.symbols.symbol import Symbol
from pyutil.sql.session import get_one_or_create


class StratRunner(object):
    def __init__(self, session_scope, logger=None):
        assert isinstance(session_scope, scoped_session)
        self.__session_scope = session_scope
        self.__logger = logger or logging.getLogger(__name__)


    # this function is the target for mp.Process
    def _run(self, strategy):
        self.__logger.debug("Pid {pid}".format(pid=os.getpid()))
        symbols = self.symbols

        self.__logger.info("Run strategy {s}".format(s=strategy))

        configuration = strategy.configuration()
        strategy.upsert(portfolio=configuration.portfolio, symbols=symbols, days=5)

    def run_strategies(self):
        # loop over all active strategies!
        jobs = []
        # we are in the main thread here...
        for s in self.active_strategies:
            # what shall I give to the Process? The strategy object, the strategy_id, a session instance, the session_scope...
            job = multiprocessing.Process(target=self._run, kwargs={"strategy": s})
            job.name = s.name
            jobs.append(job)

        run_jobs(jobs, logger=self.__logger)



 @property
    def symbols(self):
        return {s.name: s for s in self.__session_scope().query(Symbol)}

    @property
    def active_strategies(self):
        return self.__session_scope().query(Strategy).filter(Strategy.active == True).all()

I am aware of tons of documentation on this project but I am overwhelmed. I loop over the rows of a table (The active_strategies). class Strategies(Base)... . I then hand over the strategy object to the _run method and update the strategy object within the very same method. Please feel free to shred my code. I am in particular puzzled about what to give to the _run method? Shall I hand over the strategy object, the strategy ID, the session, the scoped_session, ... ?

I have now created a runner object:

import abc
import logging
import os

from sqlalchemy.orm import sessionmaker


class Runner(object):
    __metaclass__ = abc.ABCMeta

    def __init__(self, engine, logger=None):
        self.__engine = engine
        self._logger = logger or logging.getLogger(__name__)
        self.__jobs = []

    @property
    def _session(self):
        """ Create a fresh new session... """
        self.__engine.dispose()
        factory = sessionmaker(self.__engine)
        return factory()

    def _run_jobs(self):
        self._logger.debug("PID main {pid}".format(pid=os.getpid()))

        for job in self.jobs:
            # all jobs get the trigge
            self._logger.info("Job {j}".format(j=job.name))
            job.start()

        for job in self.jobs:
            self._logger.info("Wait for job {j}".format(j=job.name))
            job.join()
            self._logger.info("Job {j} done".format(j=job.name))

    @property
    def jobs(self):
        return self.__jobs

    @abc.abstractmethod
    def run(self):
        """ Described in the child class """

In particular this class can provide a fresh session (via ._session). However, using this setup I see plenty of :

psycopg2.OperationalError: server closed the connection unexpectedly
         |  This probably means the server terminated abnormally
         |  before or while processing the request.
tschm
  • 2,905
  • 6
  • 33
  • 45
  • 2
    A scoped session is for handling sessions in *threads*, not processes, and it is a bad idea to share an engine over process boundaries, if forking is used. Here for example you first make a query in the "main" process, launch multiple jobs, and make a query in each job. With some bad luck and forking a connection is shared between multiple processes. – Ilja Everilä Aug 21 '18 at 08:48
  • 1
    Related: https://stackoverflow.com/questions/44677915/across-process-boundary-in-scoped-session, and an example of what might happen: https://stackoverflow.com/questions/41279157/connection-problems-with-sqlalchemy-and-multiple-processes/41722129 – Ilja Everilä Aug 21 '18 at 08:59
  • Thanks Ilja, shall I share the connection string? Or forget about multiprocessing and sqlalchemy in combination? – tschm Aug 21 '18 at 09:04
  • 1
    Have a look here for some insight on dealing with forking multiprocessing and SQLA: http://docs.sqlalchemy.org/en/latest/core/pooling.html#using-connection-pools-with-multiprocessing – Ilja Everilä Aug 21 '18 at 09:10
  • Ok, It seems in the docs an engine (but not a connection) is shared between the processes. I am sceptical about the engine.dispose(). If I start multiple processes all disposing the engine wouldn't that have cross-effects? – tschm Aug 21 '18 at 10:00
  • So, if I go down the engine dispose route. I would then have to create a local session object using sessionmaker(bind=engine)() in every process? For the objects I hand over from the main thread I just forward the ids to the process? – tschm Aug 21 '18 at 10:20

0 Answers0