3

I have created a cluster with 1 master (clus-m) and two worker nodes(clus-w-0, clus-w-1) in gcp dataproc. Now using pyspark rdd, I want to distribute one task so that all the nodes get involved. Below is my code snippet.

def pair_dist(row):
    dissimlarity = 0
    Z = row[0].split(',')
    X = row[1].split(',')

    for j in range(len(Z)):
        if Z[j] != X[j]:
            dissimlarity += 1

    return str(dissimlarity) + **os.uname()[1]**

sc = SparkContext.getOrCreate()
rdd = sc.textFile( "input.csv" )

rdd = sc.parallelize(rdd.take(5))
rdd = rdd.cartesian(rdd)
dist = rdd.map(lambda x: pair_dist(x)).collect()
dist = np.array(dist).reshape((5,5))
print(dist)

sc.stop()

To check whether it is happened properly or not I put the host name with the result. But I always get the host name clus-m in result not the worker nodes' host name.

Output: [0clus-m 2clus-m...... 1clus-m 0clus-m.......] 5x5

Please suggest what exactly I need to do?

srimanta
  • 179
  • 1
  • 1
  • 5

1 Answers1

2

To distribute work, your input dataset has to be sharded. Since you're using sc.textFile( "input.csv" ) you will have a single mapper reading the file.

If for instance the input dataset is substantially multiplied through transformations, you could RDD.repartition to make subsequent operations better parallelized.

Your best bet will be to split the input into multiple files.

Spark programming guide has these points that are relevant to your question:

All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/.txt"), and textFile("/my/directory/.gz").

The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.

tix
  • 2,138
  • 11
  • 18
  • Thanks for the information. I did the partition and want to check the hostname again. But got the same result. Below is the code snippet. def hostname(row): return os.uname()[1] data = pd.read_csv("input.csv", header=None) data = numpy.array(data) rdd = sc.parallelize(data) rdd = rdd.repartition(3) print(rdd.getNumPartitions()) rdd = rdd.cartesian(rdd) print(rdd.count()) dist = rdd.map(lambda x: hostname(x)).collect() dist = np.array(dist).reshape((120,120)) print(dist) – srimanta Mar 05 '20 at 12:44
  • Basically I want to check the collection of map() should show all the vm's host name – srimanta Mar 05 '20 at 12:50
  • Or I need to manually configure the hadoop first? like mentioning different slave nodes, ip in /etc/host file? – srimanta Mar 05 '20 at 12:58
  • Dataproc should just work out of the box, no need to configure anything. Please keep in mind that distributing data between nodes adds latency so Spark/Yarn will sensibly choose to scale up before scaling out. Additional answers you may find useful: https://stackoverflow.com/questions/27531816 https://stackoverflow.com/questions/48615354 – tix Mar 05 '20 at 17:11