I am sending data from a dataframe to an API that has a limit of 50,000 rows. Say my dataframe has 70,000 rows, how can I split it into separate dataframes, each with a max row count of 50,000? These do not have to be even and the data order does not matter.
-
u could a condition using `df.count` and do if greater than 50k use `randomSplit` function. https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.randomSplit – murtihash May 07 '20 at 00:59
-
2something like `def split(df): if df.count()>50000: df1,df2=df.randomSplit([0.5,0.5],24) return df1,df2 else: return df` – murtihash May 07 '20 at 01:15
-
Josh, you might find your answer [here](https://stackoverflow.com/questions/52958225/split-spark-dataframe-into-two-dataframes-70-and-30-based-on-id-column-by-p) – mango May 07 '20 at 08:07
-
@Josh a better solution would be to leverage `foreachPartition` method on dataframes, in this way you can control the exact number of rows per partition and send the data directly to your API as you previously asked here https://stackoverflow.com/questions/61645936/how-can-i-convert-a-pyspark-dataframe-to-a-csv-without-sending-it-to-a-file. – abiratsis May 07 '20 at 10:20
-
1Another workaround for this can be to use `.limit()` function. You can do something like: let's say your main df with 70k rows is original_df. So you can do like `limited_df = df.limit(50000)` for the very first time to get the 50k rows and for the next rows you can do `original_df.subtract(limited_df)` and you will get the remaining rows. You can even do .limit() for the subtracted df too if needed. – Frosty May 07 '20 at 10:56
-
@frosty This is the best way to go I think as I can repeatedly do this until the dataframe is under 50,000 rows. – Josh May 07 '20 at 16:09
-
You can do it for any number of rows present in dataframe. Let's say at one point if your dataframe has 30000 rows and if you did df.limit(50000), it's not gonna throw any error and will just return the 30k rows present in dataframe. – Frosty May 08 '20 at 07:00
-
@frosty Right, this is a good solution. I was able to use a while loop to batch up some datasets that had over 300k rows into 50k sets perfectly. You should post this as the answer. – Josh May 09 '20 at 01:20
3 Answers
You can achieve the following by using row_number and then splitting for every 50000 rows
#order by any column to populate the row number
window=Window.orderBy('ID')
length=df1.count()
df2=df1.withColumn('row',f.row_number().over(window))
step=50000
for i in range(1,length,step):
df3 = df2.filter((f.col('row')>=i) & (f.col('row')<=i+step-1))
#Here perform your API call as it will contain only 50000 rows at one time

- 5,327
- 2
- 15
- 38
Workaround for this can be to use .limit()
function. You can do something like: let's say your main df with 70k rows is original_df. So you can do like
limited_df = df.limit(50000)
for the very first time to get the 50k rows and for the next rows you can do
original_df.subtract(limited_df)
and you will get the remaining rows. You can even do .limit() for the subtracted df too if needed.
UPDATED: You can do it for any number of rows present in dataframe. Let's say at one point if your dataframe has 30000 rows and if you did df.limit(50000), it's not gonna throw any error and will just return the 30k rows present in dataframe.

- 560
- 2
- 12
Addition on @frosty his answer:
limited_df = df.limit(50000).cache()
rest_df = original_df.subtract(limited_df)
.cache()
is advised for consistency, because without it limited_df
and rest_df
can have overlapping rows. That behaviour is due to PySpark running .limit()
distributed and multiple times (once for limited_df
and once for rest_df
).
p.s. reason for separate answer: I couldn't comment yet.

- 11
- 1