0

i am trying to make some scala functions that would help making flink map and filter operations that redirect their error to a dead letter queue.

However, i'm struggling with scala's type erasure which prevents me from making them generic. The implementation of mapWithDeadLetterQueue below does not compile.


sealed trait ProcessingResult[T]
case class ProcessingSuccess[T,U](result: U) extends ProcessingResult[T]
case class ProcessingError[T: TypeInformation](errorMessage: String, exceptionClass: String, stackTrace: String, sourceMessage: T) extends ProcessingResult[T]

object FlinkUtils {
    // https://stackoverflow.com/questions/1803036/how-to-write-asinstanceofoption-in-scala
    implicit class Castable(val obj: AnyRef) extends AnyVal {
        def asInstanceOfOpt[T <: AnyRef : ClassTag] = {
            obj match {
            case t: T => Some(t)
            case _ => None
            }
        }
    }

    def mapWithDeadLetterQueue[T: TypeInformation,U: TypeInformation](source: DataStream[T], func: (T => U)): (DataStream[U], DataStream[ProcessingError[T]]) = {
        val mapped = source.map(x => { 
            val result = Try(func(x)) 
            result match {
                case Success(value) => ProcessingSuccess(value)
                case Failure(exception) => ProcessingError(exception.getMessage, exception.getClass.getName, exception.getStackTrace.mkString("\n"), x)
            }
        } )
        val mappedSuccess = mapped.flatMap((x: ProcessingResult[T]) => x.asInstanceOfOpt[ProcessingSuccess[T,U]]).map(x => x.result)
        val mappedFailure = mapped.flatMap((x: ProcessingResult[T]) => x.asInstanceOfOpt[ProcessingError[T]])
        (mappedSuccess, mappedFailure)
    }
  
}

I get:

[error] FlinkUtils.scala:35:36: overloaded method value flatMap with alternatives:
[error]   [R](x$1: org.apache.flink.api.common.functions.FlatMapFunction[Product with Serializable with ProcessingResult[_ <: T],R], x$2: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator[R] <and>
[error]   [R](x$1: org.apache.flink.api.common.functions.FlatMapFunction[Product with Serializable with ProcessingResult[_ <: T],R])org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator[R]
[error]  cannot be applied to (ProcessingResult[T] => Option[ProcessingSuccess[T,U]])
[error]         val mappedSuccess = mapped.flatMap((x: ProcessingResult[T]) => x.asInstanceOfOpt[ProcessingSuccess[T,U]]).map(x => x.result)

Is there a way to make this work ?

1 Answers1

0

Ok, i'm going to answer my own question. I made a couple of mistakes:

  • First of all, i accidentically included the java DataStream class instead of the scala DataStream class (this happens all the time). The java variant obviously doesn't accept a scala lambda for map/filter/flatmap
  • Second, sealed traits are not supported by flinks serialisation. There is a project that should solve it but I didn't try it yet.

Solution: first i didn't use sealed trait but a simple case class with two Options (bit less expressive, but still works):

case class ProcessingError[T](errorMessage: String, exceptionClass: String, stackTrace: String, sourceMessage: T)
case class ProcessingResult[T: TypeInformation, U: TypeInformation](result: Option[U], error: Option[ProcessingError[T]])

Then, i could have everything working like so:

object FlinkUtils {
    def mapWithDeadLetterQueue[T: TypeInformation: ClassTag,U: TypeInformation: ClassTag]
       (source: DataStream[T], func: (T => U)): 
       (DataStream[U], DataStream[ProcessingError[T]]) = {
        implicit val typeInfo = TypeInformation.of(classOf[ProcessingResult[T,U]])

        val mapped = source.map((x: T) => { 
            val result = Try(func(x)) 
            result match {
                case Success(value) => ProcessingResult[T, U](Some(value), None)
                case Failure(exception) => ProcessingResult[T, U](None, Some(
                  ProcessingError(exception.getMessage, exception.getClass.getName, 
                           exception.getStackTrace.mkString("\n"), x)))
            }
        } )
        val mappedSuccess = mapped.flatMap((x: ProcessingResult[T,U]) => x.result)
        val mappedFailure = mapped.flatMap((x: ProcessingResult[T,U]) => x.error)
        (mappedSuccess, mappedFailure)
    }

}

the flatMap and filter functions look very similar, but they use a ProcessingResult[T,List[T]] and a ProcessingResult[T,T] respectively.

I use the functions like this:

val (result, errors) = FlinkUtils.filterWithDeadLetterQueue(input, (x: MyMessage) => {
          x.`type` match {
            case "something" => throw new Exception("how how how")
            case "something else" => false
            case _ => true
          }
})