18

I have to add column to a PySpark dataframe based on a list of values.

a= spark.createDataFrame([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")],["Animal", "Enemy"])

I have a list called rating, which is a rating of each pet.

rating = [5,4,1]

I need to append the dataframe with a column called Rating, such that

+------+-----+------+
|Animal|Enemy|Rating|
+------+-----+------+
|   Dog|  Cat|     5|
|   Cat|  Dog|     4|
| Mouse|  Cat|     1|
+------+-----+------+

I have done the following however it is returning only the first value in the list in the Rating Column

def add_labels():
    return rating.pop(0)

labels_udf = udf(add_labels, IntegerType())

new_df = a.withColumn('Rating', labels_udf()).cache()

out:

+------+-----+------+
|Animal|Enemy|Rating|
+------+-----+------+
|   Dog|  Cat|     5|
|   Cat|  Dog|     5|
| Mouse|  Cat|     5|
+------+-----+------+
blackbishop
  • 30,945
  • 11
  • 55
  • 76
Bryce Ramgovind
  • 3,127
  • 10
  • 41
  • 72

7 Answers7

27
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql import Window

#sample data
a= sqlContext.createDataFrame([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")],
                               ["Animal", "Enemy"])
a.show()

#convert list to a dataframe
rating = [5,4,1]
b = sqlContext.createDataFrame([(l,) for l in rating], ['Rating'])

#add 'sequential' index and join both dataframe to get the final result
a = a.withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))
b = b.withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))

final_df = a.join(b, a.row_idx == b.row_idx).\
             drop("row_idx")
final_df.show()

Input:

+------+-----+
|Animal|Enemy|
+------+-----+
|   Dog|  Cat|
|   Cat|  Dog|
| Mouse|  Cat|
+------+-----+

Output is:

+------+-----+------+
|Animal|Enemy|Rating|
+------+-----+------+
|   Cat|  Dog|     4|
|   Dog|  Cat|     5|
| Mouse|  Cat|     1|
+------+-----+------+
Zoe
  • 27,060
  • 21
  • 118
  • 148
Prem
  • 11,775
  • 1
  • 19
  • 33
  • 1
    @IgorS I agree with _The generated ID is guaranteed to be monotonically increasing and unique_ , but it is not possible to give inconsistent result because the answer is not using `monotonically_increasing_id()` to directly compare the row; rather it is using it to generate consecutive row number starting from 1 using `row_number()` function. – Ankit Agrawal Aug 24 '20 at 07:10
  • this worked for me, just creating the dataframe from the rating list can be improved as: df = spark.createDataFrame(rating.astype(IntegerType), IntegerType()) – user3322581 Apr 06 '22 at 16:50
  • 2
    Tried this answer on a larger dataframe and the columns got mismatched. – The Singularity Apr 26 '22 at 10:15
9

As mentioned by @Tw UxTLi51Nus, if you can order the DataFrame, let's say, by Animal, without this changing your results, you can then do the following:

def add_labels(indx):
    return rating[indx-1] # since row num begins from 1
labels_udf = udf(add_labels, IntegerType())

a = spark.createDataFrame([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")],["Animal", "Enemy"])
a.createOrReplaceTempView('a')
a = spark.sql('select row_number() over (order by "Animal") as num, * from a')

a.show()


+---+------+-----+
|num|Animal|Enemy|
+---+------+-----+
|  1|   Dog|  Cat|
|  2|   Cat|  Dog|
|  3| Mouse|  Cat|
+---+------+-----+

new_df = a.withColumn('Rating', labels_udf('num'))
new_df.show()
+---+------+-----+------+
|num|Animal|Enemy|Rating|
+---+------+-----+------+
|  1|   Dog|  Cat|     5|
|  2|   Cat|  Dog|     4|
|  3| Mouse|  Cat|     1|
+---+------+-----+------+

And then drop the num column:

new_df.drop('num').show()
+------+-----+------+
|Animal|Enemy|Rating|
+------+-----+------+
|   Dog|  Cat|     5|
|   Cat|  Dog|     4|
| Mouse|  Cat|     1|
+------+-----+------+

Edit:

Another - but perhaps ugly and a bit inefficient - way, if you cannot sort by a column, is to go back to rdd and do the following:

a = spark.createDataFrame([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")],["Animal", "Enemy"])

# or create the rdd from the start:
# a = spark.sparkContext.parallelize([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")])

a = a.rdd.zipWithIndex()
a = a.toDF()
a.show()

+-----------+---+
|         _1| _2|
+-----------+---+
|  [Dog,Cat]|  0|
|  [Cat,Dog]|  1|
|[Mouse,Cat]|  2|
+-----------+---+

a = a.select(bb._1.getItem('Animal').alias('Animal'), bb._1.getItem('Enemy').alias('Enemy'), bb._2.alias('num'))

def add_labels(indx):
    return rating[indx] # indx here will start from zero

labels_udf = udf(add_labels, IntegerType())

new_df = a.withColumn('Rating', labels_udf('num'))

new_df.show()

+---------+--------+---+------+
|Animal   |   Enemy|num|Rating|
+---------+--------+---+------+
|      Dog|     Cat|  0|     5|
|      Cat|     Dog|  1|     4|
|    Mouse|     Cat|  2|     1|
+---------+--------+---+------+

(I would not recommend this if you have much data)

Hope this helps, good luck!

mkaran
  • 2,528
  • 20
  • 23
9

I might be wrong, but I believe the accepted answer will not work. monotonically_increasing_id only guarantees that the ids will be unique and increasing, not that they will be consecutive. Hence using it on two different dataframes will likely create two very different columns, and the join will mostly return empty.

taking inspiration from this answer https://stackoverflow.com/a/48211877/7225303 to a similar question, we could change the incorrect answer to:

from pyspark.sql.window import Window as W
from pyspark.sql import functions as F

a= sqlContext.createDataFrame([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")],
                               ["Animal", "Enemy"])

a.show()

+------+-----+
|Animal|Enemy|
+------+-----+
|   Dog|  Cat|
|   Cat|  Dog|
| Mouse|  Cat|
+------+-----+



#convert list to a dataframe
rating = [5,4,1]
b = sqlContext.createDataFrame([(l,) for l in rating], ['Rating'])
b.show()

+------+
|Rating|
+------+
|     5|
|     4|
|     1|
+------+


a = a.withColumn("idx", F.monotonically_increasing_id())
b = b.withColumn("idx", F.monotonically_increasing_id())

windowSpec = W.orderBy("idx")
a = a.withColumn("idx", F.row_number().over(windowSpec))
b = b.withColumn("idx", F.row_number().over(windowSpec))

a.show()
+------+-----+---+
|Animal|Enemy|idx|
+------+-----+---+
|   Dog|  Cat|  1|
|   Cat|  Dog|  2|
| Mouse|  Cat|  3|
+------+-----+---+

b.show()
+------+---+
|Rating|idx|
+------+---+
|     5|  1|
|     4|  2|
|     1|  3|
+------+---+

final_df = a.join(b, a.idx == b.idx).drop("idx")

+------+-----+------+
|Animal|Enemy|Rating|
+------+-----+------+
|   Dog|  Cat|     5|
|   Cat|  Dog|     4|
| Mouse|  Cat|     1|
+------+-----+------+
Biggus
  • 186
  • 2
  • 2
  • 3
    this is okay! but without `partition` this can cause serious performance degradation on a large dataset. – theshemul Feb 18 '20 at 20:40
3

You can convert your rating into rdd

rating = [5,4,1]
ratingrdd = sc.parallelize(rating)

And then convert your dataframe to rdd, attach each value of ratingrdd to rdd dataframe using zip and convert the zipped rdd to dataframe again

sqlContext.createDataFrame(a.rdd.zip(ratingrdd).map(lambda x: (x[0][0], x[0][1], x[1])), ["Animal", "Enemy", "Rating"]).show()

It should give you

+------+-----+------+
|Animal|Enemy|Rating|
+------+-----+------+
|   Dog|  Cat|     5|
|   Cat|  Dog|     4|
| Mouse|  Cat|     1|
+------+-----+------+
Ramesh Maharjan
  • 41,071
  • 6
  • 69
  • 97
1

What you are trying to do does not work, because the rating list is in your driver's memory, whereas the a dataframe is in the executor's memory (the udf works on the executors too).

What you need to do is add the keys to the ratings list, like so:

ratings = [('Dog', 5), ('Cat', 4), ('Mouse', 1)]

Then you create a ratings dataframe from the list and join both to get the new colum added:

ratings_df = spark.createDataFrame(ratings, ['Animal', 'Rating'])
new_df = a.join(ratings_df, 'Animal')
akoeltringer
  • 1,671
  • 3
  • 19
  • 34
  • The problem is I cant put in a key. Its basically indexed in a specific order. – Bryce Ramgovind Jan 09 '18 at 09:13
  • Spark Dataframes do not guarantee a specific ordering, unless you call ``orderBy`` on it. So if you know the ordering of both the ``a`` dataframe and the ``ratings`` dataframe, you can come up with a key to combine them. If you do not know this, there is no way to combine the two dataframes... – akoeltringer Jan 09 '18 at 09:54
1

We can add new column to Pandas Data Frame, PySpark provides function to convert Spark Data Frame to Pandas Data Frame.

test_spark_df = spark.createDataFrame([(1,'A'), (2, 'B'), (3, 'C')], schema=['id', 'name'])
test_spark_df.show()

+---+----+
| id|name|
+---+----+
|  1|   A|
|  2|   B|
|  3|   C|
+---+----+

Convert this spark- df to pandas df.

new_pandas_df = test_spark_df.toPandas()
new_pandas_df['gender'] = ['M', 'F', 'M']
new_pandas_df

    id  name  gender
0   1   A     M
1   2   B     F
2   3   C     M

Convert this pandas df to spark df.

converted_spark_df = spark.createDataFrame(new_pandas_df)
converted_spark_df.show()

+---+----+------+
| id|name|gender|
+---+----+------+
|  1|   A|     M|
|  2|   B|     F|
|  3|   C|     M|
+---+----+------+
bhargav3vedi
  • 521
  • 1
  • 6
  • 11
  • 2
    Converting between PySpark and Pandas creates inefficiently on the master node, as all the data needs to be collected from the different worker nodes. So for this problem the solution work but not scalable. – Bryce Ramgovind Apr 11 '22 at 15:09
-1

Following the initial idea of using udf, you can do the following:

import pyspark.sql.functions as F

def add_labels(idx):
    lista = [5,4,1]
    return lista[idx]

a = spark.createDataFrame([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")],["Animal", "Enemy"])
a = a.withColumn("idx", F.monotonically_increasing_id())
a.show()

+------+-----+---+
|Animal|Enemy|idx|
+------+-----+---+
|   Dog|  Cat|  0|
|   Cat|  Dog|  1|
| Mouse|  Cat|  2|
+------+-----+---+

labels_udf = F.udf(add_labels, IntegerType())
new_df = a.withColumn('Rating', labels_udf(F.col('idx'))).drop('idx')
new_df.show()

+------+-----+------+
|Animal|Enemy|Rating|
+------+-----+------+
|   Dog|  Cat|     5|
|   Cat|  Dog|     4|
| Mouse|  Cat|     1|
+------+-----+------+