I'm very new to spark and I was wondering if this changes anything regarding to memory consumption and how the task is assigned to its workers. See bellow the minimal example for you to be able to understand what I'm asking.
# import thing for the pandas udf
import pyspark.sql.functions as F
import pyspark.sql.types as T
# for creating minimal example
import pandas as pd
import numpy as np
#create minimal example
df_minimal_example = pd.DataFrame({"x":np.arange(0,50,1), "y":np.arange(50,100,1) })
# crate a random integer
df_minimal_example["PARTITION_ID"] = np.random.randint(0,2,size=len(df_minimal_example) )
sdf_minimal_example = spark.createDataFrame(df_minimal_example)
Let's print the output
x y PARTITION_ID
0 0 50 1
1 1 51 0
2 2 52 1
3 3 53 1
4 4 54 0
Now I will perform the pandas udf, in order to be able to use my python function in spark
schema = T.StructType([T.StructField('xy', T.FloatType() ),
T.StructField('x2', T.FloatType() ),
T.StructField('y2', T.FloatType() ),
T.StructField('PARTITION_ID', T.LongType() )
]
)
@F.pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def newfunction(pdf):
pdf["xy"] = pdf["x"]*pdf["y"]
pdf["x2"] = pdf["x"]*pdf["x"]
pdf["y2"] = pdf["y"]*pdf["y"]
cols2retrieve = ["PARTITION_ID","xy","x2","y2"]
newpdf = pdf[cols2retrieve].copy()
return newpdf
newpdf = sdf_minimal_example.groupby("PARTITION_ID").apply(newfunction)
# to see results
display(newpdf )
As you see, I use .groupby("PARTITION_ID") when applying the pandas udf function; and the column "PARTITION_ID" has either 1 or 0. The Question is: what if PARTITION_ID has integers between 0 and 100 ? For example:
#instead of this
df_minimal_example["PARTITION_ID"] = np.random.randint(0,2,size=len(df_minimal_example) )
# use this
df_minimal_example["PARTITION_ID"] = np.random.randint(0,100,size=len(df_minimal_example) )
does this change anything regarding to memory issues and how the task is assigned to each worker? If anyone coould provide a little more information about this that'd be nice.