One of our business case require us to run PCA algorithm for each subgrouped (in the below example -apple, orange) subset of data in our larger dataset i.e., we would like to run one PCA algo for the records whose Type == Apple and another PCA for the records whose Type == Orange.
Overall dataset typically contains around 5-6 Million records and have 100-200 features. The number of unique value in the Type column could be around 50K-60K.
We would like to run PCA algo per Type value in spark environment with python language. Please find the python code below where I could standardize the records and run PCA algo. However, the PCA algorithm ran for whole dataset using pyspark, instead of subgrouped dataset.
Currently, im stuck on implementing optimized verision PCA algo for subgrouped dataset. Any help or direction is much appreciated.
Code
df = spark.createDataFrame(
[("20181028","Apple",1,3,4,5),
("20181029","Apple",10,2,6,7),
("20181128","Orange",12,19,7,16),
("20181129","Orange",9,7,3,5),
("20181120","Orange",6,3,4,5)],
("Date", "Type","F1","F2","F3","F4")
)
df.show()
#Create vector
assembler = VectorAssembler(inputCols=["F1","F2","F3","F4"], outputCol="features")
VectorOutput = assembler.transform(df)
#standardize the input feature
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",withStd=True, withMean=True)
scalerModel = scaler.fit(VectorOutput)
scaledData = scalerModel.transform(VectorOutput)
#Run PCA algo
pca = PCA(k=2, inputCol="scaledFeatures", outputCol="pcaFeatures")
model = pca.fit(scaledData)
result = model.transform(scaledData).select("Date","Type","pcaFeatures")
#Show the result
result.show(50,truncate=False)
Output
+--------+------+---+---+---+---+
| Date| Type| F1| F2| F3| F4|
+--------+------+---+---+---+---+
|20181028| Apple| 1| 3| 4| 5|
|20181029| Apple| 10| 2| 6| 7|
|20181128|Orange| 12| 19| 7| 16|
|20181129|Orange| 9| 7| 3| 5|
|20181120|Orange| 6| 3| 4| 5|
+--------+------+---+---+---+---+
+--------+------+-----------------------------------------+
|Date |Type |pcaFeatures |
+--------+------+-----------------------------------------+
|20181028|Apple |[1.5045340541317065,-0.9528611894036658] |
|20181029|Apple |[-0.19719398621559137,0.6812534757461064]|
|20181128|Orange|[-2.9560149391613217,-0.3754979381651896]|
|20181129|Orange|[0.6717104866798362,0.5707559972381819] |
|20181120|Orange|[0.9769643845653697,0.07634965458456777] |
+--------+------+-----------------------------------------+
Update
I already have a python implementation of the existing case, where I will loop through each Type and build up the model. Below is my pseudo code of my implementation
foreach item in Type.Distinct()
Filter the records corresponding to item
Run PCA algorithm
Append the result to output Dataset
However, I would like to implement in spark use its capability - such as partitioning the data, run pca such that multiple instance of pca run across various executor etc. Im also looking if there is any way to solve it using pair RDD's where key is the type and value is all the records belonging to particular type.