I am having a dataframe. I need to convert each record in to JSON and then call an API with the JSON payload to insert data into postgress. I have 14000 records in the dataframe and to call api and get response back, it is taking 5 hrs. Is there any way to improve the performance. Below is my code snippet.
df_insert = spark.read \
.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("dbtable", "source_table_name") \
.load()
json_insert = df_insert.toJSON().collect()
for row in json_insert:
line = json.loads(row)
headers = {
'Authorization': authorization,
'content-type': "application/json",
'cache-control': "no-cache",
}
response = requests.request("POST", url_insert, data=payload, headers=headers)
print(response.text)
res = response.text
response_result = json.loads(res)
#print(response_result["httpStatus"])
if response_result["message"] == 'success':
print ("INFO : Record inserted successfully")
else:
print ("ERROR : Error in the record")
status_code = response_result["status"]
error_message = response_result["error"]
my_list = [(status_code,error_message,row)]
df = sc.createDataFrame(my_list, ['status', 'error', 'json data'])
df.write.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("dbtable", "error_table") \
.option("header", "true") \
.option("truncate_table", "on") \
.mode("append") \
.save()
Note : I know by doing "json_insert = df_insert.toJSON().collect()" i am loosing the advantage of a dataframe. Is there any better way to accomplish.