0

sc.version => u'2.2.0'

I'm experiencing challenges with extracting individual columns of RDD data to construct spark ML pipeline. I first read in the data. Used struct function to combine the columns before grouping by "A".Subsequently, I applied collect_list. However, I'm stuck at converting it to spark ml label, features format like so:

df = sc.createDataFrame(input_data, ["label", "features"]) 

Below was the initial table:

import pyspark
from pyspark import SparkContext
from pyspark import Row
from pyspark.sql import SQLContext
from pyspark.sql import functions as F

_c0|             A|     B|    C|     D|     E|       F|     G|     H|     I|       J|
+---+--------------+------+-----+------+------+--------+------+------+------+--------+
|  0|50009100010000|-999.0| 90.0|-999.0|-999.0|264.7466|-999.0|-999.0|-999.0|  -999.0|
|  1|50009100010000|-999.0| 90.5|-999.0|-999.0|258.5411|-999.0|-999.0|-999.0|  -999.0|
|  2|50009100010000|-999.0| 91.0|-999.0|-999.0|252.3356|-999.0|-999.0|-999.0|  -999.0|
|  3|50009100010000|-999.0| 91.5|-999.0|-999.0|246.1301|-999.0|-999.0|-999.0|  -999.0|
|  4|50009100010000|-999.0| 92.0|-999.0|-999.0|239.9246|-999.0|-999.0|-999.0| 39.8812|

Joining the columns before grouping the data by A:

df_next = df.select("A", F.struct(["B","C","D","E","F","G","H","I","J"]).alias("allcol"))
df_next.show(5)

+--------------+--------------------+
|             A|              allcol|
+--------------+--------------------+
|50009100010000|[-999.0,90.0,-999...|
|50009100010000|[-999.0,90.5,-999...|
|50009100010000|[-999.0,91.0,-999...|
|50009100010000|[-999.0,91.5,-999...|
|50009100010000|[-999.0,92.0,-999...|

Grouping by A and collecting the columns in the nested list:

df_next_list = df_next.groupBy("A").agg(F.collect_list("allcol").alias("collected_cols"))

 +--------------+--------------------+
|             A|      collected_cols|
+--------------+--------------------+
|50009100090000|[[-999.0,210.0,-9...|
|50009100070000|[[-999.0,1110.0,-...|
|50009100170000|[[10.14438,303.0,...|
|50283200140000|[[9.8958,36.0,-99...|
|50009100040000|[[-999.0,290.5,-9...|
+--------------+--------------------+

This is how calling df_next_list.rdd.take(n) looks like:

Row(A=50009100090000, collected_cols=[Row(B=-999.0, C=210.0, D=-999.0, E=-999.0, F=16.016660690307617, G=-999.0, H=7.6022491455078125, I=-999.0, J=30.627119064331055), Row(B=-999.0, C=210.5, D=-999.0, E=-999.0, F=18.973539352416992, G=-999.0, H=15.784810066223145, I=-999.0, J=29.249160766601562......)])]

From here onwards, I got stuck with extracting the right element grouped by Column A for each variable (BCDEFGHIJ) in the collected_cols list and converting the df_next_list to Spark ML form of label (J) and features (B,C,D,E,F,G,H,I). With that, I can construct the Spark ML algorithm. This is how I would like the rdd to look like which makes it easy to extract the elements:

Row(A=50009100090000, B=-999.0, C=210.0, D=-999.0, E=-999.0, F=264.7466125488281, G=-999.0, H=-999.0, I=-999.0, J=-999.0),....

Any help will be immensely appreciated. Thanks

dlvr
  • 93
  • 1
  • 11
  • It is quite unclear what *exactly* your issue is - please clarify – desertnaut Feb 06 '18 at 19:33
  • @desertnaut, I want to be able to convert df_next_list to spark ml format like so: df_ml = sc.createDataFrame(df_next_list, ["label", "features"]) . That way I could use it to build the ml model – dlvr Feb 06 '18 at 23:08
  • Please include an example of your exact *desired output*; see [guidelines](https://stackoverflow.com/questions/48427185/how-to-make-good-reproducible-apache-spark-dataframe-examples/48427953#48427953) – desertnaut Feb 06 '18 at 23:24
  • Updated question. I hope that helps – dlvr Feb 07 '18 at 02:49
  • Not really. What are the `...`? Do you expect your `Row` to end there or what? And how exactly do you expect to handle features `B`, `C` etc that have different values for the same `A` (after you have grouped by `A`)? Your `collected_cols` is clearly not an array, but an array of arrays. Your question is ill-posed... – desertnaut Feb 07 '18 at 12:02

0 Answers0