0

I have a method in which there are multiple calls to db. As I have not implemented any concurrent processing, a 2nd db call has to wait until the 1st db call gets completed, 3rd has to wait until the 2nd gets completed and so on.

All db calls are independent of each other. I want to make this in such a way that all DB calls run concurrently.

I am new to Akka framework.

Can someone please help me with small sample or references would help. Application is developed in Scala Lang.

user3666197
  • 1
  • 6
  • 50
  • 92
Bharath Kumar
  • 95
  • 1
  • 8
  • How many db calls are u gonna make in sequence. I mean a db call is basically a Future, so you can make use of map to handle everything in sequence. – zenwraight Jun 12 '18 at 18:19
  • Can you give more information on which DB interface you are using? It is not clear why Akka is relevant here, so a bit more explanation would help. – Tim Jun 12 '18 at 22:07
  • Cassandra is the DB we are using. As we have multiple db calls i want them to run in parallel instead of sequencial manner. – Bharath Kumar Jun 13 '18 at 01:51
  • @zenwraight, what do you mean by each db call is a Future. is it implicitly or do we need to explicitly define Future for each DB call – Bharath Kumar Jun 13 '18 at 01:54
  • @BharathKumar It is difficult to help without seeing an example of your sequential DB code. Please add some code to your question. – Tim Jun 13 '18 at 06:51
  • @BharathKumar Your question is contradictory. You state that "a 2nd db call has to wait until the 1st db call gets completed" but then you also say "all db calls are independent of each other". Both of those statements cannot be true at the same time. – Ramón J Romero y Vigil Jun 13 '18 at 11:05
  • @Ramon , what I mean to say is execution of dB calls are happening in a sequential order. As they are independent I want them to run in parallel. – Bharath Kumar Jun 13 '18 at 11:16
  • Below is the sample code. Def method1(){. Val value1= dbcall1();. Val value2= dbcall2();. Val value3=dbcall3();. }. – Bharath Kumar Jun 13 '18 at 11:24

2 Answers2

1

There are three primary ways that you could achieve concurrency for the given example needs.

Futures

For the particular use case that is asked about in the question I would recommend Futures before any akka construct.

Suppose we are given the database calls as functions:

type Data = ???

val dbcall1 : () => Data = ???

val dbcall2 : () => Data = ???

val dbcall3 : () => Data = ???

Concurrency can be easily applied, and then the results can be collected, using Futures:

val f1 = Future { dbcall1() }
val f2 = Future { dbcall2() }
val f3 = Future { dbcall3() }

for {
  v1 <- f1
  v2 <- f2
  v3 <- f3
} {
  println(s"All data collected: ${v1}, ${v2}, ${v3}")
}

Akka Streams

There is a similar stack answer which demonstrates how to use the akka-stream library to do concurrent db querying.

Akka Actors

It is also possible to write an Actor to do the querying:

object MakeQuery

class DBActor(dbCall : () => Data) extends Actor {
  override def receive = {
    case _ : MakeQuery => sender ! dbCall()
  }
}

val dbcall1ActorRef = system.actorOf(Props(classOf[DBActor], dbcall1)) 

However, in this use case Actors are less helpful because you still need to collect all of the data together.

You can either use the same technique as the "Futures" section:

val f1 : Future[Data] = (dbcall1ActorRef ? MakeQuery).mapTo[Data]

for {
  v1 <- f1
  ...

Or, you would have to wire the Actors together by hand through the constructor and handle all of the callback logic for waiting on the other Actor:

class WaitingDBActor(dbCall : () => Data, previousActor : ActorRef) {
  override def receive = {
    case _ : MakeQuery => previousActor forward MakeQuery

    case previousData : Data => sender ! (dbCall(), previousData)
  }
}
Ramón J Romero y Vigil
  • 17,373
  • 7
  • 77
  • 125
  • Thanks for the response. – Bharath Kumar Jun 15 '18 at 14:48
  • Can you please help me with below code. def func1(seq: Seq[Object1]): Seq[Object1Line] = { seq.groupBy(aa => Object1Line(aa.forecastId, aa.managementAreaCode, aa.year, aa.month,aatype = Nil)).map { case (line: Object1Line, aaSeq: Seq[Object1]) => val Object1 = aaSeq.groupBy(aa => aa.aatype).map { case (aatype, aaSeq) => val value1 = aaSeq.head.attr1 val value2 = aaSeq.head.attr2 aatypePoints(up(attt), attr1, attr2) }.toList line.copy(aatype = Object1) }.toSeq.sortBy(line => YearMonth.of(line.year, line.month)) } – Bharath Kumar Jun 16 '18 at 06:44
  • The above code is taking more time. Can you please check and help me in tuning the code – Bharath Kumar Jun 16 '18 at 06:48
0

If you want to querying database, you should use something like slick which is a modern database query and access library for Scala.

quick example of slick:

case class User(id: Option[Int], first: String, last: String)

class Users(tag: Tag) extends Table[User](tag, "users") {
  def id = column[Int]("id", O.PrimaryKey, O.AutoInc)
  def first = column[String]("first")
  def last = column[String]("last")
  def * = (id.?, first, last) <> (User.tupled, User.unapply)
}
val users = TableQuery[Users]

then your need to create configuration for your db:

mydb = {
  dataSourceClass = "org.postgresql.ds.PGSimpleDataSource"
  properties = {
    databaseName = "mydb"
    user = "myuser"
    password = "secret"
  }
  numThreads = 10
}

and in your code you load configuration:

val db = Database.forConfig("mydb")

then run your query with db.run method which gives you future as result, for example you can get all rows by calling method result

val allRows: Future[Seq[User]] = db.run(users.result)

this query run without blocking current thread.

If you have task which take long time to execute or calling to another service, you should use futures.

Example of that is simple HTTP call to external service. you can find example in here

If you have task which take long time to execute and for doing so, you have to keep mutable states, in this case the best option is using Akka Actors which encapsulate your state inside an actor which solve problem of concurrency and thread safety as simple as possible.Example of suck tasks are:

import akka.actor.Actor

import scala.concurrent.Future

case class RegisterEndpoint(endpoint: String)

case class NewUpdate(update: String)

class UpdateConsumer extends Actor {
  val endpoints = scala.collection.mutable.Set.empty[String]

  override def receive: Receive = {

    case RegisterEndpoint(endpoint) =>
      endpoints += endpoint

    case NewUpdate(update) =>
      endpoints.foreach { endpoint =>
        deliverUpdate(endpoint, update)
      }
  }

  def deliverUpdate(endpoint: String, update: String): Future[Unit] = {
    Future.successful(Unit)
  }

}

If you want to process huge amount of live data, or websocket connection, processing CSV file which is growing over time, ... or etc, the best option is Akka stream. For example reading data from kafka topic using Alpakka:Alpakka kafka connector

marc_s
  • 732,580
  • 175
  • 1,330
  • 1,459