I have a unit-test (using PyTest) that runs my PySpark tests.
I have the normal conftest.py
that creates SQLContext.
I would like to get the same uuid4 in all cases, so I patched uuid4 in my test.
If I call uuid.uuid4()
from the test funnction, all is good.
However, when I run the PySpark job, that also calls uuid4, it is not patched:
My PySpark function (simplified):
def create_uuid_if_needed(current, prev):
if current > prev:
return str(uuid.uuid4())
else:
return None
def my_df_func(df):
my_udf = udf(create_uuid_if_needed, T.StringType())
my_window = Window.partitionBy(F.col(PARTITIONING_KEY)).orderBy(F.col(ORDER))
return df.withColumn('new_col', my_udf(df.col, F.lag(df.col, 1)).over(my_window))
My test looks like this:
@patch.object(uuid, 'uuid4', return_value='1-1-1-1')
def test_add_activity_period_start_id(mocker, sql_context, input_fixture):
input_df = sql_context.createDataFrame(input_fixture, [... schema...])
good_uuid = str(uuid.uuid4())
another_goood_uuid = create_uuid_if_needed(2, 1)
actual_df = my_df_func(input_df)
...
The good_uuid
gets the correct value - '1-1-1-1', and so is the another_good_uuid
but the dataframe's udf version of the function still calls the non patched uuid4.
What is wrong here? Is it something that the udf()
function is doing?
Thanks!