3

I am able to use string indexers and one hot encoders to create the features column on the far right. Notice how id 1 has multiple rows. I was wondering how to aggregate the sparse vector in features using a pipeline or some other alternative so that the feature for id 1 = (7,[0,3,5],[1.0,1.0,1.0]).

I want to take this input:

+---+------+----+-----+
| id|houses|cars|label|
+---+------+----+-----+
|  0|     M|   A|  1.0|
|  1|     M|   C|  1.0|
|  1|     M|   B|  1.0|
|  2|     F|   A|  0.0|
|  3|     F|   D|  0.0|
|  4|     Z|   B|  1.0|
|  5|     Z|   C|  0.0|
+---+------+----+-----+

then one hot encode the houses column, the cars column, combine them and aggregate by id

and generate this output:

+-------------------+
|           features|
+-------------------+
|(7,[0,4],[1.0,1.0])|
|(7,[0,3,5],[1.0,1.0,1.0])|
|(7,[2,4],[1.0,1.0])|
|(7,[2,6],[1.0,1.0])|
|(7,[1,3],[1.0,1.0])|
|(7,[1,5],[1.0,1.0])|
+-------------------+
def oneHotEncoderExample(sqlContext: SQLContext): Unit = {

// define data
val df = sqlContext.createDataFrame(Seq(
  (0, "M", "A", 1.0),
  (1, "M", "C", 1.0),
  (1, "M", "B", 1.0),
  (2, "F", "A", 0.0),
  (3, "F", "D", 0.0),
  (4, "Z", "B", 1.0),
  (5, "Z", "C", 0.0)
)).toDF("id", "houses", "cars", "label")
df.show()

// define stages of pipeline
val indexerHouse = new StringIndexer()
  .setInputCol("houses")
  .setOutputCol("housesIndex")

val encoderHouse = new OneHotEncoder()
  .setDropLast(false)
  .setInputCol("housesIndex")
  .setOutputCol("typeHouses")

val indexerCar = new StringIndexer()
  .setInputCol("cars")
  .setOutputCol("carsIndex")

val encoderCar = new OneHotEncoder()
  .setDropLast(false)
  .setInputCol("carsIndex")
  .setOutputCol("typeCars")

val assembler = new VectorAssembler()
  .setInputCols(Array("typeHouses", "typeCars"))
  .setOutputCol("features")

val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.01)

// define pipeline
val pipeline = new Pipeline()
  .setStages(Array(
    indexerHouse, encoderHouse,
    indexerCar, encoderCar,
    assembler, lr))

// Fit the pipeline to training documents.
val pipelineModel  = pipeline.fit(df)
}

// helper code to simulate and aggregate current pipeline (generates table below)
val indexedHouse = indexerHouse.fit(df).transform(df)
indexedHouse.show()
val encodedHouse = encoderHouse.transform(indexedHouse)
encodedHouse.show()
val indexedCar = indexerCar.fit(df).transform(df)
indexedCar.show()
val encodedCar = encoderCar.transform(indexedCar)
encodedCar.show()
val assembledFeature = assembler.transform(encodedHouse.join(encodedCar, usingColumns = Seq("id", "houses", "cars")))
assembledFeature.show()

aggregated input and output of current pipeline

lapolonio
  • 1,107
  • 2
  • 14
  • 24
  • the result of the pipeline after assembly is the features column on the far right. That is used as input in the logistic regression. I want to aggregate the features column (a column of sparse vectors) by a primary key (in this instance id). I am asking the best way to do this. Should it be done inside or outside the pipeline and if so how? – lapolonio Mar 23 '16 at 04:26
  • 1
    Give an example output, because what you are asking isn't clear ! – eliasah Mar 23 '16 at 07:23
  • i cleaned up the question, please tell me if you need something else. – lapolonio Mar 23 '16 at 12:45
  • How much different levels do you expect? – zero323 Mar 29 '16 at 01:46
  • i'm actually working on analyzing patient data. 4 levels? (age, medication, gender, diagnosis) about 7000 diagnosis codes, 3100 medication codes, 10 ages. Is that what you are asking for? – lapolonio Mar 29 '16 at 01:49
  • More or less. AFAIK there is no built-in method which can be used here. You can combine an UDAF [I've provided here](http://stackoverflow.com/a/33901072/1560062) and wrap it with a transformer (see http://stackoverflow.com/a/35183614/1560062) – zero323 Mar 30 '16 at 11:19

0 Answers0