7

I have a Dataframe with a MapType field.

>>> from pyspark.sql.functions import *
>>> from pyspark.sql.types import *
>>> fields = StructType([
...         StructField('timestamp',      TimestampType(), True),
...         StructField('other_field',    StringType(), True),
...         StructField('payload',        MapType(
...                                         keyType=StringType(),
...                                         valueType=StringType()),
...                                                     True),   ])
>>> import datetime
>>> rdd = sc.parallelize([[datetime.datetime.now(), 'this should be in', {'akey': 'aValue'}]])
>>> df = rdd.toDF(fields)
>>> df.show()
+--------------------+-----------------+-------------------+
|           timestamp|      other_field|            payload|
+--------------------+-----------------+-------------------+
|2018-01-10 12:56:...|this should be in|Map(akey -> aValue)|
+--------------------+-----------------+-------------------+

I would like to add the other_field as a key in the payload field.

I know I can use a udf:

>>> def _add_to_map(name, value, map_field):
...     map_field[name] = value
...     return map_field
...
>>> add_to_map = udf(_add_to_map, MapType(StringType(),StringType()))
>>> df.select(add_to_map(lit('other_field'), 'other_field', 'payload')).show(1, False)
+------------------------------------------------------+
|PythonUDF#_add_to_map(other_field,other_field,payload)|
+------------------------------------------------------+
|Map(other_field -> this should be in, akey -> aValue) |
+------------------------------------------------------+

Is there a way to do this without a udf?

zemekeneng
  • 1,660
  • 2
  • 15
  • 26

3 Answers3

4

Here is one way to do it without udf if you know the keys ahead of time. Use the create_map function. As to whether or not this is more performant, I don't know.

from pyspark.sql.functions import col, lit, create_map

df.select(
    create_map(
        lit('other_field'),
        col('other_field'),
        lit('akey'),
        col('payload')['akey']
    )
).show(n=1, truncate=False)

Output:

+-----------------------------------------------------+
|map(other_field, other_field, akey, payload['akey']) |
+-----------------------------------------------------+
|Map(other_field -> this should be in, akey -> aValue)|
+-----------------------------------------------------+

Update

Here is a way to do it without having to hardcode the dictionary keys. Unfortunately it involves one collect() operation.

Mockup some data

Firstly, let's modify your original dataframe to include one more key-value pair in the MapType() field.

from pyspark.sql.functions import col, lit, create_map
import datetime
rdd = sc.parallelize(
    [
        [
            datetime.datetime.now(),
            'this should be in',
            {'akey': 'aValue', 'bkey': 'bValue'}
        ]
    ]
)
df = rdd.toDF(fields)
df.show(n=1, truncate=False)

Which creates the following:

+--------------------------+-----------------+-----------------------------------+
|timestamp                 |other_field      |payload                            |
+--------------------------+-----------------+-----------------------------------+
|2018-01-10 17:37:58.859603|this should be in|Map(bkey -> bValue, akey -> aValue)|
+--------------------------+-----------------+-----------------------------------+

Get the map's keys

Using explode() and collect(), you can get the keys as so:

from pyspark.sql.functions import explode

keys = [
    x['key'] for x in (df.select(explode("payload"))
                        .select("key")
                        .distinct()
                        .collect())
]

Create a new map with all of the fields

Now use create_map as above, but use the information from keys to create the key-value pairs dynamically. I used reduce(add, ...) because create_map expects the inputs to be key-value pairs in order- I couldn't think of another way to flatten the list.

from operator import add
df.select(
    create_map
    (
        *([lit('other_field'), col('other_field')] + reduce(add, [[lit(k), col('payload').getItem(k)] for k in keys]))
    )
).show(n=1, truncate=False)

Final result:

+---------------------------------------------------------------------------+
|map(other_field, other_field, akey, payload['akey'], bkey, payload['bkey'])|
+---------------------------------------------------------------------------+
|Map(other_field -> this should be in, akey -> aValue, bkey -> bValue)      |
+---------------------------------------------------------------------------+

References

  1. pyspark: Create MapType Column from existing columns

  2. PySpark converting a column of type 'map' to multiple columns in a dataframe

pault
  • 41,343
  • 15
  • 107
  • 149
  • The keys don't change very often in my case, so precomputing the keys might give me a performance gain, I will try it out. Thanks @pault – zemekeneng Jan 10 '18 at 23:35
  • Happy to help. Please do share the performance results. I'm curious to see how the various methods stack up against each other. – pault Jan 10 '18 at 23:54
  • No noticeable performance difference. However, this is still useful for folk on spark <1.6 because of union all. See here: https://stackoverflow.com/questions/38650568/spark-1-5-2-org-apache-spark-sql-analysisexception-unresolved-operator-union – zemekeneng Jan 11 '18 at 21:10
  • But are these methods faster than `udf`? Or did you mean that these methods had no performance difference than `udf`? – pault Jan 11 '18 at 21:12
  • I would have to put together a more rigorous test to see, I will get back to you :) – zemekeneng Jan 12 '18 at 00:50
  • @pault, great explanation. I would like to apply what you mention here in my question. Can you take a look at it as well? https://stackoverflow.com/questions/62070186/pyspark-mapping-values-from-different-dataframes?noredirect=1#comment109790263_62070186 – jgtrz May 28 '20 at 23:42
4

Using map_concat & create_map (pyspark 2.4+):

(
    df.withColumn(
        "new_map",
        F.map_concat(
            "old_map",
            F.create_map(F.lit("key"), F.lit("val"))
        )
    )
)

You can add multiple keys at once thanks to F.create_map, but F.map_concat won't replace old keys.

utkarshgupta137
  • 139
  • 3
  • 4
0

In case anyone reaches this question, but looking the solution for Scala:

df.withColumn("payload", map_concat(col("payload"), map(lit("other_field"), col("other_field"))))
Ignacio Alorre
  • 7,307
  • 8
  • 57
  • 94