I suppose you would have to use some backoff strategy here, also I wouldn't use sleep
, I would keep my application asynchronous.
This probably is not strictly a solution to your problem since it's almost considerable pseudo code, but it could be a start. First I borrow from Play! the timeout future definition:
import scala.language.higherKinds
import scala.concurrent.duration.FiniteDuration
import java.util.concurrent.TimeUnit
import scala.concurrent.{ExecutionContext, Future, Promise => SPromise}
import play.api.libs.concurrent.Akka
import util.Try
def timeout[A](message: => A, duration: Long, unit: TimeUnit = TimeUnit.MILLISECONDS)(implicit ec: ExecutionContext): Future[A] = {
val p = SPromise[A]()
Akka.system.scheduler.scheduleOnce(FiniteDuration(duration, unit)) {
p.complete(Try(message))
}
p.future
}
This uses Akka to schedule a future execution and combined with a promise returns a future. At this point you could chain future execution using flatMap
on the timeout future:
val timeoutFuture: Future[String] =
timeout("timeout", duration, TimeUnit.SECONDS)
timeoutFuture.flatMap(timeoutMessage => connectToStream())
At this point the connection is executed only after the timeout has expired but we still need to implement some kind of reconnection mechanism, for that we can use recover:
def twitterStream(duration: Long = 0, retry: Int = 0): Future[Any] = {
val timeoutFuture: Future[String] =
timeout("timeout", duration, TimeUnit.SECONDS)
// check how many time we tried to implement some stop trying strategy
// check how long is the duration and if too long reset.
timeoutFuture.flatMap(timeoutMessage => connectToStream())
.recover {
case connectionLost: SomeConnectionExpiredException =>
twitterStream(duration + 20, retry + 1) // try to reconnect
case ex: Exception if ex.getMessage.startsWith("420") =>
twitterStream(duration + 120, retry + 1) // try to reconect with a longer timer
case _ =>
someDefault()
}
}
def connectToStream(): Future[String] = {
// connect to twitter
// do some computation
// return some future with some result
Future("Tweets")
}
What happens here is that when an exception is catched from the future and if that exception is a 420 or some connection lost exception the recover is executed and the function is re-called restarting the connection after duration + 20
seconds.
A couple of notes, the code is untested (I could only compile it), also the backoff time here is linear (x + y
), you may want to have a look at some exponential backoff strategy and lastly you will need Akka to implement the schedule once used in the timeout future (Play has already Akka available), for other possibility of using timeout on futures check this SO question.
Not sure if all this is overkill, probably there are shorter and easier solutions.