How about using Akka streams instead?
import akka.Done
import akka.stream.{KillSwitch, KillSwitches, OverflowStrategy}
import akka.stream.scaladsl.{Keep, Sink, Source}
import scala.concurrent.duration._
import scala.concurrent.Future
object BatchEmailSender {
sealed trait Msg
case object Tick extends Msg
case class Email(toAddress: String, body: String) extends Msg
def apply(sendEmail: Email => Future[Done], sendInterval: FiniteDuration = 10.seconds)(implicit mat: ActorMaterializer)
: (Email => Unit, KillSwitch) = {
val emailQueue = scala.collection.mutable.Queue[Email]()
val (emailCmdQueue, killSwitch) = Source.queue[Msg](0, OverflowStrategy.backpressure)
.merge(Source.tick(0.seconds, sendInterval, Tick))
.viaMat(KillSwitches.single)(Keep.both)
.toMat(Sink.foreach {
case newEmail: Email =>
emailQueue.enqueue(newEmail)
case Tick =>
emailQueue.dequeueAll(_ => true).foreach { email =>
sendEmail(email).onFailure { case e =>
println(s"Error sending email to ${email.toAddress}: $e")
}
}
})(Keep.left)
.run()
(emailCmdQueue.offer(_), killSwitch)
}
}
You need a sendEmail function, and then it would work like this:
import scala.concurrent.ExecutionContext.Implicits.global // TODO: remove me
object TestApp extends App {
import BatchEmailSender._
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
def sendEmail(email: Email): Future[Done] ={
println(s"Sending email $email") // TODO: insert real email sender code here
Future.successful(Done)
}
val (sendEmailEvery10s, killSwitch) = BatchEmailSender(sendEmail)
sendEmailEvery10s(Email("foo@bar.com", "Email will arrive in 10s"))
sendEmailEvery10s(Email("foo@bar.com", "Email will arrive in same batch"))
Thread.sleep(11000)
sendEmailEvery10s(Email("foo@bar.com", "Email will arrive after another 10s"))
Thread.sleep(11000)
killSwitch.shutdown()
}
I may have just complicated your life, but Akka streams lets you do these things without worrying about which actor does what, has back-pressure and typically is much more robust code.
I would have used 1 actor if Akka streams did not exist. Accumulate all messages in the actor and then send a tick to itself periodically.