25

Problem description
I adjusted the code from this answer a little bit (see below). However when running this script on Linux (so command line: python script_name.py) it will print jobs running: x for all the jobs but then just seems to stuck after that. However when I use the spawn method (mp.set_start_method('spawn')) it works out fine and immediately starts printing the value of the counter variable (see listener method).

Question

  • Why does it work only when spawning processes?
  • How can I adjust the code so it works with fork? (because it is probably faster)

Code

import io
import csv
import multiprocessing as mp

NEWLINE = '\n'

def file_searcher(file_path):
    parsed_file = csv.DictReader(io.open(file_path, 'r', encoding='utf-8'), delimiter='\t')

    manager = mp.Manager()
    q = manager.Queue()
    pool = mp.Pool(mp.cpu_count())

    # put listener to work first
    watcher = pool.apply_async(listener, (q,))

    jobs = []
    for row in parsed_file:
        print('jobs running: ' + str(len(jobs) + 1))
        job = pool.apply_async(worker, (row, q))
        jobs.append(job)

  # collect results from the workers through the pool result queue
    for job in jobs:
        job.get()

    #now we are done, kill the listener
    q.put('kill')
    pool.close()
    pool.join()

def worker(genome_row, q):
    complete_data = []
    #data processing
    #ftp connection to retrieve data
    #etc.
    q.put(complete_data)
    return complete_data

def listener(q):
    '''listens for messages on the q, writes to file. '''
    f = io.open('output.txt', 'w', encoding='utf-8')
    counter = 0
    while 1:
        m = q.get()
        counter +=1
        print(counter)
        if m == 'kill':
            break
        for x in m:
            f.write(x + NEWLINE)
        f.flush()
    f.close()

if __name__ == "__main__":
   file_searcher('path_to_some_tab_del_file.txt')

Processor info

Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                20
On-line CPU(s) list:   0-19
Thread(s) per core:    1
Core(s) per socket:    1
Socket(s):             20
NUMA node(s):          2
Vendor ID:             GenuineIntel
CPU family:            6
Model:                 45
Model name:            Intel(R) Xeon(R) CPU E5-2660 v3 @ 2.60GHz
Stepping:              2
CPU MHz:               2596.501
BogoMIPS:              5193.98
Hypervisor vendor:     VMware
Virtualization type:   full
L1d cache:             32K
L1i cache:             32K
L2 cache:              256K
L3 cache:              25600K
NUMA node0 CPU(s):     0-19

Linux kernel version

3.10.0-514.26.2.el7.x86_64

Python version

Python 3.6.1 :: Continuum Analytics, Inc.

LOG
I added the code as suggested by @yacc, this will give the following log:

[server scripts]$ python main_v3.py
[INFO/SyncManager-1] child process calling self.run()
[INFO/SyncManager-1] created temp directory /tmp/pymp-2a9stjh6
[INFO/SyncManager-1] manager serving at '/tmp/pymp-2a9stjh6/listener-jxwseclw'
[DEBUG/MainProcess] requesting creation of a shared 'Queue' object
[DEBUG/SyncManager-1] 'Queue' callable returned object with id '7f0842da56a0'
[DEBUG/MainProcess] INCREF '7f0842da56a0'
[DEBUG/MainProcess] created semlock with handle 139673691570176
[DEBUG/MainProcess] created semlock with handle 139673691566080
[DEBUG/MainProcess] created semlock with handle 139673691561984
[DEBUG/MainProcess] created semlock with handle 139673691557888
[DEBUG/MainProcess] added worker
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-2] INCREF '7f0842da56a0'
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-2] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-4] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-4] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-3] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-3] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-6] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-5] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-6] child process calling self.run()
[INFO/ForkPoolWorker-5] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-7] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-8] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-7] child process calling self.run()
[INFO/ForkPoolWorker-8] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-9] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-9] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-10] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-10] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-11] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-11] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-12] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-12] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-13] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-13] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-14] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-14] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-15] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-15] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-16] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-16] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-17] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-17] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-18] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-18] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-19] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-19] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-20] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-20] child process calling self.run()
jobs running: 1
jobs running: 2
jobs running: 3
jobs running: 4
[DEBUG/ForkPoolWorker-21] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-21] child process calling self.run()
jobs running: 5
jobs running: 6
jobs running: 7
[DEBUG/ForkPoolWorker-2] INCREF '7f0842da56a0'
jobs running: 8
written to file
jobs running: 9
jobs running: 10
[DEBUG/ForkPoolWorker-2] thread 'MainThread' does not own a connection
[DEBUG/ForkPoolWorker-2] making connection to manager
jobs running: 11
jobs running: 12
jobs running: 13
jobs running: 14
jobs running: 15
[DEBUG/SyncManager-1] starting server thread to service 'ForkPoolWorker-2'
jobs running: 16
jobs running: 17
jobs running: 18
jobs running: 19
[DEBUG/ForkPoolWorker-4] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-3] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-5] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-6] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-7] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-8] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-10] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-9] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-11] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-13] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-14] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-12] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-15] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-16] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-18] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-17] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-20] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-19] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-21] INCREF '7f0842da56a0'
Tiago Martins Peres
  • 14,289
  • 18
  • 86
  • 145
CodeNoob
  • 1,988
  • 1
  • 11
  • 33
  • 1
    Could you provide details about the versions of Linux, Python, MP package and hardware/processor? – yacc Sep 05 '17 at 12:10
  • 1
    I added the info you asked for (see edit) @yacc I wasn't able to figure out how to get the MP package version. I hope you can find the problem – CodeNoob Sep 05 '17 at 13:45
  • 1
    multiprocessing is part of core library so it's the same version as the rest. For me (Python 3.4.3) the code works alright (only thing I changed was removing csvreader and reading normal file instead). Did you try to reproduce it elsewhere? – MacHala Sep 05 '17 at 17:11
  • 1
    @CodeNoob Ok, looks like mp had its version info removed since p3. Could you then add logging output? `import logging` and under main `logger = mp.log_to_stderr()` and `logger.setLevel(mp.SUBDEBUG)`. Run it with the fork context. – yacc Sep 06 '17 at 00:03
  • 1
    This will only print something at the start of the program but not at the point were it gets "stuck" (see my edit) @yacc – CodeNoob Sep 06 '17 at 10:07
  • 1
    @CodeNoob There's lots of debug info missing, things like `[INFO/SyncManager-1] child process calling self.run()` etc. Should have been printed to standard error. Any chance to get this? – yacc Sep 06 '17 at 10:12
  • 1
    I created less processes so now the full log is visible (see edit) @yacc – CodeNoob Sep 06 '17 at 10:28
  • 1
    Good. The difference to my log is that you got only one `[DEBUG/ForkPoolWorker-2] thread 'MainThread' does not own a connection` `[DEBUG/ForkPoolWorker-2] making connection to manager`. I see this for *all* workers, and afterwards lots of progress/termination logging. Looks like the manager stopped servicing and blocks the pool, or did you miss some logging? I got Python 3.5.3 on Ubuntu 17.04, maybe your issue is very version specific so I cannot help you further. – yacc Sep 06 '17 at 10:51
  • 2
    Thankyou at least my problem is more specific now by knowing the difference with your log @yacc – CodeNoob Sep 06 '17 at 10:52
  • 1
    I've upgraded to 3.6.1 and it still works as expected. Pool size is 3 though. Could you retry it with `mp.Pool(3)`? – yacc Sep 11 '17 at 14:30
  • 1
    I have tried that but it will result in the same problem :( @yacc Has something to do with the linux installation then I guess – CodeNoob Sep 12 '17 at 14:57
  • 2
    It's a bit embarassing but at least you have a work-around with spawning. I suggest to upgrade to 3.6.2, and if it still occurs then open a ticket at Python. This could be important to fix for future releases. – yacc Sep 12 '17 at 15:42
  • 1
    I didn't now that it was disabled? Based on what did you see that?@jxh – CodeNoob Sep 22 '17 at 07:47
  • 1
    `Thread(s) per core: 1`, with hyper-threading, I expect to see 2 here. – jxh Sep 22 '17 at 07:49
  • 1
    But that doesn't influence mutliprocessing capacity right? @jxh – CodeNoob Sep 22 '17 at 07:51
  • 1
    It means something is lying to your OS. Is this actually a VM? Ah, I see that it is. – jxh Sep 22 '17 at 07:53
  • 1
    Given your performance problem is on the VM (note: `Hypervisor vendor: VMware`), I suspect there is a locking primitive in the hypervisor's hardware abstraction layer that is preventing you from gaining maximal performance. Please see if you have the same problem when running on bare metal. – jxh Sep 22 '17 at 07:59
  • 1
    No I just connect to an server @jxh – CodeNoob Sep 22 '17 at 07:59
  • 1
    Oow wait maybe you are right I'm working on a VM from which I connect to the server @jxh – CodeNoob Sep 22 '17 at 08:01
  • Can please explain what that "Hypervisor vendor: VMware" means and how it influences the performance (You are literally the first one who actually found something in the logs that could be the cause) – CodeNoob Sep 22 '17 at 08:03
  • 3
    It means that your "machine" is actually virtual, and not bare metal. This means there is an hardware abstraction layer that is pretending to be a machine to Linux, while it is actually a process running over a host OS. `VMware` is a software company that creates hypervisors, which is the software that runs on a host OS to provide VMs their virtual hardware. – jxh Sep 22 '17 at 08:07
  • 1
    Thankyou learning something today ;) But it's still not clear for me how this affects the mp, because when using mp I only use 1 core per cpu right? @jxh – CodeNoob Sep 22 '17 at 08:26
  • 2
    Since `fork` has to share resources from the parent, Linux likely imposes a lock. On real hardware, this lock would be cheap, but on a VM, it is unclear how cheap the virtual version of that lock would be. – jxh Sep 22 '17 at 09:25
  • 1
    Aaah that explains why "spwan" is working fine because the sharing isn't needed then. Well you are a genius that you figured this out ;) Thankyou! @jxh – CodeNoob Sep 22 '17 at 09:43
  • 1
    One last question hahah would this also influence multithreading? @jxh – CodeNoob Sep 22 '17 at 09:47
  • 1
    Multithreading actually uses true sharing of all resources, while forked processes will perform a copy on write. – jxh Sep 22 '17 at 10:30
  • I noticed you're not closing the file you pass to csv.DictReader. – Mikhail Berlinkov May 21 '18 at 13:04
  • Maybe this is realted to Hyper-Threadin bug fund in Skylake and Kabylake CPUs? https://www.theregister.co.uk/2017/06/25/intel_skylake_kaby_lake_hyperthreading/ – ramazan polat Jul 31 '18 at 07:07
  • Chances are the `fork` method somehow leaves internal state inconsistent and causes `io.open` to deadlock, rendering `listener` useless. Perhaps add print statements just before and after the `f = io.open(...)` line? – minmaxavg Sep 19 '18 at 06:37

1 Answers1

1

As @jxh hinted, the differences between fork and spawn are important. The documentation on multiprocessing indicates in section 17.2.1.2 that the difference is: forking preserves the environment and things like stdin/out, whereas spawn just makes a fresh new process. I think you maybe have something in your environment which causes issues for the worker function, likely in the code behind your comments about other processing. Spawning gives you a clean slate and things are running fine under those conditions.

To determine what is going on, I would have each worker print diagnostic messages, probably written to a file unique to each worker. open/close that file each time you want to write a message, so the contents gets updated/flushed.

Fork should not be faster than spawn, because fork needs to copy the environment information to the new process. In any case, this is only the startup cost which is minimal, I assume, because the worker needs to do some computationally or I/O bound work that you want to parallelize.

Kevin Buchs
  • 2,520
  • 4
  • 36
  • 55