1

I'm trying to run a job on Yarn mode that processes a large amount of data (2TB) read from google cloud storage. My pipeline works just fine with 10GB of data. The specs of my cluster and the beginning of my pipeline is detailed here : PySpark Yarn Application fails on groupBy

Here is the rest of the pipeline :

      input.groupByKey()\
      [...] processing on sorted groups for each key shard
      .mapPartitions(sendPartition)\
      .map(mergeShardsbyKey)
      .reduceByKey(lambda list1, list2: list1 + list2).take(10)
      [...] output

the map function that is applied over partitions is the following :

def sendPartition(iterator):
    pool = external_service_connection_pool()
    return [make_request(record, pool) for record in iterator]

def make_request(record, pool):
    [...] tags a record based on query results from the external service
    return key, taggedrecord

On the whole dataset the execution fails caused by :

java.lang.OutOfMemoryError: Java heap space

I tried to get a bit more information and I've seen it fails on the reduceByKey, however since the mapPartitions, the task is only executed on one executor until it fails on the reduce (at least only one executor shows on the Spark web interface, and the job is not split into multiple tasks until the reduce)

My question is the following : Why is it only run on 1 executor ? Even though the documentation describing the function seems to fit my idea of mapPartitions (http://spark.apache.org/docs/latest/programming-guide.html),Is this a failure or is it is supposed to work after this groupByKey?

EDIT: I tried on a smaller cluster with a smaller dataset and even though it succeeds, only one executor is used to process all the data after the groupByKey. Moreover there are multiple partitions after each phase and the groupByKey stage is noted as "pending" for each stage after it on the interface when I launch stages one by one.

Community
  • 1
  • 1
Paul K.
  • 796
  • 2
  • 7
  • 20
  • 1
    try specifying NumTasks for reduceByKey – Karthik Nov 05 '15 at 15:49
  • Sorry it was not very clear, the reduce is split into tasks but not the mapPartitions and the reduce uses all the workers. I just thought they could be linked in some way. – Paul K. Nov 05 '15 at 16:00
  • 1
    Is the use of mapPartitions mostly to optimize reusing a connection pool as much as possible? – Dennis Huo Nov 05 '15 at 16:26
  • yes, it is used to do so, I know I could coalesce partitions to have only one connection pool per worker but I don't think it is the issue since my external service doesn't get that much load considering the number of partitions (approx. 58000). The weird thing is that the reduceByKey is already really slow on a small dataset (15 times the duration of the group by on a small cluster). – Paul K. Nov 05 '15 at 16:33
  • What happens if you add a repartition (1000) before the mapPartitions? – Dennis Huo Nov 05 '15 at 16:41
  • I'm going to try right now. I tested calling getNumPartitions between each stage it is always 58000 even though there is only 1 task and 1 executor. Do you think that each time the RDD get shrinked to one partition? – Paul K. Nov 05 '15 at 16:47
  • 2
    One possibility is the mapPartitions is getting unlucky with spark dynamic allocation; is there also only 1 registered executor during that step or are there other executors just not being used? Either way you might try setting spark.executor.instances=99999 as --conf or in gcloud dataproc, --properties and see if that matters – Dennis Huo Nov 05 '15 at 17:00
  • stages are merged into 2 groups on the web interface (reduce and group) when I use the gcloud dataproc command line tool to submit so I can't see the mapPartitions part. However the reduce still seems to be as slow as it was on the small cluster. I will test tomorrow on a larger cluster and by connecting to the master to split the tasks to see if the mapPartitions uses all the executors. – Paul K. Nov 05 '15 at 17:36
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/94409/discussion-between-paul-k-and-dennis-huo). – Paul K. Nov 06 '15 at 09:46

0 Answers0