0

I'm working currently on a Play application with a background job which should send mails periodically for which I want to use Akka. I have to add that I'm really new to Scala/Play/Akka.

Currently I have the following setup:

// JobModule.scala
bind(classOf[MailJobScheduler]).asEagerSingleton()

This should start up the following piece of code which does work every second

// MailJobScheduler.scala
val mailActor = actorSystem.actorOf(MailActor.props, "mail-actor")

actorSystem.scheduler.schedule(0 seconds, 1 seconds) {
    // check how many mails have to be sent and sent messages to the mailActor 
}

It might be that each second multiple new mails should be send. I am wondering: in case I send every second 10 messages to the mailActor will it be really only one actor having to do all the work or will there be multiple actors doing the work concurrently?

If it is one actor how can I have multiple actors for which I can assign the work and how many can/should I have?

Chris
  • 1,008
  • 2
  • 11
  • 22

2 Answers2

0

How about using Akka streams instead?

import akka.Done
import akka.stream.{KillSwitch, KillSwitches, OverflowStrategy}
import akka.stream.scaladsl.{Keep, Sink, Source}
import scala.concurrent.duration._
import scala.concurrent.Future

object BatchEmailSender {
  sealed trait Msg
  case object Tick extends Msg
  case class Email(toAddress: String, body: String) extends Msg

  def apply(sendEmail: Email => Future[Done], sendInterval: FiniteDuration = 10.seconds)(implicit mat: ActorMaterializer)
    : (Email => Unit, KillSwitch) = {
    val emailQueue = scala.collection.mutable.Queue[Email]()

    val (emailCmdQueue, killSwitch) = Source.queue[Msg](0, OverflowStrategy.backpressure)
      .merge(Source.tick(0.seconds, sendInterval, Tick))
      .viaMat(KillSwitches.single)(Keep.both)
      .toMat(Sink.foreach {
        case newEmail: Email =>
          emailQueue.enqueue(newEmail)
        case Tick =>
          emailQueue.dequeueAll(_ => true).foreach { email =>
            sendEmail(email).onFailure { case e =>
              println(s"Error sending email to ${email.toAddress}: $e")
            }
          }
      })(Keep.left)
      .run()

    (emailCmdQueue.offer(_), killSwitch)
  }
}

You need a sendEmail function, and then it would work like this:

import scala.concurrent.ExecutionContext.Implicits.global // TODO: remove me

object TestApp extends App {
  import BatchEmailSender._
  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()

  def sendEmail(email: Email): Future[Done] ={
    println(s"Sending email $email") // TODO: insert real email sender code here
    Future.successful(Done)
  }

  val (sendEmailEvery10s, killSwitch) = BatchEmailSender(sendEmail)
  sendEmailEvery10s(Email("foo@bar.com", "Email will arrive in 10s"))
  sendEmailEvery10s(Email("foo@bar.com", "Email will arrive in same batch"))
  Thread.sleep(11000)
  sendEmailEvery10s(Email("foo@bar.com", "Email will arrive after another 10s"))
  Thread.sleep(11000)
  killSwitch.shutdown()
}

I may have just complicated your life, but Akka streams lets you do these things without worrying about which actor does what, has back-pressure and typically is much more robust code.

I would have used 1 actor if Akka streams did not exist. Accumulate all messages in the actor and then send a tick to itself periodically.

Lodewijk Bogaards
  • 19,777
  • 3
  • 28
  • 52
0

Use the scheduler as you are doing in your example, but I don't see how the mailActor helps you here.

actorSystem.scheduler.schedule(0 seconds, 1 seconds) {
    // just call the code the the checks for email
}

Don't assume that there will be one thread. i.e. be extra careful to not close over unstable references

andyczerwonka
  • 4,230
  • 5
  • 34
  • 57