3

I need to replace values of multiple columns (100s-1000s of columns) of a large parquet file. I am using pyspark.

I have a working implementation using replace that works with fewer number of columns, but when the number of columns is in the order of 100s it is taking a long time to even generate the spark plan from what I can see(> 3-4s for each column). So, I am looking for an implementation that is faster.

value_label_map = {"col1": {"val1": "new_val1"}, "col2": {"val2": "new_val2"}}
for k, v in value_label_map.items():
    print(f"replacing {k}")
    columns_to_replace.append(k)
    df = df.replace(to_replace=v, subset=k)

I tried an alternate approach, but I couldn't find a way to access the value of pyspark Column object to be able to look up the dict.

Alternate impl

def replace_values(col, value_map):
    if value_map:
        return when(col.isin(list(value_map.keys())),value_label_map[col]).otherwise(col)
    else:
        return col

df = spark.read.parquet("some-path")
updated_cols = [replace_values(df[col_name], value_labels.get(col_name)).alias(col_name) for col_name in df_values_renamed.columns]

the problem with this is that I can't look up value_labels using column object.

vangap
  • 243
  • 5
  • 12
  • Try looking at [Pyspark: Replacing value in a column by searching a dictionary](https://stackoverflow.com/questions/43976237/pyspark-replacing-value-in-a-column-by-searching-a-dictionary). – Alias Cartellano Mar 31 '23 at 16:23

1 Answers1

1

You could try packing everything in one select. Since replace is based on when statements, let's use them directly:

def replace_from_dict(col_name, dict):
    """for each (k,v) item in dict, replace value k from col_name by value v."""
    res = None
    for k, v in dict.items():
        if res is None:
            res = F.when(F.col(col_name) == k, F.lit(v))
        else:
            res = res.when(F.col(col_name) == k, F.lit(v))
    return res.otherwise(F.col(col_name)).alias(col_name)

def replace_or_not(col_name):
    """generate a column replacement if need be, keeping the column otherwise"""
    if col_name in value_label_map:
        return replace_from_dict(col_name, value_label_map[col_name])
    else:
        return col_name

result = df.select(*[replace_or_not(c) for c in df.columns])
Oli
  • 9,766
  • 5
  • 25
  • 46
  • Thanks Oli, I think the part that I missed considering is iterating the dict and using "when" to replace. I was trying to directly look up the values from the dict based on "Column" object. Before seeing this, I also tried doing the transformation 100 columns at a time, writing to disk and then doing the next 100. This was way faster than my original implementation even though I am writing and reading multiple times. This is not elegant and would be slower for really large files. – vangap Apr 01 '23 at 01:41
  • Your solution seems elegant, just curious if it is going to be O(n^2) unnecessarily since we are iterating k,v of dict and then each col value has to be processed? and would this be an issue because of the number of "when" conditions the overall plan might generate? I have 1000s of columns to replace and some of the columns have 1000s of values to replace. Overall, the performance seemed to be ok though on the datasets that I tried some of which have 6000 columns. – vangap Apr 01 '23 at 01:42
  • What is `n`? This implementation follows the same logic as your first implem, it just compresses everything into one select. Spark seems to struggle when we iterately increase the size of the execution plan. Here we have two loops because we have nested dicts. `replace_from_dict` iterates over inner dicts while the loop inside the select iterates over `value_label_map`. – Oli Apr 01 '23 at 07:35
  • Please ignore my comment about O(n^2).. I didn't realize that the iteration of dict is only the initial part to generate the when conditions, (I understand now that for every column and the k,v of the dict we would be having a when condition chained) – vangap Apr 02 '23 at 10:16