0

I am attempting to asynchronously write messages to Amazon Kinesis using Scala Futures so I can load test an application.

This code works, and I can see data moving down my pipeline as well as the output printing to the console.

import com.amazonaws.services.kinesis.AmazonKinesisClient
import java.nio.CharBuffer
import java.nio.charset.Charset
import java.text.SimpleDateFormat
import java.util.{Date, TimeZone}    

object KinesisDummyDataProducer extends App {

  val kinesis = new AmazonKinesisClient(PipelineConfig.awsCredentials)
  println("Connected")

  lazy val encoder = Charset.forName("UTF-8").newEncoder()
  lazy val tz = TimeZone.getTimeZone("UTC")
  lazy val df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'Z")
  df.setTimeZone(tz)

  (1 to args(0).toInt).map(int => send(int)).map(msg => println(msg))

  private def send(int: Int) = {
    val msg = "{\"event_name\":\"test\",\"timestamp\":\"%s\",\"int\":%s}".format(df.format(new Date()), int.toString)
    val bytes = encoder.encode(CharBuffer.wrap(msg))
    encoder.flush(bytes)
    kinesis.putRecord("PrimaryEventStream", bytes, "123")
    msg
  }
}

This code works with Scala Futures.

import scala.concurrent.future
import scala.concurrent.ExecutionContext.Implicits.global

def doIt(x: Int) = {Thread.sleep(1000); x + 1}
(1 to 10).map(x => future{doIt(x)}).map(y => y.onSuccess({case x => println(x)}))

You'll note that the syntax is nearly identical on the mapping of sequences. However, the follwoing does not work (i.e., it neither prints to the console nor sends data down my pipeline).

import com.amazonaws.services.kinesis.AmazonKinesisClient
import java.nio.CharBuffer
import java.nio.charset.Charset
import java.text.SimpleDateFormat
import java.util.{Date, TimeZone}
import scala.concurrent.future
import scala.concurrent.ExecutionContext.Implicits.global


object KinesisDummyDataProducer extends App {

  val kinesis = new AmazonKinesisClient(PipelineConfig.awsCredentials)
  println("Connected")

  lazy val encoder = Charset.forName("UTF-8").newEncoder()
  lazy val tz = TimeZone.getTimeZone("UTC")
  lazy val df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'Z")
  df.setTimeZone(tz)

  (1 to args(0).toInt).map(int => future {send(int)}).map(f => f.onSuccess({case msg => println(msg)}))

  private def send(int: Int) = {
    val msg = "{\"event_name\":\"test\",\"timestamp\":\"%s\",\"int\":%s}".format(df.format(new Date()), int.toString)
    val bytes = encoder.encode(CharBuffer.wrap(msg))
    encoder.flush(bytes)
    kinesis.putRecord("PrimaryEventStream", bytes, "123")
    msg
  }
}

Some more notes about this project. I am using Maven to do the build (from the command line), and running all of the above code (also from the command line) works just dandy.

My question is: Why with using the same syntax does my function 'send' appear to not be executing?

  • 1
    possible duplicate of [Possible bug in Scala 2.10: Futures do not run](http://stackoverflow.com/questions/10565475/possible-bug-in-scala-2-10-futures-do-not-run) – Régis Jean-Gilles Feb 27 '14 at 15:39
  • This does not appear to be a duplicate because wrapping the entire function in a future then calling: f.onSuccess({case x => println("Success")}); Await.ready(f, Duration.Inf); does not solve it like the other solutions. –  Feb 28 '14 at 09:20
  • But AFAIK by doing this your app still potentially quits before the callback that prints "Success" gets a chance to execute. Try with calling `Await.result` and then just after that (in sequence, in the same thread) call `println`. Then if "Success" is printed you know the future successfully completed. If you got an error you'll see the stack trace, and finally if the future never completed your app will just hang. – Régis Jean-Gilles Feb 28 '14 at 10:07
  • Also note that you are spawning several futures in a `map`. You'll get as a result a collection of futures, which is not in itself a future. To be able to wait for all the futurse to complete, you'll first want to convert your collection of futures in single future, which can be done using `Future.sequence`. Wrapping up this would give: `val aggregateFuture = Future.sequence((1 to args(0).toInt).map(int => future {send(int)})); Await.result( aggregateFuture ); println("Success")` – Régis Jean-Gilles Feb 28 '14 at 10:11

0 Answers0