I am new to multithreading in python so am not sure how to set this up. I am trying to produce a large output dataframe populated with calculations based on another input dataframe. The output dataframe is like an adjacency matrix of the columns of the input dataframe.
The following non-multithreaded version works perfectly:
import numpy as np
import pandas as pd
from scipy.stats import chi2_contingency
import json
import os
import time
def build_adjacency_matrix(DATA_MATRIX, OUT):
# READS DATA: data must be a csv with a header and an index column
my_data = pd.read_csv(DATA_MATRIX, index_col=0)
# INITIALIZE EMPTY DF WITH COLSNAMES FROM INPUT AS COLUMNS AND INDEX (rownames)
AM = pd.DataFrame(columns=my_data.columns, index = my_data.columns)
y=0
w=2
for c1 in my_data.columns:
print (c1)
y+=1
if y > w:
time.sleep(1) # GIVE THE PROCESSER A REST AFTER EACH 10 COLUMNS
print(y) #KEEP TRACK OF HOW MANY COLS HAVE BEEN PROCESSED
w+=10
for c2 in my_data.columns:
if c1==c2: AM.loc[c1,c2]=0; continue
sample_df = pd.DataFrame(my_data, columns=[c1,c2])
# KEEP ONLY ROWS WITH 1s and 0s
sample_df = sample_df[sample_df[c1] != 0.5]
sample_df = sample_df[sample_df[c2] != 0.5]
sample_df = sample_df.dropna()
# CALCULATE ChiX
# Contingency table.
contingency = pd.crosstab(sample_df[c1], sample_df[c2])
# Chi-square test of independence.
try:
chi2, p, ddof, expected = chi2_contingency(contingency)
AM.loc[c1,c2] = p
except:
ValueError;
# ASSIGN AS NOT SIGNIFICANT IF THERE IS A PROBLEM
AM.loc[c1,c2] = 1
AM.to_csv(OUT, sep=',')
return
# FILES
data_matrix='input_test.csv'
out='output_mt_test.csv'
# FUNCTION CALL
build_adjacency_matrix(data_matrix, out)
Here is the top few rows of the input file:
,VAR1,VAR2,VAR3,VAR4,VAR5,VAR6,VAR7,VAR8,VAR9,VAR10,VAR11,VAR12,VAR13,VAR14,VAR15,VAR16,VAR17,VAR18,VAR19
SAMPLE1,1,0,0.5,1,1,0.5,0.5,1,0.5,0.5,0.5,0.5,0,0.5,0,0.5,0,0.5,0.5
SAMPLE2,0.5,0.5,0.5,1,1,0.5,0.5,1,0.5,0.5,0,1,0,0.5,0,0.5,0.5,0.5,0.5
SAMPLE3,0.5,0,0.5,1,1,0.5,0.5,1,0.5,0.5,1,0.5,0.5,0.5,0,1,0,0.5,0.5
SAMPLE4,1,0.5,0.5,1,1,0.5,0.5,0,0.5,0.5,0.5,0.5,0.5,0.5,1,1,0.5,0.5,1
And here is the top few rows of the output file:
,VAR1,VAR2,VAR3,VAR4,VAR5,VAR6,VAR7,VAR8,VAR9,VAR10,VAR11,VAR12,VAR13,VAR14,VAR15,VAR16,VAR17,VAR18,VAR19
VAR1,0,0.00326965769624,0.67328997966,0.573642138098,0.573642138098,0.923724918398,0.556975806531,0.665485722686,1.0,0.545971722677,0.125786424639,0.665005542102,0.914326585297,0.843324894877,0.10024407707,0.37367830795,0.894229755473,0.711877649185,0.920167313802
VAR2,0.00326965769624,0,0.67328997966,0.714393037634,0.714393037634,0.829638099719,1.0,0.881545828869,1.0,1.0,0.504985075094,0.665005542102,0.672603817442,0.75946286538,0.365088814029,1.0,0.478520976544,0.698535358303,0.700311372937
VAR3,0.67328997966,0.67328997966,0,1.0,1.0,0.665005542102,1.0,0.672603817442,1.0,1.0,1.0,1.0,0.819476976778,1.0,0.324126587758,1.0,1.0,0.665005542102,0.608407800233
The code works well and produces the expected output for the test file, however the real input file (exactly the same file structure but with 100s rows and 1000s of cols) is considerably larger and takes ~48 hours to run so I need to make it faster.
I tried the following attempt to implement multithreading:
import pandas as pd
from scipy.stats import chi2_contingency
from threading import Thread
def build_adjacency_matrix(DATA_MATRIX, OUT, THREADS):
# READS DATA: data must be a csv with a header and an index column
my_data = pd.read_csv(DATA_MATRIX, index_col=0)
# INITIALIZE EMPTY DF WITH COLSNAMES FROM INPUT AS COLUMNS AND INDEX (rownames)
AM = pd.DataFrame(columns=my_data.columns, index = my_data.columns)
print(len(my_data.columns))
print(len(my_data.index))
# BUILD THREAD GROUPS
thread_groups={}
chunk=int(len(AM.columns)/THREADS)
i=0; j=chunk
for t in range(THREADS): thread_groups[t]=list(range(i,j)); i+=chunk; j+=chunk;
# DELEGATE REMAINING COLS TO THE LAST THREAD
if thread_groups[THREADS-1][-1] != len(AM.columns):
thread_groups[THREADS-1] = thread_groups[THREADS-1] + \
list(range((thread_groups[THREADS-1][-1]),len(AM.columns)))
print(thread_groups)
def populate_DF(section):
for c1 in AM.columns[section]:
for c2 in AM.columns:
if c1==c2: AM.loc[c1,c2]=0; continue
sample_df = pd.DataFrame(my_data, columns=[c1,c2])
# KEEP ONLY ROWS WITH 1s and 0s
sample_df = sample_df[sample_df[c1] != 0.5]
sample_df = sample_df[sample_df[c2] != 0.5]
sample_df = sample_df.dropna()
# CALCULATE ChiX
# Contingency table.
contingency = pd.crosstab(sample_df[c1], sample_df[c2])
#Chi-square test of independence.
try:
# POPULATE AM WITH CHI-SQ p-value
chi2, p, ddof, expected = chi2_contingency(contingency)
AM.loc[c1,c2] = p
except:
# ASSIGN A p-value OF 1.0 IF THERE IS A PROBLEM
ValueError;
AM.loc[c1,c2] = 1
for tg in thread_groups:
t = Thread(target=populate_DF, args=(thread_groups[tg],))
print(tg)
print(thread_groups[tg])
t.start()
AM.to_csv(OUT, sep=',')
return
data_matrix='input_test.csv'
out='output_mt_test.csv'
build_adjacency_matrix(data_matrix, out, 4)
I'm not sure if I should be making the output dataframe a global variable? Or how to do it? The aim of the section on 'building thread groups' is to delegate groups of columns from the input file to be delegated to separate threads and each of the outputs added to the final dataframe. I have up to 16 cores available so thought a multithreading solution would help here. The code as it is produces an unexpected, partially complete output:
,VAR1,VAR2,VAR3,VAR4,VAR5,VAR6,VAR7,VAR8,VAR9,VAR10,VAR11,VAR12,VAR13,VAR14,VAR15,VAR16,VAR17,VAR18,VAR19
VAR1,0,0.00326965769624,0.67328997966,0.573642138098,0.573642138098,0.923724918398,0.556975806531,0.665485722686,1.0,0.545971722677,0.125786424639,0.665005542102,0.914326585297,0.843324894877,0.10024407707,0.37367830795,0.894229755473,0.711877649185,
VAR2,,,,,,,,,,,,,,,,,,,
VAR3,,,,,,,,,,,,,,,,,,,
VAR4,,,,,,,,,,,,,,,,,,,
VAR5,0.573642138098,0.714393037634,1.0,5.61531250139e-06,0,1.0,1.0,0.859350808026,0.819476976778,0.819476976778,1.0,1.0,0.805020272634,,,,,,
VAR6,,,,,,,,,,,,,,,,,,,
VAR7,,,,,,,,,,,,,,,,,,,
VAR8,,,,,,,,,,,,,,,,,,,
VAR9,1.0,1.0,1.0,0.819476976778,,,,,,,,,,,,,,,
VAR10,,,,,,,,,,,,,,,,,,,
VAR11,,,,,,,,,,,,,,,,,,,
VAR12,,,,,,,,,,,,,,,,,,,
VAR13,0.914326585297,,,,,,,,,,,,,,,,,,
VAR14,,,,,,,,,,,,,,,,,,,
VAR15,,,,,,,,,,,,,,,,,,,
VAR16,,,,,,,,,,,,,,,,,,,
VAR17,,,,,,,,,,,,,,,,,,,
VAR18,,,,,,,,,,,,,,,,,,,
VAR19,,,,,,,,,,,,,,,,,,,
i'm not sure if this is to do with an issue with the multithreads trying to output to the same variable or if this is a problem with how I have spread the workload. I would really appreciate any help with how to fix this, or any other ways to optimize the code? Thanks in advance!