6

I am working on a Scala (2.11) / Spark (1.6.1) streaming project and using mapWithState() to keep track of seen data from previous batches.

The state is split in 20 partitions, created with StateSpec.function(trackStateFunc _).numPartitions(20). I had hoped to distribute the state throughout the cluster, but it seems that each node holds the complete state and the execution is always performed only exactly one node.

Locality Level Summary: Node local: 50 is shown in the UI for each batch and the complete batch is Shuffle read. Afterwards, I write to Kafka and the partitions are spread across the cluster again. I can't seem to find out why mapWithState() needs to be run on a single node. Doesn't this ruin the concept of partitioning the state if it is limited by one node instead of the complete cluster? Couldn't it be possible to distribute the state by key?

Lawrence Benson
  • 1,398
  • 1
  • 16
  • 33

2 Answers2

2

I can't seem to find out why mapWithState needs to be run on a single node

It doesn't. Spark by default uses a HashPartitioner to partition your keys among the different worker nodes in your cluster. If for some reason you're seeing all your data stored on a different node, check the distribution of your keys. If this is a custom object you're using as a key, make sure it's hashCode method is implemented properly. This can happen if something is wrong with the key distribution. If you'd like to test this, try using random numbers as your keys and looking a the Spark UI and seeing if this behavior changes.

I'm running mapWithState and the data coming in is partitioned based on the key, as I also have a reduceByKey method call prior to holding the state, and when looking at the Storage tab on the Spark UI, I can see the different RDD's being stored on different worker nodes in the cluster.

Yuval Itzchakov
  • 146,575
  • 32
  • 257
  • 321
  • My data comes into spark streaming partitioned by key via kafka but I have the same issue. Would you mind expanding your answer? How do I access/edit the hash partitioner, and how do you apply reduceByKey before writing to database? – Andreas Sep 26 '16 at 16:57
  • @Andreas What issue are you experiencing? Partitioning bias? – Yuval Itzchakov Sep 26 '16 at 17:40
  • Thanks for responding, I'd like to learn how to control the partitioning from key for both kafka and spark, as in my problem the data doesn't need to be shuffled. In practice I observe shuffling, as the hash function applied to my compound key sends the data away. Details of my problem below. – Andreas Sep 26 '16 at 18:15
  • I have an impression-count problem. Kafka partitions by input class (a number, e.g. a URL ID#) and transmits a pair (URLID, timestamp). I round off the time stamp to the nearest time interval and form a key "URLID;RoundedTimeStamp", then do an accumulating word count. The DAG shuffles for each RDD, so I think I have to provide a HashPartitioner that returns the same value for the URLID alone and the compound key "URLID;RoundedTimeStamp", ideally also already partition the same way on the kafka end. I keep finding updateStateByKey examples but looking to use mapWithState and unsure about syntax – Andreas Sep 26 '16 at 18:18
  • @Andreas Spark defaults to use `HashPartitioner` for operations requiring a shuffle, so there's no need to explicitly provide it. Regarding `mapWithState` syntax, [see this blog post](http://asyncified.io/2016/07/31/exploring-stateful-streaming-with-apache-spark/) (disclaimer: I am the author). – Yuval Itzchakov Sep 26 '16 at 19:05
  • I was wondering how to control which partitioner is used in order to minimize shuffling of my data in the mapWithState step. Apparently I can supply one as StateSpec.partitioner(MyPartitioner) in the argument of mapWithState, with my partitioner defined in analogy to http://stackoverflow.com/questions/23127329 – Andreas Sep 27 '16 at 04:19
  • @Andreas I see. Yes, that's definitely possible to do. – Yuval Itzchakov Sep 27 '16 at 05:45
0

Are u running the spark on --deploy-mode cluster ? please check that.

Also make sure you are setting the --num-executors 20 --executor-cores 10 because unless you run with dynamic allocation by default it will assign 2 executors.

BalaramRaju
  • 439
  • 2
  • 8