I am working on a Scala (2.11) / Spark (1.6.1) streaming project and using mapWithState()
to keep track of seen data from previous batches.
The state is split in 20 partitions, created with StateSpec.function(trackStateFunc _).numPartitions(20)
. I had hoped to distribute the state throughout the cluster, but it seems that each node holds the complete state and the execution is always performed only exactly one node.
Locality Level Summary: Node local: 50
is shown in the UI for each batch and the complete batch is Shuffle read. Afterwards, I write to Kafka and the partitions are spread across the cluster again. I can't seem to find out why mapWithState()
needs to be run on a single node. Doesn't this ruin the concept of partitioning the state if it is limited by one node instead of the complete cluster? Couldn't it be possible to distribute the state by key?