1

I am using Pyspark 2.4.4., and I need to use a UDF to create my desired output. This UDF uses a broadcasted dictionary. First, it looks like I need to modify the code for the UDF to accept the dictionary. Second, I am not sure that what I am doing is the most efficient way to go in Spark 2.4. My code is as follows:

# This is a sample of the original Spark dataframe, which I will use to create the dictionary
df = spark.createDataFrame([(220, 2, '2012-11-22 22:03:42'), (2382556,3, '2012-11-23 22:03:42'), (7854140,3,'2012-11-28 22:03:42')], ["user", "preacher", "time"])

# I am converting the above dataframe to pandas dataframe in order to create my dictionary
Dict = df.toPandas().groupby('preacher')['user','time'].apply(lambda g: list(map(tuple, g.values.tolist()))).to_dict()

#Broadcast the dictionary
pcDict = sc.broadcast(Dict)

## Function that calls the dictionary
def example(n):
    nodes = []
    children = [i[0] for i in pcD.value[n]]
    for child in children:
                    nodes.append(child)

    return Row('Out1', 'Out2')(nodes, [(n, n+2), (n, n+4)])

## Convert the Python function to UDF
schema = StructType([
    StructField("Out1", ArrayType(IntegerType()), False),
    StructField("Out2", ArrayType(StructType([StructField("_1", IntegerType(), False), StructField("_2", IntegerType(), False)])))])

example_udf = F.udf(example, schema)

# Create sample dataframe to test the UDF function
testDf = spark.createDataFrame([(3, 4), (220,5)], ["user", "Number"])

### Final output
newDf = testDf.withColumn("Output", F.explode(F.array(example_udf(testDf["user"]))))
newDf = newDf.select("user", "Output.*")

My first question is regarding the dictionary. Should I use it or is there any other more efficient way? I was thinking for collectAsMap(), but given that it's available for rdds, I am not sure if this is the way to go in Spark 2.4.

The second question is that given that dictionary is the way to go, how should I modify the udf function?

Thanks in advance!

morfara
  • 190
  • 3
  • 16
  • If your dictionary is small in memory size and fits in your executors memory then it is good. – user238607 Jan 23 '20 at 13:19
  • If it's not, what do you think is the best alternative? – morfara Jan 23 '20 at 13:23
  • Broadcasting is the correct way to ensure static data (i.e. read only data) is sent to the executors only once. If your udf doesn't modify it, then it is correct. https://stackoverflow.com/questions/38056774/spark-cache-vs-broadcast – user238607 Jan 23 '20 at 13:30
  • Hi again, I am not sure I understand one part here. pcDict is a dictionary with key `preacher` although you use the user id inside the udf to access the dictionary. Which one will be the key of the dictionary? In the case that n is the same as the dictionary key you can replace the UDF with a join achieving much better performance – abiratsis Jan 24 '20 at 07:44
  • As for the 2nd question, it would be better to explain the problem from scratch in order to provide a better overview by describing the initial dataset and the desired output. My first guess though is that it should be possible to flatten your data by avoiding to use dictionary i.e instead of grouping by preacher you could just use join between df and testDf. In this way you get with flat structures which are much easier to work with – abiratsis Jan 24 '20 at 08:15

1 Answers1

2

Regarding the first question I think that pandas offer an elegant way to convert your data into dictionary. Although since pandas will be executed in one node you may need to leverage the power of the cluster and therefore decide to go for a Spark version. One more factor, is the size of the dictionary itself. If you are sure that the dictionary can easily fit in one node you can safely keep the pandas version, otherwise try the next Spark code:

from pyspark.sql import functions as F

# This is a sample of the original Spark dataframe, which I will use to create the dictionary
df = spark.createDataFrame([(220, 2, '2012-11-22 22:03:42'), (2382556,3, '2012-11-23 22:03:42'), (7854140,3,'2012-11-28 22:03:42')], ["user", "preacher", "time"])

df = df.rdd.map(lambda r: (r[1], (r[0], r[2]))) \
      .toDF(["preacher", "tuple"]) \
      .groupBy("preacher") \
      .agg(F.collect_list("tuple").alias("tuple"))

dict = {}
for k,v in df.rdd.collectAsMap().items():
  dict[k] = list(map(lambda row: (row[0], row[1]), v))

dict
# {3: [(2382556, '2012-11-23 22:03:42'), (7854140, '2012-11-28 22:03:42')],
#  2: [(220, '2012-11-22 22:03:42')]}

Also is good to mention that Spark will pack and send together with each task all the local variables used in the program. Therefore broadcast is suitable for large variables that should be stored on the executors in order to be easily accessible by any task.

abiratsis
  • 7,051
  • 3
  • 28
  • 46