0

I'm using the PYSPARK to extract the files and doing basic transformation and loading the data to HIVE. Using for loop to find the extract files and loading it to Hive. We have around 60 tables. Looping each file and loading take time. So using ThreadpoolExecutor to run the threads in parallel. Here is the sample code prototype.

def func(args):
     df=extract(args)
     tbl,status=load(df)
     return tbl,status

def extract(args):
     ###finding file list and loading it to Hive####
     return df

def load(df)
     ###Loading it to Hive###
     status[tbl]='Completed'
     return tbl,status

status={}    
listA =['ABC','BCD','DEF']
prcs=[]
with futures.ThreadPoolExecutor() as executor:
    for i in listA:
        prcs.append(executor.submit(func,args))

for tsk in futures.as_completed(prcs):
    tbl, status = future.result()
    print(tbl)
    print(status)

It works well. I'm redirecting the spark-submit log to a file. But while using threadpoolexecutor, logs are clumsy, cant debug anything. Any better way to group the logs based on thread. Here thread denotes the each table. I'm new to Python. Kindly help.

Raja
  • 507
  • 1
  • 6
  • 24

1 Answers1

0

As described here.

Spark uses log4j for logging. You can configure it by adding a log4j.properties file in the conf directory. One way to start is to copy the existing log4j.properties.template located there.

So you can configure via log4j.properties or programmatically as described in How to configure the log level of a specific logger using log4j in pyspark?. Post is about log level, but similar concept to configure the appender.

For what to put in logs to make them more meaningful, you need some correlation-id to correlate the logs. Not sure if thread id/name is sufficient? If not then see the note about MDC. You can add your own custom MDC (See: Spark application and logging MDC (Mapped Diagnostic Context) )

By default, Spark adds 1 record to the MDC (Mapped Diagnostic Context): mdc.taskName, which shows something like task 1.0 in stage 0.0. You can add %X{mdc.taskName} to your patternLayout in order to print it in the logs. Moreover, you can use spark.sparkContext.setLocalProperty(s"mdc.$name", "value") to add user specific data into MDC. The key in MDC will be the string of “mdc.$name”.

If taskName isn't sufficient then you can create your own correlation id and add to MDC (setLocalProperty()) and use it in patternLayout.

Kashyap
  • 15,354
  • 13
  • 64
  • 103