1

I am reading an 800,000 row table into a dataframe. I am then looping over every column and every row in the column to gather stats such as max length, min length, max value, distinct values etc.

I have access to 32 core compute using SLURM, so thought I would use pool.map_async to process each column in the dataframe in separate processes.

It is much slower than just using a for loop.

I tried scaling number of CPUs down to 8, 4 etc. to see if it was process startup causing it.

I suspect it is the serialisation of an 800,000 row panda series causing it?

import cx_Oracle
import csv
import os
import glob
import datetime
import multiprocessing as mp
import get_column_stats as gs
import pandas as pd
import pandas.io.sql as psql


def get_data():
    print("Starting Job: " + str(datetime.datetime.now()))

    # Step 1: Init multiprocessing.Pool()   
    pool = mp.Pool(mp.cpu_count())
    print("CPU Count: " + str(mp.cpu_count()))

    dsn_tns = cx_Oracle.makedsn('myserver.net', '1521', service_name='myservice')
    con = cx_Oracle.connect(user='ARIEL', password='zzzzz', dsn=dsn_tns)


    stats_results = [["OWNER","TABLE","COLUMN_NAME","RECORD_COUNT","DISTINCT_VALUES","MIN_LENGTH","MAX_LENGTH","MIN_VAL","MAX_VAL"]]

    sql = "SELECT * FROM ARIEL.DIM_REGISTRATION_SET"

    cur = con.cursor()
    print("Start Executing SQL: " + str(datetime.datetime.now()))

    df = psql.read_sql(sql, con);

    print("End SQL Execution: " + str(datetime.datetime.now()))

    col_names = df.columns.values.tolist()
    col_index = 0


    print("Start In-Memory Iteration of Dataset: " + str(datetime.datetime.now()))
    # we go through every field

    # start at column 0
    col_index = 0

    # iterate through each column, to gather stats from each column using parallelisation
    proc_results = pool.map_async(gs.get_column_stats, df.iteritems()).get()




    # Step 3: Don't forget to close
    pool.close() 
    pool.join()


    for result in proc_results:
        stats_results.append(result)


    print("End In-Memory Iteration of Dataset: " + str(datetime.datetime.now()))
    # end filename for
    cur.close()           

    outfile = open('C:\jupyter\Experiment\stats_dim_registration_set.csv','w')
    writer=csv.writer(outfile,quoting=csv.QUOTE_ALL, lineterminator='\n')
    writer.writerows(stats_results)
    outfile.close()
    print("Ending Job: " + str(datetime.datetime.now()))





get_data()

Code being called async:

import os
import sys

def strip_crlf(value):
    return value.replace('\n', ' ').replace('\r', '')

def get_column_stats(args):
    # args is a tuple, the first value is the column name of the panda series, the second value is the panda data series

    col_name, rs = args
    sys.stdout = open("col_" + col_name + ".out", "a")

    print("Starting Iteration of Column: " + col_name)
    max_length = 0 
    min_length = 100000  # abitrarily large number!!

    max_value = ""
    min_value = "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"  # abitrarily large number!!

    distinct_value_count = 0

    has_values = False # does the column have any non-null values
    has_null_values = False

    row_count = 0

    # create a dictionary into which we can add the individual items present in each row of data
    # a dictionary will not let us add the same value more than once, so we can simply count the 
    # dictionary values at the end
    distinct_values = {}

    row_index = 0



    # go through every row, for the current column being processed to gather the stats
    for row_value in rs.values:
        row_count += 1


        if row_value is None:
            value_length = 0
        else:
            value_length = len(str(row_value))


        if value_length > max_length:
            max_length = value_length

        if value_length < min_length:
            if value_length > 0:
                min_length = value_length

        if row_value is not None:
            if str(row_value) > max_value:
                max_value = str(row_value)
            if str(row_value) < min_value:
                min_value = str(row_value)

        # capture distinct values
        if row_value is None:
            row_value = "Null"
            has_null_values = True
        else:
            has_values = True
            distinct_values[row_value] = 1

        row_index += 1
        # end row for

    distinct_value_count = len(distinct_values)

    if has_values == False:
        distinct_value_count = None
        min_length = None
        max_length = None
        min_value = None
        max_value = None
    elif has_null_values == True and distinct_value_count > 0:
        distinct_value_count -= 1

    if min_length == 0 and max_length > 0 and has_values == True:
        min_length = max_length

    print("Ending Iteration of Column: " + col_name)


    return ["ARIEL","DIM_REGISTRATION_SET", col_name,row_count, distinct_value_count, min_length, max_length, 
                            strip_crlf(str(min_value)), strip_crlf(str(max_value))]
smackenzie
  • 2,880
  • 7
  • 46
  • 99
  • 1
    Why not write a bit of SQL to get the statistics you're computing? It will be a million times faster to have the database engine do it, rather than having your script download all the data. – John Zwinck Jun 16 '19 at 00:52
  • @AndréLaszlo: That question is rather different, and the answers to it will not be useful here. – John Zwinck Jun 16 '19 at 00:55
  • @JohnZwinck Ok! Didn't mean to vote close actually :D Was just looking at potential answers using Managers, but it seems like a dead end. – André Laszlo Jun 16 '19 at 00:58
  • Deleted my close vote, here's the thing it referenced: https://stackoverflow.com/questions/22487296/multiprocessing-in-python-sharing-large-object-e-g-pandas-dataframe-between – André Laszlo Jun 16 '19 at 00:58
  • I need to gather stats for each column, and ultimately from a number of databases for comparison @JohnZwinck – smackenzie Jun 16 '19 at 01:07
  • @smackenzie: OK. Of the statistics you need to gather, which ones do you think cannot be done using SQL? – John Zwinck Jun 16 '19 at 01:08
  • I am fully aware of how to write aggregates in SQL, we are exploring using python for regression testing. This is just POC work @JohnZwinck – smackenzie Jun 16 '19 at 01:12

1 Answers1

0

You can make it faster by having each process query one column, rather than querying all the columns at the start which requires copying them around to all the child processes.

One idea is just query the column names in the central process, then send only the names of the columns to the child processes. Then SELECT columnX instead of SELECT *.

John Zwinck
  • 239,568
  • 38
  • 324
  • 436