1

I am having a dataframe. I need to convert each record in to JSON and then call an API with the JSON payload to insert data into postgress. I have 14000 records in the dataframe and to call api and get response back, it is taking 5 hrs. Is there any way to improve the performance. Below is my code snippet.

df_insert = spark.read \
.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("dbtable", "source_table_name") \
.load()

json_insert = df_insert.toJSON().collect()

for row in json_insert:
  line = json.loads(row)
    headers = {
    'Authorization': authorization,
    'content-type': "application/json",
    'cache-control': "no-cache",
    }
  response = requests.request("POST", url_insert, data=payload, headers=headers)
  print(response.text)
  res = response.text
  response_result = json.loads(res)
  #print(response_result["httpStatus"])
  if response_result["message"] == 'success':
      print ("INFO : Record inserted successfully")
  else:
      print ("ERROR : Error in the record")
      status_code = response_result["status"]
      error_message =  response_result["error"]
      my_list = [(status_code,error_message,row)]
      df = sc.createDataFrame(my_list, ['status', 'error', 'json data'])
      df.write.format(SNOWFLAKE_SOURCE_NAME) \
      .options(**sfOptions) \
      .option("dbtable", "error_table") \
      .option("header", "true") \
      .option("truncate_table", "on") \
      .mode("append") \
      .save()

Note : I know by doing "json_insert = df_insert.toJSON().collect()" i am loosing the advantage of a dataframe. Is there any better way to accomplish.

Basant Jain
  • 115
  • 1
  • 12

1 Answers1

1

df_insert.toJSON() returns a RDD that you can flatMap over. 1

source_rdd = df_insert.toJSON()

Perform a flatMap over this RDD and get back an RDD of containing only errors.

headers = {
    'Authorization': authorization,
    'content-type': "application/json",
    'cache-control': "no-cache"
}

def post_service_error(row):
    # requests package may not be available in the node
    # see about adding files to the spark context
    response = requests.request("POST", url_insert, data=row, headers=headers)
    response_result = response.json()
    if response_result['message'] == 'success':
        print ("INFO : Record inserted successfully")
        return []
    print ("ERROR : Error in the record")
    status_code = response_result["status"]
    error_message =  response_result["error"]
    return [(status_code, error_message, row)]

errors_rdd = source_rdd.flatMap(post_service_error)

Convert the errors RDD to a spark DataFrame and persist that to a table.

errors_df = sc.createDataFrame(errors_rdd, ['status', 'error', 'json data'])
(errors_df.write.format(SNOWFLAKE_SOURCE_NAME)
  .options(**sfOptions)
  .option("dbtable", "error_table")
  .option("header", "true")
  .option("truncate_table", "on")
  .mode("append")
  .save())

If you own the API you're doing the request to, I suggest exploring an implementation that accepts a batch of these objects/arrays. This way you can partition the RDD before mapping each partition to a batch request and process the error thereafter.

Oluwafemi Sule
  • 36,144
  • 1
  • 56
  • 81
  • We do not own the API and we have to iterate through each json object and call the api. With the above approach, job is not going inside the function "post_service_error" when are are calling using flatMap. – Basant Jain Mar 02 '19 at 05:46
  • You can flatMap to a list of error tuple if you don't own that API as I've shown in the example. "Job is not going inside the function" — not sure I fully understand what you mean. What error is logged in your master or node console about it? – Oluwafemi Sule Mar 02 '19 at 07:54
  • I started with below code: def post_service_error(row): print ("Inside method ..............") print(row) print('Starting job...........') source_rdd.map(post_service_error) print('Job ended.............') In the output starting job and job ended is printing but not "inside method" – Basant Jain Mar 02 '19 at 08:26
  • You still have invoke `collect` on the Rdd whether you `flatMap` or `map` to distribute the task to the nodes and collect back the task results. `map` and `flatMap` are just ways of declaring the tasks. The way you wrote it in your question, you're running the task on the master and not taking advantage of other nodes that you may have in your cluster to run tasks. – Oluwafemi Sule Mar 02 '19 at 08:41
  • Yes i am not able to take the advantage of other nodes because of collect and as a result the job is taking too much time. So to improve the result i tried the way you provided. Here when i tried to call the function(post_service_error)from flatMap (source_rdd.flatMap(post_service_error)), i am not getting anything. I tried by only giving a print command inside the function, but it is not printing. Sorry for my miscommunication – Basant Jain Mar 02 '19 at 08:49
  • I see. Did you check the Hadoop logs? You may want to check the worker(s) logs to see if anything about the task is logged there. https://stackoverflow.com/a/39911816/5189811 – Oluwafemi Sule Mar 02 '19 at 09:17
  • Sure, you can do that or use multiprocess. – Oluwafemi Sule Mar 02 '19 at 14:55