I am running two jobs in a same time using parallel processing. Sometimes they are running fine but sometime they are failing in random order. I don't know what is the reason between this. So is there any way to lock one process when 1st job is running and completion of 1st 2nd will start.
Jobs: ['Im_Xref_Prod_bridge', 'RECORDTYPE'] both have execution sequence 1. So they start parallel.
First they are creating view parallel and then run the query. So they are failing in random order. Can you please help me how to solve this? I share with you snip of my code. You can get better understanding of my question.
if you want anything please let me know.
code:
In Parallel_execution() i am calling two function - parallel_view_creation()
and parallel_build_query()
def parallel_Execution():
logging.info("parallel_Execution..................[started]")
par_temp_loc = '/medaff/Temp/'
df = pd.read_csv(par_temp_loc+'metadata_file_imedical.txt', delimiter='|')
#get unique exec seq
logging.info("Getting the unique Execution Sequence Number!")
unique_exec_seq = df['Execution Sequence'].unique().tolist()
unique_exec_seq.sort()
num_unique_seq = len(unique_exec_seq)
logging.info("Total Number of unique sequence Number : %2d" %(num_unique_seq))
list_df = []
df_main4 = pd.DataFrame()
for exec_seq in unique_exec_seq:
seq_num = exec_seq
temp_df = df[df['Execution Sequence'] == exec_seq].copy()
unique_master_job = temp_df['Master Job Name'].unique().tolist()
print(unique_master_job) #['Im_Xref_Prod_bridge', 'RECORDTYPE']
logging.info("%s Master Job Started." %(unique_master_job))
if(len(unique_master_job)>0):
num_processes = len(unique_master_job)
pool = Pool(processes=num_processes)
#Parallel View creation process start...................
result1 = pool.map(partial(parallel_view_creation, exec_seq, temp_df), unique_master_job)
pool.close()
pool.join()
df_main = pd.DataFrame(result1)
print("printing df_main")
print(df_main)
for m_job in df_main.master_job.unique():
temp_df1 = df_main[df_main['master_job'] == m_job]
status = temp_df1.status.unique()[0]
if(status == 0):
unique_master_job.remove(m_job)
pool = Pool(processes=num_processes)
#Parallel build query process start
result2 = pool.map(partial(parallel_build_query, exec_seq, temp_df), unique_master_job)
pool.close()
pool.join()
if(result2):
df_main2 = pd.DataFrame(result2)
#print("printing df_main2")
#print(df_main2)
df_main3 = pd.concat([df_main,df_main2])
#print("printing df_main3")
#print(df_main3)
df_main4 = df_main4.append(df_main3)
print(df_main4)
Now i am sharing the Parallel_view_creation() code:
def parallel_view_creation(exec_seq, temp_df, master_job):
#df3 = pd.DataFrame()
error_dict = OrderedDict()
error_dict['status'] = 0
error_dict['error_msg'] = ''
error_dict['error_func'] = ''
error_dict['start_time'] = datetime.datetime.now().strftime("%m-%d-%Y %H:%M:%S")
#success = 0
#error_msg = ''
#error_func = ''
try:
logging.info("parallel_view_creation.................[started]")
df_data = temp_df[temp_df['Execution Sequence'] == exec_seq]
df_data = df_data[df_data['Master Job Name'] == master_job]
#print(df_data)
pipe_files=['CDP']
view_list = []
metatemp_df_main = df_data[df_data['Master Job Name'] == master_job]
#metatemp_df_main = metatemp_df_main.drop_duplicates('Source File Name Lnd')
print(metatemp_df_main)
for index, row in metatemp_df_main.iterrows():
if pd.isna(row['Source File Name Lnd'])==False:
if row['TableName'] in pipe_files:
print(row['Source File Name Lnd'])
df_read_file = sqlContext.read.format('csv').option("delimiter", '\001').options(header='true',quote='',inferSchema='true').load(row['Source File Name Lnd'])
print("VIEW: "+row['Source File Name Lnd'])
df_read_file.createOrReplaceTempView(row['landingdfname'])
view_list.append(row['landingdfname'])
logging.info("View created for the table" + " " + row['TableName'] + " " + "in" + " " + row['Master Job Name'])
else:
df_read_file = sqlContext.read.format('csv').option("delimiter", '|').options(header='true',quote='',inferSchema='true').load(row['Source File Name Lnd'])
print("CDP_VIEW: "+row['Source File Name Lnd'])
df_read_file.createOrReplaceTempView(row['landingdfname'])
#row['landingdfname'].printSchema()
view_list.append(row['landingdfname'])
logging.info("View created for the table" + " " + row['TableName'] + " " + "in" + " " + row['Master Job Name'])
logging.info(view_list)
error_dict['status'] = 1
#mail_df(master_job,exec_seq,'success')
except Exception as Creationofviews:
print(Creationofviews)
logging.error(Creationofviews)
logging.info("Creation of views has been failed for" + " " + row['TableName'] + " " + "in" + " " + row['Master Job Name'] )
error_dict['status'] = 0
error_dict['error_msg'] = str(Creationofviews)
error_dict['error_func'] = 'parallel_view_creation'
#mail_df(master_job,exec_seq,status,Creationofviews,'parallel_view_creation')
finally:
error_dict['master_job'] = master_job
error_dict['exec_seq'] = exec_seq
error_dict['end_time'] = datetime.datetime.now().strftime("%m-%d-%Y %H:%M:%S")
return error_dict