Today (version Python 0.8.0) it works as follows (example is calculating the median of a group/partition), i.e. acts as an UDAF:
from statistics import median
from snowflake.snowpark.types import *
class MyMedian:
values = []
def __init__(self):
self.values = []
def process(self, value: float):
self.values.append(value)
#no return value
for _ in range(0):
yield
def end_partition(self):
yield ("partition_summary",median(self.values))
output_schema = StructType([
StructField("label", StringType()),
StructField("median", FloatType())
])
my_median = udtf(
MyMedian,
output_schema=output_schema,
input_types=[FloatType()]
)
example_df = session.create_dataframe(
[["A", 2.0],
["A", 2.0],
["A", 4.0],
["B", -1.0],
["B", 0.0],
["B", 1.0]],
StructType([
StructField("Key", StringType()),
StructField("Value", FloatType())
])
)
example_df.show()
-------------------
|"KEY" |"VALUE" |
-------------------
|A |2.0 |
|A |2.0 |
|A |4.0 |
|B |-1.0 |
|B |0.0 |
|B |1.0 |
-------------------
Now the usage uf my_median:
example_df.join_table_function(my_median("VALUE").over(partition_by=col("KEY")))\
.show()
------------------------------------------------
|"KEY" |"VALUE" |"LABEL" |"MEDIAN" |
------------------------------------------------
|A |NULL |partition_total |2.0 |
|B |NULL |partition_total |0.0 |
------------------------------------------------