0

I have a large volume of data, and I'm looking to efficiently (ie, using a relatively small Spark cluster) perform COUNT and DISTINCT operations one of the columns.

If I do what seems obvious, ie load the data into a dataframe:

df = spark.read.format("CSV").load("s3://somebucket/loadsofcsvdata/*").toDF()
df.registerView("someview")

and then attempt to run a query:

domains = sqlContext.sql("""SELECT domain, COUNT(id) FROM someview GROUP BY domain""")
domains.take(1000).show()

my cluster just crashes and burns - throwing out of memory exceptions or otherwise hanging/crashing/not completing the operation.

I'm guessing that somewhere along the way there's some sort of join that blows one of the executors' memory?

What's the ideal method for performing an operation like this, when the source data is at massive scale and the target data isn't (the list of domains in the above query is relatively short, and should easily fit in memory)


related info available at this question: What should be the optimal value for spark.sql.shuffle.partitions or how do we increase partitions when using Spark SQL?

Community
  • 1
  • 1
blueberryfields
  • 45,910
  • 28
  • 89
  • 168

1 Answers1

2

I would suggest to tune your executors settings. Especially, setting following parameters correctly can provide dramatic improvement in performance.

spark.executor.instances
spark.executor.memory
spark.yarn.executor.memoryOverhead
spark.executor.cores

In your case, I would also suggest to tune Number of partitions, especially bump up following param from default 200 to higher value, as per requirement.

spark.sql.shuffle.partitions
Ayan Guha
  • 750
  • 3
  • 10