0

I have a pyspark RDD (myRDD) that is a variable-length list of IDs, such as

[['a', 'b', 'c'], ['d','f'], ['g', 'h', 'i','j']]

I have a pyspark dataframe (myDF) with columns ID and value.

I want to query myDF with the query:

outputDF = myDF.select(F.collect_set("value")).alias("my_values").where(col("ID").isin(id_list))

where id_list is an element from the myRDD, such as ['d','f'] or ['a', 'b', 'c'].

An example would be:

outputDF = myDF.select(F.collect_set("value")).alias("my_values").where(col("ID").isin(['d','f']))

What is a parallelizable way to use the RDD to query the DF like this?

Itchydon
  • 2,572
  • 6
  • 19
  • 33
Eka
  • 31
  • 1
  • 5

1 Answers1

0

Considering your dataframe column "ID" is of type stringType(), you want to keep ID values that appear in any of your RDD's row.

First, let's transform our RDD into a one column dataframe with with a unique ID for each row:

from pyspark.sql import HiveContext
hc = HiveContext(sc)
ID_df = hc.createDataFrame(
    myRDD.map(lambda row: [row]), 
    ['ID']
).withColumn("uniqueID", psf.monotonically_increasing_id())

We'll explose it so that each row only has one ID value:

import pyspark.sql.functions as psf
ID_df = ID_df.withColumn('ID', psf.explode(ID_df.ID))

We can now join out original dataframe, the inner join will serve as a filter:

myDF = myDF.join(ID_df, "ID", "inner)

A collect_set is an aggregation function, so you need so kind of groupBy before using it, for instance by the newly created row ID:

myDF.groupBy("uniqueID").agg(
    psf.collect_set("ID").alias("ID")
)
MaFF
  • 9,551
  • 2
  • 32
  • 41
  • Thanks, @Marie. This works with one exception: if I feed it [['a', 'b', 'c'], ['d','f'], I get back the joined values for both the element ['a', 'b', 'c'] and the element ['d','f']. The reason I was thinking of using the "in" clause, was because it would only return matching values for that particular element. The hope was to create a dataframe like [(['a','b','c'],('v1','v2','v3','v4','v5')] where v1 to v5 are all of the values that match to the IDs a, b, and c. – Eka Aug 23 '17 at 22:58
  • You can create a unique ID for each row to identify them later. I have edited my answer – MaFF Aug 24 '17 at 05:39
  • Thanks, again, @Marie! I changed the last line to DF.groupBy("uniqueID").agg(psf.collect_set("ID").alias("ID"), psf.collect_set("value").alias("values")).drop("uniqueID") to get the final version of the table i wanted. Thank you for all of your help! – Eka Aug 24 '17 at 18:25
  • you were so helpful with this, I thought you may be able to help with my latest question: https://stackoverflow.com/questions/46289068/pyspark-merge-wrappedarrays-within-a-dataframe – Eka Sep 19 '17 at 01:56