There are many differences between SAS and python and pyspark. Here are some highlights:
We transform the data by defining steps in an execution pipeline.
(ABC
# Filter the data in the pipeline to just rows where A is 1234.
.filter(ABC.A == 1234)
# We can only create one column at a time, but these columns
# can be nested. We have to define the transformation that produces this column separately.
.withColumn('X', my_udf(ABC.A)))
Here we define the function to perform our transformation in the execution pipeline:
def my_func(A):
# Exit early if A is not greater than or equal to zero.
if A < 0:
return (None, None, None, None) # NOTE: we must return 4 columns.
C = A * 50000 # This is the same in almost every language ;)
padded = C.zfill(14) # Save our padded string ("z-filled") to be reused
X1 = int(padded[0:2]) # Here we slice the string to get a substring from ,and including, the 0 index to, but not including, the 2 index
X2 = int(padded[2:4])
X3 = int(padded[0] + padded[2])
X4 = int(padded[1] + padded[3])
return (X1, X2, X3, X4) # We return all four values packed in a "tuple". They'll be nested below our parent column in the new dataset.
Here we define the types and columns that our transformation function will return.
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, IntegerType
schema = StructType([ # A struct is a data structure that holds other datastructures
StructField("X1", IntegerType()),
StructField("X2", IntegerType()),
StructField("X3", IntegerType()),
StructField("X4", IntegerType())
])
my_udf = udf(my_func, schema) # We define a function for use in pyspark, by combining a python function with a pyspark schema.
Here's what the schema looks like.
root
|-- X: struct (nullable = true)
| |-- X1: int (nullable = true)
| |-- X2: int (nullable = true)
| |-- X3: int (nullable = true)
| |-- X4: int (nullable = true)