2

I'm essentially trying to do the opposite of what is being asked in this question; that is to say, use a Source[A] to push elements into a InputDStream[A].

So far, I've managed to clobber together an implementation that uses a Feeder actor and a Receiver actor similar to the ActorWordCount example, but this seems a bit complex so I'm curious if there is a simpler way.

Community
  • 1
  • 1
lloydmeta
  • 1,289
  • 1
  • 15
  • 25

2 Answers2

1

EDIT: Self-accepting after 5 days since there have been no good answers.

I've extracted the Actor-based implementation into a lib, Sparkka-streams, and it's been working for me thus far. When a solution to this question that is better shows up, I'll either update or deprecate the lib.

Its usage is as follows:

// InputDStream can then be used to build elements of the graph that require integration with Spark
val (inputDStream, feedDInput) = Streaming.connection[Int]()
val source = Source.fromGraph(GraphDSL.create() { implicit builder =>

  import GraphDSL.Implicits._

  val source = Source(1 to 10)

  val bCast = builder.add(Broadcast[Int](2))
  val merge = builder.add(Merge[Int](2))

  val add1 = Flow[Int].map(_ + 1)
  val times3 = Flow[Int].map(_ * 3)
  source ~> bCast ~> add1 ~> merge
            bCast ~> times3 ~> feedDInput ~> merge

  SourceShape(merge.out)
})

val reducedFlow = source.runWith(Sink.fold(0)(_ + _))
whenReady(reducedFlow)(_ shouldBe 230)

val sharedVar = ssc.sparkContext.accumulator(0)
inputDStream.foreachRDD { rdd =>
  rdd.foreach { i =>
    sharedVar += i
  }
}
ssc.start()
eventually(sharedVar.value shouldBe 165)
lloydmeta
  • 1,289
  • 1
  • 15
  • 25
  • has anything come up? Your library hasn't been updated in a while.. thanks! – ticofab Jan 27 '17 at 07:33
  • no, nothing out of the ordinary :) I wrote it for a couple of small projects I was doing at the time but no longer need to touch. If you're interested, please feel free to send a PR ! – lloydmeta Jan 27 '17 at 14:36
0

Ref: http://spark.apache.org/docs/latest/streaming-custom-receivers.html

You can do it like:

class StreamStopped extends RuntimeException("Stream stopped")

// Serializable factory class
case class SourceFactory(start: Int, end: Int) {
  def source = Source(start to end).map(_.toString)
}

class CustomReceiver(sourceFactory: SourceFactory)
  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {

  implicit val materializer = ....

  def onStart() {
    sourceFactory.source.runForEach { e =>
      if (isStopped) {
        // Stop the source
        throw new StreamStopped
      } else {
        store(e)
      }
    } onFailure {
      case _: StreamStopped => // ignore
      case ex: Throwable => reportError("Source exception", ex)
    }
  }

  def onStop() {}
}

val customReceiverStream = ssc.receiverStream(new CustomReceiver(SourceFactory(1,100))
PH88
  • 1,796
  • 12
  • 12
  • I tried something like this before using the aforementioned Actor variant, but at runtime, Spark tries to serialise the `Source` and dies. – lloydmeta Feb 28 '16 at 03:17
  • Sorry I'm no Spark expert and didn't noticed that `Receiver` must be serializable. In that case one choice could be, instead of passing the source instance, pass the info for creating the source (in form of a factory object) and create the Source in `Receiver#onStart`. See my updated example. – PH88 Feb 28 '16 at 05:06
  • Fair enough, that could be one way of doing it in the case where it is acceptable to start a flow from within a receiver, but I'm specifically looking for using an existing `Source[A]` as opposed to constructing the source from within the receiver. If we could use an existing `Source`, we can easily generalise that to any existing Akka flow (or a transformation of a flow), which is nice. – lloydmeta Feb 28 '16 at 06:12
  • In akka-stream, `Source[A]` or `Graph` in general is just a blueprint of execution flow. If your intend is to share the 'blueprint', then whether you pass the Source[A] as param or have it created within the Receiver makes little difference in runtime because in both case the materialization happens within the Receiver. However, if what you want is to feed a `materialized` data flow into a dstream, then your original approach of Feeder / Receiver actors actually make sense. – PH88 Feb 28 '16 at 08:03
  • If I have a `Source[A]` that I need to split or broadcast or otherwise reuse in other flows, and the physical resource that pushes things into the source is constrained to only 1 connection at a time (e.g. a webcam, depending on the driver), I think the difference between creating and running the stream from within the receiver vs passing in an existing `Flow`/`Graph`/`Source` is quite big. Also, efficiency-wise, even without that constraint, being able to reuse an existing defined source is nicer. – lloydmeta Feb 28 '16 at 08:36
  • Let me clarify: say if you have a `Source[Photo]` that connects to some webcam, you can code your receiver like: **Case A1**: `class CustomReceiver(src: Source[Photo])`; **Case A2**: `class CustomReceiver(factory: WebcamSourceFactory)`; **Case B**: `class CustomReceiver(feeder: ActorRef)`; In both **Case A1** and **Case A2** the materialization of the source i.e. connect to the WebCam, happens within the Receiver at the call to `runForeach` and thus in runtime they makes little difference... – PH88 Feb 28 '16 at 09:49
  • Given your webcam scenario, **Case B** might be more appropriate and necessary i.e. materialize and connect the `Source[Photo]` only once to a single Feeder actor and share the actor instance i.e. your original approach. – PH88 Feb 28 '16 at 09:50
  • Your **Case A1** actually will not work because you are holding a reference to a non-serialisable `Source`. **Case A2** will use a bunch of arguments to materialise a *non-sharable* source within the receiver (in which case, why even use a `Source` to begin with?). Comparing A1 and A2 is pointless because one will not run and the other will run but be wasteful. **Case B** is what I already have and in asking this question, I'm trying to figure out if that is the 1. best way 2. the only way. – lloydmeta Feb 28 '16 at 11:18
  • Well, not all `Source` are same and some maintain it's own shared resources internally and thus not 'wasteful' to materialise multiple times e.g. Http().cachedHostConnectionPool(...) from http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/http/client-side/host-level.html. In case like this, it's actually nicer to just let Spark to materialize the `Source` in `Receiver` and avoid the complexity in sharing the materialized flow (which give rise to you question in the first place). – PH88 Feb 28 '16 at 15:02
  • Putting aside the fact that your example code demonstrates creation of another new `Source`, your current scenario depends on having a Flow that is cached by some kind of stateful singleton, which is not always available, and if you were to implement yourself, is itself a source of complexity. Not only that, if a `Source` already exists in the driver application (highly likely), it still creates a wasted resource because it allocates a Source on a Spark node. In addition, forcing the Source to be in the receiver severely limits the composability of a given graph. In short, it's far from ideal. – lloydmeta Feb 28 '16 at 16:23
  • @lloydmeta, allocating local resource (e.g. resource pool in case of the the Http().cachedHostConnectionPool()) on Spark node and shares it among the hundreds/thousands of jobs running on the node is not a waste. In this case it's actually good for performance and scalability (to have jobs make use of the local pool on the node). Spark is about cluster/performance/parallel processing after all. No offense, I'm just making a point that there's no silver bullet. I don't think there's single ideal way and it all depends on your use case. – PH88 Feb 29 '16 at 12:53
  • No need to convince me there's no silver bullet; I'm the one critiquing yours. It fully relies on being able to create new Sources from Spark nodes, which has huge drawbacks in terms of possibility (webcam,websockets..) and infosec. Also, your solution would not be wasteful only if you don't have an existing `Source` in your driver, your allocation of a Source is cached by a *something*, and your Spark jobs all run in the same executor+node (suboptimal use of a cluster). This is ifs upon ifs: I hope to find a more general solution that answers the SO question by using *an existing source* – lloydmeta Feb 29 '16 at 15:32