5

I am writing a batch processing program in Spark using pyspark. Following are the input files and their sizes

base-track.dat (3.9g)
base-attribute-link.dat (18g)
base-release.dat (543m)

These are text files with one record per line and each field is separated by a special character (refer code)

I am performing some filtering operations on attribute link and grouping them and joining with other tables.

I am submitting this program via spark-submit to a Hadoop cluster with 9 data nodes managed by Ambari. Each data node contains 140 GB of RAM and 3.5 TB of disk space.

Following is my pyspark code

import sys

from pyspark import SparkContext
from pyspark.sql import SQLContext, Row

if __name__ == "__main__":
        sc = SparkContext(appName = "Tracks")
        sqlContext = SQLContext(sc)

        #Load base-track
        track = sc.textFile("base-track/input").map(lambda row: row.split(u'\u0001'))

        #Load base-attribute-link
        attlnk = sc.textFile("base-attribute-link/input").map(lambda row: row.split(u'\u0001'))

        #Load base-release
        release = sc.textFile("base-release/input").map(lambda row: row.split(u'\u0001'))

        attlnk = attlnk.filter(lambda row: row[2] == 'MA0000000162')

        attlnkg = attlnk.groupBy(lambda row: row[1])

        attlnkmax = attlnkg.map( lambda t: (t[0],max([v[4] for v in t[1]])) )

        alg = attlnkmax.map(lambda r: Row(al_objectid=r[0],al_value=r[1]))

        aldf = alg.toDF()

        track = track.map(lambda r:Row(t_tag = r[0], t_trackid= r[1], t_releaseid= r[2], t_songid = r[3], t_med= r[4], t_ph = r[5], t_tn = r[5], t_title= r[5], t_part= r[6], t_dur = r[7], t_pick = r[8], t_amgclid  = r[9], t_amgpopid = r[10], t_compid = r[11], t_muzid = r[12], t_perfid= r[13], t_albumid = r[14]))

        trackdf = track.toDF()

        release = release.map(lambda r:Row(r_tag = r[0], r_relid = r[1], r_albumid = r[2], r_mediafmtid = r[3], r_prodfmtid = r[4], r_reldate = r[5], r_prodcode = r[6], r_prodtypeid = r[7], r_label = r[8], r_relyear = r[9], r_ispurch = r[10], r_amgclassid = r[11], r_amgpopid = r[12], r_eanid = r[13], r_upcid = r[14]))

        releasedf = release.toDF()

        trackaldf = trackdf.join(aldf, trackdf['t_trackid'] == aldf['al_objectid'], 'left_outer')


        tracksdf = trackaldf.join(releasedf, trackaldf['t_releaseid'] == releasedf['r_relid'])

        tracksdf = tracksdf.select('t_trackid', 't_releaseid', 't_songid', 't_med', 't_ph', 't_tn', 't_title', 't_part', 't_dur', 't_pick', 't_amgclid', 't_amgpopid', 't_compid', 't_muzid', 'al_objectid', 't_perfid', 't_albumid', 'r_label')


        tracksdf.rdd.map(lambda x: u"\u0001".join(map(str, x))).coalesce(100).saveAsTextFile("tracks-out")

Got a bunch of the following errors when it is trying to execute this.

ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /tmp/blockmgr-d88c631e-cec3-4b83-8af6-a38b109b5e3b/0e/temp_shuffle_7dbda3ac-48b1-4c4a-89c7-64eb5d858d90
java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
    at java.io.FileOutputStream.write(FileOutputStream.java:326)
    at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
    at org.xerial.snappy.SnappyOutputStream.flush(SnappyOutputStream.java:336)
    at org.apache.spark.io.SnappyOutputStreamWrapper.flush(CompressionCodec.scala:209)
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:141)
    at java.io.DataOutputStream.flush(DataOutputStream.java:123)
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.flush(UnsafeRowSerializer.scala:83)
    at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$revertPartialWritesAndClose$1.apply$mcV$sp(DiskBlockObjectWriter.scala:157)
    at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$revertPartialWritesAndClose$1.apply(DiskBlockObjectWriter.scala:154)
    at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$revertPartialWritesAndClose$1.apply(DiskBlockObjectWriter.scala:154)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1239)
    at org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(DiskBlockObjectWriter.scala:161)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.stop(BypassMergeSortShuffleWriter.java:232)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

There are a couple of questions on SO, here and here related to the same problem.

Here is what I have tried from the above two questions. I tried to increase the spark.yarn.executor.memoryOverhead to 4GB from 384 MB.

SPARK_JAVA_OPTS+=" -Dspark.local.dir=/mnt/spark,/mnt2/spark -Dhadoop.tmp.dir=/mnt/ephemeral-hdfs"

export SPARK_JAVA_OPTS

First one did not have any effect. I got the error that /mnt directories are not present if I add the java opts.

After reading about this problem on multiple forums (including databricks) got some vague idea that this job is trying to create temporary files as part of the shuffle on /tmp of each cluster node and exhausting the space. On each cluster node, we have allocated 100 GB for root (/) partition on which tmp directory is present.

I have been struggling for more than a month to get this executed by playing with various spark configuration parameters. As part of tweaking, I increased spark.driver and spark.executor memory to 16g and later to 64g. Also increased spark yarn executor memory to 4GB. Unfortunately none of this could solve the space issue.

Any guidance on how to proceed further would be of great help.

[Edit-1] I was just checking the disk space of root directories on all the machines, 7 of 9 nodes in our cluster has 100+GB allocated for root directories, but on 2 nodes only 10 GB is allocated, only 6+GB is left on them. This might be causing the disk space issue, I will have to check with our IT team if the size of the root directory can be extended.

[Edit-2] I worked with the IT team to extend the root partition size to 100+GB on all the machines, but the issue still persist, may be 100GB of /tmp space is also not sufficient for this job. I estimated the output of this job to be roughly 4.6GB.

Ravi Chandra
  • 677
  • 12
  • 24

2 Answers2

4

Given the nature of your error and the fact that you are performing large joins on tens of GB of data, in which spark workers will write intermediate data to disk as it shuffles, a 100GB disk does not seem to be enough. I recommend either allocating a lot more disk for the default worker_dir and local_dirs by either mounting them to larger disks or provisioning a much larger root disk. Also, note that if spark does not shutdown properly this intermediate data may linger and take up a lot of space on the worker nodes. So, you may have to inspect those directories and remove any stale files. If you are running spark-standalone on AWS r3,c3 or a similar instance type with large ephemeral SSD disks, I recommend mounting those disks to say "mnt" and "mnt2" and configuring spark scratch space to point to those mounts, instead of the (usually) smaller root volume. e.g:

SPARK_LOCAL_DIRS=/mnt
SPARK_WORKER_DIR=/mnt2
Brian Cajes
  • 3,274
  • 3
  • 21
  • 22
  • So these /mnt and /mnt2 directories should be empty just for scratch space? Or can we use shared directory? – Ravi Chandra Jun 21 '17 at 18:21
  • Our machines are set up using jbod configuration (/jbod1, /jbod2, etc...) They have a lot of disk space. But when I poined SPARK_LOCAL_DIRS and SPARK_WORKER_DIR to these directories, I am getting the error "Failed to create local dir in /jbod1" – Ravi Chandra Jun 21 '17 at 18:29
  • These directories can by any directories that your spark cluster has permission to write to. I would ensure that the user that your spark cluster is running as has permissions to write to /jbod1, as well as ensuring that the user is not hitting any quotas put in place by your systems admin. – Brian Cajes Jun 22 '17 at 01:45
1

I figured out that I am not submitting the spark job to the cluster but a single machine and hence the disk space issues. I was always submitting my script in the following way

spark-submit tracks.py

Since I want my script to be executed on the Hadoop cluster and use Yarn as resource manager, I changed my submit command to the following, then it worked fine.

spark-submit --master yarn --deploy-mode cluster tracks.py
Ravi Chandra
  • 677
  • 12
  • 24