3

I have a PySpark data frame and for each (batch of) record(s), I want to call an API. So basically say I have 100000k records, I want to batch up items into groups of say 1000 and call an API. How can I do this with PySpark? Reason for the batching is because the API probably will not accept a huge chunk of data from a Big Data system.

I first thought of LIMIT but that wont be "deterministic". Furthermore it seems like it would be inefficient?

Jiew Meng
  • 84,767
  • 185
  • 495
  • 805

2 Answers2

3
df.foreachPartition { ele =>
   ele.grouped(1000).foreach { chunk =>
   postToServer(chunk)
}

Code is in scala, you can check same in python. It will create batches of 1000.

Kishore
  • 5,761
  • 5
  • 28
  • 53
  • Just curious, isit better `repartition` to create partitions of desired sizes and just process the batches in a similar way (without the grouped) or is this better? If so why? Can I say repartition is too expensive in general? Also I find that the result is not a dataframe. How should I make it a dataframe? Theres https://stackoverflow.com/a/48111699/292291, but wonder if I can do without explicitly setting the column names in `toDF()`? – Jiew Meng May 04 '19 at 12:24
  • The ` repartition` algorithm does a full shuffle and creates new partitions with data that's distributed evenly. It is expensive, it gives parallelism to your application. – Kishore May 05 '19 at 09:11
  • For some reason, now I get "'itertools.chain' object has no attribute 'grouped'" any idea why? It seems like `itertools.chain` indeed does not have the function grouped. I am using PySpark btw – Jiew Meng May 11 '19 at 13:39
2

Using foreachPartition and then something like this how to split an iterable in constant-size chunks to batch the iterables to groups of 1000 is arguably the most efficient way to do it in terms of Spark resource usage.

def handle_iterator(it):
    # batch the iterable and call API
    pass
df.foreachPartition(handle_iterator)

Note: This would make parallel calls to the API from executors and might not be the way to go in practise if e.g. rate-limiting is an issue.

ollik1
  • 4,460
  • 1
  • 9
  • 20
  • Is there no way to do the grouping in spark? Also if I am doing the chunking in Python, is there any reason of using foreachPartition over just map? – Jiew Meng May 04 '19 at 06:46
  • 1
    Map is lazy and should not contain any side-effects. It also accesses a single row at time so it is not suitable for batching. If you wanted to batch in spark, there is an aggregate function called `collect_list`. However, you'd need to figure out grouping/windowing that produces even 1k batches. For example with the mentioned 10^8 rows, you could group by hash modulo 10^5 which requires first calculating the df size and then almost certainly shuffling data. – ollik1 May 05 '19 at 16:54