0

The actual problem is this: I open up a User Stream to populate some cache of mine, some times, this stream gets a 420 exception (Too many login attempts in a short period of time.)

How long should I wait before trying to reestablish connection?

   override def onException(ex: Exception): Unit = {
      Logger.info("Exception:::" + ex.getMessage + ":::" + ex.getCause)
      if (ex.getMessage.startsWith("420")) {
        // Can't authenticate for now, thus has to fill up cache hole in next start
        // Wait some time (How long?) Thread.sleep(5000L)
        // Connect via restApi and fill up the holes in the cache
        // Continue listening
      }
    }
danielrvt
  • 10,177
  • 20
  • 80
  • 121

1 Answers1

1

I suppose you would have to use some backoff strategy here, also I wouldn't use sleep, I would keep my application asynchronous.

This probably is not strictly a solution to your problem since it's almost considerable pseudo code, but it could be a start. First I borrow from Play! the timeout future definition:

import scala.language.higherKinds
import scala.concurrent.duration.FiniteDuration
import java.util.concurrent.TimeUnit
import scala.concurrent.{ExecutionContext, Future, Promise => SPromise}
import play.api.libs.concurrent.Akka
import util.Try

def timeout[A](message: => A, duration: Long, unit: TimeUnit = TimeUnit.MILLISECONDS)(implicit ec: ExecutionContext): Future[A] = {
  val p = SPromise[A]()
  Akka.system.scheduler.scheduleOnce(FiniteDuration(duration, unit)) {
    p.complete(Try(message))
  }
  p.future
}

This uses Akka to schedule a future execution and combined with a promise returns a future. At this point you could chain future execution using flatMap on the timeout future:

val timeoutFuture: Future[String] = 
  timeout("timeout", duration, TimeUnit.SECONDS)

timeoutFuture.flatMap(timeoutMessage => connectToStream())

At this point the connection is executed only after the timeout has expired but we still need to implement some kind of reconnection mechanism, for that we can use recover:

def twitterStream(duration: Long = 0, retry: Int = 0): Future[Any] = {
  val timeoutFuture: Future[String] = 
    timeout("timeout", duration, TimeUnit.SECONDS)

  // check how many time we tried to implement some stop trying strategy
  // check how long is the duration and if too long reset.

  timeoutFuture.flatMap(timeoutMessage => connectToStream())
    .recover {
      case connectionLost: SomeConnectionExpiredException => 
        twitterStream(duration + 20, retry + 1) // try to reconnect
      case ex: Exception if ex.getMessage.startsWith("420") =>
        twitterStream(duration + 120, retry + 1) // try to reconect with a longer timer
      case _ => 
        someDefault()
    }
}

def connectToStream(): Future[String] = {
  // connect to twitter
  // do some computation
  // return some future with some result
  Future("Tweets")
}

What happens here is that when an exception is catched from the future and if that exception is a 420 or some connection lost exception the recover is executed and the function is re-called restarting the connection after duration + 20 seconds.

A couple of notes, the code is untested (I could only compile it), also the backoff time here is linear (x + y), you may want to have a look at some exponential backoff strategy and lastly you will need Akka to implement the schedule once used in the timeout future (Play has already Akka available), for other possibility of using timeout on futures check this SO question.

Not sure if all this is overkill, probably there are shorter and easier solutions.

Community
  • 1
  • 1
Ende Neu
  • 15,581
  • 5
  • 57
  • 68