2

I am using Spark Streaming v2.0.0 to retrieve logs from Kafka and to do some manipulation. I am using the function mapWithState in order to save and update some fields related to a device. I am wondering how this function works in cluster. Indeed, i am just using the standalone mode so far but I will try it later with a Yarn cluster.

However, let's say I have a cluster with several nodes, if a node updates the sate of a device, does he notify immediately all other nodes of this update ? If no, the mapWithState function in cluster needs to be set. And how can I do that ?

ChrisF
  • 134,786
  • 31
  • 255
  • 325
Yassir S
  • 1,032
  • 3
  • 21
  • 44

3 Answers3

5

However, let's say I have a cluster with several nodes, if a node updates the state of a device, does he notify immediately all other nodes of this update ? If no, the mapWithState function in cluster needs to be set.

That's not how mapWithState works. mapWithState is a shuffle stage, that means it will cause data in your cluster to move around. How does that affect mapWithState? Each entry (key value pair) will be shuffled to a particular Executor. Upon subsequent arrivals of that same key to whichever Executor was processing it from the input stream at the given time, it will be shuffled to the node holding the in-memory map with the state of previous messages. This is done by default via the HashPartitioner which will hash the key and then send it to the proper Executor holding the state, that's why you need to choose the key carefully.

This means that the state for a particular key isn't spread throughout the cluster. It is assigned to a particular Executor inside the cluster, and the incoming data will keep coming back to the one each time based on the hash of the key.

Yuval Itzchakov
  • 146,575
  • 32
  • 257
  • 321
  • Hi Yuval, do you have any documentation about the HashPartitioner. I only got this link but I don't understand how it works precisely. http://spark.apache.org/docs/2.0.2/api/java/index.html?org/apache/spark/HashPartitioner.html – Yassir S Dec 07 '16 at 15:14
  • 1
    http://stackoverflow.com/questions/31424396/how-does-hashpartitioner-work – Yuval Itzchakov Dec 07 '16 at 15:15
  • What happens if a spark executor crashes? How is the state rebuilt for the keys in that failed executor? – user1870400 Jun 30 '17 at 04:11
0

All stateful transformations shuffle data by key so all values for a specific key are processed on the same executor thread.

There is no need for additional synchronization and state for a particular key is always consistent.

  • When you say "the same machine", you mean "the same node" ? if i undersatnd what you wrote, there's already a synchronisation between nodes regarding transformations ? – Yassir S Dec 05 '16 at 13:23
-1

Checkpoint is supplied as a directory, so can be from local file system, NFS mounted, HDFS hosted or S3 hosted!!!

Now, consider YARN + HDFS combination. Any data written to checkpoint due to mapWithState will be distributed across different HDFS nodes based on the state's key and spark will attempt to schedule tasks dependent on it on the same node.

But if you consider, YARN + NFS (perhaps not logical at all). Each node should mount the NFS at the same mount point and each read/write request will be an NFS request. This will create a perfect bottleneck!!!

Lets assume, state to manage session of users. We might choose to keep few bites or many GBs of information per user. The key in the state should somehow uniquely identify the user and each time the mapWithState function is triggered, all information saved in the state for that user will be accessible.

rakesh
  • 1,941
  • 2
  • 16
  • 23
  • I wouldn't checkpoint to S3; checkpointing relies on renames to commit the checkpoint, and on an object store that is very slow and not atomic. – stevel Dec 06 '16 at 12:14