I've set up Spark on a multi-node cluster as explained in this article: https://medium.com/ymedialabs-innovation/apache-spark-on-a-multi-node-cluster-b75967c8cb2b
I verified that the workers are all in the "ALIVE" state using the web UI: http://MASTER-IP:8080/
I also tested that I can start the spark shell, execute Scala commands and see the job results at http://MASTER-HOSTNAME:4041.
In Python, I'm able to create a pyspark context without specifying the master OR by setting the master to local[*]
. Below is the basic Python script I created for testing.
# spark-basic.py
from pyspark import SparkConf
from pyspark import SparkContext
conf = SparkConf()
#conf.setMaster('spark://<MASTER-HOSTNAME>:7077')
conf.setMaster('local[*]')
conf.setAppName('spark-basic')
sc = SparkContext(conf=conf)
def mod(x):
import numpy as np
return (x, np.mod(x, 2))
rdd = sc.parallelize(range(1000)).map(mod).take(10)
print(rdd)
Why can I not set the master to 'spark://<MASTER-HOSTNAME>:7077'
as described in many online tutorials such as this one? The error I get when I try is pretty long but here is part of it (hopefully this part of the error message is enough for someone to help me out but I can always paste the whole thing in here too):
19/05/01 14:29:17 WARN StandaloneAppClient$ClientEndpoint: Failed to
connect to master MASTER-HOSTNAME:7077
org.apache.spark.SparkException: Exception thrown in awaitResult
at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:100)
at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:108)
at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1$$anon$1.run(StandaloneAppClient.scala:106)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readUTF(DataInputStream.java:609)
at java.io.DataInputStream.readUTF(DataInputStream.java:564)
at org.apache.spark.rpc.netty.RequestMessage$.readRpcAddress(NettyRpcEnv.scala:593)
Then a follow-up question - does it matter? If I just set the master to local[*]
am I still getting the full functionality of the Spark cluster? How do I know that I'm utilizing the Spark cluster for faster performance? (Sorry, I'm a newbie to this. I tried searching the internet like crazy but didn't find anything that helped me. If someone can point me to some useful resources to help me out too, that would be great! My ultimate goal is to use the Spark cluster to quickly manipulate very large datasets in Python.)
Last follow-up question - I keep getting the warning 19/04/30 13:19:24 WARN TaskSetManager: Stage 0 contains a task of very large size (5786 KB). The maximum recommended task size is 100 KB.
Based on this question it looks like I need to utilize the parallelize
method in all of my data manipulations. This method returns an RDD and currently all my code is written to deal with pyspark DataFrame objects. Just checking - do I need to rewrite all my code to manipulate RDDs in order to utilize the Spark cluster?