-1

I need to iterate over data frame in specific order and apply some complex logic to calculate new column.

Also my strong preference is to do it in generic way so I do not have to list all columns of a row and do df.as[my_record] or case Row(...) => as shown here. Instead, I want to access row columns by their names and just add result column(s) to source row.

Below approach works just fine but I'd like to avoid specifying schema twice: first time so that I can access columns by name while iterating and second time to process output.

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema

val q = """
select 2 part, 1 id
union all select 2 part, 4 id
union all select 2 part, 3 id
union all select 2 part, 2 id
"""
val df = spark.sql(q)

def f_row(iter: Iterator[Row]) : Iterator[Row] = {
  if (iter.hasNext) {
    def complex_logic(p: Int): Integer = if (p == 3) null else p * 10;

    val head = iter.next
    val schema = StructType(head.schema.fields :+ StructField("result", IntegerType))
    val r =
      new GenericRowWithSchema((head.toSeq :+ complex_logic(head.getAs("id"))).toArray, schema)

    iter.scanLeft(r)((r1, r2) =>
      new GenericRowWithSchema((r2.toSeq :+ complex_logic(r2.getAs("id"))).toArray, schema)
    )
  } else iter
}

val schema = StructType(df.schema.fields :+ StructField("result", IntegerType))
val encoder = RowEncoder(schema)
df.repartition($"part").sortWithinPartitions($"id").mapPartitions(f_row)(encoder).show

What information is lost after applying mapPartitions so output cannot be processed without explicit encoder? How to avoid specifying it?

Dr Y Wit
  • 2,000
  • 9
  • 16
  • Can you use Dataset and provide a function that maps from T => U? – Terry Dactyl Nov 08 '18 at 13:00
  • @TerryDactyl, can you elaborate a bit more? I use `df.repartition($"part").sortWithinPartitions($"id")` which is `org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]` and I supply `f_row(iter: Iterator[Row]) : Iterator[Row]` for `mapPartitions`. – Dr Y Wit Nov 08 '18 at 13:09
  • A DataFrame is a Dataset[Row] and as discussed below Row is untyped. If you were to provide case classes T and U which correspond to the input and outputs shapes of your map function Iterator[T] => Iterator[U] and import spark.implicits._ then Spark may be able to provide an Encoder and you would effectively map from Dataset[T] => Dataset[U]. This is just a guess but may be worth a try. – Terry Dactyl Nov 08 '18 at 13:16

3 Answers3

0

What information is lost after applying mapPartitions so output cannot be processed without

The information is hardly lost - it wasn't there from the begining - subclasses of Row or InternalRow are basically untyped, variable shape containers, which don't provide any useful type information, that could be used to derive an Encoder.

schema in GenericRowWithSchema is inconsequential as it describes content in terms of metadata not types.

How to avoid specifying it?

Sorry, you're out of luck. If you want to use dynamically typed constructs (a bag of Any) in a statically typed language you have to pay the price, which here is providing an Encoder.

  • I do not agree that "it wasn't there from the begining". I can show result after re-partitioning and ordering `df.repartition($"part").sortWithinPartitions($"id").show` but it's not possible after `mapPartitions`. – Dr Y Wit Nov 08 '18 at 12:35
  • And function used in `mapPartitions` is `(func: (Iterator[T]) ⇒ Iterator[U])`. So why `show` can not be used if function produces pretty much the same records (with new columns to be precise) after iterating. – Dr Y Wit Nov 08 '18 at 12:38
0

OK - I have checked some of my spark code and using .mapPartitions with the Dataset API does not require me to explicitly build/pass an encoder.

You need something like:

case class Before(part: Int, id: Int)
case class After(part: Int, id: Int, newCol: String)

import spark.implicits._

// Note column names/types must match case class constructor parameters.
val beforeDS = <however you obtain your input DF>.as[Before]

def f_row(it: Iterator[Before]): Iterator[After] = ???

beforeDS.reparition($"part").sortWithinPartitions($"id").mapPartitions(f_row).show
Terry Dactyl
  • 1,839
  • 12
  • 21
  • As I mentioned in the original post, "my strong preference is to do it in generic way so I do not have to list all columns of a row and do `df.as[my_record]` or `case Row(...) =>`". I want to avoid any changes if new columns added to data frame. See https://stackoverflow.com/questions/53159461/generic-iterator-over-dataframe-spark-scala – Dr Y Wit Nov 08 '18 at 15:50
0

I found below explanation sufficient, maybe it will be useful for others.

mapPartitions requires Encoder because otherwise it cannot construct Dataset from iterator or Rows. Even though each row has a schema, that shema cannot be derived (used) by constructor of Dataset[U].

  def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = {
    new Dataset[U](
      sparkSession,
      MapPartitions[T, U](func, logicalPlan),
      implicitly[Encoder[U]])
  }

On the other hand, without calling mapPartitions Spark can use the schema derived from initial query because structure (metadata) of the original columns is not changed.

I described alternatives in this answer: https://stackoverflow.com/a/53177628/7869491.

Dr Y Wit
  • 2,000
  • 9
  • 16