1

I am trying to use the multiprocessing module to gather statistics from an Oracle table read into memory. I need to iterate over each row, one column at a time to gather data stats (min length, max length) etc.

I thought once the table is in memory, I could process the stats for each column in parallel.

However, I am getting the error:

Traceback (most recent call last):
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\process.py", line 297, in _bootstrap
    self.run()
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py", line 110, in worker
    task = get()
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\queues.py", line 354, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'get_column_stats' on <module '__main__' (built-in)>

The code is below:

import cx_Oracle
import csv;
import os;
import glob;
import datetime;
import multiprocessing as mp;

def strip_crlf(value):
    return value.replace('\n', ' ').replace('\r', '')



def get_column_stats(rs, col_name, col_names):
    print("Starting Iteration of Column: " + col_name);
    max_length = 0; 
    min_length = 100000;  # abitrarily large number!!

    max_value = "";
    min_value = "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz";  # abitrarily large number!!

    distinct_value_count = 0;

    has_values = False; # does the column have any non-null values
    has_null_values = False;

    row_count = 0;

    # create a dictionary into which we can add the individual items present in each row of data
    # a dictionary will not let us add the same value more than once, so we can simply count the 
    # dictionary values at the end
    distinct_values = {};

    row_index = 0;

    col_index = col_names.index(col_name);

    # go through every row, for the current column being processed to gather the stats
    for row in rs:  
        row_count += 1;

        row_value = row[col_index];    

        if row_value is None:
            value_length = 0;
        else:
            value_length = len(str(row_value));


        if value_length > max_length:
            max_length = value_length;

        if value_length < min_length:
            if value_length > 0:
                min_length = value_length;

        if row_value is not None:
            if str(row_value) > max_value:
                max_value = str(row_value);
            if str(row_value) < min_value:
                min_value = str(row_value);

        # capture distinct values
        if row_value is None:
            row_value = "Null";
            has_null_values = True;
        else:
            has_values = True;
            distinct_values[row_value] = 1;

        row_index += 1;
        # end row for

    distinct_value_count = len(distinct_values);

    if has_values == False:
        distinct_value_count = None;
        min_length = None;
        max_length = None;
        min_value = None;
        max_value = None;
    elif has_null_values == True and distinct_value_count > 0:
        distinct_value_count -= 1;

    if min_length == 0 and max_length > 0 and has_values == True:
        min_length = max_length;


    return ["ARIEL","DIM_REGISTRATION_SET", col_name,row_count, distinct_value_count, min_length, max_length, 
                            strip_crlf(str(min_value)), strip_crlf(str(max_value))];

    print("Ending Iteration of Column: " + col_name);  


def get_data():
    print("Starting Job: " + str(datetime.datetime.now()));

    # Step 1: Init multiprocessing.Pool()
    pool = mp.Pool(mp.cpu_count());

    dsn_tns = cx_Oracle.makedsn('myserver.net', '1521', service_name='myservice');
    con = cx_Oracle.connect(user='ARIEL', password='password', dsn=dsn_tns);


    stats_results = [["OWNER","TABLE","COLUMN_NAME","RECORD_COUNT","DISTINCT_VALUES","MIN_LENGTH","MAX_LENGTH","MIN_VAL","MAX_VAL"]];

    sql = "SELECT * FROM ARIEL.DIM_REGISTRATION_SET WHERE DATA_STATE = 'C' AND ROWNUM <= 100"

    cur = con.cursor();
    print("Start Executing SQL: " + str(datetime.datetime.now()));
    cur.execute(sql);
    print("End SQL Execution: " + str(datetime.datetime.now()));
    print("Start SQL Fetch: " + str(datetime.datetime.now()));
    rs = cur.fetchall();
    print("End SQL Fetch: " + str(datetime.datetime.now()));


    col_names = [];
    col_index = 0;

    print("Start In-Memory Iteration of Dataset: " + str(datetime.datetime.now()));
      # we go through every field
    for field in cur.description:   
        col_names.append(field[0]);       

    # start at column 0
    col_index = 0;

    # iterate through each column, to gather stats from each column using parallelisation
    stats_results = [pool.apply(get_column_stats, args=(rs, col_name, col_names)) for col_name in col_names];

    # Step 3: Don't forget to close
    pool.close() 


    print("End In-Memory Iteration of Dataset: " + str(datetime.datetime.now()));
    # end filename for
    cur.close();           

    outfile = open('C:\jupyter\Experiment\stats_dim_registration_set.csv','w');
    writer=csv.writer(outfile,quoting=csv.QUOTE_ALL, lineterminator='\n');
    writer.writerows(stats_results);
    outfile.close()
    print("Ending Job: " + str(datetime.datetime.now()));

in the get_data function the important line is:

# iterate through each column, to gather stats from each column using parallelisation
    stats_results = [pool.apply(get_column_stats, args=(rs, col_name, col_names)) for col_name in col_names];

Which should call the get_column_stats function in a parallel synchronous mode, but I seem to be getting a Can't Get Attribute 'get_column_stats' error.

Any advice appreciated.

smackenzie
  • 2,880
  • 7
  • 46
  • 99
  • Have a look at https://stackoverflow.com/questions/41385708/multiprocessing-example-giving-attributeerror –  Jun 14 '19 at 10:09
  • Thanks, now I just get: TypeError: can't pickle module objects – smackenzie Jun 14 '19 at 10:21
  • Not all python objects can be pickled. I guess you trying to send module object to another process instead of function/data. Check variable names in the code. –  Jun 14 '19 at 10:43
  • My bad - I was calling the module, not the function in the module – smackenzie Jun 14 '19 at 14:42

0 Answers0