1

I'm receiving an error and from related questions I've found on here, the answer very well may be increasing the partition number. Where I'm stuck is actually implementing the solution.

Three questions:

(A) Do I need to go back to when I instantiated my spark context and define the partition number there? Example code would be appreciated.

(B) Better yet, can I increase partitions "in place"? Some lines of code took quite a while to execute; I'd like to avoid starting over if possible.

(C) Perhaps I should have asked this first; is increasing the partition number the appropriate solution to the error below?

users = r.select('user_id').distinct().collect()

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-56-0c036c22fe44> in <module>()
----> 1 users = r.select('user_id').distinct().collect()

3 frames
/content/spark-2.4.4-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o191.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 1 times, most recent failure: Lost task 0.0 in stage 12.0 (TID 1030, localhost, executor driver): org.apache.hadoop.fs.FSError: java.io.IOException: Operation canceled
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:163)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
    at java.io.DataInputStream.read(DataInputStream.java:149)
    at org.apache.hadoop.fs.FSInputChecker.readFully(FSInputChecker.java:436)
    at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readChunk(ChecksumFileSystem.java:257)
    at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:276)
    at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:228)
    at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:196)
    at java.io.DataInputStream.read(DataInputStream.java:149)
    at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
    at org.apache.hadoop.util.LineReader.readCustomLine(LineReader.java:304)
    at org.apache.hadoop.util.LineReader.readLine(LineReader.java:172)
    at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
    at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:144)
    at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:184)
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:230)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:153)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
    at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
Caused by: java.io.IOException: Operation canceled
    at java.io.FileInputStream.readBytes(Native Method)
    at java.io.FileInputStream.read(FileInputStream.java:255)
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:156)
    ... 28 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
    at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3263)
    at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3260)
    at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
    at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3260)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.fs.FSError: java.io.IOException: Operation canceled
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:163)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
    at java.io.DataInputStream.read(DataInputStream.java:149)
    at org.apache.hadoop.fs.FSInputChecker.readFully(FSInputChecker.java:436)
    at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readChunk(ChecksumFileSystem.java:257)
    at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:276)
    at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:228)
    at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:196)
    at java.io.DataInputStream.read(DataInputStream.java:149)
    at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
    at org.apache.hadoop.util.LineReader.readCustomLine(LineReader.java:304)
    at org.apache.hadoop.util.LineReader.readLine(LineReader.java:172)
    at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
    at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:144)
    at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:184)
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:230)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:153)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
    at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
Caused by: java.io.IOException: Operation canceled
    at java.io.FileInputStream.readBytes(Native Method)
    at java.io.FileInputStream.read(FileInputStream.java:255)
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:156)
    ... 28 more

And here's what I've tried so far, triggering the same error as above:

r2 = r.repartition(4000)
users = r2.select('user_id').distinct().collect()

  • The error log is truncated, can you show the full log? It may be worth checking the log at the executor level as well if the exception occurs in Python. – cylim Nov 21 '19 at 09:55

1 Answers1

0

4000 partitions is probably a way too high for your cluster. Then you perform a collect() over which is a task done on the Master Node (bottleneck). Using a wide batch a small partitions is not a good idea. The parallel processing is not optimal.

Question A:

Yes, it's possible and recommended.

spark.sql.shuffle.partitions

What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism?

Question B:

The number of partitions should be a equal or a multiple (max x 2 or x 3) of the sum of the number of cores available on your cluster executors. The spark documentation recommend a size of partition upper than 128 MB.

https://medium.com/parrot-prediction/partitioning-in-apache-spark-8134ad840b0

Question C:

Regarding the answer B, it's not.

rbcvl
  • 406
  • 3
  • 13