11

I have read similar questions but couldn't find a solution to my specific problem.

I have a list

l = [1, 2, 3]

and a DataFrame

df = sc.parallelize([
    ['p1', 'a'],
    ['p2', 'b'],
    ['p3', 'c'],
]).toDF(('product', 'name'))

I would like to obtain a new DataFrame where the list l is added as a further column, namely

+-------+----+---------+
|product|name| new_col |
+-------+----+---------+
|     p1|   a|     1   |
|     p2|   b|     2   |
|     p3|   c|     3   |
+-------+----+---------+

Approaches with JOIN, where I was joining df with an

 sc.parallelize([[1], [2], [3]])

have failed. Approaches using withColumn, as in

new_df = df.withColumn('new_col', l)

have failed because the list is not a Column object.

mar tin
  • 9,266
  • 23
  • 72
  • 97
  • 3
    I think this is a great question, because it shows a functionality that is seriously missing in Spark DataFrames API. – Katya Willard Mar 21 '16 at 17:40

3 Answers3

3

So, from reading some interesting stuff here, I've ascertained that you can't really just append a random / arbitrary column to a given DataFrame object. It appears what you want is more of a zip than a join. I looked around and found this ticket, which makes me think you won't be able to zip given that you have DataFrame rather than RDD objects.

The only way I've been able to solve your issue invovles leaving the world of DataFrame objects and returning to RDD objects. I've also needed to create an index for the purpose of the join, which may or may not work with your use case.

l = sc.parallelize([1, 2, 3])
index = sc.parallelize(range(0, l.count()))
z = index.zip(l)

rdd = sc.parallelize([['p1', 'a'], ['p2', 'b'], ['p3', 'c']])
rdd_index = index.zip(rdd)

# just in case!
assert(rdd.count() == l.count())
# perform an inner join on the index we generated above, then map it to look pretty.
new_rdd = rdd_index.join(z).map(lambda (x, y): [y[0][0], y[0][1], y[1]])
new_df = new_rdd.toDF(["product", 'name', 'new_col'])

When I run new_df.show(), I get:

+-------+----+-------+
|product|name|new_col|
+-------+----+-------+
|     p1|   a|      1|
|     p2|   b|      2|
|     p3|   c|      3|
+-------+----+-------+

Sidenote: I'm really surprised this didn't work. Looks like an outer join?

from pyspark.sql import Row
l = sc.parallelize([1, 2, 3])
new_row = Row("new_col_name")
l_as_df = l.map(new_row).toDF()
new_df = df.join(l_as_df)

When I run new_df.show(), I get:

+-------+----+------------+
|product|name|new_col_name|
+-------+----+------------+
|     p1|   a|           1|
|     p1|   a|           2|
|     p1|   a|           3|
|     p2|   b|           1|
|     p3|   c|           1|
|     p2|   b|           2|
|     p2|   b|           3|
|     p3|   c|           2|
|     p3|   c|           3|
+-------+----+------------+
Community
  • 1
  • 1
Katya Willard
  • 2,152
  • 4
  • 22
  • 43
  • Note that you can transform a DataFrame into an RDD using df.rdd directly, for example on my df in the question. – mar tin Mar 21 '16 at 17:25
1

If the product column is unique then consider the following approach:

original dataframe:

df = spark.sparkContext.parallelize([
    ['p1', 'a'],
    ['p2', 'b'],
    ['p3', 'c'],
]).toDF(('product', 'name'))

df.show()

+-------+----+
|product|name|
+-------+----+
|     p1|   a|
|     p2|   b|
|     p3|   c|
+-------+----+

new column (and new index column):

lst = [1, 2, 3]
indx = ['p1','p2','p3']

create a new dataframe from the list above (with an index):

from pyspark.sql.types import *
myschema= StructType([ StructField("indx", StringType(), True),
                       StructField("newCol", IntegerType(), True)                       
                     ])
df1=spark.createDataFrame(zip(indx,lst),schema = myschema)
df1.show()
+----+------+
|indx|newCol|
+----+------+
|  p1|     1|
|  p2|     2|
|  p3|     3|
+----+------+

join this to the original dataframe, using the index created:

dfnew = df.join(df1, df.product == df1.indx,how='left')\
          .drop(df1.indx)\
          .sort("product")

to get:

dfnew.show()

+-------+----+------+
|product|name|newCol|
+-------+----+------+
|     p1|   a|     1|
|     p2|   b|     2|
|     p3|   c|     3|
+-------+----+------+
Grant Shannon
  • 4,709
  • 1
  • 46
  • 36
0

This is achievable via RDDs.

1 Convert dataframes to indexed rdds:

df_rdd = df.rdd.zipWithIndex().map(lambda row: (row[1], (row[0][0], row[0][1])))
l_rdd = sc.parallelize(l).zipWithIndex().map(lambda row: (row[1], row[0]))

2 Join two RDDs on index, drop index and rearrange elements:

res_rdd = df_rdd.join(l_rdd).map(lambda row: [row[1][0][0], row[1][0][1], row[1][1]])

3 Convert result to Dataframe:

res_df = res_rdd.toDF(['product', 'name', 'new_col'])
res_df.show()

+-------+----+-------+
|product|name|new_col|
+-------+----+-------+
|     p1|   a|      1|
|     p2|   b|      2|
|     p3|   c|      3|
+-------+----+-------+
Val
  • 345
  • 2
  • 14