9

I would like to create a function in PYSPARK that get Dataframe and list of parameters (codes/categorical features) and return the data frame with additional dummy columns like the categories of the features in the list PFA the Before and After DF: before and After data frame- Example

The code in python looks like that:

enum = ['column1','column2']

for e in enum:
    print e
    temp = pd.get_dummies(data[e],drop_first=True,prefix=e)
    data = pd.concat([data,temp], axis=1)
    data.drop(e,axis=1,inplace=True)

data.to_csv('enum_data.csv')
T.c
  • 93
  • 1
  • 1
  • 4

5 Answers5

21

First you need to collect distinct values of TYPES and CODE. Then either select add column with name of each value using withColumn or use select fro each column. Here is sample code using select statement:-

import pyspark.sql.functions as F
df = sqlContext.createDataFrame([
    (1, "A", "X1"),
    (2, "B", "X2"),
    (3, "B", "X3"),
    (1, "B", "X3"),
    (2, "C", "X2"),
    (3, "C", "X2"),
    (1, "C", "X1"),
    (1, "B", "X1"),
], ["ID", "TYPE", "CODE"])

types = df.select("TYPE").distinct().rdd.flatMap(lambda x: x).collect()
codes = df.select("CODE").distinct().rdd.flatMap(lambda x: x).collect()
types_expr = [F.when(F.col("TYPE") == ty, 1).otherwise(0).alias("e_TYPE_" + ty) for ty in types]
codes_expr = [F.when(F.col("CODE") == code, 1).otherwise(0).alias("e_CODE_" + code) for code in codes]
df = df.select("ID", "TYPE", "CODE", *types_expr+codes_expr)
df.show()

OUTPUT

+---+----+----+--------+--------+--------+---------+---------+---------+
| ID|TYPE|CODE|e_TYPE_A|e_TYPE_B|e_TYPE_C|e_CODE_X1|e_CODE_X2|e_CODE_X3|
+---+----+----+--------+--------+--------+---------+---------+---------+
|  1|   A|  X1|       1|       0|       0|        1|        0|        0|
|  2|   B|  X2|       0|       1|       0|        0|        1|        0|
|  3|   B|  X3|       0|       1|       0|        0|        0|        1|
|  1|   B|  X3|       0|       1|       0|        0|        0|        1|
|  2|   C|  X2|       0|       0|       1|        0|        1|        0|
|  3|   C|  X2|       0|       0|       1|        0|        1|        0|
|  1|   C|  X1|       0|       0|       1|        1|        0|        0|
|  1|   B|  X1|       0|       1|       0|        1|        0|        0|
+---+----+----+--------+--------+--------+---------+---------+---------+
Rakesh Kumar
  • 4,319
  • 2
  • 17
  • 30
11

The solutions provided by Freek Wiemkeijer and Rakesh Kumar are perfectly adequate, however, since I coded it up, I thought it was worth posting this generic solution as it doesn't require hard coding of the column names.

pivot_cols = ['TYPE','CODE']
keys = ['ID','TYPE','CODE']

before = sc.parallelize([(1,'A','X1'),
                         (2,'B','X2'),
                         (3,'B','X3'),
                         (1,'B','X3'),
                         (2,'C','X2'),
                         (3,'C','X2'),
                         (1,'C','X1'),
                         (1,'B','X1')]).toDF(['ID','TYPE','CODE'])                         

#Helper function to recursively join a list of dataframes
#Can be simplified if you only need two columns
def join_all(dfs,keys):
    if len(dfs) > 1:
        return dfs[0].join(join_all(dfs[1:],keys), on = keys, how = 'inner')
    else:
        return dfs[0]

dfs = []
combined = []
for pivot_col in pivot_cols:
    pivotDF = before.groupBy(keys).pivot(pivot_col).count()
    new_names = pivotDF.columns[:len(keys)] +  ["e_{0}_{1}".format(pivot_col, c) for c in pivotDF.columns[len(keys):]]        
    df = pivotDF.toDF(*new_names).fillna(0)    
    combined.append(df)

join_all(combined,keys).show()

This gives as output:

+---+----+----+--------+--------+--------+---------+---------+---------+
| ID|TYPE|CODE|e_TYPE_A|e_TYPE_B|e_TYPE_C|e_CODE_X1|e_CODE_X2|e_CODE_X3|
+---+----+----+--------+--------+--------+---------+---------+---------+
|  1|   A|  X1|       1|       0|       0|        1|        0|        0|
|  2|   C|  X2|       0|       0|       1|        0|        1|        0|
|  3|   B|  X3|       0|       1|       0|        0|        0|        1|
|  2|   B|  X2|       0|       1|       0|        0|        1|        0|
|  3|   C|  X2|       0|       0|       1|        0|        1|        0|
|  1|   B|  X3|       0|       1|       0|        0|        0|        1|
|  1|   B|  X1|       0|       1|       0|        1|        0|        0|
|  1|   C|  X1|       0|       0|       1|        1|        0|        0|
+---+----+----+--------+--------+--------+---------+---------+---------+
Alex
  • 21,273
  • 10
  • 61
  • 73
  • I am trying to run this code on a large dataframe and its taking a very long time. I am super new to spark do you think this code is not suitable for large data sets?if so is there a way to improve the performance. – rahul Aug 21 '19 at 13:59
  • Have you tried Kumars solution. If you have many columns, this solution will not be particularly fast. – Alex Aug 21 '19 at 21:07
  • I need to be able to do this over a lot of columns without hardcoding thats the reason I used ur solution, is there a way to run Kumars solution without the hardcoding? – rahul Aug 22 '19 at 05:49
  • when I am trying to run df.show(), it is not showing all the dummy columns – Abhay kumar Apr 22 '20 at 09:14
4

I was looking for the same solution but is scala, maybe this will help someone:

val list = df.select("category").distinct().rdd.map(r => r(0)).collect()
val oneHotDf = list.foldLeft(df)((df, category) => finalDf.withColumn("category_" + category, when(col("category") === category, 1).otherwise(0)))
ysegal
  • 41
  • 1
  • (The question is about python. This post does not seem to provide a [quality answer](https://stackoverflow.com/help/how-to-answer) to the question). – sɐunıɔןɐqɐp Aug 01 '18 at 06:57
  • 2
    I think its always a win-win if somebody is sharing knowledge and the only thing you can do is profit from ysegals answer. I don't like it if people get discouraged for their work by others because these think they have to strictly follow some ruleset (no matter the context or situation). – Markus Aug 21 '20 at 17:23
2

If you'd like to get the PySpark version of pandas "pd.get_dummies" function, you can you the following function:

import itertools

def spark_get_dummies(df):
    
    categories = []
    for i, values in enumerate(df.columns):
        categories.append(df.select(values).distinct().rdd.flatMap(lambda x: x).collect())
        
    expressions = []
    for i, values in enumerate(df.columns):
        expressions.append([F.when(F.col(values) == i, 1).otherwise(0).alias(str(values) + "_" + str(i)) for i in categories[i]])
    
    expressions_flat = list(itertools.chain.from_iterable(expressions))
    
    df_final = df.select(*expressions_flat)
    
    return df_final

The reproducible example is:

df = sqlContext.createDataFrame([
    ("A", "X1"),
    ("B", "X2"),
    ("B", "X3"),
    ("B", "X3"),
    ("C", "X2"),
    ("C", "X2"),
    ("C", "X1"),
    ("B", "X1"),
], ["TYPE", "CODE"])

dummies_df = spark_get_dummies(df)
dummies_df.show()

You will get:

enter image description here

0

The first step is to make a DataFrame from your CSV file.

See Get CSV to Spark dataframe ; the first answer gives a line by line example.

Then you can add the columns. Assume you have a DataFrame object called df, and the columns are: [ID, TYPE, CODE].

The rest van be fixed with DataFrame.withColumn() and pyspark.sql.functions.when:

from pyspark.sql.functions import when

df_with_extra_columns = df.withColumn("e_TYPE_A", when(df.TYPE == "A", 1).otherwise(0).withColumn("e_TYPE_B", when(df.TYPE == "B", 1).otherwise(0)

(this adds the first two columns. you get the point.)

Community
  • 1
  • 1
Freek Wiekmeijer
  • 4,556
  • 30
  • 37