2

I have a DataSet as follows:

+----+---------+-------------------------+
|key |value    |vector                   |
+----+---------+-------------------------+
|key0|[a, d]   |(5,[0,2],[1.0,1.0])      |
|key1|[c]      |(5,[1],[1.0])            |
|key2|[b, d, e]|(5,[0,3,4],[1.0,1.0,1.0])|
|key3|[a, c, d]|(5,[0,1,2],[1.0,1.0,1.0])|
+----+---------+-------------------------+

The vector part of this dataset is what is called the "characteristic matrix", and it represents "which document contains which element". Let me write the characteristic matrix out using a format that is easier on eyes:

+--------+------+------+------+------+
|element | doc0 | doc1 | doc2 | doc3 |
+--------+------+------+------+------+
|   d    |   1  |  0   |  1   |  1   |
|   c    |   0  |  1   |  0   |  1   |
|   a    |   1  |  0   |  0   |  1   |
|   b    |   0  |  0   |  1   |  0   |
|   e    |   0  |  0   |  1   |  0   |  
+--------+------+------+------+------+

If you look carefully, you can see that key0 maps to doc0, which contains element a and d, that is why it has a vector given by (5,[0,2],[1.0,1.0]) (the vector is the column under doc0. Note the characteristic matrix itself does not contain the element column and the first row, they are there just to make the reading easier.

Now, my goal is to use N hash functions to get the Minhash signature of this characteristic matrix.

For example, let N = 2, in other words, two Hash functions are used, and let us also say these two hash functions are given below:

(x + 1) % 5
(3x + 1) % 5

and x is the row number in the characteristic matrix. After using these two hash functions, I am expecting the "minhash signature matrix" looks like this:

+--------+------+------+------+------+
|        | doc0 | doc1 | doc2 | doc3 |
+--------+------+------+------+------+
|   h1   |   1  |  3   |  0   |  1   |
|   h2   |   0  |  2   |  0   |  0   |
+--------+------+------+------+------+

Now, using Spark Java, how do I specify these two hash functions that I would like to use, and then how do I generate the above RDD from the given dataset (at the begging of this question)? In real test case, I would probably use about 1000 hash functions, but understanding how to use 2 is good enough for now.

I have been searching and reading the Spark document, but just seems to be quite difficult to get a handler on this. Any hints/guidance will be greatly helpful to me.

Thank you in advance!

Now, I did look at the documentation and I have the following code:

<pre>
    List<Tuple2<String, String[]>> data  = Arrays.asList(
            new Tuple2<>("key0", new String [] {"a", "d"}),
            new Tuple2<>("key1", new String [] {"c"}),
            new Tuple2<>("key2", new String [] {"b", "d", "e"}),
            new Tuple2<>("key3", new String [] {"a", "c", "d"})
    );
    JavaPairRDD<String, String[]> rdd = JavaPairRDD.fromJavaRDD(jsc.parallelize(data));
    rdd.values().foreach(new VoidFunction<String[]>() {
        public void call(String[] rows) throws Exception {
            for ( int i = 0; i < rows.length; i ++ ) {
                System.out.print(rows[i] + "|");                    
            }
            System.out.println();
        }
    });

    StructType schema = StructType.fromDDL("key string, value array<String>");
    Dataset<Row> df = spark.createDataFrame(
            rdd.map((Function<Tuple2<String, String[]>, Row>) value -> RowFactory.create(value._1(), value._2())),
            schema
    );
    df.show(false);

    CountVectorizer vectorizer = new CountVectorizer().setInputCol("value").setOutputCol("vector").setBinary(false);
    Dataset<Row> matrixDoc = vectorizer.fit(df).transform(df);

    MinHashLSH mh = new MinHashLSH()
      .setNumHashTables(5)
      .setInputCol("vector")
      .setOutputCol("hashes");

    MinHashLSHModel model = mh.fit(matrixDoc);
    model.transform(matrixDoc).show(false);


here is what I got:
+----+---------+-------------------------+---------------------------------------------------------------------------------------+
|key |value    |vector                   |hashes                                                                                 |
+----+---------+-------------------------+---------------------------------------------------------------------------------------+
|key0|[a, d]   |(5,[0,1],[1.0,1.0])      |[[7.57939931E8], [-1.974869772E9], [-1.974047307E9], [4.95314097E8], [7.01119548E8]]   |
|key1|[c]      |(5,[2],[1.0])            |[[-2.031299587E9], [-1.758749518E9], [-4.86208737E8], [1.247220523E9], [1.702128832E9]]|
|key2|[b, d, e]|(5,[0,3,4],[1.0,1.0,1.0])|[[-1.278435698E9], [-1.542629264E9], [-1.974047307E9], [4.95314097E8], [-1.59182918E9]]|
|key3|[a, c, d]|(5,[0,1,2],[1.0,1.0,1.0])|[[-2.031299587E9], [-1.974869772E9], [-1.974047307E9], [4.95314097E8], [7.01119548E8]] |
+----+---------+-------------------------+---------------------------------------------------------------------------------------+

Now, I have no idea how to explain the result... what hash functions are used by Spark? do I have a control of what these functions are? How I can then hash the result to different buckets so the documents in the same buckets are the "same" documents? After all, we do not want to do pair-wise comparison...

lee
  • 234
  • 2
  • 16

0 Answers0