0

I have a target dataframe like this:

rdd_1 = sc.parallelize([(0,"A",2), (1,"B",1), (2,"A",2)])
rdd_2 = sc.parallelize([(0,223,"201603"), (1,3213,"201602"), (2,2321,"201601")])
df_tg = sqlContext.createDataFrame(rdd_1, ["id", "route_a", "route_b"])
df_data = sqlContext.createDataFrame(rdd_2, ["id", "cost", "date"])

 df_tg.show()

+---+-------+-------+
| id|route_a|route_b|
+---+-------+-------+
|  0|      A|      2|
|  1|      B|      1|
|  2|      A|      2|
+---+-------+-------+

and more information on another dataframe with some timestamps:

 df_data.show()

+---+----+------+
| id|cost|  date|
+---+----+------+
|  0| 223|201603|
|  1|3213|201602|
|  2|2321|201601|
+---+----+------+

and I need to create new columns joining the dataframes, but by date:

df_data_m1 = df_data[df_data.date == "201603"]
df_data_m2 = df_data[df_data.date == "201602"]
df_data_m3 = df_data[df_data.date == "201601"]

and now I need to create new columns:

df = df_tg.join(df_data_m1, df_data_m1.id == df_tg.id)

but this generates the problem of duplicated columns that we have to drop and rename the column so we will not overwrite the previous dates:

 df = df_tg.join(df_data_m1, df_data_m1.id == df_tg.id).drop(df_data_m1.id).withColumnRenamed("cost", "cost_201603")
 df = df.join(df_data_m2, df_data_m2.id == df_tg.id).drop(df_data_m2.id).withColumnRenamed("cost", "cost_201602")

and we have to do this for each column, for each month, so that's a lot of code, even if we write a UDF to handle this. Is there a way to do it directly?

Community
  • 1
  • 1
Ivan
  • 19,560
  • 31
  • 97
  • 141

1 Answers1

3

The simplest way to deal with duplicate columns is not to generate these at all. If all you have is equi-joins all you need is correct join. Spark provides special syntax for cases like this were you simply enumerate join columns:

tmp = df_tg.join(df_data, ["id"])
tmp.printSchema() 
## root
##  |-- id: long (nullable = true)
##  |-- route_a: string (nullable = true)
##  |-- route_b: long (nullable = true)
##  |-- cost: long (nullable = true)
##  |-- date: string (nullable = true)

On a side note the remaining part of your code smell fishy. If DataFrame is not partitioned each filter like this:

df_data[df_data.date == "201603"]

requires a linear scan over all records.

It is not exactly clear for me what is your goal but if you want to separate data for writes you can use partitionBy method in supported writers:

tmp.write.partitionBy("date")

In other cases pivoting can be a better choice.

Finally if you plan to apply some aggregations it makes more sense to shuffle once (groupBy or repartition) than doing this on separated data.

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935