2

I'm seeing a few scalability problems with a pyspark script I've written and was wondering if anyone would be able to shed a bit of light.

I have a very similar use case to the one presented here:

Separate multi line record with start and end delimiter

In that I have some multi line data that where there is a logical delimiter between records. E.g. the data looks like:

AA123
BB123
CCXYZ
AA321
BB321
CCZYX
...

Using the example in the previous answer, I've separated this into multiple records using a script like...

spark = SparkSession \
    .builder \
    .appName("TimetableSession") \
    #Played around with setting the available memory at runtime
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

files = os.path.join("data","*_lots_of_gzipped_files.gz")
df=spark.sparkContext.textFile(files).toDF()
df=df.withColumn("id", monotonically_increasing_id())

w=Window.partitionBy().orderBy('id')
df=df.withColumn('AA_indicator', expr("case when entry like 'AA%' then 1 else 0 end"))
#!!!Blowing up with OOM errors here at scale!!!
df=df.withColumn('index', sum('AA_indicator').over(w))
df.show()

+--------------------+---+------------+-----+
|               entry| id|AA_indicator|index|
+--------------------+---+------------+-----+
|               AA123|  1|           1|    1|
|               BB123|  2|           0|    1|
|               CCXYZ|  3|           0|    1|
|               AA321|  4|           1|    2|
|               BB321|  5|           0|    2|
|               CCZYX|  6|           0|    2|
+--------------------+---+------------+-----+

This seems to work ok with data which is a reasonable size (e.g. 50MB of data) when I scale this up to > 1GB of data I'm seeing Java OOM errors. I'm seeing the same problem even when attempting to allocate > 20GB memory to spark.driver/executor.

I believe the problem is that the window for the data partitioned and everything is being collected into memory at once rather than being parralelised? But I might be way off the mark with this.

I'm running this script in a standalone docker container using the jupyter pyspark notebook https://github.com/jupyter/docker-stacks/tree/master/pyspark-notebook.

Any help in terms of a better approach to indexing 'records' or how to better approach the problem would be much appreciated.

robarthur1
  • 457
  • 2
  • 9
  • 19

1 Answers1

3

Probably because you use window without PARTITION BY:

Window.partitionBy().orderBy('id')

In that case Spark doesn't distribute the data and processes all records on a single machine sequentially.

Having a lot of gzipped files makes it even worse, as gzip compression cannot be split. So each file is loaded on a single machine, and can OOM as well.

Overall this is not something that benefits Spark.

Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
  • Thanks for the reply. I believe in this instance there is not a suitable field to partition the data by, which makes me wonder if the approach is the best way to go in the first place? The cumulative sum approach sounds interesting, if I'm understanding it correctly is that possible to do without partitioning the data? – robarthur1 Feb 01 '18 at 15:27
  • Thanks for your replies. When you say 'It seems it depend only on the order, and doesn't require single partition'. Are you able to be more explicit with what you mean? Are you suggesting there's a way to partition that data, or that using a window function is incorrect in the first place for this use case (because there is no sensible partition choice)? – robarthur1 Feb 01 '18 at 15:45
  • The cumulative sum solution can work across partition so it should work fine, as long you want cumsum. Other functions, will require different approach. But if you cannot find a way to redefine the problem, which doesn't depend on sequential read, then Spark won't help you. – Alper t. Turker Feb 01 '18 at 16:39