5

I have a hard task to read from a Cassandra table millions of rows. Actually this table contains like 40~50 millions of rows. The data is actually internal URLs for our system and we need to fire all of them. To fire it, we are using Akka Streams and it have been working pretty good, doing some back pressure as needed. But we still have not found a way to read everything effectively.

What we have tried so far:

  • Reading the data as Stream using Akka Stream. We are using phantom-dsl that provides a publisher for a specific table. But it does not read everything, only a small portion. Actually it stops to read after the first 1 million.

  • Reading using Spark by a specific date. Our table is modeled like a time series table, with year, month, day, minutes... columns. Right now we are selecting by day, so Spark will not fetch a lot of things to be processed, but this is a pain to select all those days.

The code is the following:

val cassandraRdd =
      sc
        .cassandraTable("keyspace", "my_table")
        .select("id", "url")
        .where("year = ? and month = ? and day = ?", date.getYear, date.getMonthOfYear, date.getDayOfMonth)

Unfortunately I can't iterate over the partitions to get less data, I have to use a collect because it complains the actor is not serializable.

val httpPool: Flow[(HttpRequest, String), (Try[HttpResponse], String), HostConnectionPool] = Http().cachedHostConnectionPool[String](host, port).async

val source =
  Source
    .actorRef[CassandraRow](10000000, OverflowStrategy.fail)
    .map(row => makeUrl(row.getString("id"), row.getString("url")))
    .map(url => HttpRequest(uri = url) -> url)

val ref = Flow[(HttpRequest, String)]
  .via(httpPool.withAttributes(ActorAttributes.supervisionStrategy(decider)))
  .to(Sink.actorRef(httpHandlerActor, IsDone))
  .runWith(source)

cassandraRdd.collect().foreach { row =>
  ref ! row
}

I would like to know if any of you have such experience on reading millions of rows for doing anything different from aggregation and so on.

Also I have thought to read everything and send to a Kafka topic, where I would be receiving using Streaming(spark or Akka), but the problem would be the same, how to load all those data effectively ?

EDIT

For now, I'm running on a cluster with a reasonable amount of memory 100GB and doing a collect and iterating over it.

Also, this is far different from getting bigdata with spark and analyze it using things like reduceByKey, aggregateByKey, etc, etc.

I need to fetch and send everything over HTTP =/

So far it is working the way I did, but I'm afraid this data get bigger and bigger to a point where fetching everything into memory makes no sense.

Streaming this data would be the best solution, fetching in chunks, but I haven't found a good approach yet for this.

At the end, I'm thinking of to use Spark to get all those data, generate a CSV file and use Akka Stream IO to process, this way I would evict to keep a lot of things in memory since it takes hours to process every million.

Thiago Pereira
  • 1,724
  • 1
  • 17
  • 31
  • This is a pretty big question to answer on stackoverflow and it's not particularly related to phantom from what I can tell. My strong feeling is that the whole culprit is having to go over HTTP. Sharing an HDFS or whatever other method, such as fetching the HTTP bits first and so on. It would be interesting to get a lot more insight on how phantom streaming is not giving you everything back, I suspect it's either overflowing or maybe oversubscribing, so it thinks its done before it actually is. Sounds really strange. – flavian May 10 '16 at 14:58
  • yes, indeed, this is not a pure phantom question, however I have added that because this was my first idea, but it unfortunately stop after 1 million or so. Also the HTTP here is a bottle neck and it takes like hours to process everything, and maybe the reactive-phantom got in trouble, because it's clear the HTTP backpressure the phantom stream, so I don't know after some hours it could not read more and thought it has done already. – Thiago Pereira May 10 '16 at 22:26

1 Answers1

5

Well, after spending sometime reading, talking with other guys and doing tests the result could be achieve by the following code sample:

val sc = new SparkContext(sparkConf)

val cassandraRdd = sc.cassandraTable(config.getString("myKeyspace"), "myTable")
  .select("key", "value")
  .as((key: String, value: String) => (key, value))
  .partitionBy(new HashPartitioner(2 * sc.defaultParallelism))
  .cache()

cassandraRdd
  .groupByKey()
  .foreachPartition { partition =>
    partition.foreach { row =>

      implicit val system = ActorSystem()
      implicit val materializer = ActorMaterializer()

      val myActor = system.actorOf(Props(new MyActor(system)), name = "my-actor")

      val source = Source.fromIterator { () => row._2.toIterator }
      source
        .map { str =>
          myActor ! Count
          str
        }
        .to(Sink.actorRef(myActor, Finish))
        .run()
    }
  }

sc.stop()


class MyActor(system: ActorSystem) extends Actor {

  var count = 0

  def receive = {

    case Count =>
      count = count + 1

    case Finish =>
      println(s"total: $count")
      system.shutdown()

  }
}

case object Count
case object Finish

What I'm doing is the following:

  • Try to achieve a good number of Partitions and a Partitioner using the partitionBy and groupBy methods
  • Use Cache to prevent Data Shuffle, making your Spark move large data across nodes, using high IO etc.
  • Create the whole actor system with it's dependencies as well as the Stream inside the foreachPartition method. Here is a trade off, you can have only one ActorSystem but you will have to make a bad use of .collect as I wrote in the question. However creating everything inside, you still have the ability to run things inside spark distributed across your cluster.
  • Finish each actor system at the end of the iterator using the Sink.actorRef with a message to kill(Finish)

Perhaps this code could be even more improved, but so far I'm happy to do not make the use of .collect anymore and working only inside Spark.

Thiago Pereira
  • 1,724
  • 1
  • 17
  • 31
  • This is interesting i am very curious about your performance in general and if you have been able to benchmark that. indeed, i find that creating an Actor system per partition is rather a heavy operation. Also depending on the number of partitition, you may find yourself with more than 8 Actors system on the same machine on the same VM ? How is that faster than simply making synchronous calls albeit parallel. – MaatDeamon Aug 29 '17 at 03:56
  • @MaatDeamon I remember it was quite fast, but unfortunately haven't measure it. If I would have to do it again today, I would go for the alpakka cassandra connector. – Thiago Pereira Aug 29 '17 at 11:14
  • I see.You would give away distribution ? – MaatDeamon Aug 29 '17 at 14:03
  • @MaatDeamon on that time, I had to fetch and fire each row through http, which is really bad when talking about millions of rows, so for that specific use case, yes. Now thinking of some computation, aggregate, map and reduce some data, I would keep that inside spark for sure. – Thiago Pereira Aug 29 '17 at 14:40