0

I have files coming in from external system into by DB and for each new file - I am processing it by passing it through 4 functions in sequence. My code is able to process one file at a time.

Currently, I am trying to process files in parallel using Pool. I am not sure if my code is processing in parallel or not because parallel processing is new to me and can't figure out a way to see details in my console like -

file 1 processing with thread 1
file 2 processing with thread 2
file 1 processing complete with thread 1
file 2 processing complete with thread 2
...so on.

Please can any one help me getting such kind of output in console.

My Python code:

import os
import threading
import subprocess
import pyodbc
import time
from multiprocessing.dummy import Pool as ThreadPool

class Workflow:

    def sql_connection(self):
        conn = pyodbc.connect('Driver={SQL Server};'
                              'Server=MSSQLSERVER01;'
                              'Database=TEST;'
                              'Trusted_Connection=yes;')
        print("DB Connected..")
        return conn

    def Function1(self):
        print ("function 1 Started..")


    def Function2(self):
        print ("function 2 Started..")

    def Function3(self):
        print ("function 3 Started..")


    def Function4(self):
        print ("function 4 Started..")

    def ProcessFile(self):
        print (" Processs %s\tWaiting %s seconds" )
        self.Function1()
        self.Function2()
        self.Function3()
        self.Funciton4()
        print (" Process %s\tDONE" )


    def Start(self):

        #Get number of files in REQUESTED STATE.
        connsql = self.sql_connection()
        query = "select count(*) from [TEST].[dbo].[files] where Status ='REQUESTED'"
        files = connsql.cursor().execute(query).fetchone()
        print(str(files[0]) + " files to be processed..")

        # Get filing ids of files in REQUESTED STATE.
        query = "select distinct filing_id from [TEST].[dbo].[files] where Status ='REQUESTED'"
        resultset = connsql.cursor().execute(query).fetchall()

        filingIds = []

        for id in resultset:
            filingIds.append(id[0])

        connsql.cursor().commit()
        connsql.close()

        #Create Threads based on number of file ids to be processed.
        pool = ThreadPool(len(filingIds))

        results = pool.map(self.ProcessFile(),filingIds) ## Process the FilingIds in parallel.

        print(results)

        # close the pool and wait for the work to finish
        pool.close()
        pool.join()

A = Workflow()
A.Start()
martineau
  • 119,623
  • 25
  • 170
  • 301
user2961127
  • 963
  • 2
  • 17
  • 29
  • Read [multiprocessing-vs-threading-python](https://stackoverflow.com/questions/3044580/multiprocessing-vs-threading-python) – stovfl Dec 02 '19 at 22:37

2 Answers2

2

I think the issue is simply that you used ThreadPool.map incorrectly. You have to pass self.ProcessFile instead of self.ProcessFile(). Why?

map expects a Callable, but self.ProcessFile() is actually the result of the ProcessFile call, which is None. So map tries to call None, which probably fails silently.

Omni
  • 1,002
  • 6
  • 12
  • Updated to `pool.map(self.ProcessFile,filingIds)` got this exception: `TypeError: ProcessFile() takes 1 positional argument but 2 were given` Any Idea about this? – user2961127 Dec 02 '19 at 22:47
  • 1
    Your function definition of ``ProcessFile`` is missing an argument (e.g. ``def ProcessFile(self, arg):`` ) – Omni Dec 02 '19 at 22:54
  • 1
    Thanks! got the idea of calling my functions in parallel – user2961127 Dec 02 '19 at 23:16
0
from multiprocessing import Process

import time
class WorkFlow:
    def __init__(self):
        pass

    def func1(self, *args):
        print('Func1 : {}'.format(args))
        time.sleep(5)
        print('Func1 Completed!')

    def func2(self, *args):
        print('Func2 : {}'.format(args))
        time.sleep(10)
        print('Func2 Completed!')

if __name__ == '__main__':
    wf = WorkFlow()
    processes = [Process(target=wf.func1), Process(target=wf.func2)]

    for p in processes:
        p.start()

    for p in processes:
        p.join()

The above code will start 3 Python Processes (1 Master Process, 2 Slave Proceesses). 1st Python Process will terminate after 5 secs and the second will terminate after 10 secs.

This can be seen using top command on linux.

PID   COMMAND      %CPU TIME     #TH   #WQ  #PORT MEM    PURG   CMPRS
9918  Python       0.0  00:00.00 1     0    8     2148K  0B     0B
9917  Python       0.0  00:00.00 1     0    8     2144K  0B     0B
9916  Python       0.0  00:00.05 1     0    14    6680K  0B     0B
CHINTAN VADGAMA
  • 634
  • 7
  • 13
  • 1
    `for p in processes: p.start()` Does this initiates the functions 1 and 2 in parallel? – user2961127 Dec 02 '19 at 22:53
  • Yes..This initiates the function1 and 2 in Parallel with 2 different Python Processes If you use `threading` then it will be 1 Python Process but 2 threads but with Python GIL we can't achieve true parallelism. – CHINTAN VADGAMA Dec 02 '19 at 22:54