0

I have a data frame like below:

from pyspark import SparkContext, SparkConf,SQLContext
import numpy as np
from scipy.spatial.distance import cosine
from pyspark.sql.functions import lit,countDistinct,udf,array,struct
import pyspark.sql.functions as F
config = SparkConf("local")
sc = SparkContext(conf=config)
sqlContext=SQLContext(sc)

@udf("float")
def myfunction(x):
    y=np.array([1,3,9])
    x=np.array(x)
    return cosine(x,y)


df = sqlContext.createDataFrame([("doc_3",1,3,9), ("doc_1",9,6,0), ("doc_2",9,9,3) ]).withColumnRenamed("_1","doc").withColumnRenamed("_2","word1").withColumnRenamed("_3","word2").withColumnRenamed("_4","word3")


df2=df.select("doc", array([c for c in df.columns if c not in {'doc'}]).alias("words"))

df2=df2.withColumn("cosine",myfunction("words"))

This throws th error as:

19/10/02 21:24:58 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)

net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype) at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23) at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707) at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175) at net.razorvine.pickle.Unpickler.load(Unpickler.java:99) at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)

I am not sure why can't I convert a list type to a numpy array ? Any help is appreciated

Ricky
  • 2,662
  • 5
  • 25
  • 57
  • I assume that cosine returns an numpy array? If so, try `cosine(x,y).item()`. Please also include the relevant imports in your sample code. – cronoik Oct 02 '19 at 13:39
  • TypeError: expected string or Unicode object, NoneType found – Ricky Oct 02 '19 at 13:44
  • Add the import of cosine to your question and I will have a look at it. – cronoik Oct 02 '19 at 13:45
  • @cronoik `x=np.array(x)` Is this conversion correct?I am converting a pyspark column to a numpy array – Ricky Oct 02 '19 at 13:50
  • 1
    Yes that is correct. During the execution x is a single value of a certain row and column. – cronoik Oct 02 '19 at 13:57

1 Answers1

1

This is basically the same issue as in your previous question. You created an udf and tell spark that this function will return a float, but you return an object of type numpy.float64.

You can convert numpy types to python types by calling item() as show below:

import numpy as np
from scipy.spatial.distance import cosine
from pyspark.sql.functions import lit,countDistinct,udf,array,struct
import pyspark.sql.functions as F


@udf("float")
def myfunction(x):
    y=np.array([1,3,9])
    x=np.array(x)
    return cosine(x,y).item()


df = spark.createDataFrame([("doc_3",1,3,9), ("doc_1",9,6,0), ("doc_2",9,9,3) ]).withColumnRenamed("_1","doc").withColumnRenamed("_2","word1").withColumnRenamed("_3","word2").withColumnRenamed("_4","word3")


df2=df.select("doc", array([c for c in df.columns if c not in {'doc'}]).alias("words"))

df2=df2.withColumn("cosine",myfunction("words"))

df2.show(truncate=False)

Output:

+-----+---------+----------+ 
| doc |   words |   cosine | 
+-----+---------+----------+ 
|doc_3|[1, 3, 9]|      0.0 | 
|doc_1|[9, 6, 0]|0.7383323 | 
|doc_2|[9, 9, 3]|0.49496463| 
+-----+---------+----------+
cronoik
  • 15,434
  • 3
  • 40
  • 78