4

I have a system using Akka which currently handles incoming streaming data over message queues. When a record arrives then it is processed, mq is acked and record is passed on for further handling within the system.

Now I would like to add support for using DBs as input.
What would be a way to go for the input source to be able to handle DB (should stream in > 100M records at the pace that the receiver can handle - so I presume reactive/akka-streams?)?

Ramón J Romero y Vigil
  • 17,373
  • 7
  • 77
  • 125
Evan M.
  • 403
  • 5
  • 13

2 Answers2

11

Slick Library

Slick streaming is how this is usually done.

Extending the slick documentation a bit to include akka streams:

//SELECT Name from Coffees
val q = for (c <- coffees) yield c.name

val action = q.result

type Name = String

val databasePublisher : DatabasePublisher[Name] = db stream action

import akka.stream.scaladsl.Source

val akkaSourceFromSlick : Source[Name, _] = Source fromPublisher databasePublisher

Now akkaSourceFromSlick is like any other akka stream Source.

"Old School" ResultSet

It is also possible to use a plain ResultSet, without slick, as the "engine" for an akka stream. We will utilize the fact that a stream Source can be instantiated from an Iterator.

First create the ResultSet using standard jdbc techniques:

import java.sql._

val resultSetGenerator : () => Try[ResultSet] = Try {
  val statement : Statement = ???
  statement executeQuery "SELECT Name from Coffees"
}

Of course all ResultSet instances have to move the cursor before the first row:

val adjustResultSetBeforeFirst : (ResultSet) => Try[ResultSet] = 
  (resultSet) => Try(resultSet.beforeFirst()) map (_ => resultSet)

Once we start iterating through rows we'll have to pull the value from the correct column:

val getNameFromResultSet : ResultSet => Name = _ getString "Name"

And now we can implement the Iterator Interface to create a Iterator[Name] from a ResultSet:

val convertResultSetToNameIterator : ResultSet => Iterator[Name] = 
  (resultSet) => new Iterator[Try[Name]] {
    override def hasNext : Boolean  = resultSet.next
    override def next() : Try[Name] = Try(getNameFromResultSet(resultSet))
   } flatMap (_.toOption)

And finally, glue all the pieces together to create the function we'll need to pass to Source.fromIterator:

val resultSetGenToNameIterator : (() => Try[ResultSet]) => () => Iterator[Name] = 
  (_ : () => Try[ResultSet])
    .andThen(_ flatMap adjustResultSetBeforeFirst) 
    .andThen(_ map convertResultSetToNameIterator) 
    .andThen(_ getOrElse Iterator.empty)

This Iterator can now feed a Source:

val akkaSourceFromResultSet : Source[Name, _] = 
  Source fromIterator resultSetGenToNameIterator(resultSetGenerator)

This implementation is reactive all the way down to the database. Since the ResultSet pre-fetches a limited number of rows at a time, data will only come off the hard drive through the database as the stream Sink signals demand.

Ramón J Romero y Vigil
  • 17,373
  • 7
  • 77
  • 125
  • How should we make some cleanup action, e.g. if we want to close the `ResultSet` and `Statement`? – user6502167 Nov 26 '19 at 15:18
  • @user6502167 You can just pre-construct the `ResultSet` before `resultSetGenerator`, instead of inside of it. then on the close hook for the `Source` call close. – Ramón J Romero y Vigil Nov 26 '19 at 16:02
  • Thanks, @Ramon J Romero y Vigil. So I'll need to make the SQL query to create the `ResultSet` outside of the `Source`, and turn it into a `Source`, right? And where's the close hook for the `Source`? I suppose you don't mean the `onComplete` hook of `Future`. – user6502167 Nov 26 '19 at 16:15
  • @user6502167 usually once a `Source` is materialized it returns a hook. For example: if you do something like `val hook : Future[Done] = mySource.runForeach(println)` you can then do `hook.foreach(_ => resultSet.close())`. This ensures that the ResultSet is only closed after the stream has completed processing all of the rows. – Ramón J Romero y Vigil Nov 26 '19 at 17:08
1

I find Alpakka documentation to be excellent and a much easier way to work with reactive-streams than than the Java Publisher interface.

The Alpakka project is an open source initiative to implement stream-aware, reactive, integration pipelines for Java and Scala. It is built on top of Akka Streams, and has been designed from the ground up to understand streaming natively and provide a DSL for reactive and stream-oriented programming, with built-in support for backpressure

Document for Alpakka with Slick: https://doc.akka.io/docs/alpakka/current/slick.html

Alpakka Github: https://github.com/akka/alpakka

Sarin Madarasmi
  • 526
  • 6
  • 11
  • Now what happens if Akka stream web application goes down for 30 minutes and comes back up does it stream all the new data changes from 30 minutes gap window ? Also I have another question does this library uses change data capture technology behind the scenes or uses sql query with where condition on time stamp column or primary key and keep pooling the database ? – sri hari kali charan Tummala Oct 10 '20 at 16:11
  • The answer to both your question is that this library is merely an interaction with database through Slick. You can look at the source code, it's really not that much: https://github.com/akka/alpakka/blob/v2.0.2/slick/src/main/scala/akka/stream/alpakka/slick/scaladsl/Slick.scala Slick, itself, is just a convenient database access. For what you want to achieve is dependent on your database. You'd want something like a DB trigger (probably better than polling the DB). For MySQL example, check this out: https://hevodata.com/learn/mysql-cdc/ – Sarin Madarasmi Oct 12 '20 at 04:10