0

One of our business case require us to run PCA algorithm for each subgrouped (in the below example -apple, orange) subset of data in our larger dataset i.e., we would like to run one PCA algo for the records whose Type == Apple and another PCA for the records whose Type == Orange.

Overall dataset typically contains around 5-6 Million records and have 100-200 features. The number of unique value in the Type column could be around 50K-60K.

We would like to run PCA algo per Type value in spark environment with python language. Please find the python code below where I could standardize the records and run PCA algo. However, the PCA algorithm ran for whole dataset using pyspark, instead of subgrouped dataset.

Currently, im stuck on implementing optimized verision PCA algo for subgrouped dataset. Any help or direction is much appreciated.

Code

df = spark.createDataFrame(
                        [("20181028","Apple",1,3,4,5),
                         ("20181029","Apple",10,2,6,7), 
                         ("20181128","Orange",12,19,7,16), 
                         ("20181129","Orange",9,7,3,5), 
                         ("20181120","Orange",6,3,4,5)],
                        ("Date", "Type","F1","F2","F3","F4")
                      )

df.show()

#Create vector
assembler = VectorAssembler(inputCols=["F1","F2","F3","F4"], outputCol="features")
VectorOutput = assembler.transform(df)

#standardize the input feature
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",withStd=True, withMean=True)
scalerModel = scaler.fit(VectorOutput)
scaledData = scalerModel.transform(VectorOutput)

#Run PCA algo
pca = PCA(k=2, inputCol="scaledFeatures", outputCol="pcaFeatures")
model = pca.fit(scaledData)
result = model.transform(scaledData).select("Date","Type","pcaFeatures")

#Show the result
result.show(50,truncate=False)

Output

+--------+------+---+---+---+---+
|    Date|  Type| F1| F2| F3| F4|
+--------+------+---+---+---+---+
|20181028| Apple|  1|  3|  4|  5|
|20181029| Apple| 10|  2|  6|  7|
|20181128|Orange| 12| 19|  7| 16|
|20181129|Orange|  9|  7|  3|  5|
|20181120|Orange|  6|  3|  4|  5|
+--------+------+---+---+---+---+

+--------+------+-----------------------------------------+
|Date    |Type  |pcaFeatures                              |
+--------+------+-----------------------------------------+
|20181028|Apple |[1.5045340541317065,-0.9528611894036658] |
|20181029|Apple |[-0.19719398621559137,0.6812534757461064]|
|20181128|Orange|[-2.9560149391613217,-0.3754979381651896]|
|20181129|Orange|[0.6717104866798362,0.5707559972381819]  |
|20181120|Orange|[0.9769643845653697,0.07634965458456777] |
+--------+------+-----------------------------------------+

Update

I already have a python implementation of the existing case, where I will loop through each Type and build up the model. Below is my pseudo code of my implementation

foreach item in Type.Distinct()
  Filter the records corresponding to item
  Run PCA algorithm
  Append the result to output Dataset

However, I would like to implement in spark use its capability - such as partitioning the data, run pca such that multiple instance of pca run across various executor etc. Im also looking if there is any way to solve it using pair RDD's where key is the type and value is all the records belonging to particular type.

Learner
  • 962
  • 4
  • 15
  • 30
  • Possible duplicate of [How to use Spark MlLib/Pipelines to build 1 model per each user](https://stackoverflow.com/questions/45640295/how-to-use-spark-mllib-pipelines-to-build-1-model-per-each-user) – 10465355 Nov 08 '18 at 09:26
  • Thanks for the link. I feel the answered mentioned in the link is the traditional way of resolving the issue, which doesnt require spark. I had the same implementation in python using the option 1 mentioned in the link. However, I would like to have per user ML model using spark capabilities - such as partitioning such that every record of an user belongs to one partition and parallel run the pca algorithm for each user – Learner Nov 08 '18 at 19:16
  • [Run ML algorithm inside map function in Spark](https://stackoverflow.com/q/43428297/10465355) is probably more descriptive. – 10465355 Nov 09 '18 at 10:01

0 Answers0