2

Context

In pySpark I broadcast a variable to all nodes with the following code:

sc = spark.sparkContext # Get context

# Extract stopwords from a file in hdfs
# The result looks like stopwords = {"and", "foo", "bar" ... }
stopwords = set([line[0] for line in csv.reader(open(SparkFiles.get("stopwords.txt"), 'r'))])

# The set of stopwords is broadcasted now
stopwords = sc.broadcast(stopwords)

After broadcasting the stopwords I want to make it accessible in mapPartitions:

# Some dummy-dataframe
df = spark.createDataFrame([(["TESTA and TESTB"], ), (["TESTB and TESTA"], )], ["text"])


# The method which will be applied to mapPartitions
def stopwordRemoval(partition, passed_broadcast):
    """
    Removes stopwords from "text"-column.

    @partition: iterator-object of partition.
    @passed_stopwords: Lookup-table for stopwords.
    """

    # Now the broadcast is passed
    passed_stopwords = passed_broadcast.value

    for row in partition:
        yield [" ".join((word for word in row["text"].split(" ") if word not in passed_stopwords))]


# re-partitioning in order to get mapPartitions working
df = df.repartition(2)

# Now apply the method
df = df.select("text").rdd \
        .mapPartitions(lambda partition: stopwordRemoval(partition, stopwords)) \
        .toDF()

# Result
df.show()

#Result:
+------------+
| text       |
+------------+
|TESTA TESTB |
|TESTB TESTA |
+------------+


Questions

Even though it works I'm not quite sure if this is the right usage of broadcasting variables. So my questions are:

  1. Is the broadcast correctly executed when I pass it to mapParitions in the demonstrated way?
  2. Is using broadcasting within mapParitions useful since stopwords would be distributed with the function to all nodes anyway (stopwords is never reused)?

The second question relates to this question which partly answers my own. Anyhow, within the specifics it differs; that's why I've chosen to also ask this question.

Markus
  • 2,265
  • 5
  • 28
  • 54
  • 1
    If stopwords is small enough lets say 1-10KB feel free to remove broadcast. The list will be packed and serialised with each task. If you know apriori that stopwords can be large i.e 1-10MB then definitely you need to broadcast it. As per if it works I assume that since you receive the expected results it works :) – abiratsis Feb 26 '20 at 19:21
  • Thanks @AlexandrosBiratsis for the advice on the size of the stopwords-object, much appreciated! Regarding the results: Yes it works in the manner of the output. I'm just still uncertain if `mapPartitions` uses the distributed (broadcasted) varibale on the respective node or if the call within the `stopwordRemoval` will be processed as _call-by-value_ whereby the entire physical `stopwords`-broadcast-object is passed and then unpacked within the method (rather than just referencing it). – Markus Feb 26 '20 at 20:32
  • 1
    The last time stopwords is assigned is line `stopwords = sc.broadcast(stopwords)` therefore I think you code is correct. Also you should get an error otherwise – abiratsis Feb 26 '20 at 20:54
  • Thanks again for your input - I agree; you should be right about the error :) – Markus Feb 26 '20 at 23:01

1 Answers1

2

Some time went by and I read some additional information which answered the question for me. Thus, I wanted to share my insights.


Question 1: Is the broadcast correctly executed when I pass it to mapParitions in the demonstrated way?

First it is of note that a SparkContext.broadcast() is a wrapper around the variable to broadcast as can be read in the docs. This wrapper serializes the variable and adds the information to the execution graph to distribute the this serialized form over the nodes. Calling the broadcasts .value-argument is the command to deserialize the variable again when used. Additionally, the docs state:

After the broadcast variable is created, it should be used instead of the value v in any functions run on the cluster so that v [the variable] is not shipped to the nodes more than once.

Secondly, I found several sources stating that this works with UDFs (User Defined Functions), e.g. here. mapPartitions() and udf()s should be considered analogous since they both, in case of pySpark, pass the data to a Python instance on the respective nodes.

Regarding this, here is the important part: Deserialization has to be part of the Python function (udf() or whatever function passed to mapPartitions()) itself, meaning its .value argument must not be passed as function-parameter.

Thus, the broadcast done the right way: The braodcasted wrapper is passed as parameter and the variable is deserialized inside stopwordRemoval().


Question 2: Is using broadcasting within mapParitions useful since stopwords would be distributed with the function to all nodes anyway (stopwords is never reused)?

Its documented that there is only an advantage if serialization yields any value for the task at hand.

The data broadcasted this way is cached in serialized form and deserialized before running each task. This means that explicitly creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.

This might be the case when you have a large reference to broadcast to your cluster:

[...] to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.

If this applies to your broadcast, broadcasting has an advantage.

Markus
  • 2,265
  • 5
  • 28
  • 54