I'm seeing a few scalability problems with a pyspark script I've written and was wondering if anyone would be able to shed a bit of light.
I have a very similar use case to the one presented here:
Separate multi line record with start and end delimiter
In that I have some multi line data that where there is a logical delimiter between records. E.g. the data looks like:
AA123
BB123
CCXYZ
AA321
BB321
CCZYX
...
Using the example in the previous answer, I've separated this into multiple records using a script like...
spark = SparkSession \
.builder \
.appName("TimetableSession") \
#Played around with setting the available memory at runtime
.config("spark.executor.memory", "8g") \
.config("spark.driver.memory", "8g") \
.getOrCreate()
files = os.path.join("data","*_lots_of_gzipped_files.gz")
df=spark.sparkContext.textFile(files).toDF()
df=df.withColumn("id", monotonically_increasing_id())
w=Window.partitionBy().orderBy('id')
df=df.withColumn('AA_indicator', expr("case when entry like 'AA%' then 1 else 0 end"))
#!!!Blowing up with OOM errors here at scale!!!
df=df.withColumn('index', sum('AA_indicator').over(w))
df.show()
+--------------------+---+------------+-----+
| entry| id|AA_indicator|index|
+--------------------+---+------------+-----+
| AA123| 1| 1| 1|
| BB123| 2| 0| 1|
| CCXYZ| 3| 0| 1|
| AA321| 4| 1| 2|
| BB321| 5| 0| 2|
| CCZYX| 6| 0| 2|
+--------------------+---+------------+-----+
This seems to work ok with data which is a reasonable size (e.g. 50MB of data) when I scale this up to > 1GB of data I'm seeing Java OOM errors. I'm seeing the same problem even when attempting to allocate > 20GB memory to spark.driver/executor.
I believe the problem is that the window for the data partitioned and everything is being collected into memory at once rather than being parralelised? But I might be way off the mark with this.
I'm running this script in a standalone docker container using the jupyter pyspark notebook https://github.com/jupyter/docker-stacks/tree/master/pyspark-notebook.
Any help in terms of a better approach to indexing 'records' or how to better approach the problem would be much appreciated.