2

I want to iterate columns of a data frame and, extract id values from it. Then I am creating new columns from those Id values using the "callback" function in "FindIdInColumn" function. The process was taking a lot of time. And the CPU usage was only about 40% according to windows task manager. After that I modified the program to run it in 8 threads. I have core i5 seventh generation CPU. There were 8 function calls in script. Now those 8 function calls are done parallelly in threads. So, I expected CPU usage to shot up. But it is still almost the same. Below is the code that I am using.

import math
import json
from datetime import datetime
import pandas as pd
import numpy as np
import threading

dataFrameTrain = pd.read_csv('../Data/train.csv')


def FindIdInColumn(lock,column,callBack,fieldName):
    for i in range(0,len(column)):
        collectionJson = column[i]
        if type(collectionJson) !=str or collectionJson == '':
            continue
        idIndex = 0
        idIndex = collectionJson.find(fieldName,idIndex,len(collectionJson))
        while idIndex != -1:
            idStr = ''
            j = idIndex+5
            while j<len(collectionJson) and collectionJson[j]!=',':
                if not(collectionJson[j].isspace()) and collectionJson[j].isnumeric():
                    idStr = idStr + collectionJson[j]
                j=j+1
            callBack(i,idStr,lock)
            idIndex = idIndex+2
            idIndex = collectionJson.find(fieldName,idIndex,len(collectionJson))


def CreateOrAddValueToCol(i,colName,lock):
    lock.acquire()
    dataFrameTrain.at[i,colName] = 1
    lock.release()

def AddValueToCollectionColumn(i,value,lock):
    lock.acquire()
    dataFrameTrain.at[i,'belongs_to_collection'] = value
    lock.release()


lock = threading.Lock()
t1 = threading.Thread(target=FindIdInColumn,args=(lock,dataFrameTrain['genres'],CreateOrAddValueToCol,'\'id\''))
t2 = threading.Thread(target=FindIdInColumn,args=(lock,dataFrameTrain['production_companies'],CreateOrAddValueToCol,'\'id\''))
t3 = threading.Thread(target=FindIdInColumn,args=(lock,dataFrameTrain['production_countries'],CreateOrAddValueToCol,'\'name\''))
t4 = threading.Thread(target=FindIdInColumn,args=(lock,dataFrameTrain['Keywords'],CreateOrAddValueToCol,'\'id\''))
t5 = threading.Thread(target=FindIdInColumn,args=(lock,dataFrameTrain['Keywords'],CreateOrAddValueToCol,'\'id\''))
t6 = threading.Thread(target=FindIdInColumn,args=(lock,dataFrameTrain['cast'],CreateOrAddValueToCol,'\'id\''))
t7 = threading.Thread(target=FindIdInColumn,args=(lock,dataFrameTrain['crew'],CreateOrAddValueToCol,'\'id\''))
t8 = threading.Thread(target=FindIdInColumn,args=(lock,dataFrameTrain['belongs_to_collection'],AddValueToCollectionColumn,'\'id\''))
print('thread start')
t1.start()
t2.start()
t3.start()
t4.start()            
t5.start()
t6.start()
t7.start()
t8.start()                   
t1.join()
t2.join()
t3.join()
t4.join()


t5.join()
t6.join()
t7.join()
t8.join()

I tried the same script in I7 8th generation CPU, there CPU usage in 18-21% only. So, what is it that I am doing wrong. Here is the link to train.csv

Edit:

I modified the code to use multiprocess module. But after using that I am now getting only 3-4% CPU usage. Below is the modified code.

import math
import json
from datetime import datetime
import pandas as pd
import numpy as np
from multiprocessing import Pool
import threading


dataFrameTrain = pd.read_csv('../Data/train.csv')

lock = threading.Lock()

def FindIdInColumn(column,callBack,fieldName):
    for i in range(0,len(column)):
        collectionJson = column[i]
        if type(collectionJson) !=str or collectionJson == '':
            continue
        idIndex = 0
        idIndex = collectionJson.find(fieldName,idIndex,len(collectionJson))
        while idIndex != -1:
            idStr = ''
            j = idIndex+5
            while j<len(collectionJson) and collectionJson[j]!=',':
                if not(collectionJson[j].isspace()) and collectionJson[j].isnumeric():
                    idStr = idStr + collectionJson[j]
                j=j+1
            callBack(i,idStr)
            idIndex = idIndex+2
            idIndex = collectionJson.find(fieldName,idIndex,len(collectionJson))


def CreateOrAddValueToCol(i,colName):
    lock.acquire()
    dataFrameTrain.at[i,colName] = 1
    lock.release()

def AddValueToCollectionColumn(i,value):
    lock.acquire()
    dataFrameTrain.at[i,'belongs_to_collection'] = value
    lock.release()




pool = Pool(4)
pool.starmap(FindIdInColumn, [(dataFrameTrain['genres'],CreateOrAddValueToCol,'\'id\''), (dataFrameTrain['production_companies'],CreateOrAddValueToCol,'\'id\''),(dataFrameTrain['production_countries'],CreateOrAddValueToCol,'\'name\''),(dataFrameTrain['Keywords'],CreateOrAddValueToCol,'\'id\''),(dataFrameTrain['Keywords'],CreateOrAddValueToCol,'\'id\''),(dataFrameTrain['crew'],CreateOrAddValueToCol,'\'id\''),(dataFrameTrain['belongs_to_collection'],AddValueToCollectionColumn,'\'id\'')])
V K
  • 1,645
  • 3
  • 26
  • 57

0 Answers0