Since you are working with an API that can accept only one row item at a time.
You can use foreach
or foreachPartition
. This answer provides a great overview of the differences between them.
You can also repartition the dataframe to control the number of parallel connections established.
from pyspark.sql import functions as f
data = [("James","","Smith","36636","M",3000),
("Michael","Rose","","40288","M",4000),
("Robert","","Williams","42114","M",4000),
("Maria","Anne","Jones","39192","F",4000),
("Jen","Mary","Brown","","F",-1)
]
df = spark.createDataFrame(data, ("firstname", "middlename", "lastname", "id", "gender", "salary", ))
def mimic_send_to_api(iterator):
# mimic intitiate costly session with API
session = print
for item in iterator:
session(f"sent to API: {item.value}")
df.select(f.to_json(f.struct([df[x] for x in df.columns])).alias("value"))\
.foreachPartition(mimic_send_to_api)
Output
sent to API: {"firstname":"James","middlename":"","lastname":"Smith","id":"36636","gender":"M","salary":3000}
sent to API: {"firstname":"Michael","middlename":"Rose","lastname":"","id":"40288","gender":"M","salary":4000}
sent to API: {"firstname":"Robert","middlename":"","lastname":"Williams","id":"42114","gender":"M","salary":4000}
sent to API: {"firstname":"Maria","middlename":"Anne","lastname":"Jones","id":"39192","gender":"F","salary":4000}
sent to API: {"firstname":"Jen","middlename":"Mary","lastname":"Brown","id":"","gender":"F","salary":-1}