0

I have some code that is mapping data to a matrix... Most of this is all setup so that my problem can be easily reproduced but the only part I need to speed up is what's after the comment # the part I want to speed up

import numpy as np
# config
matrix_height = 100
matrix_width  = 200

# fake data
x_data = np.array(range(10000))
y_data = [{i:i for i in range(100)}  for t in range(len(x_data))]

# fake mapping
x_to_index = {x: np.random.randint(matrix_width) for x in x_data }
y_to_index = {}
for y_dict in y_data:
  for y_key, y_val in y_dict.items():
    y_start = np.random.randint(matrix_height-2)
    y_to_index[y_key] = (y_start, y_start+2 )

# data that must be stored
total_matrix = np.zeros([matrix_height, matrix_width]).astype(int)
n_matrix     = np.zeros([matrix_height, matrix_width]).astype(int)
latest_value = np.zeros([matrix_height, matrix_width]).astype(int)

# the part I want to speed up
for x, y_dict in zip(x_data, y_data):
    x_index = x_to_index[x]
    for y_key, y_data in y_dict.items():
        y_slice = slice(*y_to_index[y_key])
        total_matrix[ y_slice, x_index ] += y_data
        latest_value[ y_slice, x_index ]  = y_data
        n_matrix[ y_slice, x_index ]     += 1

Again, I'm only concerned with the part below the comment # the part I want to speed up.

I'm not sure how to speed this up but it seems like it should be possible to use a mapping function that can do this all in parallel...

I'm looking for a marked improvement in that last section. Any ideas?

financial_physician
  • 1,672
  • 1
  • 14
  • 34
  • Is the y_to_index list correct in the fake mapping section? Even though it iterates 10000 times, only the last iteration is saved? Clarification would help deduce the last sections intent. – Frank Aug 05 '21 at 21:32
  • @Frank It doesn't update the numbers beyond the first iteration so you can `break` if you want to. The point is just to have a mapping for every key that's within scope of the matrix height. I'm happy to clarify so let me know if more clarification is needed! – financial_physician Aug 05 '21 at 21:36
  • Is `y_data` always containing dict will all the key from 0 to the max value (ie 100 here)? Is it the case for `x_to_index` and `y_to_index` (ie. respectively 0..10000 and 0..100 here)? – Jérôme Richard Aug 05 '21 at 23:15
  • @JérômeRichard the first index and last index will have at least one mapping to it (by design) – financial_physician Aug 05 '21 at 23:23
  • I think I found a hint, will post the solution when I get it working https://stackoverflow.com/questions/7894791/use-numpy-array-in-shared-memory-for-multiprocessing – financial_physician Aug 05 '21 at 23:56

1 Answers1

1

Tailored to number of cores.

For total_matrix, addition is commutative.

For latest_value, apply the split cols in reverse order.

import numpy as np
import time
import multiprocessing as mp

def func(cols, zz, tm, nm, lv, setOrder):
    for c in cols:
        for t in zz:
            tm[slice(*t[0]), c] += t[1]
            lv[slice(*t[0]), c] = t[1]
            nm[slice(*t[0]), c] += 1
    return [tm, nm, lv, setOrder]

if __name__ == '__main__':
    mp.freeze_support()

    matrix_height = 100
    matrix_width = 200
    total_range = 10000

    x_data = np.array(range(total_range))
    y_data = [{i:i for i in range(matrix_height)} for t in range(len(x_data))]

    x_to_index = {x: np.random.randint(matrix_width) for x in x_data}
    # potential general purpose cols list
    #cols = np.random.randint(0, total_range, (1, total_range))[0]
    cols = [np.int(x_to_index[k]) for k in x_to_index]

    y_to_index = {}
    for y_dict in y_data:
        for y_key, y_val in y_dict.items():
            y_start = np.random.randint(matrix_height-2)
            y_to_index[y_key] = (y_start, y_start+2)

    # potential general purpose rows list
    #rows = [(np.random.randint(matrix_height), np.random.randint(matrix_height)) for x in range(matrix_height)]
    rows = [y_to_index[k] for k in y_to_index]

    # potential general purpose y data
    #y_dat = np.random.randint(0, matrix_height, (1, matrix_height))[0]
    y_dat = [i for i in range(matrix_height)]

    o_total_matrix = np.zeros([matrix_height, matrix_width]).astype(int)
    o_n_matrix     = np.zeros([matrix_height, matrix_width]).astype(int)
    o_latest_value = np.zeros([matrix_height, matrix_width]).astype(int)

    startTime = time.time()
    for x, y_dict in zip(x_data, y_data):
        x_index = x_to_index[x]
        for y_key, y_data in y_dict.items():
            y_slice = slice(*y_to_index[y_key])
            o_total_matrix[ y_slice, x_index ] += y_data
            o_latest_value[ y_slice, x_index ]  = y_data
            o_n_matrix[ y_slice, x_index ]     += 1
    print('original time was {0:5.2f} sec'.format(time.time() - startTime))

    procs = mp.cpu_count()

    i_tm = [np.zeros([matrix_height, matrix_width]).astype(int)] * procs
    i_nm = [np.zeros([matrix_height, matrix_width]).astype(int)] * procs
    i_lv = [np.zeros([matrix_height, matrix_width]).astype(int)] * procs

    zz = list(zip(rows, y_dat))

    procs_split = np.array_split(cols, procs)
    itup = []
    for x in range(procs):
        itup.append(((list(procs_split[x])), zz, i_tm[x], i_nm[x], i_lv[x], x))

    startTime = time.time()
    with mp.Pool(processes=procs) as pool:

        ret = pool.starmap(func, itup)
        i_total_matrix = ret[0][0]
        i_n_matrix = ret[0][1]
        for x in range(1, procs):
            i_total_matrix = np.add(i_total_matrix, ret[x][0])
            i_n_matrix = np.add(i_n_matrix, ret[x][1])

        colOrder = [0] * procs
        for x in range(procs):
            colOrder[x] = (procs-1) - ret[x][3]

        i_latest_value = ret[colOrder[0]][2]
        for x in range(1, procs):
            np.putmask(i_latest_value, i_latest_value == 0, ret[x][2])

    print('improved time was {0:5.2f} sec'.format(time.time() - startTime))
    comparison = i_total_matrix == o_total_matrix
    if not comparison.all():
        print('ERROR TOTAL MATRIX')
    comparison = i_n_matrix == o_n_matrix
    if not comparison.all():
        print('ERROR N MATRIX')
    comparison = i_latest_value == o_latest_value
    if not comparison.all():
        print('ERROR LATEST VALUE')

After a trial run, the results showed approx. twice as fast:

original time was 7.12 sec

improved time was 2.29 sec

Frank
  • 76
  • 1
  • 2