2

I'm doing a program were there are multiple processes working in parallel.

For every process, I wanna collect some information and then, at the end of the program, I want that information to be written into a binary file.

I got everything working fine (the parallel stuff and the written file stuff), but there's a problem with the information I'm collecting...

I'm trying to save the info in a multiprocessing.Array, but the thing is that it seems like every process is creating an array of it's own.

When the processes finish doing what they need to do, I printed the array so that I could see what was going on, and this printed:

['6554,a,0.109375,2;', '6554,c,0.099609375,2;']
['6557,g,0.109375,2;']
['6555,b,0.099609375,2;', '6555,f,0.109375,2;']
['6556,d,0.109375,2;']
[]

Note: the strings are the information I'm collecting from the processes.

What's the problem?
I have a long code, so I'm trying to explain myself as good as I can.

------------------------- UPDATE -----------------------------
The project I'm doing receives a file, or files, and compresses that file into a zip file. When multiple files, there are multiple processes, and each of them, when compressing, collects info from the file it is zipping and saves it into a multiprocessing.Array.
In the end, I call for another function that writes a binary file with the information within that array.

I'm expecting one array ( list ) like this:

['6554,a,0.109375,2;', '6554,c,0.099609375,2;', '6557,g,0.109375,2;', '6555,b,0.099609375,2;', '6555,f,0.109375,2;', '6556,d,0.109375,2;']
user3666197
  • 1
  • 6
  • 50
  • 92

4 Answers4

1

You have to consider that when you create a new process, this child process gets its own, new and different address space. At space meaning the space were it stores all the variables etc.

  Process 1            process 2-child process
  result = []              result = []
                              |
                              |
                              |
                              ↓
                          result = [x1,x2,x3]  --> xi are the values that you want to overwrite.

You have to consider that in this case result is a global variable( I don't have a lor of information about your code but i want that you can understand the main point when you share data and variables). Notice that the child process will copie the global variable in another address memory. So, now we have separate a copy of result variable hence when you run your function (in my case square function) it is only updating the copy in the child process. So, this is the problem. But, how can i solve that? By sharing memory. I know that you know, but is better for me and for you to clarify this point.
Sharing memory

     Process 1      process 2-child process

         |             | 
         |             |
         |             |
         ↓             ↓
     result = [x1,x2,x3]  --> xi are the values that 

Now, i am sure that you know it. But the solution is not only use a multiprocessing.Array('i', 4) --> You have to specify your datatype which in my case is integer and the size e.g 4 You have to consider that the functions of a normal array can not coincide with multiprocessing array's functions. I recomend you that you don't forget the process.join.

     import multiprocessing

     def calc_square(numbers, result):
         for idx, n in enumerate(numbers):
              result[idx] = n*n

     if __name__ == "__main__":
         numbers = [2,3,5]
         result = multiprocessing.Array('i',3)
         p = multiprocessing.Process(target=calc_square, args=(numbers, result))

      p.start()
      p.join()

      print(result)

Good luck!

Notice that if you are using a queue you only should add in the function parametres the q( a random name) and to put inside the function q.put(n*n) adding in the main process a q = multiprocessing.Queue() and p = multiprocessing.Process(target=calc_square, args=(numbers, q)). And if you want to seed what happen only add while q.empty() is not False: print(q.get())

1

This post does not grant a universal one-size-fits-all, but given the above posted pieces of information, this approach does nothing but the requested set of features:

Do just what's needed, not more: ( otherwise you will pay way more than receive )

a ) processing must perform a fileIO output ( the compressed file )
b ) processing must report a result / status to the central code
c ) processing does not exchange any additional information or synchronisation

Let's mock up such processing:

>>> def aFunToProcessGivenSTRING( aString2PROCESS = "<aStringNotPassedOnCALL>" ):
...     time.sleep( 5 ) # emulate doing compression "efforts"
...     #               # ( yes, w/o loading a CPU-core )
...     return ( "[pid:{0:}]->( {1:} )".format( os.getpid(),
...                                             aString2PROCESS
...                                             )
...               )

Let's mock up also a list of "Tasks" for such aFunToProcessGivenSTRING():

 aListOfTaskSTRINGs = [ '6554,a,0.109375,2;',
                        '6554,c,0.099609375,2;',
                        '6557,g,0.109375,2;',
                        '6555,b,0.099609375,2;',
                        '6555,f,0.109375,2;',
                        '6556,q,0.109376,2;',
                        '6558,r,0.109375,2;',
                        '6559,s,0.109376,2;',
                        '6553,t,0.109375,2;',
                        '6553,d,0.109375,2;',
                        'Some more Meat for Dr.Jackson',
                        'The last but not the least...'
                        ]

The most lightweight approach, using an async version of the apply(), a multiprocessing.Pool() builtin, for data-driven parallelism goes as light as this:

import  multiprocessing, os, time
aPool = multiprocessing.Pool( 2 )    # Here, allow 2-Process-streams to take place

print( aPool.map( aFunToProcessGivenSTRING, aListOfTaskSTRINGs ) )

['[pid:12387]->( 6554,a,0.109375,2; )', '[pid:12388]->( 6554,c,0.099609375,2; )', '[pid:12387]->( 6557,g,0.109375,2; )', '[pid:12388]->( 6555,b,0.099609375,2; )', '[pid:12387]->( 6555,f,0.109375,2; )', '[pid:12388]->( 6556,d,0.109375,2; )', ..., '[pid:12387]->( Some more Meat for Dr.Jackson )'
 '[pid:12388]->( The last but not the least... )'
  ]

Check the 2 Process instances via reported "remote"-os.getpid()-#s.

Given no other interactions are intended, this ought serve best for your file-compression project.

How "big" the .Pool()-instance ought get?

Well, this requires a bit deeper insight into resources-mapping. Given your processing interacts with fileIO, there might be some bottlenecks on hardware level ( disk hardware uses a pure-[SERIAL]-arrangement of atomic fileIO chunks, located on physical 4D-locations: PLATE:CYL:HEAD:SEC and as such a disk cannot read from more such 4D-locations "at once" ( sure, controller elevator-optimiser helps a lot, caches do either ), yet there is a performance limit no further [CONCURRENT]-scheduling of tasks will ever overcome ).

Next typical ( and negative ) surprise comes when a multiprocessing.Pool()-mapped task already wants to use some process-mapped parallelism ( and some module functions do ). Spawning a .Pool() instance, covering "just"-enough the available CPU cores is always better, than "over-sized" .Pool(), which then coughs in waiting for tasks being actually CPU-assigned by the O/S scheduling, as there are more processes than free CPU-cores. In case the .Pool()-mapped function spawn their own Process-instances, the "suffocation" gets multiple-times worse.

Similar colliding resource is memory. In case a process-to-run demands some amount of memory, the wise capacity planning ought take due care of this, as no one would enjoy the process-flow to get O/S into almost endless swapping.

So, a careful resources-planning and "just-enough" resources-mapping is a way to achieving the top performance possible. "Suffocated" process-flow is always a bad sign of such performance-motivated engineering practice. Never underestimate a systematic approach to testing and benchmarking on scales, that are close to your use-case - if interested in more details about the actual costs of instantiations, under different python tools, this read may give you directions to follow.

user3666197
  • 1
  • 6
  • 50
  • 92
  • I believe that what you have here is probably my answer , but idk how to implement it on my code – David Leites Dec 10 '17 at 13:32
  • Simply: create an adequately sized `multiprocessing.Pool( processes = min( multiprocessing.cpu_count() - 1, a_just_enough_N ) )` and next `.map_async( yourFun, yourParameter )` and "inject" `yourFun` compression function's `yourParameter` as needed and enjoy the powers of python `multiprocessing` harnessed to process your needs. The `.map_async()` returns a list-of-return-values, as was demonstrated above. – user3666197 Dec 10 '17 at 13:40
  • I couldn't put this into work, but I already found a way. I believe your idea would work, but I'm still new at this and I didn't know how to make it work :( , but thank you anyway. – David Leites Dec 11 '17 at 10:04
  • Do you tell me, that a copy / paste / modified sample of the code above did not work on your platform to reproduce the demonstrated results ? – user3666197 Dec 11 '17 at 10:50
0

Unlike threads, Python processes don't share memory. There are some exceptions though, and you can try and use multiprocessing.Array as a shared state data structure and see if it behaves like you expect.
Read more about multiprocessing here

Evya
  • 2,325
  • 3
  • 11
  • 22
  • I'm using multiprocessing.Array. The array I was talking about is that one. – David Leites Dec 10 '17 at 01:56
  • :| Might've been a good idea to mention it. Anyway, can you replace it with a Manager.Queue object ? I've had more success with a Queue than shared dictionaries or arrays. Is that something that could fit your code ? you can transform it to a list after processing has finished. – Evya Dec 10 '17 at 02:00
  • Sorry. I've tried that, both manager and queue, didn't work. A global array has the same result too. – David Leites Dec 10 '17 at 02:05
  • And btw, you know how to do a shared dictionary? – David Leites Dec 10 '17 at 02:05
  • 1
    With all due respect, sharing ( of any kind and fashion ) is to be avoided, where possible. Lightweight, "just-enough"-equipped design is a direction to go into. Immense instantiation costs are overheads to be ( always ) avoided -- just benchmark the add-on costs and look into overhead-strict Amdahl's Law, how any technically feasible speedup get devastated in cases, where add-on overheads actually paid have consumed more, than any N-CPU-concurrent process-flows can deliver ( also note the atomicity of ZIP-processing, that sets the max( )-part in the re-formulated Amdahl's Law denominator ). – user3666197 Dec 10 '17 at 06:44
0

[RESOLVED]

For everyone that tried to help, thank you so much, you all gave me ideas and I finally did it.

I used multiprocessing.Manager(). I used it like this:

with Manager() as manager:
    info = manager.list()
    proc = []
    info2 = []

    if opcoes[0] == True: 
       (...)

Thank you, and I hope this can help someone with the same problem.