5

I want to understand a basic thing in spark streaming. I have 50 Kafka topic partitions and 5 numbers of executors, I am using DirectAPI so no. of RDD partitions will be 50. How this partition will be processed on 5 executors? Will spark process 1 partition at a time on each executors or if the executor has enough memory and cores it will process more than 1 partition in parallel on each executor.

nilesh1212
  • 1,561
  • 2
  • 26
  • 60

1 Answers1

11

Will spark process 1 partition at a time on each executors or if the executor has enough memory and cores it will process more than 1 partition in parallel on each executor.

Spark will process each partition depending on the total amount of cores available to the job you're running.

Let's say your streaming job has 10 executors, each one with 2 cores. This means that you'll be able to process 10 x 2 = 20 partitions concurrently, assuming spark.task.cpus is set to 1.

If you really want the details, look inside Spark Standalone requests resources from CoarseGrainedSchedulerBackend, you can look at it's makeOffers:

private def makeOffers() {
  // Filter out executors under killing
  val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
  val workOffers = activeExecutors.map { case (id, executorData) =>
    new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
  }.toIndexedSeq
  launchTasks(scheduler.resourceOffers(workOffers))
}

Key here is executorDataMap, which holds a mapping from executor id to an ExecutorData, which tells how much cores each such executor in the system is utilizing, and according to that and the preferred locality of the partition, makes an educated guess on which executor this task should run.

Here is an example from a live Spark Streaming app consuming from Kafka:

Spark Tasks

We have 5 partitions with 3 executors running, where each executor has more than 2 cores which enables the streaming to process each partition concurrently.

Yuval Itzchakov
  • 146,575
  • 32
  • 257
  • 321
  • Thank you so much for such a precise and elaborate answer, That means each partition is processed by 1 core (1 thread) if `spark.task.cpus` is set to 1. So in my case If I have 5 executors and I set `--executor-cores 10` all partitions will be processed concurrently. – nilesh1212 Dec 18 '16 at 14:54
  • @nilesh1212 The amount of tasks will depend on the number of partitions in your `DirectKafkaInputDStream`, but basically each task will be requested for each partition in the RDD of the underlying DStream (see [this question for more](http://stackoverflow.com/q/37528047/1870803)). You can verify this yourself by going into the first transformation which reads the data from Kafka and look where each partition is being processed. – Yuval Itzchakov Dec 18 '16 at 14:59
  • if I am not wrong Task are nothing but the transformation/actions executed on the rdd right at partition level ? – nilesh1212 Dec 18 '16 at 15:07
  • I don't think "running Spark Standalone which uses CoarseGrainedSchedulerBackend" holds. It's the opposite that true, i.e. CGSB is used to talk to Spark Standalone and Mesos. – Jacek Laskowski Dec 18 '16 at 18:52
  • @Jacek Maybe "uses" isn't the right word. Perhaps "communicates with" is better. – Yuval Itzchakov Dec 18 '16 at 19:09
  • It's the opposite -- a Spark application requests resources from a cluster manager. – Jacek Laskowski Dec 18 '16 at 20:19
  • @JacekLaskowski Ok, fixed it. Let me know if it makes more sense now. – Yuval Itzchakov Dec 18 '16 at 20:39