26

I am trying to create a simple sql query on S3 events using Spark. I am loading ~30GB of JSON files as following:

val d2 = spark.read.json("s3n://myData/2017/02/01/1234");
d2.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK);
d2.registerTempTable("d2");

Then I am trying to write to file the result of my query:

val users_count = sql("select count(distinct data.user_id) from d2");
users_count.write.format("com.databricks.spark.csv").option("header", "true").save("s3n://myfolder/UsersCount.csv");

But Spark is throwing the following exception:

java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1287)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:439)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:672)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Note that the same query works for smaller amounts of data. What's the problem here?

eexxoo
  • 390
  • 1
  • 4
  • 13
  • most likely issue is with partition size exceeding limits, try `.repartition(100)` etc, this should solve it – elcomendante Feb 15 '17 at 11:22
  • after reading data try repartition `val d2 = spark.read.json("s3n://myData/2017/02/01/1234").repartition(1000)` Reference https://issues.apache.org/jira/browse/SPARK-1476 – undefined_variable Feb 15 '17 at 11:22
  • As a side note, you may want to look into using the newer `s3a` instead of `s3n`; see e.g. http://stackoverflow.com/questions/33356041/technically-what-is-the-difference-between-s3n-s3a-and-s3 – sgvd Feb 15 '17 at 13:59
  • Thanks for the answers. The query worked for 30GB. Now I am trying to run a query on ±200GB of data and I see this: `Failed to send RPC 6395111411946395180 to /x.x.x.x:yyyy: java.nio.channels.ClosedChannelException` And also ` Attempted to get executor loss reason for executor id 165 at RPC address x.x.x.x:yyyyyy, but got no response. Marking as slave lost.` Any ideas? I'm loading data in 100 repartitions. – eexxoo Feb 15 '17 at 20:47

2 Answers2

61

No Spark shuffle block can be larger than 2GB (Integer.MAX_VALUE bytes) so you need more / smaller partitions.

You should adjust spark.default.parallelism and spark.sql.shuffle.partitions (default 200) such that the number of partitions can accommodate your data without reaching the 2GB limit (you could try aiming for 256MB / partition so for 200GB you get 800 partitions). Thousands of partitions is very common so don't be afraid to repartition to 1000 as suggested.

FYI, you may check the number of partitions for an RDD with something like rdd.getNumPartitions (i.e. d2.rdd.getNumPartitions)

There's a story to track the effort of addressing the various 2GB limits (been open for a while now): https://issues.apache.org/jira/browse/SPARK-6235

See http://www.slideshare.net/cloudera/top-5-mistakes-to-avoid-when-writing-apache-spark-applications/25 for more info on this error.

Traian
  • 1,474
  • 13
  • 11
  • 3
    Thanks for the explanation! Also have a look at https://stackoverflow.com/questions/45704156/what-is-the-difference-between-spark-sql-shuffle-partitions-and-spark-default-pa to edit the number of default partitions. – Raphvanns Jan 30 '18 at 04:53
0

When I use the Spark core processing 200G of data, set up --conf spark.default.parallelism = 2000 and .repartition(100), but the error will appear, finally, I use the following Settings to solve:

val conf = new SparkConf()
         .setAppName(appName)
         .set("spark.rdd.compress", "true")

Description of spark.rdd.compress

I hope it helps you

kai.tian
  • 31
  • 1
  • 5