2
val df1 = Seq(("Brian", 29, "0-A-1234")).toDF("name", "age", "client-ID")
val df2 = Seq(("1234", 555-5555, "1234 anystreet")).toDF("office-ID", "BusinessNumber", "Address")

I'm trying to run a function on each row of a dataframe (in streaming). This function will contain a combination of scala code, and Spark dataframe api code. for example, I want to take the 3 features from df, and use them to filter a second dataframe called df2. My understanding is that a UDF can't accomplish this. Now I have all the filtering code working just fine, without the ability to apply it to each row of df.

My goal is to be able to do something like

df.select("ID","preferences").map(row => ( //filter df2 using row(0), row(1) and row(3) ))

The dataframes can't be joined, there is not a joinable relationship between them.

Although I'm using Scala, an answer in Java or Python would probably be fine.

I'm also fine with alternative ways of accomplishing this. If I could extract the data from the rows into separate variables (keep in mind this is streaming), that's also fine.

Brian
  • 848
  • 10
  • 32

1 Answers1

1

My understanding is that a UDF can't accomplish this.

It is correct, but neither can map (local Datasets seem to be an exception Why does this Spark code make NullPointerException?). A nested logic like this one can be expressed only using joins:

  • If both Datasets are streaming it has to be equijoin. It means that even though:

    The dataframes can't be joined, there is not a joinable relationship between them.

    You have to derive one in some way which approximates well filter condition.

  • If one Dataset is not streaming, you can brute force things with crossJoin followed by filter, but it is of course hardly recommended.

Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
  • I don't even how can you define a predicate on another dataframe without having a derived set of keys. It feels like a crossJoin kind of solution... – eliasah Apr 13 '18 at 11:51
  • 1
    I don't even, because there OP omitted `filter` logic. But if you can filter you can always derive one, even if a dummy one (`1 = 1`). Think of LSH here as a representation of this pattern... :) – Alper t. Turker Apr 13 '18 at 11:54
  • ok thank you very much I'll contemplate this, and when I have my solution I'll post it as an update. – Brian Apr 13 '18 at 11:57
  • 1
    @user9613318 I do it all the time :) – eliasah Apr 13 '18 at 12:00
  • is there a way to take the variables out of df1 and save them to a scala variable.. ie val name = df1.name in a streaming context? – Brian Apr 13 '18 at 12:03
  • Do you mean something like `collect`? There is memory sink, but I don't think it will do you much good. – Alper t. Turker Apr 13 '18 at 12:06
  • 1
    so I followed the suggestion of joining.. I used a cross join. For my use case the first dataframe "should" have only 1 row. If it has more then this could be a really bad path. Another solution I thought of was to embed each row of df1 as array in an element of a column in df2. This would scale better I believe. – Brian Apr 13 '18 at 12:58
  • @Brian crossJoin's don't scale well actually. It has a complexity of O(N^2). What you are doing is a bit confusing because if both your DFs are streamed you should haven't followed the first suggestion with equijoins and not the one with cross-joins... – eliasah Apr 13 '18 at 13:04
  • @eliasah let me back up, I could do it both streaming but I'm likely not going to for other reasons. df1 the single row dataframe will be streaming from a kafka topic, df2 will likely be df from a table from Hive that is refreshed to stay updated. I know crossjoins will not scale. What I want to do is filter df2 using values in df. – Brian Apr 13 '18 at 13:20
  • @eliasah one filter would be to take the value of 1 column in df and find all rows in df that are <=, another filter will do something similar ect.. there are several layers of filters. – Brian Apr 13 '18 at 13:55