0

I'm work on a dataframe with two inicial columns, id and colA.

+---+-----+
|id |colA |
+---+-----+
| 1 |  5  |
| 2 |  9  |
| 3 |  3  |
| 4 |  1  |
+---+-----+

I need to merge that dataFrame to another column more, colB. I know that colB fits perfectly at the end of the dataFrame, I just need some way to join it all together.

+-----+
|colB |
+-----+
|  8  |
|  7  | 
|  0  | 
|  6  |
+-----+

In result of these, I need to obtain a new dataFrame like that below:

+---+-----+-----+
|id |colA |colB |
+---+-----+-----+
| 1 |  5  | 8   |
| 2 |  9  | 7   |
| 3 |  3  | 0   |
| 4 |  1  | 6   |
+---+-----+-----+

This is the pyspark code to obtain the first DataFrame:

l=[(1,5),(2,9), (3,3), (4,1)]
names=["id","colA"]
db=sqlContext.createDataFrame(l,names)
db.show()

How can I do it? Could anyone help me, please? Thanks

Thaise
  • 1,043
  • 3
  • 16
  • 28
  • You cannot add an arbitrary column to a DataFrame in Spark - see extensive answer here: https://stackoverflow.com/questions/33681487/how-do-i-add-a-new-column-to-a-spark-dataframe-using-pyspark – desertnaut Oct 18 '17 at 12:56
  • Possible duplicate of [How do I add a new column to a Spark DataFrame (using PySpark)?](https://stackoverflow.com/questions/33681487/how-do-i-add-a-new-column-to-a-spark-dataframe-using-pyspark) – desertnaut Oct 18 '17 at 12:56

1 Answers1

0

I've done! I've solved it by adding a temporary column with the indices of the rows and then I delete it.

code:

from pyspark.sql import Row
from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber
w = Window().orderBy()

l=[(1,5),(2,9), (3,3), (4,1)]
names=["id","colA"]
db=sqlContext.createDataFrame(l,names)
db.show()

l=[5,9,3,1]
rdd = sc.parallelize(l).map(lambda x: Row(x))
test_df = rdd.toDF()
test_df2 =  test_df.selectExpr("_1 as colB")
dbB = test_df2.select("colB")

db= db.withColum("columnindex", rowNumber().over(w))
dbB = dbB.withColum("columnindex", rowNumber().over(w))


testdf_out = db.join(dbB, db.columnindex == dbB.columnindex. 'inner').drop(db.columnindex).drop(dbB.columnindex)
testdf_out.show()
Thaise
  • 1,043
  • 3
  • 16
  • 28
  • you can use monotonically_increasing_id to create the temporary index directly to each dataframe and join them.Check that as well. – Suresh Oct 18 '17 at 15:06