0

I want to unfold a vector column into normal columns in a dataframe. .transform creates individual columns, but there is something wrong with datatypes or ‘nullable’ that gives an error when I try to .show – see an example code below. How to fix the problem?

from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import udf

spark = SparkSession\
        .builder\
        .config("spark.driver.maxResultSize", "40g") \
        .config('spark.sql.shuffle.partitions', '2001') \
        .getOrCreate()

data = [(0.2, 53.3, 0.2, 53.3),
        (1.1, 43.3, 0.3, 51.3),
        (2.6, 22.4, 0.4, 43.3),
        (3.7, 25.6, 0.2, 23.4)]     
df = spark.createDataFrame(data, ['A','B','C','D'])
df.show(3)
df.printSchema() 

vecAssembler = VectorAssembler(inputCols=['C','D'], outputCol="features")
new_df = vecAssembler.transform(df)
new_df.printSchema()
new_df.show(3)

split1_udf = udf(lambda value: value[0], DoubleType())
split2_udf = udf(lambda value: value[1], DoubleType())
new_df = new_df.withColumn('c1', split1_udf('features')).withColumn('c2', split2_udf('features'))
new_df.printSchema()
new_df.show(3)
  • 1
    What's the error? – pault Apr 08 '19 at 22:16
  • The error message is a mile long, the key lines in my understanding are the following: Caused by: net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype) – Goodfaithuser Apr 09 '19 at 13:36
  • Py4JJavaErrorTraceback (most recent call last) ----> 1 new_df.show(3) /opt/cloudera/parcels/SPARK2/lib/spark2/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical) --> 350 print(self._jdf.showString(n, 20, vertical)) – Goodfaithuser Apr 09 '19 at 13:37
  • /usr/local/lib/python2.7/site-packages/py4j/java_gateway.pyc in __call__(self, *args) 1256 return_value = get_return_value( -> 1257 answer, self.gateway_client, self.target_id, self.name) /opt/cloudera/parcels/SPARK2/lib/spark2/python/pyspark/sql/utils.py in deco(*a, **kw) ---> 63 return f(*a, **kw) /usr/local/lib/python2.7/site-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name) --> 328 format(target_id, ".", name), value) – Goodfaithuser Apr 09 '19 at 13:37
  • @Goodfaithuser don't paste the error message in the comments. Instead [edit] your question to include the full traceback. I understand it will be long, but it is what it is. – pault Apr 09 '19 at 15:08

2 Answers2

2

I don't know what is the problem with UDF's. But I found another solution - below.

data = [(0.2, 53.3, 0.2, 53.3),
        (1.1, 43.3, 0.3, 51.3),
        (2.6, 22.4, 0.4, 43.3),
        (3.7, 25.6, 0.2, 23.4)]      
df = spark.createDataFrame(data, ['A','B','C','D'])  

vecAssembler = VectorAssembler(inputCols=['C','D'], outputCol="features")
new_df = vecAssembler.transform(df)

def extract(row):
    return (row.A, row.B,row.C,row.D,) + tuple(row.features.toArray().tolist())

extracted_df = new_df.rdd.map(extract).toDF(['A','B','C','D', 'col1', 'col2'])
extracted_df.show()
0

feature column contain the type pyspark.ml.linalg.DenseVector, and the feature vector elements are of type numpy.float64.

To convert numpy dtypes to native python types value.item()

split1_udf = udf(lambda value: value[0].item(), DoubleType())
split2_udf = udf(lambda value: value[1].item(), DoubleType())

Using this fix results the following output

+---+----+---+----+----------+---+----+
|  A|   B|  C|   D|  features| c1|  c2|
+---+----+---+----+----------+---+----+
|0.2|53.3|0.2|53.3|[0.2,53.3]|0.2|53.3|
|1.1|43.3|0.3|51.3|[0.3,51.3]|0.3|51.3|
|2.6|22.4|0.4|43.3|[0.4,43.3]|0.4|43.3|
|3.7|25.6|0.2|23.4|[0.2,23.4]|0.2|23.4|
+---+----+---+----+----------+---+----+
Ranga Vure
  • 1,922
  • 3
  • 16
  • 23