5

I am building an algorithmic trading platform using Python. Multiple algorithms are monitoring the market and execute trades accordingly daily from 09:30 to 16:00.

What I'm looking for is to start and stop algorithms arbitrarily from a client. Therefore I want to have a server script running using multiprocessing and a client which can start/stop/list algorithms (which should run in separate process) at any given time.

Any examples of how this can be done? The majority of online examples are for queue servers, which do not seem to fit my problem.

EDIT:

I am trying to to this with the package multiprocessing. The idea of using a queue seems wrong to me, as I know an arbitrary number of processes will for a fact run for the whole day or at least until I say stop. I'm not trying to run a short script and let a worker consume the next job from a queue once the previous is done. Actually I'm thinking of having a server script using a Manager which will run forever and just start new scripts in separate processes/threads when requested. I would however, like to be able to send a stop signal to a process to kill it. I do have a feeling that I'm doing this kinda backwards :-) What I have is:

server.py:

import multiprocessing as mp
from multiprocessing import Process
from multiprocessing.managers import BaseManager
from time import strftime


class Server(object):
    def __init__(self, port=50000, authkey=''):
        self.processes = {}
        self._authkey = authkey
        self.port = port
        self.server = None
        self.running = False
        BaseManager.register('get_process', callable=lambda: self)


    def start_server(self):
        manager = BaseManager(address=('', self.port), authkey=self._authkey)
        self.server = manager.get_server()
        try:
            self._logmessage("Server started")
            self.running = True
            self.server.serve_forever()
        except (KeyboardInterrupt, SystemExit):
            self.shutdown()

    def start_process(self, mod, fcn, *args, **kwargs):
        mod = __import__(mod, globals(), locals(), ['object'], -1)
        key = "{0}.{1}".format(mod, fcn)
        assert not key in self.processes, \
            "Process named '%s' already exists" % key
        p = Process(target=getattr(mod, fcn), name=mod, args=(None, ), kwargs=kwargs)
        self._logmessage("Process '%s' started" % key)
        p.start()
        # p.join()
        self.processes[key] = p

    def stop_process(self, key):
        self.processes[key].terminate()
        del self.processes[key]

    def get_processes(self):
        return self.processes.keys()

    def shutdown(self):
        for child in mp.active_children():
            child.terminate()
        self.server.shutdown()
        self.running = False
        print "Shutting down"

    def _logmessage(self, msg):
        print "%s: %s" % (strftime('%Y-%m-%d %H:%M:%S'), msg)


if __name__ == '__main__':
    server = Server(authkey='abc')
    try:
        server.start_server()
    except (KeyboardInterrupt, SystemExit):
        server.shutdown()

client.py:

from multiprocessing.managers import BaseManager
import time


class Client(object):
    def __init__(self, host='', port=50000, authkey=''):
        self.host = host
        self.port = port
        self.manager = None
        self.process = None
        self._type_id = 'get_process'
        self._authkey = authkey
        self.manager = BaseManager(address=(self.host, self.port), authkey=self._authkey)
        BaseManager.register(self._type_id)

    def connect(self):
        try:
            self.manager.connect()
            self._logmessage("Connected to server")
        except:
            self._logmessage("Could not connect to server")
        self.process = getattr(self.manager, self._type_id)()

    def start_process(self, mod, fcn):
        self.process.start_process(mod, fcn)
        self._logmessage("Process '%s' started" % fcn)

    def list_processes(self):
        print self.process.get_processes()

    @property
    def connected(self):
        return self.manager._state.value == self.manager._state.STARTED

    def _logmessage(self, msg):
        print "%s: %s" % (time.strftime('%Y-%m-%d %H:%M:%S'), msg)


def test(data):
    while True:
        print time.time()
        time.sleep(1.)


if __name__ == '__main__':
    from algotrading.server.process_client import Client
    client = Client(authkey='abc')
    client.connect()
    client.start_process("algotrading.server.process_client", "test")
    client.list_processes()
Morten
  • 1,819
  • 5
  • 28
  • 37

2 Answers2

1

Check out Supervisord which allows for remote management of processes, plus automatic start/restart configurability.

Depending on your scalability and disaster-recovery needs, you may be thinking about distributing your "monitoring/trading processes" across running multiple servers. While supervisord is really only designed to manage a single machine, you could build a manager app which coordinates multiple servers, each running supervisord, via it's xml-rpc interface.

Cron or Celery could be used for your daily start/stop scheduling.

Dwight Gunning
  • 2,485
  • 25
  • 39
0

You could implement a socket server which listens to the clients and launches threads to execute an algorithm.

I think RPC would be the simplest solution.

Some inspiration: What is the current choice for doing RPC in Python?

Community
  • 1
  • 1
synthomat
  • 774
  • 5
  • 11
  • 1
    Sorry I might not have expressed my question clearly. I'm looking to do this with the package `multiprocessing`. I thought I could be using a `Manager` to handle this. – Morten Jan 22 '14 at 12:58