1

I have a large dataframe (more than 100 thousand records

dataframe example:

+-----+---+-----+
|index|  X|    Y|
+-----+---+-----+
|    0|  1|    8|
|    1|  3|    9|
|    2|  5|    4|
|    3|  7|    0|
+-----+---+-----+

need to put for each row add a new column that will contain an object that should be initialized with data from the original columns is it possible?

I know that when using a pandas, you can put objects in the pandas df, but I don’t know if something like this can be implemented in pyspasrk the desired output looks something like this:

+-----+---+-----+---------------------------------+
|index|  X|    Y|                              obj|
+-----+---+-----+---------------------------------+
|    0|  1|    8|<__main__.MyPoint object at 0x01>|
|    1|  3|    9|<__main__.MyPoint object at 0x02>|
|    2|  5|    4|<__main__.MyPoint object at 0x03>|
|    3|  7|    0|<__main__.MyPoint object at 0x04>|
+-----+---+-----+---------------------------------+

can I somehow add a new column and save my object there?

Misha
  • 21
  • 3
  • Include an example to help us understand what you are trying to do. Follow [Minimally Reproducible Example](https://stackoverflow.com/a/48427186/7989581) to better structure your question. – Nithish Dec 14 '21 at 16:24
  • @snithish Thanks for the link, updated the question – Misha Dec 14 '21 at 18:04

1 Answers1

2

Spark does not allow for User Defined DataType (UDT) in Version 2.0+, this answer summarizes this well, the API has been made available as of Spark 3.2.0.

To achieve what you want, you can pickle the python object as a byte array encapsulated as BinaryType in PySpark. Un-pickling the BinaryType will bring back the object.

For pickling to work seamlessly,move the class to a separate file in the module, this is a well documented issue.

Working Example

test_classes.py

class MyPoint:
    def __init__(self, x, y):
        self.x = x
        self.y = y
    
    def add(self):
        return self.x + self.y

main.py

from test_classes import MyPoint
import pickle
from pyspark.sql import functions as F
from pyspark.sql.types import BinaryType, IntegerType, UserDefinedType

data = [(0, 1, 8, ),
        (1, 3, 9, ),
        (2, 5, 4, ),
        (3, 7, 0, ),]

df = spark.createDataFrame(data, ("index", "X", "Y", ))

@F.udf(BinaryType())
def point(x, y):
    return pickle.dumps(MyPoint(x, y))

@F.udf(IntegerType())
def point_add(point_pickled):
    return pickle.loads(point_pickled).add()

df.withColumn("obj", point(F.col("X"), F.col("Y"))).withColumn("val", point_add(F.col("obj"))).show()

Output

#show
+-----+---+---+--------------------+---+
|index|  X|  Y|                 obj|val|
+-----+---+---+--------------------+---+
|    0|  1|  8|[80 04 95 30 00 0...|  9|
|    1|  3|  9|[80 04 95 30 00 0...| 12|
|    2|  5|  4|[80 04 95 30 00 0...|  9|
|    3|  7|  0|[80 04 95 30 00 0...|  7|
+-----+---+---+--------------------+---+

# schema

root
 |-- index: long (nullable = true)
 |-- X: long (nullable = true)
 |-- Y: long (nullable = true)
 |-- obj: binary (nullable = true)
 |-- val: integer (nullable = true)
Nithish
  • 3,062
  • 2
  • 8
  • 16