I have a node.js application that runs a client interface which exposes action that triggers machine-learn tasks. Since python is a better choice when implementing machine-learn related stuff, I've implemented a python application that runs on demand machine learning tasks.
Now, I need to integrate both applications. It has been decided that we need to use a single (AWS) instance to integrate both applications.
One way found to do such integration was using python-shell
node module. There, the communications between Python and Node are done by stdin
and stdout
.
On node I have something like this:
'use strict';
const express = require('express');
const PythonShell = require('python-shell');
var app = express();
app.listen(8000, function () {
console.log('Example app listening on port 8000!');
});
var options = {
mode: 'text',
pythonPath: '../pythonapplication/env/Scripts/python.exe',
scriptPath: '../pythonapplication/',
pythonOptions: ['-u'], // Unbuffered
};
var pyshell = new PythonShell('start.py', options);
pyshell.on('message', function (message) {
console.log(message);
});
app.get('/task', function (req, res) {
pyshell.send('extract-job');
});
app.get('/terminate', function (req, res) {
pyshell.send('terminate');
pyshell.end(function (err, code, signal) {
console.log(err)
console.log(code)
console.log(signal);
});
});
On python, I have a main script which loads some stuff and the calls a server script, that runs forever reading lines with sys.stdin.readline()
and then executes the corresponding task.
start.py
is:
if __name__ == '__main__':
# data = json.loads(sys.argv[1])
from multiprocessing import Manager, Pool
import logging
import provider, server
# Get logging setup objects
debug_queue, debug_listener = provider.shared_logging(logging.DEBUG, 'python-server-debug.log')
info_queue, info_listener = provider.shared_logging(logging.INFO, 'python-server.log')
logger = logging.getLogger(__name__)
# Start logger listener
debug_listener.start()
info_listener.start()
logger.info('Initializing pool of workers...')
pool = Pool(initializer=provider.worker, initargs=[info_queue, debug_queue])
logger.info('Initializing server...')
try:
server.run(pool)
except (SystemError, KeyboardInterrupt) as e:
logger.info('Execution terminated without errors.')
except Exception as e:
logger.error('Error on main process:', exc_info=True)
finally:
pool.close()
pool.join()
debug_listener.stop()
info_listener.stop()
print('Done.')
Both info_queue
and debug_queue
are multiprocessing.Queue
to handle multiprocessing logging. If I run my python application as standalone, everything works fine, even when using the pool of workers (logs get properly logged, prints, get properly printed...)
But, if I try to run using python-shell
, only my main process prints and logs get printed and logged correctly... Every message (print or log) from my pool of workers get held until I terminate the python script.
In other words, every message will get held until the finally
step on server.py
run...
Does anyone has any insights on this issue? Have you guys heard about python-bridge
module? Is it a better solution? Can you suggest a better approach for such integration that does not uses two separated servers?
Here I post my real provider
script, and a quick mock I did for the server script (the real one has too much stuff)
mock server.py
:
import json
import logging
import multiprocessing
import sys
import time
from json.decoder import JSONDecodeError
from threading import Thread
def task(some_args):
logger = logging.getLogger(__name__)
results = 'results of machine learn task goes here, as a string'
logger.info('log whatever im doing')
# Some machine-learn task...
logger.info('Returning results.')
return results
def answer_node(message):
print(message)
# sys.stdout.write(message)
# sys.stdout.flush()
def run(pool, recrutai, job_pool, candidate_queue):
logger = logging.getLogger(__name__)
workers = []
logger.info('Server is ready and waiting for commands')
while True:
# Read input stream
command = sys.stdin.readline()
command = command.split('\n')[0]
logger.debug('Received command: %s', command)
if command == 'extract-job':
logger.info(
'Creating task.',
)
# TODO: Check data attributes
p = pool.apply_async(
func=task,
args=('args'),
callback=answer_node
)
# What to do with workers array?!
workers.append(p)
elif command == 'other-commands':
pass
# Other task here
elif command == 'terminate':
raise SystemError
else:
logger.warn(
'Received an invalid command %s.',
command
)
my provider.py
:
import logging
import os
from logging.handlers import QueueHandler, QueueListener
from multiprocessing import Queue
def shared_logging(level, file_name):
# Create main logging file handler
handler = logging.FileHandler(file_name)
handler.setLevel(level)
# Create logging format
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
# Create queue shared between all process to centralize logging features
logger_queue = Queue() # multiprocessing.Queue
# Create logger queue listener to send records from logger_queue to handler
logger_listener = QueueListener(logger_queue, handler)
return logger_queue, logger_listener
def process_logging(info_queue, debug_queue, logger_name=None):
# Create logging queue handlers
debug_queue_handler = QueueHandler(debug_queue)
debug_queue_handler.setLevel(logging.DEBUG)
info_queue_handler = QueueHandler(info_queue)
info_queue_handler.setLevel(logging.INFO)
# Setup level of process logger
logger = logging.getLogger()
if logger_name:
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG)
# Add handlers to the logger
logger.addHandler(debug_queue_handler)
logger.addHandler(info_queue_handler)
def worker(info_queue, debug_queue):
# Setup worker process logging
process_logging(info_queue, debug_queue)
logging.debug('Process %s initialized.', os.getpid())