2

I am trying my hand at scala+Akka and I am trying to figure out fault tolerance. I have an actor which receives message from a supervisor and inserts data into the DB. The Supervisor restarts the actor when it experiences a fault.

I am changing the connection string in the postRestart() in case there are connectivity issues to the DB. Now whenever there is a connectivity issue with one DB, the actor restarts and starts inserting data into another DB.

Is this a good enough approach? What is the recommended approach?

Supervisor:

class SocialSupervisor extends Actor {

    override val supervisorStrategy=OneForOneStrategy(loggingEnabled = false){
    case (e:Exception)=>Restart
    }

    val post_ref=context.actorOf(Props[Post])
    def receive={
            case Get_Feed(feed)=>{
                //get data from feed
                post_ref!Post_Message(posted_by,post)
            }
    }
}

Actor:

class Post extends Actor{
  val config1=ConfigFactory.load()
    var config=config1.getConfig("MyApp.db")

    override def postRestart(reason: Throwable) {
        config=config1.getConfig("MyApp.backup_db")
        super.postRestart(reason)
    }

    def insert_data(commented_by:String,comment:String){
            val connection_string=config.getString("url")
                val username=config.getString("username")
                val password=config.getString("password")
                //DB operations
    }

    def receive={
      case Post_Message(posted_by,message)=>{
        insert_data(posted_by, message)
      }
    }
}
codingsplash
  • 4,785
  • 12
  • 51
  • 90

1 Answers1

1

I think there are several improvements you could make to your code to make it more "fault tolerant".

Modularity

You should probably separate your insert_data function from the rest of the Actor so that it can be used & tested independent of any ActorSystem. Your Actors should have very little code in them and the receive method should basically be a dispatcher to external functions:

object Post {
  def insert_data(conn : Connection)(commented_by : String, comment : String) = {
    ...
  }
}

You could even go one step further and remove the Connection dependency. From your Actor's perspective an insertion is nothing more than a function that takes in a PostMessage and returns the number of valid row updates:

object Post {
  //returns an Int because Statement.executeUpdate returns an Int
  type DBInserter : Post_Message => Int

You can now insert into a database Connection as before:

  def insertIntoLiveDB(connFactory : () => Connection) : DBInserter = 
    (postMessage : Post_Message) => {
      val sqlStr = s"INSERT INTO .."
      connFactory().createStatement() executeUpdate sqlStr
    }
  }

Or write a function that never does insertions for testing purposes:

  //does no inserting
  val neverInsert : DBInserter = (postMessage : Post_Message) => 0
}

Now your Actor has virtually no logic:

class Post(inserter : Post.DBInserter) extends Actor {

  def receive = {
    case pm : Post_Message => inserter(pm)
  }

}

Fault Tolerance

By far the greatest source of "fault" within an application is the network, manifested in your case by a Connection to a database. We need some way for Connections to automatically refresh in the case of failure. We can use a factory function to do so:

def basicConnFactory(timeoutInSecs : Int = 10) = {

  //setup initial connection, not specified in question
  var conn : Connection = ???  

  () => {
     if(conn isValid timeoutInSecs)
       conn
     else {
       conn = ??? //setup backup connection
       conn
     }
  }
}

Now the Connection's validity is tested on each insertion and re-established if there is a problem. This factory can then be used to create the Actor:

import Post.queryLiveDB
val post_ref = 
  context actorOf (Props[Post], insertIntoLiveDB(basicConnFactory()))

As your production requirements get more strict, you can ammend the factory to utilize a connection pool...

Community
  • 1
  • 1
Ramón J Romero y Vigil
  • 17,373
  • 7
  • 77
  • 125