0

I have a parallel program. It sequentially reads files, the tasks in each file are split across processes. When the file is done on all processes, next file is loaded, and so on. I want to write log files, such that I have a new log file for each data file. I would like all of my processes to write log info, and that they do not interfere with each other. After reading some posts and the logging documentation, I have come up with the following minimal example

import numpy as np
import matplotlib.pyplot as plt
from time import time
import multiprocessing, pathos
import logging

def task(x):
    thisID = pathos.core.getpid()
    logger.info("Process " + str(thisID) + ": Processing stuff " + x)
    return 1

for iJob in range(3):
    # Create file handler
    fh = logging.FileHandler('log'+str(iJob)+'_pathos.txt')
    fh.setLevel(logging.DEBUG)
    formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
    fh.setFormatter(formatter)

    logger = pathos.logger(level=logging.DEBUG, handler=fh)

    pool = pathos.multiprocessing.ProcessingPool(7)
    results_mp = pool.map(task, list("aalkfnalkgnlkaerngnarngkwlekfwebkwr"))

    logger.removeHandler(fh)

    print(results_mp)

No matter what I try all of the output goes to the first log file, the other two are created but stay empty. The alternative implementation using bare multiprocessing seems to work fine (see below). The problem is that I require pathos, because it allows me to parallelize some imported libraries, that regular multiprocessing refuses to work with

def task(x):
    thisID = multiprocessing.current_process()._identity[0]
    logger.info("Process " + str(thisID) + ": Processing stuff " + x)
    return 1

for iJob in range(3):
    # Create file handler
    fh = logging.FileHandler('log'+str(iJob)+'_pathos.txt')
    fh.setLevel(logging.DEBUG)
    formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
    fh.setFormatter(formatter)

    logger = logging.getLogger("MyLogger")
    logger.setLevel(logging.DEBUG)
    logger.addHandler(fh)

    pool = multiprocessing.Pool(7)
    results_mp = pool.map(task, list("aalkfnalkgnlkaerngnarngkwlekfwebkwr"))

    logger.removeHandler(fh)

    print(results_mp)

Perhaps it is useful to mention that I run the code from Jupyter notebook. Also, I get somewhat unstable behaviour when I run the same cell twice, deleting the log files inbetween. Sometimes the new log files are all empty

Aleksejs Fomins
  • 688
  • 1
  • 8
  • 20

1 Answers1

0

ProcessPool creates new workers that have their own memory. So you can't / shouldn't access global variables. Pass everything you'll need into pool.map().

This works for me:

import numpy as np
import matplotlib.pyplot as plt
from time import time
import multiprocessing, pathos
import logging

def task(x, iJob):
    thisID = pathos.core.getpid()
    fh = logging.FileHandler('log'+str(iJob)+'_pathos.txt')
    fh.setLevel(logging.DEBUG)
    formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
    fh.setFormatter(formatter)

    logger = pathos.logger(level=logging.DEBUG, handler=fh)
    logger.info("Process " + str(thisID) + ": Processing stuff " + x)
    logger.removeHandler(fh)
    return 1

for iJob in range(3):
    # Create file handler

    pool = pathos.multiprocessing.ProcessPool(7)
    input = "aalkfnalkgnlkaerngnarngkwlekfwebkwr"
    results_mp = pool.map(task, list(input), [iJob] * len(input))
    print(results_mp)
Alexei Andreev
  • 598
  • 5
  • 17