2

Just reading more details on storm and came across it's ability to do fields grouping so for example if you where counting tweets per user and you had two tasks with a fields grouping of user-id the same user-id's would get sent to the same tasks.

So task 1 could have the following counts in memory bob: 10 alice: 5

task 2 could have the following counts in memory jill:10 joe: 4

If I added a new machine to the cluster to increase capacity and ran rebalance, what happens to my counts in memory? Will you start to get users with different counts?

James
  • 15,085
  • 25
  • 83
  • 120

4 Answers4

5

Using fields grouping we can guide a specific field to go to a particular tasks.

Fields grouping: The stream is partitioned by the fields specified in the grouping. For example, if the stream is grouped by the "user-id" field, tuples with the same "user-id" will always go to the same task, but tuples with different "user-id"'s may go to different tasks.

these task are always static in a storm's life cycle, what you can alter using the rebalance is number of executors(threads). in case of adding a new node to a cluster allows you to reconfigure the number of executors to run with out shutting down the topology but no matter what the number of tasks remains the same. its just that adding a new node gives you the advantage of increasing the performance by tuning the parallelism of storm.

user2720864
  • 8,015
  • 5
  • 48
  • 60
  • gotcha, so once bob goes to one task he'll always go to the same one until the topology is stopped. When I add new nodes that task just gets more threads to run concurrent operations so all your data structures need to be threadsafe in your bolts? – James Dec 08 '13 at 23:44
  • true, read more about this [Here](http://stackoverflow.com/questions/17257448/what-is-the-task-in-twitter-storm-parallelism) – user2720864 Dec 09 '13 at 07:34
2

In order to send the message to the same task every time storm will mod the hashcode of the value with the number of tasks (hashcode(values)% #tasks). If you were to increase your tasks your counts will not be accurate as they may not go to the same task/worker after re balance.

https://groups.google.com/forum/#!msg/storm-user/lCKnl8AmSVE/rVCH3uuUAcMJ
Naresh
  • 610
  • 1
  • 4
  • 14
0

To fully understand it, you have to see the code:

Fields grouping is dependent on the field string and not on which spout emitted it. So a rebalance won't affect it. This is the function: https://github.com/apache/storm/blob/3b1ab3d8a7da7ed35adc448d24f1f1ccb6c5ff27/storm-core/src/jvm/org/apache/storm/daemon/GrouperFactory.java#L157-L161

@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
    int targetTaskIndex = Math.abs(TupleUtils.listHashCode(outFields.select(groupFields, values))) % numTasks;
    return Collections.singletonList(targetTasks.get(targetTaskIndex));
}

TupleUtils.listHashCode leads to

public static <T> int listHashCode(List<T> alist) {
  if (alist == null) {
      return 1;
  } else {
      return Arrays.deepHashCode(alist.toArray());
  }
}
Nav
  • 19,885
  • 27
  • 92
  • 135
0

Based on the fields of one or more tuples, fields grouping allows you to control tuples sent to bolts. It ensures that a given set of values for a combination of fields is always sent to the same bolt.

VIJ
  • 1,516
  • 1
  • 18
  • 34