0

I am new to multithreading in python so am not sure how to set this up. I am trying to produce a large output dataframe populated with calculations based on another input dataframe. The output dataframe is like an adjacency matrix of the columns of the input dataframe.

The following non-multithreaded version works perfectly:

import numpy as np
import pandas as pd
from scipy.stats import chi2_contingency
import json
import os
import time

def build_adjacency_matrix(DATA_MATRIX, OUT):

    # READS DATA: data must be a csv with a header and an index column
    my_data = pd.read_csv(DATA_MATRIX, index_col=0)

    # INITIALIZE EMPTY DF WITH COLSNAMES FROM INPUT AS COLUMNS AND INDEX (rownames)
    AM = pd.DataFrame(columns=my_data.columns, index = my_data.columns)

    y=0
    w=2
    for c1 in my_data.columns:
        print (c1)
        y+=1
        if y > w:
            time.sleep(1)   # GIVE THE PROCESSER A REST AFTER EACH 10 COLUMNS
            print(y)        #KEEP TRACK OF HOW MANY COLS HAVE BEEN PROCESSED
            w+=10
        for c2 in my_data.columns:
            if c1==c2: AM.loc[c1,c2]=0; continue
            sample_df = pd.DataFrame(my_data, columns=[c1,c2])
            # KEEP ONLY ROWS WITH 1s and 0s
            sample_df = sample_df[sample_df[c1] != 0.5]
            sample_df = sample_df[sample_df[c2] != 0.5]
            sample_df = sample_df.dropna()
            # CALCULATE ChiX
            # Contingency table.
            contingency = pd.crosstab(sample_df[c1], sample_df[c2])
            # Chi-square test of independence.
            try:
                chi2, p, ddof, expected = chi2_contingency(contingency)
                AM.loc[c1,c2] = p
            except:
                ValueError;
                # ASSIGN AS NOT SIGNIFICANT IF THERE IS A PROBLEM
                AM.loc[c1,c2] = 1

    AM.to_csv(OUT, sep=',')

    return

# FILES
data_matrix='input_test.csv'
out='output_mt_test.csv'

# FUNCTION CALL
build_adjacency_matrix(data_matrix, out)

Here is the top few rows of the input file:

,VAR1,VAR2,VAR3,VAR4,VAR5,VAR6,VAR7,VAR8,VAR9,VAR10,VAR11,VAR12,VAR13,VAR14,VAR15,VAR16,VAR17,VAR18,VAR19
SAMPLE1,1,0,0.5,1,1,0.5,0.5,1,0.5,0.5,0.5,0.5,0,0.5,0,0.5,0,0.5,0.5
SAMPLE2,0.5,0.5,0.5,1,1,0.5,0.5,1,0.5,0.5,0,1,0,0.5,0,0.5,0.5,0.5,0.5
SAMPLE3,0.5,0,0.5,1,1,0.5,0.5,1,0.5,0.5,1,0.5,0.5,0.5,0,1,0,0.5,0.5
SAMPLE4,1,0.5,0.5,1,1,0.5,0.5,0,0.5,0.5,0.5,0.5,0.5,0.5,1,1,0.5,0.5,1

And here is the top few rows of the output file:

    ,VAR1,VAR2,VAR3,VAR4,VAR5,VAR6,VAR7,VAR8,VAR9,VAR10,VAR11,VAR12,VAR13,VAR14,VAR15,VAR16,VAR17,VAR18,VAR19
VAR1,0,0.00326965769624,0.67328997966,0.573642138098,0.573642138098,0.923724918398,0.556975806531,0.665485722686,1.0,0.545971722677,0.125786424639,0.665005542102,0.914326585297,0.843324894877,0.10024407707,0.37367830795,0.894229755473,0.711877649185,0.920167313802
VAR2,0.00326965769624,0,0.67328997966,0.714393037634,0.714393037634,0.829638099719,1.0,0.881545828869,1.0,1.0,0.504985075094,0.665005542102,0.672603817442,0.75946286538,0.365088814029,1.0,0.478520976544,0.698535358303,0.700311372937
VAR3,0.67328997966,0.67328997966,0,1.0,1.0,0.665005542102,1.0,0.672603817442,1.0,1.0,1.0,1.0,0.819476976778,1.0,0.324126587758,1.0,1.0,0.665005542102,0.608407800233

The code works well and produces the expected output for the test file, however the real input file (exactly the same file structure but with 100s rows and 1000s of cols) is considerably larger and takes ~48 hours to run so I need to make it faster.

I tried the following attempt to implement multithreading:

import pandas as pd
from scipy.stats import chi2_contingency
from threading import Thread


def build_adjacency_matrix(DATA_MATRIX, OUT, THREADS):

    # READS DATA: data must be a csv with a header and an index column
    my_data = pd.read_csv(DATA_MATRIX, index_col=0)

    # INITIALIZE EMPTY DF WITH COLSNAMES FROM INPUT AS COLUMNS AND INDEX (rownames)
    AM = pd.DataFrame(columns=my_data.columns, index = my_data.columns)
    print(len(my_data.columns))
    print(len(my_data.index))



    # BUILD THREAD GROUPS
    thread_groups={}
    chunk=int(len(AM.columns)/THREADS)
    i=0; j=chunk
    for t in range(THREADS): thread_groups[t]=list(range(i,j)); i+=chunk; j+=chunk; 
    # DELEGATE REMAINING COLS TO THE LAST THREAD
    if thread_groups[THREADS-1][-1] != len(AM.columns):
        thread_groups[THREADS-1] = thread_groups[THREADS-1] + \
                                   list(range((thread_groups[THREADS-1][-1]),len(AM.columns)))
    print(thread_groups)


    def populate_DF(section):

        for c1 in AM.columns[section]:
            for c2 in AM.columns:
                if c1==c2: AM.loc[c1,c2]=0; continue
                sample_df = pd.DataFrame(my_data, columns=[c1,c2])
                # KEEP ONLY ROWS WITH 1s and 0s
                sample_df = sample_df[sample_df[c1] != 0.5]
                sample_df = sample_df[sample_df[c2] != 0.5]
                sample_df = sample_df.dropna()
                # CALCULATE ChiX
                # Contingency table.
                contingency = pd.crosstab(sample_df[c1], sample_df[c2])
                #Chi-square test of independence.
                try:
                    # POPULATE AM WITH CHI-SQ p-value
                    chi2, p, ddof, expected = chi2_contingency(contingency)
                    AM.loc[c1,c2] = p
                except:
                    # ASSIGN A p-value OF 1.0 IF THERE IS A PROBLEM
                    ValueError;
                    AM.loc[c1,c2] = 1


    for tg in thread_groups:
        t = Thread(target=populate_DF, args=(thread_groups[tg],))
        print(tg)
        print(thread_groups[tg])
        t.start()


    AM.to_csv(OUT, sep=',')

    return


data_matrix='input_test.csv'
out='output_mt_test.csv'

build_adjacency_matrix(data_matrix, out, 4)

I'm not sure if I should be making the output dataframe a global variable? Or how to do it? The aim of the section on 'building thread groups' is to delegate groups of columns from the input file to be delegated to separate threads and each of the outputs added to the final dataframe. I have up to 16 cores available so thought a multithreading solution would help here. The code as it is produces an unexpected, partially complete output:

    ,VAR1,VAR2,VAR3,VAR4,VAR5,VAR6,VAR7,VAR8,VAR9,VAR10,VAR11,VAR12,VAR13,VAR14,VAR15,VAR16,VAR17,VAR18,VAR19
VAR1,0,0.00326965769624,0.67328997966,0.573642138098,0.573642138098,0.923724918398,0.556975806531,0.665485722686,1.0,0.545971722677,0.125786424639,0.665005542102,0.914326585297,0.843324894877,0.10024407707,0.37367830795,0.894229755473,0.711877649185,
VAR2,,,,,,,,,,,,,,,,,,,
VAR3,,,,,,,,,,,,,,,,,,,
VAR4,,,,,,,,,,,,,,,,,,,
VAR5,0.573642138098,0.714393037634,1.0,5.61531250139e-06,0,1.0,1.0,0.859350808026,0.819476976778,0.819476976778,1.0,1.0,0.805020272634,,,,,,
VAR6,,,,,,,,,,,,,,,,,,,
VAR7,,,,,,,,,,,,,,,,,,,
VAR8,,,,,,,,,,,,,,,,,,,
VAR9,1.0,1.0,1.0,0.819476976778,,,,,,,,,,,,,,,
VAR10,,,,,,,,,,,,,,,,,,,
VAR11,,,,,,,,,,,,,,,,,,,
VAR12,,,,,,,,,,,,,,,,,,,
VAR13,0.914326585297,,,,,,,,,,,,,,,,,,
VAR14,,,,,,,,,,,,,,,,,,,
VAR15,,,,,,,,,,,,,,,,,,,
VAR16,,,,,,,,,,,,,,,,,,,
VAR17,,,,,,,,,,,,,,,,,,,
VAR18,,,,,,,,,,,,,,,,,,,
VAR19,,,,,,,,,,,,,,,,,,,

i'm not sure if this is to do with an issue with the multithreads trying to output to the same variable or if this is a problem with how I have spread the workload. I would really appreciate any help with how to fix this, or any other ways to optimize the code? Thanks in advance!

user3062260
  • 1,584
  • 4
  • 25
  • 53
  • Threading may not be your answer; have a look through - https://stackoverflow.com/q/4496680/2823755. And if you are curious - [Efficiently Exploiting Multiple Cores with Python](http://python-notes.curiousefficiency.org/en/latest/python3/multicore_python.html) – wwii Sep 17 '17 at 19:49
  • Thanks for this, I hadn't relized python was such a challenge to get multithreading to work! – user3062260 Sep 20 '17 at 05:50

0 Answers0