1

I have a local linux server which contains 4 cores. I am running a pyspark job on it locally which basically reads two tables from database and saves the data into 2 dataframes. Now i am using these 2 dataframes to do some processing and then i am using the resultant processed df to save it into elasticsearch. Below is the code

def save_to_es(df):
  
    
    df.write.format('es').option('es.nodes', 'es_node').option('es.port', some_port_no.).option('es.resource', index_name).option('es.mapping', es_mappings).save()
    
    
                   
    def coreFun():
        
        spark = SparkSession.builder.master("local[1]").appName('test').getOrCreate()
        
        spark.catalog.clearCache()
        spark.sparkContext.setLogLevel("ERROR")
        sc = spark.sparkContext
        
        sqlContext = SQLContext(sc)
        
        select_sql = """(select * from db."master_table")"""
        
        
        df_master = spark.read.format("jdbc").option("url", "jdbcurl").option("dbtable", select_sql).option("user", "username").option("password", "password").option("driver", "database_driver").load()
      
        
        select_sql_child = """(select * from db."child_table")""" 
        
        df_child = spark.read.format("jdbc").option("url", "jdbcurl").option("dbtable", select_sql_cost).option("user", "username").option("password", "password").option("driver", "database_driver").load()
        
        merged_df = merged_python_file.merged_function(df_master,df_child,sqlContext)
        logic1_df = logic1_python_file.logic1_function(df_master,sqlContext)
        logic2_df = logic2_python_file.logic2_function(df_master,sqlContext)
        logic3_df = logic3_python_file.logic3_function(df_master,sqlContext)
        logic4_df = logic4_python_file.logic4_function(df_master,sqlContext)
        logic5_df = logic5_python_file.logic5_function(df_master,sqlContext)
        
        save_to_es(merged_df)
        save_to_es(logic1_df)
        save_to_es(logic2_df)
        save_to_es(logic3_df)
        save_to_es(logic4_df)
        save_to_es(logic5_df)
        
        end_time = int(time.time())
        print(end_time-start_time)
        sc.stop()
        
        
        
    if __name__ == "__main__":
        coreFun()

There are different logic for processing written in separate python files e.g logic1 in logic1_python_file etc. I send my df_master to separate functions and they return resultant processed df back to driver. Now i use this resultant processed df to save into elasticsearch. It works fine but problem is here everything is happening sequentially first merged_df gets processed and while it is getting processed others simply wait even though they are not really dependent on the o/p of merged_df function and then logic_1 gets processed while others wait and it goes on. This is not an ideal system design considering the o/p of one logic is not dependent on other.

I am sure asynchronous processing can help me here but i am not sure how to implement it here in my usecase. I know i may have to use some kind of queue(jms,kafka etc) to accomplish this but i dont have a complete picture.

Please let me know how can i utilize asynchronous processing here. Any other inputs which can help in improving the performance of job is welcome.

Gaurav Gupta
  • 159
  • 1
  • 17

1 Answers1

0

If during the processing of one single step like (merged_python_file.merged_function), only one core of the CPU is getting heavily utilized and others are nearly idle, multiprocessing can speed up. It can be achieved by using multiprocessing module of python. For more details can check answer on How to do parallel programming in Python?

Bipul Ranjan
  • 11
  • 1
  • 4