I'm using latest Spark 1.6.1 released 9 March 2016.
I start a fresh cluster of 10 nodes each with 256GB RAM and 32 cores. I ensure that SPARK_WORKER_DIR is empty on every node. I load two .csv tables (each 200GB) from HDFS and join them on one column. Despite using .cache
I'm finding that 28GB of data is being written to the block manager directory on each node (280GB total). Is there any way to turn off the block manager? I'd just like to join the two tables in-memory.
If there's anything else you can spot I'm doing wrong here, that'd be great to know too. Thanks.
These parameters are set on each of the 10 nodes :
export SPARK_PRINT_LAUNCH_COMMAND=1
export SPARK_MASTER_IP="NODE1"
export SPARK_MASTER_PORT="17077"
export SPARK_WORKER_PORT="17087"
export SPARK_WORKER_INSTANCES=1
export MASTER="spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT"
export SPARK_HOME="/home/mattd/spark-1.6.1-bin-hadoop2.6"
tmpdir=~/tmp
export SPARK_LOG_DIR=$tmpdir/spark/logs
export SPARK_WORKER_DIR=$tmpdir/spark/work
export SPARK_LOCAL_DIRS=$tmpdir/spark/work
$SPARK_HOME/sbin/start-master.sh # just on NODE1
$SPARK_HOME/sbin/start-slave.sh $MASTER # on NODE1 to NODE10
Then on NODE1 (which is also the HDFS Name Node, if it makes a difference) I run :
$SPARK_HOME/bin/spark-shell --master spark://NODE1:17077
--packages com.databricks:spark-csv_2.10:1.3.0
--executor-memory 220G --driver-memory 20G
import org.apache.spark.sql.SQLContext
import org.joda.time._
val sqlContext = SQLContext.getOrCreate(sc)
val X = sqlContext.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load("hdfs://NODE1/datasets/X.csv")
val Y = sqlContext.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load("hdfs://NODE1/datasets/Y.csv")
X.show Y.show
+----------+-----------+ +----------+-----------+
| KEY| X2| | KEY| Y2|
+----------+-----------+ +----------+-----------+
|7632858426| 1476754316| |2055977176|-5965358667|
|8176486913|-7805583191| | 358321276| 8309936938|
|3235589527|-9261053875| |4311730481|-1352785819|
|6017229071| -756595861| |2084690536|-1218203804|
|1039519144| 8161598094| |3183436600| 6996336786|
... snip ... snip
X.count
10000000000
Y.count
10000000000
val ans = X.join(Y, "KEY").orderBy("KEY")
ans.cache()
ans.show
ans.count
While ans.cache()
is running, I notice data being written to disk here on every node :
$ pwd
/home/mattd/tmp/spark/work/spark-27273513-649b-4232-83eb-112eec922158/
executor-2154b748-3550-488a-b76c-6935d7c41699/
$ du -sh blockmgr-13655d08-b6b1-4b85-b6b4-92ab29d939a0/
29G blockmgr-13655d08-b6b1-4b85-b6b4-92ab29d939a0/
$ ls blockmgr-13655d08-b6b1-4b85-b6b4-92ab29d939a0/
00 04 08 0c 10 14 18 1c 20 24 28 2c 30 34 38 3c
01 05 09 0d 11 15 19 1d 21 25 29 2d 31 35 39 3d
02 06 0a 0e 12 16 1a 1e 22 26 2a 2e 32 36 3a 3e
03 07 0b 0f 13 17 1b 1f 23 27 2b 2f 33 37 3b 3f