5

I'm using latest Spark 1.6.1 released 9 March 2016.

I start a fresh cluster of 10 nodes each with 256GB RAM and 32 cores. I ensure that SPARK_WORKER_DIR is empty on every node. I load two .csv tables (each 200GB) from HDFS and join them on one column. Despite using .cache I'm finding that 28GB of data is being written to the block manager directory on each node (280GB total). Is there any way to turn off the block manager? I'd just like to join the two tables in-memory.

If there's anything else you can spot I'm doing wrong here, that'd be great to know too. Thanks.

These parameters are set on each of the 10 nodes :

export SPARK_PRINT_LAUNCH_COMMAND=1
export SPARK_MASTER_IP="NODE1"
export SPARK_MASTER_PORT="17077"
export SPARK_WORKER_PORT="17087"
export SPARK_WORKER_INSTANCES=1
export MASTER="spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT"
export SPARK_HOME="/home/mattd/spark-1.6.1-bin-hadoop2.6"
tmpdir=~/tmp
export SPARK_LOG_DIR=$tmpdir/spark/logs
export SPARK_WORKER_DIR=$tmpdir/spark/work
export SPARK_LOCAL_DIRS=$tmpdir/spark/work
$SPARK_HOME/sbin/start-master.sh           # just on NODE1
$SPARK_HOME/sbin/start-slave.sh $MASTER    # on NODE1 to NODE10

Then on NODE1 (which is also the HDFS Name Node, if it makes a difference) I run :

$SPARK_HOME/bin/spark-shell --master spark://NODE1:17077
    --packages com.databricks:spark-csv_2.10:1.3.0
    --executor-memory 220G --driver-memory 20G

import org.apache.spark.sql.SQLContext
import org.joda.time._
val sqlContext = SQLContext.getOrCreate(sc)
val X = sqlContext.read.format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load("hdfs://NODE1/datasets/X.csv")
val Y = sqlContext.read.format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load("hdfs://NODE1/datasets/Y.csv")

X.show                            Y.show
+----------+-----------+          +----------+-----------+
|       KEY|         X2|          |       KEY|         Y2|
+----------+-----------+          +----------+-----------+
|7632858426| 1476754316|          |2055977176|-5965358667|
|8176486913|-7805583191|          | 358321276| 8309936938|
|3235589527|-9261053875|          |4311730481|-1352785819|
|6017229071| -756595861|          |2084690536|-1218203804|
|1039519144| 8161598094|          |3183436600| 6996336786|
... snip                          ... snip
X.count
10000000000
Y.count
10000000000
val ans = X.join(Y, "KEY").orderBy("KEY")
ans.cache()
ans.show
ans.count

While ans.cache() is running, I notice data being written to disk here on every node :

$ pwd
/home/mattd/tmp/spark/work/spark-27273513-649b-4232-83eb-112eec922158/
executor-2154b748-3550-488a-b76c-6935d7c41699/

$ du -sh blockmgr-13655d08-b6b1-4b85-b6b4-92ab29d939a0/
29G blockmgr-13655d08-b6b1-4b85-b6b4-92ab29d939a0/

$ ls blockmgr-13655d08-b6b1-4b85-b6b4-92ab29d939a0/
00  04  08  0c  10  14  18  1c  20  24  28  2c  30  34  38  3c
01  05  09  0d  11  15  19  1d  21  25  29  2d  31  35  39  3d
02  06  0a  0e  12  16  1a  1e  22  26  2a  2e  32  36  3a  3e
03  07  0b  0f  13  17  1b  1f  23  27  2b  2f  33  37  3b  3f
Matt Dowle
  • 58,872
  • 22
  • 166
  • 224
  • 3
    These writes are most likely generated by shuffle required for `join` and next for `order`. AFAIK it is not possible to avoid this. – zero323 Mar 19 '16 at 00:12
  • @zero323 Many thanks for quick reply. This writing enables the resiliency feature of Spark iiuc (if one of the 10 nodes was turned off during the join, it would still complete with the remaining 9)? – Matt Dowle Mar 19 '16 at 00:36
  • 2
    officially `join` might not do any shuffles if both RDD/DF have the same partitioner so that might be one way to tackle it but I've never tried that with DFs so not sure how to go around doing it/will it blend, I mean work. Order by... don't have any tricks here :-) – Mateusz Dymczyk Mar 19 '16 at 00:36
  • 1
    @MateuszDymczyk But we cannot repartition without a shuffle. So we are back to the square one :/ Matt Dowie: the main reason is to [avoid reshuffle when computation is repeated](http://stackoverflow.com/q/34580662/1560062). Data can be also spit to disk when there is physical grouping operation to avoid OOM errors. – zero323 Mar 19 '16 at 01:02

0 Answers0