1

I have been wanting to introduce rate limiter for the spark writer to limit the number of http requests made to a downstream application and have been running into spark serialization errors.

Sample code snippet:

import org.spark_project.guava.util.concurrent.RateLimiter

@transient
object Baz {
    @transient var maybeRateLimiter: Option[RateLimiter] = createRateLimiter()
    final val DEFAULT_RATELIMITER_ACQUIRE_WAIT_TIME_IN_MS = 1000

    def rateLimitedFetch(someKey: String,
                         fooClient: FooClient)(implicit executionContext: ExecutionContext): EitherT[Future, String, Foo] = {
        maybeRateLimiter.fold {
          logger.info("No rate limiter, not gating requests")
          EitherT(
            fooClient.fetchFoo(someKey)
              .wrapEither(t => s"Error fetching $someKey due to ${t.getMessage}")
          )
        }
        {
          rateLimiter =>
            while (!rateLimiter.tryAcquire(DEFAULT_RATELIMITER_ACQUIRE_WAIT_TIME_IN_MS, TimeUnit.MILLISECONDS)) {
              logger.info(s"Not enough permits, requested: 1, current: {}", rateLimiter.getRate)
            }

            EitherT(
              fooClient.fetchFoo(someKey)
                .wrapEither(t => s"Error fetching $someKey due to ${t.getMessage}")
            )
        }
    }
}

Baz.rateLimitedFetch(someKey, fooClient)

Stack trace:

    Caused by: java.io.NotSerializableException: org.spark_project.guava.util.concurrent.RateLimiter$Bursty
Serialization stack:
    - object not serializable (class: org.spark_project.guava.util.concurrent.RateLimiter$Bursty, value: RateLimiter[stableRate=500.0qps])

Not sure if guava RateLimiter can be used in this context and if there's a better way to rate limit downstream requests from spark application

Karthik Raj
  • 241
  • 2
  • 3
  • 6
  • 1
    since you need one rate limiter per executor, a common way to do it is to use `foreachPartition` and initialize it in there, so it does not need to be serialized, see this https://stackoverflow.com/questions/30484701/spark-foreach-vs-foreachpartitions-when-to-use-what – Roberto Congiu Nov 06 '18 at 21:28

0 Answers0