1

I have the following data in a dataframe

col1    col2    col3    col4
1       desc1    v1      v3
2       desc2    v4      v2
1       desc1    v4      v2
2       desc2    v1      v3

I need only the first row of each unique combination of col1,col2 like below

Expected Output:

col1    col2    col3    col4
1       desc1    v1      v3
2       desc2    v4      v2

How can I achieve this in pyspark (version 1.3.1)?

I tried and achieved the same by converting the dataframe into an rdd and then applying map and reduceByKey functions and then converting back the result rdd into dataframe. Is there any other way to perform the above operation using dataframe functions?

Community
  • 1
  • 1
Mohan
  • 867
  • 2
  • 7
  • 25
  • 1
    Is there an logical order to consider ? – eliasah Apr 09 '16 at 14:58
  • To make it more clear: First /Last row is not mandatory. I need distinct values of (col1,col2) along with any values of (col3,col4) from same row. I am not able to use aggregate functions like min or max, because they will bring values of col3 and col4 from different rows. – Mohan Apr 09 '16 at 15:20
  • 2
    If I can introduce one more column(col5) with values of: row_number() over (partition by col1,col2 order by col1,col2 asc) , I can filter out the first rows using the new column - col5 = 1. – Mohan Apr 09 '16 at 15:28
  • http://stackoverflow.com/a/35226857/1560062 – zero323 Apr 09 '16 at 15:34
  • Window functions are not available in spark 1.3.1. Is there a way to bring them in?? – Mohan Apr 09 '16 at 15:57
  • No, but you can use struct ordering. – eliasah Apr 09 '16 at 16:12
  • @eliasah Not in 1.3. – zero323 Apr 09 '16 at 16:40

1 Answers1

2

If you want an arbitrary row you can try to use first or last but it is far from pretty and I would seriously consider upgrading Spark:

from pyspark.sql.functions import col, first

df = sc.parallelize([
  (1, "desc1", "v1", "v3"), (2, "desc2", "v4", "v2"),
  (1, "desc1", "v4", "v2"), (2, "desc2", "v1", "v3")
]).toDF(["col1", "col2", "col3", "col4"])

keys = ["col1", "col2"]
values = ["col3", "col4"]
agg_exprs = [first(c).alias(c) for c in keys + ["vs_"]]
select_exprs = keys + [
    "vs_.col{0} AS {1}".format(i + 1, v) for (i, v) in enumerate(values)]

df_not_so_first = (df
  .selectExpr("struct({}) AS vs_".format(",".join(values)), *keys)
  .groupBy(*keys)
  .agg(*agg_exprs)
  .selectExpr(*select_exprs))

Note that in this particular context first doesn't choose any specific row and results may not be deterministic. Moreover, depending on a Spark version, individual aggregations can be scheduled separately. It means that

df.groupBy("col1", "col2").agg(first("col3"), first("col4"))

doesn't guarantee col3 and col4 will be selected from the same row.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • Thanks a lot. I used first() function and tried the following. It is working fine. df.groupBy(df.col1,df.col2).agg(df.col1,df.col2,first(df.col3).alias('col3'),first(df.col4).alias('col4')).show() – Mohan Apr 09 '16 at 17:15
  • 1
    You shouldn't use it alone. Depending on a version and settings behavior you'll get won't be deterministic. In `groupBy` context it doesn't choose any particular row so if individual aggregations are scheduled individually there is no guarantee you get values from the same row or even the same values every time. – zero323 Apr 09 '16 at 17:18