4

Spark Dataframes has a method withColumn to add one new column at a time. To add multiple columns, a chain of withColumns are required. Is this the best practice to do this?

I feel that usingmapPartitions has more advantages. Let's say I have a chain of three withColumns and then one filter to remove Rows based on certain conditions. These are four different operations (I am not sure if any of these are wide transformations, though). But I can do it all in one go if I do a mapPartitions. It also helps if I have a database connection that I would prefer to open once per RDD partition.

My question has two parts.

The first part, this is my implementation of mapPartitions. Are there any unforeseen issues with this approach? And is there a more elegant way to do this?

df2 = df.rdd.mapPartitions(add_new_cols).toDF()

def add_new_cols(rows):
    db = open_db_connection()
    new_rows = []
    new_row_1 = Row("existing_col_1", "existing_col_2", "new_col_1", "new_col_2")
    i = 0
    for each_row in rows:
        i += 1
        # conditionally omit rows
        if i % 3 == 0:
            continue
        db_result = db.get_some_result(each_row.existing_col_2)
        new_col_1 = ''.join([db_result, "_NEW"])
        new_col_2 = db_result
        new_f_row = new_row_1(each_row.existing_col_1, each_row.existing_col_2, new_col_1, new_col_2)
        new_rows.append(new_f_row)

    db.close()
    return iter(new_rows)

The second part, what are the tradeoffs in using mapPartitions over a chain of withColumn and filter?

I read somewhere that using the available methods with Spark DFs are always better than rolling out your own implementation. Please let me know if my argument is wrong. Thank you! All thoughts are welcome.

zero323
  • 322,348
  • 103
  • 959
  • 935
void
  • 2,403
  • 6
  • 28
  • 53
  • can you share an example of a problem you are unable to do? as of now your question is too broad and sort of unclear. – mtoto Apr 04 '18 at 12:33
  • I have updated the question. – void Apr 04 '18 at 14:27
  • 2
    A chain of `withColumn`s will not be executed serially if that's what you're worried about. Lazy spark will optimize the operations. – pault Apr 04 '18 at 14:57
  • @pault Let's say I can add three columns as a result of a single db query inside a `mapPartitions` method. But with `withColumn` I will need three separate queries to do this, right? – void Apr 04 '18 at 15:02
  • Not necessarily. You could also just use `select()`: `new_df = old_df.select("*", new_column1, new_column2, ...)`. It's hard to say without a [mcve] that explains what you're trying to do. – pault Apr 04 '18 at 15:06
  • @pault Please see the edited question. `db_result` has been used in two columns values (took only one db query). But with `withColumn`, it would take two separate db calls, one in each of the `udf`s fed to the respective `withColumn` method. – void Apr 04 '18 at 15:12
  • 1
    Well to avoid opening the db connection twice, you could return a list and then split the output into columns. Something like `df = df.withColumn('list_output', myUDF()).select("*", col('list_output')[0].alias('new_col1'), col('list_output)[1].alias('new_col2')).drop("list_output")`. Converting to rdd and back to DF is slow, but I'm not an expert on this. – pault Apr 04 '18 at 15:49
  • Using SELECT like that looks impressive. Could you provide me with any documentation which points towards the same? Especially for the `col('name')[0]` part. – void Apr 04 '18 at 16:07
  • 1
    @void take a look at [this post](https://stackoverflow.com/questions/45789489/how-to-split-a-list-to-multiple-columns-in-pyspark). You could also get fancy and return a `StructType()` from your udf and then use `list_output.*` – pault Apr 04 '18 at 16:13
  • 1
    See also: [How to assign the result of udf to multiple columns](https://stackoverflow.com/questions/35322764/apache-spark-assign-the-result-of-udf-to-multiple-dataframe-columns) and [How to add multiple columns using udf](https://stackoverflow.com/questions/47669895/how-to-add-multiple-columns-using-udf?rq=1). – pault Apr 04 '18 at 16:42

2 Answers2

6

Are there any unforeseen issues with this approach?

Multiple. The most severe implications are:

  • A few times higher memory footprint to compared to plain DataFrame code and significant garbage collection overhead.
  • High cost of serialization and deserialization required to move data between execution contexts.
  • Introducing breaking point in the query planner.
  • As is, cost of schema inference on toDF call (can be avoided if proper schema is provided) and possible re-execution of all preceding steps.
  • And so on...

Some of these can be avoided with udf and select / withColumn, other cannot.

let's say I have a chain of three withColumns and then one filter to remove Rows based on certain conditions. These are four different operations (I am not sure if any of these are wide transformations, though). But I can do it all in one go if I do a mapPartitions

Your mapPartitions doesn't remove any operations, and doesn't provide any optimizations, that Spark planner cannot excluding. Its only advantage is that it provides a nice scope for expensive connection objects.

I read somewhere that using the available methods with Spark DFs are always better than rolling out your own implementation

When you start using executor-side Python logic you already diverge from Spark SQL. Doesn't matter if you use udf, RDD or newly added vectorized udf. At the end of the day you should make decision based on overall structure of your code - if it is predominantly Python logic executed directly on the data it might be better to stick with RDD or skip Spark completely.

If it is just a fraction of the logic, and doesn't cause severe performance issue, don't sweat about it.

Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
-1

using df.withColumn() is the best way to add columns. they're all added lazily

Chitral Verma
  • 2,695
  • 1
  • 17
  • 29
  • I agree. Aren't all transformations lazily evaluated? Even `mapPartition` should be. Please go through the question's comments. I have raised a concern I have to pault. – void Apr 04 '18 at 15:29
  • ok, so I read the other comments and what you're asking. I am not that familiar with python syntax, so guessing here as it should be the same as scala. So in the scala API the `rows` you passed to `add_new_cols` is actually an `Iterator[Row]`. In order to use the contents of those rows in your query, you are materialising these iterators, doing that exhausts these iterators. – Chitral Verma Apr 04 '18 at 18:42
  • 1
    Ill suggest you to not use `withColumns` in this case as you end up making/using one/separate connections for each row. Alternately, use mapPartitions and make connections to external DB on each partition level, where connections are coming from some singleton pool – Chitral Verma Apr 04 '18 at 18:45
  • Im not really familiar with the problem statement here but if I were to solve this, I'll probably read the external table as a JDBC source using Spark API, find a common key in between this DF and my exisiting one and then join them. On this joined DF, I can do all my processing – Chitral Verma Apr 04 '18 at 18:47