25

I'm a newbie in PySpark.

I have a Spark DataFrame df that has a column 'device_type'.

I want to replace every value that is in "Tablet" or "Phone" to "Phone", and replace "PC" to "Desktop".

In Python I can do the following,

deviceDict = {'Tablet':'Mobile','Phone':'Mobile','PC':'Desktop'}
df['device_type'] = df['device_type'].replace(deviceDict,inplace=False)

How can I achieve this using PySpark? Thanks!

zero323
  • 322,348
  • 103
  • 959
  • 935
Yuehan Lyu
  • 1,044
  • 4
  • 11
  • 17

6 Answers6

33

You can use either na.replace:

df = spark.createDataFrame([
    ('Tablet', ), ('Phone', ),  ('PC', ), ('Other', ), (None, )
], ["device_type"])

df.na.replace(deviceDict, 1).show()
+-----------+
|device_type|
+-----------+
|     Mobile|
|     Mobile|
|    Desktop|
|      Other|
|       null|
+-----------+

or map literal:

from itertools import chain
from pyspark.sql.functions import create_map, lit

mapping = create_map([lit(x) for x in chain(*deviceDict.items())])


df.select(mapping[df['device_type']].alias('device_type'))
+-----------+
|device_type|
+-----------+
|     Mobile|
|     Mobile|
|    Desktop|
|       null|
|       null|
+-----------+

Please note that the latter solution will convert values not present in the mapping to NULL. If this is not a desired behavior you can add coalesce:

from pyspark.sql.functions import coalesce


df.select(
    coalesce(mapping[df['device_type']], df['device_type']).alias('device_type')
)
+-----------+
|device_type|
+-----------+
|     Mobile|
|     Mobile|
|    Desktop|
|      Other|
|       null|
+-----------+
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Greetings. Even though it is more than a year later: I want to use the mapping approach with pyspark 2.1. However, in contrast to the example, when my table contains a "NULL" entry I get the error : "Py4JJavaError: An error occurred while calling o6564.collectToPython. : java.lang.RuntimeException: Cannot use null as map key!". Am i misunderstanding this or can you give a hint on where the problem has its source? Thanks – gilgamash Sep 28 '18 at 12:24
20

After a lot of searching and alternatives I think that the simplest way to replace using a python dict is with pyspark dataframe method replace:

deviceDict = {'Tablet':'Mobile','Phone':'Mobile','PC':'Desktop'}
df_replace = df.replace(deviceDict,subset=['device_type'])

This will replace all values with the dict, you can get the same results using df.na.replace() if you pass a dict argument combined with a subset argument. It's not clear enough on his docs because if you search the function replace you will get two references, one inside of pyspark.sql.DataFrame.replace and the other one in side of pyspark.sql.DataFrameNaFunctions.replace, but the sample code of both reference use df.na.replace so it is not clear you can actually use df.replace.

José
  • 411
  • 3
  • 5
9

Here is a little helper function, inspired by the R recode function, that abstracts the previous answers. As a bonus, it adds the option for a default value.

from itertools import chain
from pyspark.sql.functions import col, create_map, lit, when, isnull
from pyspark.sql.column import Column

df = spark.createDataFrame([
    ('Tablet', ), ('Phone', ),  ('PC', ), ('Other', ), (None, )
], ["device_type"])

deviceDict = {'Tablet':'Mobile','Phone':'Mobile','PC':'Desktop'}

df.show()
+-----------+
|device_type|
+-----------+
|     Tablet|
|      Phone|
|         PC|
|      Other|
|       null|
+-----------+

Here is the definition of recode.

def recode(col_name, map_dict, default=None):
    if not isinstance(col_name, Column): # Allows either column name string or column instance to be passed
        col_name = col(col_name)
    mapping_expr = create_map([lit(x) for x in chain(*map_dict.items())])
    if default is None:
        return  mapping_expr.getItem(col_name)
    else:
        return when(~isnull(mapping_expr.getItem(col_name)), mapping_expr.getItem(col_name)).otherwise(default)

Creating a column without a default gives null/None in all unmatched values.

df.withColumn("device_type", recode('device_type', deviceDict)).show()

+-----------+
|device_type|
+-----------+
|     Mobile|
|     Mobile|
|    Desktop|
|       null|
|       null|
+-----------+

On the other hand, specifying a value for default replaces all unmatched values with this default.

df.withColumn("device_type", recode('device_type', deviceDict, default='Other')).show()

+-----------+
|device_type|
+-----------+
|     Mobile|
|     Mobile|
|    Desktop|
|      Other|
|      Other|
+-----------+
Daveed
  • 149
  • 2
  • 8
yardsale8
  • 940
  • 9
  • 15
  • how can you avoid hard coding 'device_type'? @yardsale8 – jgtrz Jun 11 '20 at 01:47
  • 1
    Since `device_type` is a column name, I am not sure you want to abstract that out. If you did, you could put the expression in a function that had the `df`, column name, and translation dict as arguments. – yardsale8 Aug 26 '20 at 16:26
7

You can do this using df.withColumn too:

from itertools import chain
from pyspark.sql.functions import create_map, lit

deviceDict = {'Tablet':'Mobile','Phone':'Mobile','PC':'Desktop'}

mapping_expr = create_map([lit(x) for x in chain(*deviceDict.items())])

df = df.withColumn('device_type', mapping_expr[df['dvice_type']])
df.show()
Ali AzG
  • 1,861
  • 2
  • 18
  • 28
  • How to do it in scala language ? – mytabi May 29 '20 at 11:25
  • @mytabi I think there is no `create_map` and `lit` for scala and spark. However `match` and `case` in scala can be an alternative solution to achieve the same result. – Ali AzG May 29 '20 at 14:59
  • @AliAzG is there a way to Remove those rows from a pyspark dataframe whose entries from a column [of the pyspark] are not present in a dictionary's list of keys? – mang4521 Apr 10 '22 at 07:18
6

The simplest way to do it is to apply a udf on your dataframe :

    from pyspark.sql.functions import col , udf

    deviceDict = {'Tablet':'Mobile','Phone':'Mobile','PC':'Desktop'}
    map_func = udf(lambda row : deviceDict.get(row,row))
    df = df.withColumn("device_type", map_func(col("device_type")))
narjes Karmeni
  • 109
  • 2
  • 2
  • thanks..there is a option in case the value not match with the column and set None? – GiovaniSalazar Aug 11 '20 at 15:24
  • A proper way to do it : def mapping_func(x,deviceDict): try: return deviceDict.get(x,x) except: return None map_func = udf(lambda row : mapping_func(row)) df = df.withColumn("device_type", map_func(col("device_type"))) – narjes Karmeni Aug 19 '20 at 14:44
2

Another way of solving this is using CASE WHEN in traditional sql but using f-strings and using the python dictionary along with .join for automatically generating the CASE WHEN statement:

column = 'device_type' #column to replace

e = f"""CASE {' '.join([f"WHEN {column}='{k}' THEN '{v}'" 
             for k,v in deviceDict.items()])} ELSE {column} END"""

df.withColumn(column,F.expr(e)).show()

+-----------+
|device_type|
+-----------+
|     Mobile|
|     Mobile|
|    Desktop|
|      Other|
|       null|
+-----------+

Note: if you want to return NULL where the keys doesnot match, just change ELSE {column} END to ELSE NULL END in the case statement for variable e

column = 'device_type' #column to replace

e = f"""CASE {' '.join([f"WHEN {column}='{k}' THEN '{v}'" 
             for k,v in deviceDict.items()])} ELSE NULL END"""

df.withColumn('New_Col',F.expr(e)).show()

+-----------+-------+
|device_type|New_Col|
+-----------+-------+
|     Tablet| Mobile|
|      Phone| Mobile|
|         PC|Desktop|
|      Other|   null|
|       null|   null|
+-----------+-------+
anky
  • 74,114
  • 11
  • 41
  • 70