-1

I have a problem with writing row by row to output from DataFrame. I write my own solution, but it's too slow, so maybe there is a builtin solution for this problem.

The goal is to write dataframe's rows separately to the output. My code:

window = Window.partitionBy(col1).orderBy(col2)
df = df.withColumn("nb", f.row_number().over(window))

for i in range(df.count()):
   (df
    .where(f"nb = {i+1}")
    .drop("nb")
    .select(f.to_json(f.struct([df[x] for x in df.columns])).alias("value"))
    .write
    .format("console")
    .save())

Is it possible to make this faster? Thanks in advance!

Atlas Bravoos
  • 360
  • 2
  • 12
  • 2
    what is it that you're trying to achieve? – Atlas Bravoos Dec 15 '21 at 14:33
  • It's slow because you're using a loop. If you want an index, that's not the way to do it. Plus, you're dropping the column, so what's the purpose of the nb column or having each one be a separate DF that's printed? – OneCricketeer Dec 15 '21 at 14:40
  • @OneCricketeer yeah, I know it's because of the loop. I need to send rows from dataframe (telemetry data) to other system. Thats is why I need to send them one by one (system requires that). So how I achive that with spark utuilities? – Łukasz Dec 15 '21 at 14:44
  • @Łukasz To confirm, do you needs to send them one at a time or you are sending to an API that accepts only one data item at a time? – Nithish Dec 15 '21 at 16:05
  • @Nithish, yes I need to send only one row at a time, API accetps only one row at a time. – Łukasz Dec 16 '21 at 11:41

1 Answers1

0

Since you are working with an API that can accept only one row item at a time. You can use foreach or foreachPartition. This answer provides a great overview of the differences between them.

You can also repartition the dataframe to control the number of parallel connections established.

from pyspark.sql import functions as f

data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]
 
df = spark.createDataFrame(data, ("firstname", "middlename", "lastname", "id", "gender", "salary", ))


def mimic_send_to_api(iterator):
    # mimic intitiate costly session with API
    session = print
    for item in iterator:
        session(f"sent to API: {item.value}")

df.select(f.to_json(f.struct([df[x] for x in df.columns])).alias("value"))\
  .foreachPartition(mimic_send_to_api)

Output

sent to API: {"firstname":"James","middlename":"","lastname":"Smith","id":"36636","gender":"M","salary":3000}
sent to API: {"firstname":"Michael","middlename":"Rose","lastname":"","id":"40288","gender":"M","salary":4000}
sent to API: {"firstname":"Robert","middlename":"","lastname":"Williams","id":"42114","gender":"M","salary":4000}
sent to API: {"firstname":"Maria","middlename":"Anne","lastname":"Jones","id":"39192","gender":"F","salary":4000}
sent to API: {"firstname":"Jen","middlename":"Mary","lastname":"Brown","id":"","gender":"F","salary":-1}
Nithish
  • 3,062
  • 2
  • 8
  • 16