0

To apply some function to stream in trident storm we pass newly created instance to each method which is called on stream like this:

stream.each(inputFields, new SomeFunc(), outputFields)

where SomeFunc is a descendant of BaseFunc.

Suppose I want to have some state variable in SomeFunc:

class SomeFunc extends BaseFunction {

  var someState: String = _

  override def execute(tuple: TridentTuple, collector: TridentCollector) = ???
}

If I set parallelism hint to some value greater than 1 for SomeFunc component will storm create multiple instances of SomeFunc? Is accessing/updating someState in SomeFunc a thread safe operation? If instead of defining SomeClass as class I define it as an object, will smth be changed?

EDIT Ok, with help of user @Shaw in comments to his answer I learned that storm creates one instance of storm component(storm/bolt/function/aggregator etc.) per executor. The question is how does it do this? I want to know the mechanism of this behaviour

maks
  • 5,911
  • 17
  • 79
  • 123

1 Answers1

1

I don't kwow how exactly Trident works but in Storm if you define parallelism hint > 1 you create multiple executors for that component which are threads spawned by the worker process.

That executor will create a X (number of tasks, 1 by default) "instances" of SomeFunc and don't share the variable someState between them. As storm works someState is threadSafe because they execute tuples "sequentially in its own thread" as they are arriving to the component.

I almost sure in Trident is the same because simply is a micro-batching abstraction over Storm.

Sure you have read it but if not, I strongly recommend you read this great article about parallelism in Storm.

gasparms
  • 3,336
  • 22
  • 26
  • How does it create X number of instances if you don't pass factory method but a concrete instance to `each` method? – maks Sep 21 '14 at 20:48
  • You create a new instance but storm framework creates multiple "threads" underlying by calling prepare, execute, cleanup... methods. That's one of the greatest things of Storm. – gasparms Sep 22 '14 at 07:21
  • `stream.each(inputFields, new SomeFunc(), outputFields)` is the same if you create instance of someFunc outside and pass the reference to `each`: `stream.each(inputFields, someFunc, outputFields)`. Multiple instances may only exist if topology is submitted to cluster which has many workers. In that case that instances would be created in multiple different jvm instances. But how a few instances will be created in single JVM? – maks Sep 22 '14 at 10:41
  • But in one worker you can have multiple executor threads so you will have one instance per executor. Each worker runs in its own JVM. Each executor (parallellism hint) is a thread spawned by a worker and then you have tasks (numTask option) that run in the same executor thread serially so numTasks do not increase the parallelism. Read this answer, http://stackoverflow.com/questions/17257448/what-is-the-task-in-twitter-storm-parallelism – gasparms Sep 22 '14 at 13:18
  • Imagine the situation: you've created an instance of SomeFunc in main method of the application; you've also created a storm topology in that main method and submitted it. Then later in your main method(suppose after 10 minutes) you changed `someState` variable. Storm topology will see this change as it has the same reference to someFunc instance as your main method – maks Sep 22 '14 at 21:25
  • In my opinion scenario proposed by you will only hold if storm serializes each component(spout, bolt ,function etc) and for each executor restores(deserialize) the state observed in component before serialization happened. Do things happen in this way? – maks Sep 22 '14 at 21:31
  • I don't know how it would work but I think you wouldn't see that change in the variable. Storm serialize once per executor and then work in local, for this reason you can use non-serialize objects in prepare and execute methods. – gasparms Sep 23 '14 at 07:02