I have a target
dataframe like this:
rdd_1 = sc.parallelize([(0,"A",2), (1,"B",1), (2,"A",2)])
rdd_2 = sc.parallelize([(0,223,"201603"), (1,3213,"201602"), (2,2321,"201601")])
df_tg = sqlContext.createDataFrame(rdd_1, ["id", "route_a", "route_b"])
df_data = sqlContext.createDataFrame(rdd_2, ["id", "cost", "date"])
df_tg.show()
+---+-------+-------+
| id|route_a|route_b|
+---+-------+-------+
| 0| A| 2|
| 1| B| 1|
| 2| A| 2|
+---+-------+-------+
and more information on another dataframe with some timestamps:
df_data.show()
+---+----+------+
| id|cost| date|
+---+----+------+
| 0| 223|201603|
| 1|3213|201602|
| 2|2321|201601|
+---+----+------+
and I need to create new columns joining the dataframes, but by date:
df_data_m1 = df_data[df_data.date == "201603"]
df_data_m2 = df_data[df_data.date == "201602"]
df_data_m3 = df_data[df_data.date == "201601"]
and now I need to create new columns:
df = df_tg.join(df_data_m1, df_data_m1.id == df_tg.id)
but this generates the problem of duplicated columns that we have to drop and rename the column so we will not overwrite the previous dates:
df = df_tg.join(df_data_m1, df_data_m1.id == df_tg.id).drop(df_data_m1.id).withColumnRenamed("cost", "cost_201603")
df = df.join(df_data_m2, df_data_m2.id == df_tg.id).drop(df_data_m2.id).withColumnRenamed("cost", "cost_201602")
and we have to do this for each column, for each month, so that's a lot of code, even if we write a UDF to handle this. Is there a way to do it directly?