I have a DataSet
as follows:
+----+---------+-------------------------+
|key |value |vector |
+----+---------+-------------------------+
|key0|[a, d] |(5,[0,2],[1.0,1.0]) |
|key1|[c] |(5,[1],[1.0]) |
|key2|[b, d, e]|(5,[0,3,4],[1.0,1.0,1.0])|
|key3|[a, c, d]|(5,[0,1,2],[1.0,1.0,1.0])|
+----+---------+-------------------------+
The vector
part of this dataset is what is called the "characteristic matrix", and it represents "which document contains which element". Let me write the characteristic matrix out using a format that is easier on eyes:
+--------+------+------+------+------+
|element | doc0 | doc1 | doc2 | doc3 |
+--------+------+------+------+------+
| d | 1 | 0 | 1 | 1 |
| c | 0 | 1 | 0 | 1 |
| a | 1 | 0 | 0 | 1 |
| b | 0 | 0 | 1 | 0 |
| e | 0 | 0 | 1 | 0 |
+--------+------+------+------+------+
If you look carefully, you can see that key0
maps to doc0
, which contains element a
and d
, that is why it has a vector given by (5,[0,2],[1.0,1.0])
(the vector is the column under doc0
. Note the characteristic matrix itself does not contain the element
column and the first row, they are there just to make the reading easier.
Now, my goal is to use N
hash functions to get the Minhash signature of this characteristic matrix.
For example, let N = 2
, in other words, two Hash functions are used, and let us also say these two hash functions are given below:
(x + 1) % 5
(3x + 1) % 5
and x
is the row number in the characteristic matrix. After using these two hash functions, I am expecting the "minhash signature matrix" looks like this:
+--------+------+------+------+------+
| | doc0 | doc1 | doc2 | doc3 |
+--------+------+------+------+------+
| h1 | 1 | 3 | 0 | 1 |
| h2 | 0 | 2 | 0 | 0 |
+--------+------+------+------+------+
Now, using Spark Java, how do I specify these two hash functions that I would like to use, and then how do I generate the above RDD from the given dataset (at the begging of this question)? In real test case, I would probably use about 1000 hash functions, but understanding how to use 2 is good enough for now.
I have been searching and reading the Spark document, but just seems to be quite difficult to get a handler on this. Any hints/guidance will be greatly helpful to me.
Thank you in advance!
Now, I did look at the documentation and I have the following code:
<pre>
List<Tuple2<String, String[]>> data = Arrays.asList(
new Tuple2<>("key0", new String [] {"a", "d"}),
new Tuple2<>("key1", new String [] {"c"}),
new Tuple2<>("key2", new String [] {"b", "d", "e"}),
new Tuple2<>("key3", new String [] {"a", "c", "d"})
);
JavaPairRDD<String, String[]> rdd = JavaPairRDD.fromJavaRDD(jsc.parallelize(data));
rdd.values().foreach(new VoidFunction<String[]>() {
public void call(String[] rows) throws Exception {
for ( int i = 0; i < rows.length; i ++ ) {
System.out.print(rows[i] + "|");
}
System.out.println();
}
});
StructType schema = StructType.fromDDL("key string, value array<String>");
Dataset<Row> df = spark.createDataFrame(
rdd.map((Function<Tuple2<String, String[]>, Row>) value -> RowFactory.create(value._1(), value._2())),
schema
);
df.show(false);
CountVectorizer vectorizer = new CountVectorizer().setInputCol("value").setOutputCol("vector").setBinary(false);
Dataset<Row> matrixDoc = vectorizer.fit(df).transform(df);
MinHashLSH mh = new MinHashLSH()
.setNumHashTables(5)
.setInputCol("vector")
.setOutputCol("hashes");
MinHashLSHModel model = mh.fit(matrixDoc);
model.transform(matrixDoc).show(false);
here is what I got: +----+---------+-------------------------+---------------------------------------------------------------------------------------+ |key |value |vector |hashes | +----+---------+-------------------------+---------------------------------------------------------------------------------------+ |key0|[a, d] |(5,[0,1],[1.0,1.0]) |[[7.57939931E8], [-1.974869772E9], [-1.974047307E9], [4.95314097E8], [7.01119548E8]] | |key1|[c] |(5,[2],[1.0]) |[[-2.031299587E9], [-1.758749518E9], [-4.86208737E8], [1.247220523E9], [1.702128832E9]]| |key2|[b, d, e]|(5,[0,3,4],[1.0,1.0,1.0])|[[-1.278435698E9], [-1.542629264E9], [-1.974047307E9], [4.95314097E8], [-1.59182918E9]]| |key3|[a, c, d]|(5,[0,1,2],[1.0,1.0,1.0])|[[-2.031299587E9], [-1.974869772E9], [-1.974047307E9], [4.95314097E8], [7.01119548E8]] | +----+---------+-------------------------+---------------------------------------------------------------------------------------+
Now, I have no idea how to explain the result... what hash functions are used by Spark? do I have a control of what these functions are? How I can then hash the result to different buckets so the documents in the same buckets are the "same" documents? After all, we do not want to do pair-wise comparison...