2

I've set up Spark on a multi-node cluster as explained in this article: https://medium.com/ymedialabs-innovation/apache-spark-on-a-multi-node-cluster-b75967c8cb2b

I verified that the workers are all in the "ALIVE" state using the web UI: http://MASTER-IP:8080/

I also tested that I can start the spark shell, execute Scala commands and see the job results at http://MASTER-HOSTNAME:4041.

In Python, I'm able to create a pyspark context without specifying the master OR by setting the master to local[*]. Below is the basic Python script I created for testing.

# spark-basic.py
from pyspark import SparkConf
from pyspark import SparkContext

conf = SparkConf()
#conf.setMaster('spark://<MASTER-HOSTNAME>:7077')
conf.setMaster('local[*]')
conf.setAppName('spark-basic')
sc = SparkContext(conf=conf)

def mod(x):
    import numpy as np
    return (x, np.mod(x, 2))

rdd = sc.parallelize(range(1000)).map(mod).take(10)
print(rdd)

Why can I not set the master to 'spark://<MASTER-HOSTNAME>:7077' as described in many online tutorials such as this one? The error I get when I try is pretty long but here is part of it (hopefully this part of the error message is enough for someone to help me out but I can always paste the whole thing in here too):

19/05/01 14:29:17 WARN StandaloneAppClient$ClientEndpoint: Failed to 
connect to master MASTER-HOSTNAME:7077
org.apache.spark.SparkException: Exception thrown in awaitResult
    at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
    at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:100)
    at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:108)
    at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1$$anon$1.run(StandaloneAppClient.scala:106)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: java.io.EOFException
    at java.io.DataInputStream.readFully(DataInputStream.java:197)
    at java.io.DataInputStream.readUTF(DataInputStream.java:609)
    at java.io.DataInputStream.readUTF(DataInputStream.java:564)
    at org.apache.spark.rpc.netty.RequestMessage$.readRpcAddress(NettyRpcEnv.scala:593)

Then a follow-up question - does it matter? If I just set the master to local[*] am I still getting the full functionality of the Spark cluster? How do I know that I'm utilizing the Spark cluster for faster performance? (Sorry, I'm a newbie to this. I tried searching the internet like crazy but didn't find anything that helped me. If someone can point me to some useful resources to help me out too, that would be great! My ultimate goal is to use the Spark cluster to quickly manipulate very large datasets in Python.)

Last follow-up question - I keep getting the warning 19/04/30 13:19:24 WARN TaskSetManager: Stage 0 contains a task of very large size (5786 KB). The maximum recommended task size is 100 KB. Based on this question it looks like I need to utilize the parallelize method in all of my data manipulations. This method returns an RDD and currently all my code is written to deal with pyspark DataFrame objects. Just checking - do I need to rewrite all my code to manipulate RDDs in order to utilize the Spark cluster?

eTothEipiPlus1
  • 577
  • 2
  • 9
  • 28

1 Answers1

-2

depends where you are running the script from. are you trying to run it form your laptop or master node.

  • run from master node: you will have to set the SPARK_HOME env variable to point to your spark lib installation.

  • if you are running from ur laptop:

  • make sure SPARK_HOME points to right lib.

  • copy all the configs from :~/SPARK_HOME to your local SPARK_HOME

followup Q1: this is tricky because you can run bot slaves and master on the same node. and configure spark to operate in master-slave config.

configuring spark for optimized performance: although spark configuration out of the box is pretty good. but to optimized its performance you would have to tweak few properties specially 1. spark.executor.cores 2. spark.executor.memory 3. spark.driver.cores 4. spark.driver.memory

additionally, you want your cluster to be as close together as possible , meaning in the same placement-groups. although not required use hdfs when working with large datasets and save intermediate results on hdfs. you might also want to use resource manager ( again not required , spark comes with one ) like yarn. ( this also come with additional config tuning ).

if you find urself shipping data to slaves and also use more shuffle operations. then you want to consider using a good serializer

follow up Q2: well, you would generally not ship all the data from master to the slaves, but have salves build the data on their own. for that you you have to put ur data in x no of small files and let slaves read them via read apis.

do I need to rewrite code to use RDD? this depend on the version of spark you are using. dataframe were introduced in 2.0.

bhavin
  • 122
  • 7
  • Thanks for taking time to answer this bhavin. I'm running from the master node. SPARK_HOME is /opt/spark/spark-2.4.0-bin-hadoop2.7 and this should be correct based on installation instructions. But I'm still getting the error described in my question. I'll look into the configurations you outline. Do you have a good source to get started (like a good tutorial explaining these configs)? I'm already using pyspark dataframes. My question is can I still distribute a computation using dataframes or do I need to use RDDs? – eTothEipiPlus1 May 02 '19 at 14:43
  • here are few 1 excel to calculate core etc | https://c2fo.io/c2fo/spark/aws/emr/2016/07/06/apache-spark-config-cheatsheet/. i had used this one to tune [snowplow] (https://snowplowanalytics.com/) processing. 2. https://stackoverflow.com/questions/37871194/how-to-tune-spark-executor-number-cores-and-executor-memory – bhavin May 03 '19 at 14:21