2

I have been going through Kafka source code of the Log class in core module of Kafka project but I am still new to scala. I have encountered a syntax which is quite hard to understand. Here is the code snippets:

snippet 1:


    // Now do a second pass and load all the log and index files.
    // We might encounter legacy log segments with offset overflow (KAFKA-6264). We need to split such segments. When
    // this happens, restart loading segment files from scratch.
    retryOnOffsetOverflow {
      // In case we encounter a segment with offset overflow, the retry logic will split it after which we need to retry
      // loading of segments. In that case, we also need to close all segments that could have been left open in previous
      // call to loadSegmentFiles().
      logSegments.foreach(_.close())
      segments.clear()
      loadSegmentFiles()
    }

snippet2:

 private[log] def retryOnOffsetOverflow[T](fn: => T): T = {
    while (true) {
      try {
        return fn// what is fn here in context of code snippet 1?
      } catch {
        case e: LogSegmentOffsetOverflowException =>
          info(s"Caught segment overflow error: ${e.getMessage}. Split segment and retry.")
          splitOverflowedSegment(e.segment)//##!!!1.return a List[Segement], but where does this return goes?
      }
    }
    throw new IllegalStateException()
  }
  1. What I found hard to understand is that how the method retryOnOffsetOverflow is called in snippet 1 and what is passed to it as the argument of its param? I know the param of retryOnOffsetOverflow is a function but here in this snippet what is the argument passed to this function?

  2. Also I am not clear what is the return of retryOnOffsetOverflow here? The return is T which is kind of a generic? I am not sure what is the return of retryOnOffsetOverflow here, will it be different according to fact that it caught the exception or not? If so what will exactly be the return respectively?

Thank a lot for the explanation and please tell me if I missed the necessary code to answer the question.


updated: I would rectify my self that the param of retryOnOffsetOverflow is a by-name parameter which won't be evaluated unless and until it is referenced somewhere in the body of the method.

Boyu Zhang
  • 219
  • 2
  • 12

3 Answers3

2

Upd.: Slightly changed last part, as looks like it would load "splitted" files right in the next loop iteration.

  1. Param for retryOnOffsetOverflow here is everything inside curly braces - basically, these three lines of code - that's the function (fn: => T), that it accepts.
  2. retryOnOffsetOverflow is trying to execute function, that was passed, and returns it's answer, if the execution was successful. One part that's a bit difficult to understand - when there's an exception, splitOverflowedSegment is called not for it return type, but for the fact that it "mutates state" in replaceSegments function. This segments would be read on the next loop iteration restart, in loadSegmentFiles function.
Rayan Ral
  • 1,862
  • 2
  • 17
  • 17
  • Yeah thanks a lot, it is a clear explanation. I see that the splitter files is reload by `loadSegmentFiles ()` in the next loop. But one more question is when will the loop in `retryOnOffsetOverflow ()` ends? Till it met the Exception on the last line of the method? – Boyu Zhang May 19 '20 at 08:30
  • Till `fn` function would be executed successfully. If it will finish, `return` would be executed, breaking the loop. – Rayan Ral May 19 '20 at 08:34
  • 1
    All right, I finally see the logic of the `retryOnOffsetOverflow ` method, so it keeps execute `fn` in a infinite loop, and each time `fn` cannot successfully executed, it will catch the exception and update the state of the Segments for the next round of the loop, and in the end `fn` will run without any exception and that's when `retryOnOffsetOverflow ` ends. Am I getting it correct? – Boyu Zhang May 19 '20 at 08:39
  • Yep, I think you're correct (at least I'm understanding this code in the same way). – Rayan Ral May 19 '20 at 08:40
  • Also, not sure what last "IllegalState" exception stands for - I don't see a path where this exception can be thrown. – Rayan Ral May 19 '20 at 08:40
  • @Rayan Ral can you help and suggest how to handle this https://stackoverflow.com/questions/62036791/while-writing-to-hdfs-path-getting-error-java-io-ioexception-failed-to-rename – BdEngineer May 27 '20 at 06:42
2

What I found hard to understand is that how the method retryOnOffsetOverflow is called in snippet 1 and what is passed to it as the argument of its param? I know the param of retryOnOffsetOverflow is a function but here in this snippet what is the argument passed to this function?

Consider following simplified example

def printTillSuccess[T](fn: => T): T = {
  while (true) {
    val result = fn
    println(result)
    return result
  }
  throw new Exception("dead end")
}

printTillSuccess("a")
printTillSuccess({ "a" })
printTillSuccess { "a" }

fn: => T is not a function but a by-name parameter. It will be evaluated on each reference, ie at line val result = fn. Functions in Scala have an apply method and this not the case here.

You can pass values into a method via (), and this is done in example printTillSuccess("a").

Scala allows wrapping any block of code with {} and the last statement will be used as result of the block. Thus, {"a"} is same as "a".

So, now you can pass {"a"} into methods, thus printTillSuccess({ "a" }) is a valid call.

And finally, Scala allows for substitution of () with block definitions {} in methods and this opens syntax printTillSuccess { "a" }.

Also I am not clear what is the return of retryOnOffsetOverflow here? The return is T which is kind of a generic? I am not sure what is the return of retryOnOffsetOverflow here, will it be different according to fact that it caught the exception or not? If so what will exactly be the return respectively?

The return type is the type of the by-name parameter and it's T. And statement return fn is the only place that defines T.

In case of LogSegmentOffsetOverflowException, the catch will be called and splitOverflowedSegment executed. This will mutate some internal state and next iteration of while(true) will evaluate the by-name parameter again. So, the exception does not change the return type but allows for the next iteration to happen.

The while loop can only exit when fn evaluates successfully or a different exception is thrown. In any case, the return, if happens, will be of T.

Ivan Stanislavciuc
  • 7,140
  • 15
  • 18
  • Thanks a lot for such an elaborating and easy to understand tutorial lol. I get it much clearer. That's really helpful. Well, I must say scala is so flexible.Sigh. – Boyu Zhang May 19 '20 at 08:44
1
def retryOnOffsetOverflow[T](fn: => T): T

retryOnOffsetOverflow() is a method that take a single "by name" parameter, which means that the parameter won't be evaluated unless and until it is referenced somewhere in the body of the method.

So fn might be a single value and it might be multiple lines of code, and it will remain un-evaluated (un-executed) until here try { return fn }, where it is executed wrapped in a try.

The fn evaluation will result in value of some type. It doesn't really matter what that type is (we'll call it T) but retryOnOffsetOverflow() is required to return the same type.

jwvh
  • 50,871
  • 7
  • 38
  • 64
  • Thanks for the answer. So you mean `retryOnOffsetOverflow() ` is required to return the same type as its param which is the return of all statements in the curly bracket? ` logSegments.foreach(_.close()) segments.clear() loadSegmentFiles()` – Boyu Zhang May 19 '20 at 08:22
  • Essentially yes. If `fn` returns a value (or evaluates to a value), that value, of type `T`, is returned from `retryOnOffsetOverflow()`. If `fn` throws, and it is caught (so it must have been a `LogSegmentOffsetOverflowException`) then the `splitOverflowedSegment()` must return a value of the same type `T` that `fn` was supposed to return. – jwvh May 19 '20 at 08:31