Using Spark SQL, I have two dataframes, they are created from one, such as:
df = sqlContext.createDataFrame(...);
df1 = df.filter("value = 'abc'"); //[path, value]
df2 = df.filter("value = 'qwe'"); //[path, value]
I want to filter df1, if part of its 'path' is any path in df2. So if df1 has row with path 'a/b/c/d/e' I would find out if in df2 is a row that path is 'a/b/c'. In SQL it should be like
SELECT * FROM df1 WHERE udf(path) IN (SELECT path FROM df2)
where udf is user defined function that shorten original path from df1. Naive solution is to use JOIN and then filter result, but it is slow, since df1 and df2 have each more than 10mil rows.
I also tried following code, but firstly I had to create broadcast variable from df2
static Broadcast<DataFrame> bdf;
bdf = sc.broadcast(df2); //variable 'sc' is JavaSparkContext
sqlContext.createDataFrame(df1.javaRDD().filter(
new Function<Row, Boolean>(){
@Override
public Boolean call(Row row) throws Exception {
String foo = shortenPath(row.getString(0));
return bdf.value().filter("path = '"+foo+"'").count()>0;
}
}
), myClass.class)
the problem I'm having is that Spark got stuck when the return was evaluated/when filtering of df2 was performed.
I would like to know how to work with two dataframes to do this. I really want to avoid JOIN. Any ideas?
EDIT>>
In my original code df1 has alias 'first' and df2 'second'. This join is not cartesian, and it also does not use broadcast.
df1 = df1.as("first");
df2 = df2.as("second");
df1.join(df2, df1.col("first.path").
lt(df2.col("second.path"))
, "left_outer").
filter("isPrefix(first.path, second.path)").
na().drop("any");
isPrefix is udf
UDF2 isPrefix = new UDF2<String, String, Boolean>() {
@Override
public Boolean call(String p, String s) throws Exception {
//return true if (p.length()+4==s.length()) and s.contains(p)
}};
shortenPath - it cuts last two characters in path
UDF1 shortenPath = new UDF1<String, String>() {
@Override
public String call(String s) throws Exception {
String[] foo = s.split("/");
String result = "";
for (int i = 0; i < foo.length-2; i++) {
result += foo[i];
if(i<foo.length-3) result+="/";
}
return result;
}
};
Example of records. Path is unique.
a/a/a/b/c abc
a/a/a qwe
a/b/c/d/e abc
a/b/c qwe
a/b/b/k foo
a/b/f/a bar
...
So df1 consits of
a/a/a/b/c abc
a/b/c/d/e abc
...
and df2 consits of
a/a/a qwe
a/b/c qwe
...