sc.version => u'2.2.0'
I'm experiencing challenges with extracting individual columns of RDD data to construct spark ML pipeline. I first read in the data. Used struct function to combine the columns before grouping by "A".Subsequently, I applied collect_list. However, I'm stuck at converting it to spark ml label, features format like so:
df = sc.createDataFrame(input_data, ["label", "features"])
Below was the initial table:
import pyspark
from pyspark import SparkContext
from pyspark import Row
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
_c0| A| B| C| D| E| F| G| H| I| J|
+---+--------------+------+-----+------+------+--------+------+------+------+--------+
| 0|50009100010000|-999.0| 90.0|-999.0|-999.0|264.7466|-999.0|-999.0|-999.0| -999.0|
| 1|50009100010000|-999.0| 90.5|-999.0|-999.0|258.5411|-999.0|-999.0|-999.0| -999.0|
| 2|50009100010000|-999.0| 91.0|-999.0|-999.0|252.3356|-999.0|-999.0|-999.0| -999.0|
| 3|50009100010000|-999.0| 91.5|-999.0|-999.0|246.1301|-999.0|-999.0|-999.0| -999.0|
| 4|50009100010000|-999.0| 92.0|-999.0|-999.0|239.9246|-999.0|-999.0|-999.0| 39.8812|
Joining the columns before grouping the data by A:
df_next = df.select("A", F.struct(["B","C","D","E","F","G","H","I","J"]).alias("allcol"))
df_next.show(5)
+--------------+--------------------+
| A| allcol|
+--------------+--------------------+
|50009100010000|[-999.0,90.0,-999...|
|50009100010000|[-999.0,90.5,-999...|
|50009100010000|[-999.0,91.0,-999...|
|50009100010000|[-999.0,91.5,-999...|
|50009100010000|[-999.0,92.0,-999...|
Grouping by A and collecting the columns in the nested list:
df_next_list = df_next.groupBy("A").agg(F.collect_list("allcol").alias("collected_cols"))
+--------------+--------------------+
| A| collected_cols|
+--------------+--------------------+
|50009100090000|[[-999.0,210.0,-9...|
|50009100070000|[[-999.0,1110.0,-...|
|50009100170000|[[10.14438,303.0,...|
|50283200140000|[[9.8958,36.0,-99...|
|50009100040000|[[-999.0,290.5,-9...|
+--------------+--------------------+
This is how calling df_next_list.rdd.take(n) looks like:
Row(A=50009100090000, collected_cols=[Row(B=-999.0, C=210.0, D=-999.0, E=-999.0, F=16.016660690307617, G=-999.0, H=7.6022491455078125, I=-999.0, J=30.627119064331055), Row(B=-999.0, C=210.5, D=-999.0, E=-999.0, F=18.973539352416992, G=-999.0, H=15.784810066223145, I=-999.0, J=29.249160766601562......)])]
From here onwards, I got stuck with extracting the right element grouped by Column A for each variable (BCDEFGHIJ) in the collected_cols list and converting the df_next_list to Spark ML form of label (J) and features (B,C,D,E,F,G,H,I). With that, I can construct the Spark ML algorithm. This is how I would like the rdd to look like which makes it easy to extract the elements:
Row(A=50009100090000, B=-999.0, C=210.0, D=-999.0, E=-999.0, F=264.7466125488281, G=-999.0, H=-999.0, I=-999.0, J=-999.0),....
Any help will be immensely appreciated. Thanks