0

We have a requirement where we want to read csv file containing name and a path. The path contains a hive query file on hdfs. Sample structure is

name|script
name1|hdfs://some/dir/query1.sql
name2|hdfs://some/dir/query2.sql

We are running a report where each of these queries mentioned in the csv file needs to be run and then a count needs to be obtained on the each of them and stored as another column.

Previous I have been doing this

input = spark.read.options(header='True', delimiter='|', inferSchema='True').csv("/input/vikas/input.csv")

for row in input.rdd.collect():
  query = spark.read.options(header='False', delimiter='|').csv(row.script)
  df = spark.sql(query)
  count = df.count()

  if count > 0:
    call_alert_function(row.name, count)

where call_alert_function would send an email to a list of people and that would be an indication that for the given name (actually a process).

Basically each sql/hql file here is running a query and if the query reports no results i.e. counts is 0 then that would mean that there are no issues with the process.

Now the issue is that the entries in the input.csv file has grown up in number about 2400 so far which now requires that instead of reading the sql files sequentially in a for loop I need to read and execute the files in parallel and execute the count and report them in a dataframe in the following format

name|script|count

I am not sure how to do that and any pointers will be of great help.

Vikas Saxena
  • 1,073
  • 1
  • 12
  • 21
  • Run spark.sql along with count operation in separate thread and save count results in mutable map. Later after all threads have completed, you can create dataframe from this result map. – Vikas Nov 22 '21 at 08:41
  • could you please post a code segment for me to understand your suggested solution. – Vikas Saxena Nov 22 '21 at 09:08
  • @Vikas, the solution has to be in python – Vikas Saxena Nov 22 '21 at 09:46
  • 1
    possible duplicate: https://stackoverflow.com/questions/30214474/how-to-run-multiple-jobs-in-one-sparkcontext-from-separate-threads-in-pyspark – Vikas Nov 22 '21 at 12:10

1 Answers1

0

df = spark.sql(query)

This part will run/fetch query in parallel under spark job, based on allocated resources for that spark application.

And if you want more parallelism, one option is Futures or with Threads

Look over a nice example with Futures here

Feroz
  • 161
  • 4