28

In Samza and Kafka Streams, data stream processing is performed in a sequence/graph (called "dataflow graph" in Samza and "topology" in Kafka Streams) of processing steps (called "job" in Samza" and "processor" in Kafka Streams). I will refer to these two terms as workflow and worker in the remainder of this question.

Let's assume that we have a very simple workflow, consisting of a worker A that consumes sensor measurements and filters all values below 50 followed by a worker B that receives the remaining measurments and filters all values above 80.

Input (Kakfa topic X) --> (Worker A) --> (Worker B) --> Output (Kafka topic Y)

If I have understood

correctly, both Samza and Kafka Streams use the topic partitioning concept for replicating the workflow/workers and thus parallelizing the processing for scalability purposes.

But:

  • Samza replicates each worker (i.e., job) separately to multiple tasks (one for each partition in the input stream). That is, a task is a replica of a worker of the workflow.

  • Kafka Streams replicates the whole workflow (i.e., topology) at once to multiple tasks (one for each partition in the input stream). That is, a task is a replica of the whole workflow.

This brings me to my questions:

  1. Assume that there is only one partition: Is it correct, that it is not possible to deploy worker (A) and (B) on two different machines in Kafka Streams while this is possible in Samza? (Or in other words: Is it impossible in Kafka Streams to split a single task (i.e., topology replica) to two machines no matter if there are multiple partitions or not.)

  2. How do two subsequent processors in a Kafka Streams topology (in the same task) communicate? (I know that in Samza all communication between two subsequent workers (i.e., jobs) is done with Kafka topics but since one has to "mark" in Kafka Streams explicitly in the code which streams have to be published as Kafka topics this can't be the case here.)

  3. Is it correct that Samza publishes also all intermediate streams automatically as Kafka topics (and thus makes them available to potential clients) while Kafka Streams only publishes those intermediate and final streams that one marks explicitly (with addSink in the low-level API and to or through in DSL)?

(I'm aware of the fact, that Samza can use also other message queues than Kafka but this is not really relevant for my questions.)

malana
  • 5,045
  • 3
  • 28
  • 41
Lukas Probst
  • 289
  • 1
  • 3
  • 5
  • 1
    I have taken a look at the code and figured out that processors in a Kafka Streams topology "communicate" by recursive calls of the `process(...)` method. Hence, there is no buffering or messaging between two subsequent processors in a topology. The first processor just calls the `process(...)`method of the subsequent processor(s) and so on (see `forward(...)` in [ProcessorContextImpl](https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java)). However, this is only true if my assumption in question 1 is correct. – Lukas Probst Dec 14 '16 at 13:38

1 Answers1

13

First of all, in both Samza and Kafka Streams, you can choose to have an intermediate topic between these two tasks (processors) or not, i.e. the topology can be either:

Input (Kakfa topic X) --> (Worker A) --> (Worker B) --> Output (Kafka topic Y)

or:

Input (Kakfa topic X) --> (Worker A) --> Intermediate (Kafka topic Z) -->(Worker B) --> Output (Kafka topic Y)

In either Samza or Kafka Streams, in the former case you will have to deploy Worker A and B together while in the latter case, you cannot deploy Worker A or B together as in either framework tasks only communicate through intermediate topics, and there is no TCP-based communication channels.

In Samza, for the former case you need to code your two filters as in one task, and for the latter case you need to specify the input and output topic for each of the tasks, e.g. for Worker A input is X and output is Z, for Work B input is Z and output is Y, and you can start / stop the deployed workers independently.

In Kafka Streams, for the former case you can just "concatenate" these processors like

stream1.filter(..).filter(..)

and as a result like Lucas mentioned each result from the first filter will be immediately passed to the second filter (you can think of each input record from topic X traverse the topology in the depth-first ordering, and there is no buffering between any directly connected processors);

And for the latter case you can indicate that the intermediate stream to be "materialized" in another topic, i.e.:

stream1.filter(..).through("topicZ").filter(..)

and each result of the first filter will be sent to the topic Z, which will then be pipelined to the second filter processor. In this case these two filters can potentially be deployed on different hosts or different threads within the same host.

mrsrinivas
  • 34,112
  • 13
  • 125
  • 125
Guozhang Wang
  • 481
  • 2
  • 5
  • (1/3) I agree with you regarding Samza (w.r.t. both versions) and regarding the first version of Kafka Streams. However, if I understood the documentation and the code correctly the workers (i.e., processors) of a Kafka Streams topology always communicate via method calls (see my comment to my initial question) and never via Kafka topics. Thus, even with `stream1.filter(..).through("topicZ").filter(..)` the workers (A and B) representing the both filters would communicate directly and not via _topicZ_. – Lukas Probst Jan 09 '17 at 14:29
  • (2/3) `through("topicZ")` only makes the intermediate stream accessible as a Kafka topic for external clients (or other topologies). Moreover, `through("topicZ")` does not introduce the possiblity to deploy worker A and worker B of the same topology on different hosts (from where did you get this information?). – Lukas Probst Jan 09 '17 at 14:30
  • (3/3) As far as I know, the only possibility would be to split the topology into two topologies, i.e., `stream1.filter(...).to("topicZ")` and `stream2 = builder.stream("topicZ").filter(...)` (see [documentation](http://docs.confluent.io/3.1.1/streams/architecture.html#backpressure)). In this case the two topologies would communicate via _topicZ_ and the two topologies could be deployed on different hosts. But the workers of a single topology would still communicate only via method calls and would not be deployable on different hosts. – Lukas Probst Jan 09 '17 at 14:31
  • 1
    Note that `stream.filter().though().filter()` is equal to `stream.filter().to("topic"); builder.stream("topic").filter()`, hence will break the topology into sub-topologies that can be deployed separately, and workers hosting these sub-topologies can independently execute at their own pace since the intermediate topic is used as a "persistent buffer". – Guozhang Wang Jan 12 '17 at 04:39
  • (1/3) Ah, you are absolutely right (see [documentation](http://docs.confluent.io/3.0.0/streams/developer-guide.html#writing-streams-back-to-kafka)). Thank you very much for this information! However, the code `stream.filter().through().filter()` still creates two (sub-)topologies instead of making the workers of a **single** topology deployable on multiple machines. Or more precisely it is still impossible to distribute the workers of a single topology onto multiple machines. This makes the system inflexible. – Lukas Probst Jan 12 '17 at 16:10
  • (2/3) You cannot use the code that creates a single topology that has originally been deployed on a single powerful machine and distribute the topology on 100 weaker machines. Instead, you have to split the topology by adding `through()` calls to the code. In Samza this is not necessary. Hence, in Samza you can use the same code for distributing the workflow on a YARN cluster with 1 machine or on a YAN cluster with 100 machines. – Lukas Probst Jan 12 '17 at 16:12
  • (3/3) Moreover, the way `through()` works induces that Kafka Streams high-level DSL is less expressive than its low-level processors API, since in the low-level API it is possible (via `addSink()`) to publish an intermediate stream (for external clients) to a Kafka topic without splitting the topology into two sub-topologies while this is impossible in the high-level DSL. – Lukas Probst Jan 12 '17 at 16:13
  • 1
    In Samza this scenario is the same: say if you have a single job that does two consecutive filter operations, this job has to be deployed as-a-whole on YARN containers; if you want to break the two filters into separate jobs so that you can deploy them on multiple containers, you have to break the code into separate jobs that are communicated via, for example, an intermediate Kafka topic. – Guozhang Wang Jan 13 '17 at 16:43
  • And similarly, just as after you have broken down the two-filter job into two single-filter jobs, these two jobs can be deployed on the same machine, in Streams it is also the case: multiple sub-topologies can be deployed on the same JVM, and then when you add more they can automatically be re-distributed into other JVMs. – Guozhang Wang Jan 13 '17 at 16:45
  • 1
    Regarding the high-level DSL: it is also possible to publish to intermediate topics without breaking the topology, for example: – Guozhang Wang Jan 13 '17 at 16:45
  • 1
    stream1 = builder.stream().filter(); // this is an intermediate stream stream2 = stream1.filter(); // this is the result stream stream2.to("topic1"); // write result to topic1 stream1.to("topic2"); // write intermediate result to topic 2 without breaking topology – Guozhang Wang Jan 13 '17 at 16:51
  • (1/2) Thank you for your nice DSL example. This refutes my assumption. However, I still disagree with you regarding the distribution flexibility: In Samza it is intended to implement any worker (e.g., a filter) as an independent job and combine these jobs to a dataflow graph. That is, a Samza job is the counterpart to a processor (low-level API) or an operator (high-level DSL) and not to a topology in Kafka Stream. So basically in Samza you get the distribution flexibility of a workflow (i.e., a dataflow graph) for free. – Lukas Probst Jan 17 '17 at 08:14
  • (2/2) In Kafka Streams on the other hand you additionally have to split a workflow (i.e, a topology). Often this might be very easy. Nevertheless, it has to be done in contrast to Samza. I do not want to disparage Kafka Streams. Kafka Streams definitely has its benefits such as for instance the nice DSL which Samza does not provide. But regarding the distribution flexibility, in my opinion Samza is superior. – Lukas Probst Jan 17 '17 at 08:14
  • Regarding distribution flexibility, I would think it a different way: 1) in Kafka Streams you can definite each worker as a separate Streams app and connect them with Kafka topics, as you can do in Samza, in which you define one worker for each process and connects them explicitly with Kafka topics; 2) in Kafka Streams, however, you can easily "merge" multiple operators into a single processor, e.g. for two light-weight filter processors, you may just want to deploy them together, while in Samza you have to change your code in order to do so. – Guozhang Wang Jan 18 '17 at 17:33
  • In fact, Samza recently has proposed some ideas similar to Kafka Streams for topology management, see http://www.slideshare.net/YiPan7?utm_campaign=profiletracking&utm_medium=sssite&utm_source=ssslideview (slide 47) – Guozhang Wang Jan 18 '17 at 17:34
  • (1/2) First, thank you very much for the link. SamzaSQL sounds like a very promising high-level/declarative language for Samza. However, for various reasons I plan to base my research on a low-level API so I will not be able to use the SamzaSQL pipeline approach. (But, discussing my research would be completely off topic. If you are interested we can discuss this somewhere else.) – Lukas Probst Jan 19 '17 at 13:56
  • (2/2) Regarding the distribution flexibility: I think, we agree on the fact that merging and distributing is possible in both systems. You already mentioned that merging jobs in Samza requires changing the code. For me it also seems a little like working around the idea of Kafka Streams if you split everything from begin on into sub-topics (or Kafka Stream apps) and connect them with Kafka topics. I think, in summary, one can say that both systems have different use cases where their distribution/parallelism concept fits better then the other. Do you agree with that? – Lukas Probst Jan 19 '17 at 13:56