I want to run multiple strategies in concurrent processes. I came up with something like this
import logging
import multiprocessing
import os
from sqlalchemy.orm import scoped_session, Session
from pyutil.sql.interfaces.symbols.symbol import Symbol
from pyutil.sql.session import get_one_or_create
class StratRunner(object):
def __init__(self, session_scope, logger=None):
assert isinstance(session_scope, scoped_session)
self.__session_scope = session_scope
self.__logger = logger or logging.getLogger(__name__)
# this function is the target for mp.Process
def _run(self, strategy):
self.__logger.debug("Pid {pid}".format(pid=os.getpid()))
symbols = self.symbols
self.__logger.info("Run strategy {s}".format(s=strategy))
configuration = strategy.configuration()
strategy.upsert(portfolio=configuration.portfolio, symbols=symbols, days=5)
def run_strategies(self):
# loop over all active strategies!
jobs = []
# we are in the main thread here...
for s in self.active_strategies:
# what shall I give to the Process? The strategy object, the strategy_id, a session instance, the session_scope...
job = multiprocessing.Process(target=self._run, kwargs={"strategy": s})
job.name = s.name
jobs.append(job)
run_jobs(jobs, logger=self.__logger)
@property
def symbols(self):
return {s.name: s for s in self.__session_scope().query(Symbol)}
@property
def active_strategies(self):
return self.__session_scope().query(Strategy).filter(Strategy.active == True).all()
I am aware of tons of documentation on this project but I am overwhelmed. I loop over the rows of a table (The active_strategies). class Strategies(Base)... . I then hand over the strategy object to the _run method and update the strategy object within the very same method. Please feel free to shred my code. I am in particular puzzled about what to give to the _run method? Shall I hand over the strategy object, the strategy ID, the session, the scoped_session, ... ?
I have now created a runner object:
import abc
import logging
import os
from sqlalchemy.orm import sessionmaker
class Runner(object):
__metaclass__ = abc.ABCMeta
def __init__(self, engine, logger=None):
self.__engine = engine
self._logger = logger or logging.getLogger(__name__)
self.__jobs = []
@property
def _session(self):
""" Create a fresh new session... """
self.__engine.dispose()
factory = sessionmaker(self.__engine)
return factory()
def _run_jobs(self):
self._logger.debug("PID main {pid}".format(pid=os.getpid()))
for job in self.jobs:
# all jobs get the trigge
self._logger.info("Job {j}".format(j=job.name))
job.start()
for job in self.jobs:
self._logger.info("Wait for job {j}".format(j=job.name))
job.join()
self._logger.info("Job {j} done".format(j=job.name))
@property
def jobs(self):
return self.__jobs
@abc.abstractmethod
def run(self):
""" Described in the child class """
In particular this class can provide a fresh session (via ._session). However, using this setup I see plenty of :
psycopg2.OperationalError: server closed the connection unexpectedly
| This probably means the server terminated abnormally
| before or while processing the request.