So, from reading some interesting stuff here, I've ascertained that you can't really just append a random / arbitrary column to a given DataFrame
object. It appears what you want is more of a zip
than a join
. I looked around and found this ticket, which makes me think you won't be able to zip
given that you have DataFrame
rather than RDD
objects.
The only way I've been able to solve your issue invovles leaving the world of DataFrame
objects and returning to RDD
objects. I've also needed to create an index for the purpose of the join, which may or may not work with your use case.
l = sc.parallelize([1, 2, 3])
index = sc.parallelize(range(0, l.count()))
z = index.zip(l)
rdd = sc.parallelize([['p1', 'a'], ['p2', 'b'], ['p3', 'c']])
rdd_index = index.zip(rdd)
# just in case!
assert(rdd.count() == l.count())
# perform an inner join on the index we generated above, then map it to look pretty.
new_rdd = rdd_index.join(z).map(lambda (x, y): [y[0][0], y[0][1], y[1]])
new_df = new_rdd.toDF(["product", 'name', 'new_col'])
When I run new_df.show()
, I get:
+-------+----+-------+
|product|name|new_col|
+-------+----+-------+
| p1| a| 1|
| p2| b| 2|
| p3| c| 3|
+-------+----+-------+
Sidenote: I'm really surprised this didn't work. Looks like an outer join?
from pyspark.sql import Row
l = sc.parallelize([1, 2, 3])
new_row = Row("new_col_name")
l_as_df = l.map(new_row).toDF()
new_df = df.join(l_as_df)
When I run new_df.show()
, I get:
+-------+----+------------+
|product|name|new_col_name|
+-------+----+------------+
| p1| a| 1|
| p1| a| 2|
| p1| a| 3|
| p2| b| 1|
| p3| c| 1|
| p2| b| 2|
| p2| b| 3|
| p3| c| 2|
| p3| c| 3|
+-------+----+------------+