1

I am using Zeppelin to read avro files of size in GBs and have records in billions. I have tried with 2 instances and 7 instances on AWS EMR, but the performance seems equal. With 7 instances it is still taking alot of times. The code is:

val snowball = spark.read.avro(snowBallUrl + folder + prefix + "*.avro")
val prod = spark.read.avro(prodUrl + folder + prefix + "*.avro")

snowball.persist()
prod.persist()

val snowballCount = snowball.count()
val prodCount = prod.count()

val u = snowball.union(prod)  

Output:
snowballCount: Long = 13537690
prodCount: Long = 193885314

And the resources can be seen here:

enter image description here

The spark.executor.cores is set to 1. If i try to change this number, the Zeppelin doesn't work and spark context shutdown. It would be great, if someone can hint a bit to improve the performance.

Edit: I checked how many partitions it created:

snowball.rdd.partitions.size
prod.rdd.partitions.size
u.rdd.partitions.size

res21: Int = 55
res22: Int = 737
res23: Int = 792
Waqar Ahmed
  • 5,005
  • 2
  • 23
  • 45
  • Can you check how many actual files do you have? I have a feeling that this number is above `Integer.MAX_VALUE`. – vvg May 22 '18 at 08:06
  • I had 2 3GB files and roughly 300 250MB files. This error comes up because the partition size exceeds this limit. – Waqar Ahmed May 22 '18 at 08:14
  • Possible duplicate of [SQL query in Spark/scala Size exceeds Integer.MAX\_VALUE](https://stackoverflow.com/questions/42247630/sql-query-in-spark-scala-size-exceeds-integer-max-value) – philantrovert May 22 '18 at 08:18
  • @philantrovert already pointed you to the root of the problem `No Spark shuffle block can be larger than 2GB (Integer.MAX_VALUE bytes) so you need more / smaller partitions.` Have you tried to increase the number of partitions on the RDD "u" using the `repartition` function? – jose.goncabel May 22 '18 at 16:10
  • @jose.goncabel Could you please describe, how can I define the repartition strategy depending on the size? If the size is in GB then repartitioning worth but if size is in MB and small then repartitioning doesnt worth. Moreover, I don't know how to get the size of the dataframe at runtime to do the repartitioning. – Waqar Ahmed May 23 '18 at 07:52
  • why do you use `persist` whereas your data have not been modified ? – Setop May 23 '18 at 08:19
  • @Setop I was just testing it. Because at first I use count. And after that in union, it will recompute the data frame, so i was checking whether persist will improve the performance or not. – Waqar Ahmed May 23 '18 at 08:21
  • @WaqarAhmed ok. Another way to ask : do you really need to count or is union the most important part of the process ? – Setop May 23 '18 at 08:31
  • @Setop Union is the most important task. Actually Im merging 2 different dataframes of around 90GBs and removing duplicates from them. – Waqar Ahmed May 23 '18 at 08:53
  • @WaqarAhmed, removing duplicates is not a process that scales very well. Maybe you can help it by providing a hash function. But not sure how. – Setop May 23 '18 at 09:05
  • @Setop hash function will not work i guess. Because the ids are hashed and it will create a single partition for one id. – Waqar Ahmed May 23 '18 at 09:54
  • First, you need to check the spark UI, how many executors you allocated, and how many tasks for each stage. – zjffdu May 24 '18 at 03:32

0 Answers0