1

I have a list lists=[0,1,2,3,5,6,7]. Order is not sequential. I have a pyspark dataframe with 9 columns.

+-------------------+--------+--------+--------+--------+--------+--------+---------------+-----+----+
|               date|ftt (°c)|rtt (°c)|fbt (°c)|rbt (°c)|fmt (°c)|rmt (°c)|fmhhumidityunit|index|Diff|
+-------------------+--------+--------+--------+--------+--------+--------+---------------+-----+----+
|2019-02-01 05:29:47|     NaN|     NaN|     NaN|     NaN|     NaN|     NaN|            NaN|    0| NaN|
|2019-02-01 05:29:17|     NaN|     NaN|     NaN|     NaN|     NaN|    NaN|           NaN|    1| NaN |

I need to add my lists as a column to my existing dataframe. My lists is not in order so iam not able to use udf. Is there a way to do it?.Please help me I want it to be like this

+-------------------+--------+--------+--------+--------+--------+--------+---------------+-----+----+------+
|               date|ftt (°c)|rtt (°c)|fbt (°c)|rbt (°c)|fmt (°c)|rmt (°c)|fmhhumidityunit|index|Diff|lists |
+-------------------+--------+--------+--------+--------+--------+--------+---------------+-----+----+-------+
|2019-02-01 05:29:47|     NaN|     NaN|     NaN|     NaN|     NaN|     NaN|            NaN|    0| NaN|0     |
|2019-02-01 05:29:17|     NaN|     NaN|     NaN|     NaN|     NaN|     NaN|           NaN|    1| NaN |1     |
vikrant rana
  • 4,509
  • 6
  • 32
  • 72
  • can you show us how would you like to add this list to your existing dataframe? – vikrant rana Oct 01 '19 at 16:31
  • I have added how the dataframe should look like. I have added only two rows but main problem is to added a column to my dataframe and I have a lists. – user-2147482338 Oct 01 '19 at 16:41
  • Not iterate but just added my lists of values as a new column to to my existing dataframe. – user-2147482338 Oct 01 '19 at 16:48
  • Possible duplicate of [How do I add a new column to a Spark DataFrame (using PySpark)?](https://stackoverflow.com/questions/33681487/how-do-i-add-a-new-column-to-a-spark-dataframe-using-pyspark) – Daniel Oct 01 '19 at 18:18
  • @Daniel its different question. I have a list and I want to add it as a column to my dataframe – user-2147482338 Oct 01 '19 at 18:29
  • 1
    Should the first item of your list be assigned to the row with the earliest date (i.e. is your dataframe ordered by date)? – cronoik Oct 01 '19 at 19:21
  • @cronoik. It is ordered by date. Suppose I have a dataframe with Row=7 and column=9. I have a list where its length is 7. I need this list to be added as a new column to my dataframe. So my resultant dataframe will have 10 column. – user-2147482338 Oct 01 '19 at 19:45
  • 1
    I just saw that you have in index column. Is it consecutive, starting from `0` and can I also say that first index of your list belongs to first the row with index `0`? I'm asking this question because pyspark dataframes are not ordered (like pandas) and to conduct such an operation requires a column which allows you to order your dataframe. – cronoik Oct 01 '19 at 19:56
  • Will something like below work? list = [(1,'DEF'),(2,'KLM')] df=spark.createDataFrame(list, ['id', 'value']) lists=[5,6] rdd = sc.parallelize(lists) df=df.rdd.zip(rdd).map(lambda x: (x[0][0],x[0][1],x[1])).toDF(["id", "Value" , "index"]) – vikrant rana Oct 01 '19 at 20:40
  • you can create rdd from given list and zip it with the existing dataframe and can use map operation on it. but the given list items and dataframe rows has to be same for above method. – vikrant rana Oct 01 '19 at 20:41
  • Thanks but can you explain `x[0][0],x[0][1],x[1]`. Is it dynamic?. Solution should work for any number of columns. length of lists is equal to dataframe rows. – user-2147482338 Oct 03 '19 at 06:50
  • @cronoik..yes its consecutive only. but the elements in the lists are not consecutive. – user-2147482338 Oct 03 '19 at 06:51
  • @user-2147482338. if the length of list and dataframe rows are equal above will work for any number of columns. you just need to include each each column element into your map function. – vikrant rana Oct 03 '19 at 10:51
  • or can use some list comprehension to exclude coding it separately for each element. – vikrant rana Oct 03 '19 at 10:52
  • @vikrantrana thanks a lot. so `x[0][0],x[0][1],x[1]` will work for any number of column right? – user-2147482338 Oct 03 '19 at 13:05
  • No. it was specific to my dataframe which is having two columns. you may need to include your columns as an element or can use some list comprehensions. I will give a try after some time. – vikrant rana Oct 03 '19 at 13:30

2 Answers2

1

Not too sure if it has to be something like this or were you expecting something else. If your number of list items and dataframe rows has to be same then here's a simple approach.

For a given sample dataframe with three columns:

 l = [(1,'DEF',33),(2,'KLM',22),(3,'ABC',32),(4,'XYZ',77)]
 df=spark.createDataFrame(l, ['id', 'value','age'])

Lets say here's a list:

lists=[5,6,7,8]

Can create a rdd from this list and use a zip function with the dataframe and use map function over it.

listrdd = sc.parallelize(lists)

newdf=df.rdd.zip(listrdd).map(lambda (x,y ) : ([x for x in x] + [y])).toDF(["id", "Value",",age","List_element"])

>>> ziprdd=df.rdd.zip(listrdd)
>>> ziprdd.take(50)
[(Row(id=1, value=u'DEF', age=33), 5), (Row(id=2, value=u'KLM', age=22), 6), (Row(id=3, value=u'ABC', age=32), 7), (Row(id=4, value=u'XYZ', age=77), 8)]

As zip function return key value pairs having first element contains data from first rdd and second element contains data from second rdd. I am using list comprehension for first element and concatenating it with second element.

It's dynamic and can work for n number of columns but list elements and dataframe rows has to be same.

>>> newdf.show()
]+---+-----+----+------------+
| id|Value|,age|List_element|
+---+-----+----+------------+
|  1|  DEF|  33|           5|
|  2|  KLM|  22|           6|
|  3|  ABC|  32|           7|
|  4|  XYZ|  77|           8|
+---+-----+----+------------+

Note: Both rdd partition count has to be same for using zip method else you will get an error

ValueError: Can only zip with RDD which has the same number of partitions
vikrant rana
  • 4,509
  • 6
  • 32
  • 72
0

you can join two dfs, like this:

df2 = spark.createDataFrame()
df= df.join(df2, on=['index']).drop('index')

df2 will contain the columns you wish to add to the main df.

Elad Cohen
  • 453
  • 3
  • 16