1

I have a broadcast a dicitonary that I would like to use for mapping column values in my DataFrame. Let's say I call withColumn() method for that.

I can only get it to work with a UDF, but not directly:

sc = SparkContext()
ss = SparkSession(sc)
df = ss.createDataFrame( [ "a", "b" ], StringType() ).toDF("key")
# +---+                                                                           
# |key|
# +---+
# |  a|
# |  b|
# +---+
thedict={"a":"A","b":"B","c":"C"}
thedict_bc=sc.broadcast(thedict)

Referencing with a literal or using UDF works fine:

df.withColumn('upper',lit(thedict_bc.value.get('c',"--"))).show()
# +---+-----+
# |key|upper|
# +---+-----+
# |  a|    C|
# |  b|    C|
# +---+-----+
df.withColumn('upper',udf(lambda x : thedict_bc.value.get(x,"--"), StringType())('key')).show()
# +---+-----+
# |key|upper|
# +---+-----+
# |  a|    A|
# |  b|    B|
# +---+-----+

However, accessing the dictionary directly from the command doesn't:

df.withColumn('upper',lit(thedict_bc.value.get(col('key'),"--"))).show()
# +---+-----+
# |key|upper|
# +---+-----+
# |  a|   --|
# |  b|   --|
# +---+-----+
df.withColumn('upper',lit(thedict_bc.value.get(df.key,"--"))).show()
# +---+-----+
# |key|upper|
# +---+-----+
# |  a|   --|
# |  b|   --|
# +---+-----+
df.withColumn('upper',lit(thedict_bc.value.get(df.key.cast("string"),"--"))).show()
# +---+-----+
# |key|upper|
# +---+-----+
# |  a|   --|
# |  b|   --|
# +---+-----+

Am I missing something obvious?

zero323
  • 322,348
  • 103
  • 959
  • 935
Go Erlangen
  • 191
  • 2
  • 11

1 Answers1

3

TL;DR You're mixing up things which belong to completely different context. Symbolic SQL expressions (lit, col, etc.) and plain Python code.

You are mixing up the contexts. Following line:

thedict_bc.value.get(col('key'),"--")))

is executed in Python on the driver and is literally a local dictionary lookup. thedict doesn't contain col('key') (literal, there is no expansion involved) you you always get default value.

Personally I would use a simple join:

lookup = sc.parallelize(thedict.items()).toDF(["key", "upper"])
df.join(lookup, ["key"], "left").na.fill("upper", "--").show()
+---+-----+                                                                     
|key|upper|
+---+-----+
|  b|    B|
|  a|    A|
+---+-----+

but udf (as you've already established) or literal map would work as well:

from pyspark.sql.functions import coalesce, create_map
from itertools import chain

thedict_col = create_map(*chain.from_iterable(
    (lit(k), lit(v)) for k, v in thedict.items()
))

df.withColumn('upper', coalesce(thedict_col[col("key")], lit("--"))).show()
+---+-----+
|key|upper|
+---+-----+
|  a|    A|
|  b|    B|
+---+-----+

Notes:

  • Of course if you want to convert to upper case, just use pyspark.sql.functions.upper.
  • Using some_broadcast.value as an argument for the function won't work at all. Variable substitution will applied locally and broadcasting won't be utilized. value should be called in the function body, so it is executed in the executor context.
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Thank you. Your explanation makes sense, and yes, I'm a bit hazy on when which context is applied. My actual dictionary (a parameter structure) is a multilevel one. For encapsulated dictionaries, I figured I can just keep using create_map, like you suggested, but what if my parameter dictionary looks like this: thedict={"a": ("A","AA","AAA"), "b" : ("B","BB","BBB") } How do I encode it to access thedict["a"][2]. I'm trying to stay away from UDF for speed reasons. – Go Erlangen Jan 21 '18 at 21:41
  • 1
    Nested dictionaries work pretty much the same way. You can check my answers [here](https://stackoverflow.com/q/48174437/6910411) and [here](https://stackoverflow.com/a/48128037/6910411) and structs are just just `Columns` so this `create_map(lit("a"), struct(lit("A"), lit("A"), lit("AAA")), lit("b"), struct(lit("B"), lit("BB"), lit("BBB")))` would work, but for syntax you'll need `arrays`: `create_map(lit("a"), array(lit("A"), lit("A"), lit("AAA")), lit("b"), array(lit("B"), lit("BB"), lit("BBB")))`. With `from pyspark.sql.functions import create_map, struct, array, lit`. – zero323 Jan 21 '18 at 23:16
  • Apologies for pushing. With your last suggestion, I felt I came close to my final goal: broadcast parameter structure of the form: `{"a": { "first" : (0.1,0.2,0.3), "second": 0.4 }, "b" : { "first" : (0.11,0.22,0.33), "second": 0.44 } } ` but then I started getting errors like this: _cannot resolve 'map('first', array(0.1D, 0.2D, 0.3D), 'second', 0.4)' due to data type mismatch. The given values of function map should all be the same type, but they are [array, double]_. Does it mean a heterogeneous parameter structure like this is not the right solution? – Go Erlangen Jan 22 '18 at 03:40
  • That's correct. Heterogeneous structures are not supported. If overall structure is fixed then you can use `struct`. Or arrays like this: `create_map(lit("a"), create_map(lit("first"), array(lit(0.1),lit(0.2),lit(0.3)), lit("second"), array(lit(0.4))))` – zero323 Jan 22 '18 at 18:14