0

I run Spark in standalone mode in our cluster (100 machines, 16 CPU cores per machine, 32 GB RAM per machine). I specify SPARK_WORKER_MEMORY and SPARK_WORKER_CORES when running any application.

In Spark programming, I program as if it's a serial program then Spark framework parallelizes the tasks automatically, right?

I encountered OOM crash when I ran the program with SPARK_WORKER_CORES = 16. I tried again with SPARK_WORKER_CORES = 4, the program completed successfully.

Surely, exploiting multiple threads by data parallelism would require larger memory, but I don't know which function in my Spark program is parallelized by the multiple threads. So I don't know which function is in charge of OOM.

I control the number of RDD partitions (degree of parallelism) by taking into account the total number of machines and the amount of memory per worker (per machine), so that each RDD partition of the data can fit in memory.

After partitioning RDDs, a worker in a machine invoke the user-defined functions on each RDD partition to process it.

Here I have the question, How does Spark exploits the multi-core parallelism in each machine?

Which function is parallelized by the multiple threads? Which function should I put special care not to use too much memory within?

Thanks

syko
  • 3,477
  • 5
  • 28
  • 51
  • I believe all of the functions that "do" anything are parallelized. It's really hard to say "which function" without getting more info - are you using scala? python? java? Spark evaluates lazily, so until you tell it to "do" something, such as count(), collect(), etc. nothing will be distributed. Does that help? – flyingmeatball Jan 23 '17 at 13:52
  • @flyingmeatball The program uses RDD's foreachPartition operation. How does Spark parallelize the computation of each RDD partition? – syko Jan 23 '17 at 14:51
  • Is there a particular reason you're using foreachPartition instead of a more standard syntax like .map, .flatmap, .reduce, etc? (see here: http://stackoverflow.com/questions/25914789/how-do-i-iterate-rdds-in-apache-spark-scala). If you're running out of memory, it means the cores * memory per core + other used memory >available memory - it seems like you already have the solution figured out...dial down the number of executors, or the memory allocated to an executor. – flyingmeatball Jan 23 '17 at 15:56

1 Answers1

2

Spark runs your logic on every partition (The RDD is spitted and distributed across your cluster). Every executor has dedicated number of cores and memory predefined. Relaying on resources executors provide task slots which are used to run tasks sent them by driver. In best condition if task slot available on executor to process partition it reserves that slot, if not it uses task slots of other executor on same node, the worse case if there is no available task slot and it tries run it on cluster (rack) level transferring over network. OOM in general occurs when you gather all your date into one place like into driver calling toArray(), collect() which combines all partitions of your RDD into one node. In other hand it can occur on executors if executor memory and executor memory overhead exceeds containers overall memory in processing stage of partitions.

FaigB
  • 2,271
  • 1
  • 13
  • 22