5

Question

Please help finding the ways to create a distributed matrix from the (user, feature, value) records in a DataFrame where features and their values are stored in a column.

Excerpts of the data is below but there are large number of users and features, and no all features are tested for users. Hence lots of feature values are null and to be imputed to 0.

For instance, a blood test may have sugar level, cholesterol level, etc as features. If those levels are not acceptable, then 1 is set as the value. But not all the features will be tested for the users (or patients).

+----+-------+-----+
|user|feature|value|
+----+-------+-----+
|  14|      0|    1|
|  14|    222|    1|
|  14|    200|    1|
|  22|      0|    1|
|  22|     32|    1|
|  22|    147|    1|
|  22|    279|    1|
|  22|    330|    1|
|  22|    363|    1|
|  22|    162|    1|
|  22|    811|    1|
|  22|    290|    1|
|  22|    335|    1|
|  22|    681|    1|
|  22|    786|    1|
|  22|    789|    1|
|  22|    842|    1|
|  22|    856|    1|
|  22|    881|    1|
+----+-------+-----+

If features are alredy columns, then there are ways explained.

But this is not the case. So one way could be pivoting the dataframe to apply those methods.

+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|user|  0| 32|147|162|200|222|279|290|330|335|363|681|786|789|811|842|856|881|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|  14|  1|  0|  0|  0|  1|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|  22|  1|  1|  1|  1|  0|  0|  1|  1|  1|  1|  1|  1|  1|  1|  1|  1|  1|  1|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+

Then use row to vector conversion. I suppose using one of these:

  • VectorAssembler
  • org.apache.spark.mllib.linalg.Vectors.fromML
  • org.apache.spark.mllib.linalg.distributed.MatrixEntry

However, since there will be many null values to be imputed to 0, the pivoted dataframe will consume far more memory space. Also pivoting a large dataframe distributed among multiple nodes would be causing large shuffling.

Hence, seek for advices, ideas, suggestions.

Related

Environment

Spark 2.4.4

mon
  • 18,789
  • 22
  • 112
  • 205
  • Interesting question. But it's not clear - why you need this? And - are you limited with spark only? It looks like that you could just keep all data in HBase or any other columnar storage. – Vladislav Varslavans Nov 20 '19 at 15:16
  • What about using pivot as shown [here](https://stackoverflow.com/questions/50749596/pyspark-boolean-pivot)? – abiratsis Nov 24 '19 at 16:13

2 Answers2

1

Solution

  1. Create a RDD[(user, feature)] for each input line.
  2. groupByKey to create a RDD[(user, [feature+])].
  3. Create a RDD[IndexedRow] where each IndexedRow represents below for all the features existing.
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|user|  0| 32|147|162|200|222|279|290|330|335|363|681|786|789|811|842|856|881|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|  14|  1|  0|  0|  0|  1|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
  1. Convert the RDD[IndexedRow] into IndexedRowMatrix.

For product operation, convert RowIndexedMatrix into BlockMatrix which supports product operation in distributed manner.

Convert each original record into IndexedRow

import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg.distributed._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row

def toIndexedRow(userToFeaturesMap:(Int, Iterable[Int]), maxFeatureId: Int): IndexedRow = {
    userToFeaturesMap match {
        case (userId, featureIDs) => {
            val featureCountKV = featureIDs.map(i => (i, 1.0)).toSeq
            new IndexedRow (
                userId,
                Vectors.sparse(maxFeatureId + 1, featureCountKV)
            )
        }
    }
}

val userToFeatureCounters= featureData.rdd
    .map(rowPF => (rowPF.getInt(0), rowPF.getInt(1)))  // Out from ROW[(userId, featureId)]
    .groupByKey()                                      // (userId, Iterable(featureId))
    .map(
        userToFeatureIDsMap => toIndexedRow(userToFeatureIDsMap, maxFeatureId)
    )                                                 // IndexedRow(userId, Vector((featureId, 1)))

Created IndexedRowMatrix

val userFeatureIndexedMatrix = new IndexedRowMatrix(userToFeatureCounters)

Trasponsed IndexedRowMatrix via BlockMatrix as IndexedRowMatrix does not support transpose

val userFeatureBlockMatrixTransposed = userFeatureBlockMatrix
    .transpose

Created product with BlockMatrix as IndexedRowMatrix requires Local DenseMatrix on the right.

val featuresTogetherIndexedMatrix = userFeatureBlockMatrix
    .multiply(userFeatureBlockMatrixTransposed)
    .toIndexedRowMatrix
mon
  • 18,789
  • 22
  • 112
  • 205
0

Maybe you could transform each row into json representation, e.g:

{ 
  "user": 14
  "features" : [
    {
      "feature" : 0
      "value"   : 1
    },
    {
      "feature" : 222
      "value"   : 1
    }
  ]
}

But all depends on how you would use your "distributed matrix" later on.

Vladislav Varslavans
  • 2,775
  • 4
  • 18
  • 33