I have been wanting to introduce rate limiter for the spark writer to limit the number of http requests made to a downstream application and have been running into spark serialization errors.
Sample code snippet:
import org.spark_project.guava.util.concurrent.RateLimiter
@transient
object Baz {
@transient var maybeRateLimiter: Option[RateLimiter] = createRateLimiter()
final val DEFAULT_RATELIMITER_ACQUIRE_WAIT_TIME_IN_MS = 1000
def rateLimitedFetch(someKey: String,
fooClient: FooClient)(implicit executionContext: ExecutionContext): EitherT[Future, String, Foo] = {
maybeRateLimiter.fold {
logger.info("No rate limiter, not gating requests")
EitherT(
fooClient.fetchFoo(someKey)
.wrapEither(t => s"Error fetching $someKey due to ${t.getMessage}")
)
}
{
rateLimiter =>
while (!rateLimiter.tryAcquire(DEFAULT_RATELIMITER_ACQUIRE_WAIT_TIME_IN_MS, TimeUnit.MILLISECONDS)) {
logger.info(s"Not enough permits, requested: 1, current: {}", rateLimiter.getRate)
}
EitherT(
fooClient.fetchFoo(someKey)
.wrapEither(t => s"Error fetching $someKey due to ${t.getMessage}")
)
}
}
}
Baz.rateLimitedFetch(someKey, fooClient)
Stack trace:
Caused by: java.io.NotSerializableException: org.spark_project.guava.util.concurrent.RateLimiter$Bursty
Serialization stack:
- object not serializable (class: org.spark_project.guava.util.concurrent.RateLimiter$Bursty, value: RateLimiter[stableRate=500.0qps])
Not sure if guava RateLimiter can be used in this context and if there's a better way to rate limit downstream requests from spark application