3

Firstly I tried everything in the link below to fix my error but none of them worked.

How to convert RDD of dense vector into DataFrame in pyspark?

I am trying to convert a dense vector into a dataframe (Spark preferably) along with column names and running into issues.

My column in spark dataframe is a vector that was created using Vector Assembler and I now want to convert it back to a dataframe as I would like to create plots on some of the variables in the vector.

Approach 1:

from pyspark.ml.linalg import SparseVector, DenseVector
from pyspark.ml.linalg import Vectors

temp=output.select("all_features")
temp.rdd.map(
    lambda row: (DenseVector(row[0].toArray()))
).toDF()

Below is the Error

TypeError: not supported type: <type 'numpy.ndarray'>

Approach 2:

from pyspark.ml.linalg import VectorUDT
from pyspark.sql.functions import udf
from pyspark.ml.linalg import *

as_ml = udf(lambda v: v.asML() if v is not None else None, VectorUDT())
result = output.withColumn("all_features", as_ml("all_features"))
result.head(5)

Error:

AttributeError: 'numpy.ndarray' object has no attribute 'asML'

I also tried to convert the dataframe into a Pandas dataframe and after that I am not able to split the values into separate columns

Approach 3:

pandas_df=temp.toPandas()
pandas_df1=pd.DataFrame(pandas_df.all_features.values.tolist())

Above code runs fine but I still have only one column in my dataframe with all the values separated by commas as a list.

Any help is greatly appreciated!

EDIT:

Here is how my temp dataframe looks like. It just has one column all_features. I am trying to create a dataframe that splits all of these values into separate columns (all_features is a vector that was created using 200 columns)

+--------------------+
|        all_features|
+--------------------+
|[0.01193689934723...|
|[0.04774759738895...|
|[0.0,0.0,0.194417...|
|[0.02387379869447...|
|[1.89796699621085...|
+--------------------+
only showing top 5 rows

Expected output is a dataframe with all 200 columns separated out in a dataframe

+----------------------------+
|        col1| col2| col3|...
+----------------------------+
|0.01193689934723|0.0|0.5049431301173817...
|0.04774759738895|0.0|0.1657316216149636...
|0.0|0.0|7.213126372469...
|0.02387379869447|0.0|0.1866693496827619|...
|1.89796699621085|0.0|0.3192169213385746|...
+----------------------------+
only showing top 5 rows

Here is how my Pandas DF output looks like

              0
0   [0.011936899347238104, 0.0, 0.5049431301173817...
1   [0.047747597388952415, 0.0, 0.1657316216149636...
2   [0.0, 0.0, 0.19441761495525278, 7.213126372469...
3   [0.023873798694476207, 0.0, 0.1866693496827619...
4   [1.8979669962108585, 0.0, 0.3192169213385746, ...
kkumar
  • 173
  • 2
  • 5
  • 15
  • 1
    Can you tell us explicitly what input you have, output you want, and output you're getting as of now ? It helps us understand your problem better (and faster). Usually, a [mcve] is required. For instance, I am not sure what values you do have in your column "all_features", and so I can't know for sure what results the use of `.values.tolist()` – IMCoins Sep 27 '18 at 22:03
  • Did you try `rdd.map(lambda x: (x, )).toDF( )` as given in the link you specified? That usually works. – mayank agrawal Sep 28 '18 at 10:11
  • @IMCoins Apologies.I have added the output and expected output now – kkumar Sep 28 '18 at 14:27
  • @mayankagrawal I tried rdd.map(lambda x: (x, )).toDF( ) – kkumar Sep 28 '18 at 14:28
  • @mayankagrawal it was again returning just one column named "all_features". I then tried converting it to Pandas DF and did the .values.tolist() which gave just one column with values separated by commas. – kkumar Sep 28 '18 at 14:35
  • check my answer. – mayank agrawal Sep 28 '18 at 14:36

1 Answers1

6

Since you want all the features in separate columns (as I got from your EDIT), the link to the answer you provided is not your solution.

Try this,

#column_names
temp = temp.rdd.map(lambda x:[float(y) for y in x['all_features']]).toDF(column_names)

EDIT:

Since your temp is originally a dataframe, you can also use this method without converting it to rdd,

import pyspark.sql.functions as F
from pyspark.sql.types import *

splits = [F.udf(lambda val: float(val[i].item()),FloatType()) for i in range(200)]
temp = temp.select(*[s(F.col('all_features')).alias(c) for c,s in zip(column_names,splits)])
temp.show()
mayank agrawal
  • 2,495
  • 2
  • 13
  • 32
  • Thank you Mayank! Tried your first solution and it worked great! Is there a way I can assign column names to the newly created dataframe using a list of column names that I already have? – kkumar Sep 28 '18 at 14:57