0

I have a node.js application that runs a client interface which exposes action that triggers machine-learn tasks. Since python is a better choice when implementing machine-learn related stuff, I've implemented a python application that runs on demand machine learning tasks.

Now, I need to integrate both applications. It has been decided that we need to use a single (AWS) instance to integrate both applications.

One way found to do such integration was using python-shell node module. There, the communications between Python and Node are done by stdin and stdout.

On node I have something like this:

'use strict';

const express = require('express');
const PythonShell = require('python-shell');

var app = express();
app.listen(8000, function () {
    console.log('Example app listening on port 8000!');
});

var options = {
    mode: 'text',
    pythonPath: '../pythonapplication/env/Scripts/python.exe',
    scriptPath: '../pythonapplication/',
    pythonOptions: ['-u'], // Unbuffered
};

var pyshell = new PythonShell('start.py', options);
pyshell.on('message', function (message) {
    console.log(message);
});

app.get('/task', function (req, res) {
    pyshell.send('extract-job');
});

app.get('/terminate', function (req, res) {
    pyshell.send('terminate');
    pyshell.end(function (err, code, signal) {
        console.log(err)
        console.log(code)
        console.log(signal);
    });
});

On python, I have a main script which loads some stuff and the calls a server script, that runs forever reading lines with sys.stdin.readline() and then executes the corresponding task.

start.py is:

if __name__ == '__main__':
    # data = json.loads(sys.argv[1])
    from multiprocessing import Manager, Pool
    import logging
    import provider, server

    # Get logging setup objects
    debug_queue, debug_listener = provider.shared_logging(logging.DEBUG, 'python-server-debug.log')
    info_queue, info_listener = provider.shared_logging(logging.INFO, 'python-server.log')

    logger = logging.getLogger(__name__)

    # Start logger listener
    debug_listener.start()
    info_listener.start()

    logger.info('Initializing pool of workers...')
    pool = Pool(initializer=provider.worker, initargs=[info_queue, debug_queue])

    logger.info('Initializing server...')
    try:
        server.run(pool)
    except (SystemError, KeyboardInterrupt) as e:
        logger.info('Execution terminated without errors.')
    except Exception as e:
        logger.error('Error on main process:', exc_info=True)
    finally:
        pool.close()
        pool.join()
        debug_listener.stop()
        info_listener.stop()

    print('Done.')

Both info_queue and debug_queue are multiprocessing.Queue to handle multiprocessing logging. If I run my python application as standalone, everything works fine, even when using the pool of workers (logs get properly logged, prints, get properly printed...)

But, if I try to run using python-shell, only my main process prints and logs get printed and logged correctly... Every message (print or log) from my pool of workers get held until I terminate the python script.

In other words, every message will get held until the finally step on server.py run...

Does anyone has any insights on this issue? Have you guys heard about python-bridge module? Is it a better solution? Can you suggest a better approach for such integration that does not uses two separated servers?


Here I post my real provider script, and a quick mock I did for the server script (the real one has too much stuff)

mock server.py:

import json
import logging
import multiprocessing
import sys
import time
from json.decoder import JSONDecodeError
from threading import Thread


def task(some_args):
    logger = logging.getLogger(__name__)

    results = 'results of machine learn task goes here, as a string'

    logger.info('log whatever im doing')
    # Some machine-learn task...

    logger.info('Returning results.')
    return results

def answer_node(message):
    print(message)
    # sys.stdout.write(message)
    # sys.stdout.flush()

def run(pool, recrutai, job_pool, candidate_queue):
    logger = logging.getLogger(__name__)

    workers = []
    logger.info('Server is ready and waiting for commands')
    while True:
        # Read input stream
        command = sys.stdin.readline()
        command = command.split('\n')[0]

        logger.debug('Received command: %s', command)

        if command == 'extract-job':
            logger.info(
                'Creating task.',
            )

            # TODO: Check data attributes
            p = pool.apply_async(
                func=task,
                args=('args'),
                callback=answer_node
            )

            # What to do with workers array?!
            workers.append(p)

        elif command == 'other-commands':
            pass
            # Other task here

        elif command == 'terminate':
            raise SystemError

        else:
            logger.warn(
                'Received an invalid command %s.',
                command
            )

my provider.py:

import logging
import os
from logging.handlers import QueueHandler, QueueListener
from multiprocessing import Queue


def shared_logging(level, file_name):
    # Create main logging file handler
    handler = logging.FileHandler(file_name)
    handler.setLevel(level)

    # Create logging format
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    handler.setFormatter(formatter)

    # Create queue shared between all process to centralize logging features
    logger_queue = Queue() # multiprocessing.Queue

    # Create logger queue listener to send records from logger_queue to handler
    logger_listener = QueueListener(logger_queue, handler)

    return logger_queue, logger_listener

def process_logging(info_queue, debug_queue, logger_name=None):
    # Create logging queue handlers
    debug_queue_handler = QueueHandler(debug_queue)
    debug_queue_handler.setLevel(logging.DEBUG)

    info_queue_handler = QueueHandler(info_queue)
    info_queue_handler.setLevel(logging.INFO)

    # Setup level of process logger
    logger = logging.getLogger()
    if logger_name:
        logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)

    # Add handlers to the logger
    logger.addHandler(debug_queue_handler)
    logger.addHandler(info_queue_handler)

def worker(info_queue, debug_queue):
    # Setup worker process logging
    process_logging(info_queue, debug_queue)
    logging.debug('Process %s initialized.', os.getpid())
leoschet
  • 1,697
  • 17
  • 33
  • It is hard to us to test our code. For example, `handleAnswer` is undefined in the JS snippet, and your Python snippet fails in defining many variables, such as `logger`, `server`, `debug_listener` etc. Could you update it to be runnable, maybe by adding imports and defining some variables? – brandizzi Jun 14 '18 at 18:13
  • Done, I left the `info` | `debug_queue` and the `info` | `debug_listener` since they are necessary to log stuff on a multiprocess program, and the logging functionalities also stop working when I run with `python-shell` – leoschet Jun 14 '18 at 18:36
  • 1
    This feels like a lot of overengineering when both Python and Node can speak HTTP and you can trivially tell both to listen on a port that the other knows about, so you can do GET/POST calls from one to the other on 127.0.0.1 – Mike 'Pomax' Kamermans Jun 14 '18 at 18:43
  • Mike, HTTP to exchange data from node and python would slow things up, i'm expecting a thousand or more tasks at the same time... I tried to use a messaging service, such as the AWS one, but for 'quick' prototyping it has been decided I should integrate both applications without such solution – leoschet Jun 14 '18 at 18:50
  • I had a discussion on my question saying that the best solution would be having both applications running on separated servers and communicating over a messaging service, but since I can't use this solution right now, I ended up erasing it... Ideally, thats how the whole application is going to run. IMHO this is not optimal since the whole solution implemented to integrate the applications now will be thrown away when the real solution gets approved... – leoschet Jun 14 '18 at 18:52
  • 1
    I fixed my scripts and now im able to run this structure as a simple mock – leoschet Jun 14 '18 at 18:53
  • Ah, much better! Yet, you import `provider` and `server` in `start.py` and we do not have these modules. The problem may even be found in one of them! – brandizzi Jun 14 '18 at 19:13
  • 1
    @brandizzi, both provider and server were given! Please see the last two blocks of code. I want to note that the problem can't be on `provider.py` this problem is occurs even if remove all logging functionalities! – leoschet Jun 15 '18 at 10:18

0 Answers0