1

I have a list of 100,000 python objects that I would like to scatter and gather in mpi4py.

When I try with 8 processors I get:

SystemError: Negative size passed to PyBytes_FromStringAndSize

on the scattering.

When I try with 64 processors I get the same error but on the gather.

When I try making an array of objects out of the list and use Gather and Scatter, I get an error which basically states the dtype of the array cannot be object.

Any way I can get this to work? Or anything else I can use other than MPI?
I'm running this on an 8-node, 64-ppn computer.

Panos Kalatzantonakis
  • 12,525
  • 8
  • 64
  • 85
rgurnani
  • 25
  • 1
  • 5

2 Answers2

2

Using scatter and gather, an example of splitting a numpy array with 100000 items.

import numpy as np
from mpi4py import MPI
from pprint import pprint
comm = MPI.COMM_WORLD

pprint("-" * 78)
pprint(" Running on %d cores" % comm.size)
pprint("-" * 78)

N = 100000
my_N = N // 8

if comm.rank == 0:
    A = np.arange(N, dtype=np.float64)
else:
    A = np.empty(N, dtype=np.float64)

my_A = np.empty(my_N, dtype=np.float64)

# Scatter data 
comm.Scatter([A, MPI.DOUBLE], [my_A, MPI.DOUBLE])

pprint("After Scatter:")
for r in range(comm.size):
    if comm.rank == r:
        print("[%d] %s" % (comm.rank, len(my_A)))
    comm.Barrier()

# Allgather data into A
comm.Allgather([my_A, MPI.DOUBLE], [A, MPI.DOUBLE])

pprint("After Allgather:")
for r in range(comm.size):
    if comm.rank == r:
        print("[%d] %s" % (comm.rank, len(A)))
    comm.Barrier()

Also you could check scatterv and gatherv, more examples here and here.

Panos Kalatzantonakis
  • 12,525
  • 8
  • 64
  • 85
0

I'm not sure this is the answer, and also I'm not sure you are still looking for the answer, but...

So you have 100,000 python objects. If these objects are regular data (data sets), not an instance of some class, pass data as json string. Something like this:

#!/usr/bin/env python

import json
import numpy as np
from mpi4py import MPI


comm = MPI.COMM_WORLD

if comm.rank == 0:
    tasks = [
        json.dumps( { 'a':1,'x':2,'b':3 } ),
        json.dumps( { 'a':3,'x':1,'b':2 } ),
        json.dumps( { 'a':2,'x':3,'b':1 } )
    ]
else:
    tasks = None


# Scatter paramters arrays
unit = comm.scatter(tasks, root=0)

p = json.loads(unit)
print "-"*18
print("-- I'm rank %d in %d size task" % (comm.rank,comm.size) )
print("-- My paramters are: {}".format(p))
print "-"*18

comm.Barrier()

calc = p['a']*p['x']**2+p['b']

# gather results
result = comm.gather(calc, root=0)
# do something with result

if comm.rank == 0:
    print "the result is ", result
else:
    result = None

note, that if you have only 8 nodes/cores, you have to create 8 records in the tasks list and sequentially scatter and gather all 100,000 data sets. If all your data set is in ALLDATA list, the code could look like this:

def calc(a=0,x=0,b=0):
    return a*x**2+b

if comm.rank == 0: collector = []
for xset in zip(*(iter(ALLDATA),) * comm.size):
    task = [ json.dumps(s) for s in xset ]
    comm.Barrier()
    unit = comm.scatter(task if comm.rank == 0 else None, root=0)
    p = json.loads(unit)
    res = json.dumps( calc(**p) )
    totres = comm.gather(res, root=0)
    if comm.rank == 0:
        collector += [ json.loads(x) for x in  totres  ]



if comm.rank == 0:
    print "the result is ", collector
rth
  • 2,946
  • 1
  • 22
  • 27