2

I'm trying to run some spark jobs, but routinely the executors run out of memory:

17/02/06 19:12:02 WARN TaskSetManager: Lost task 10.0 in stage 476.3 (TID 133250, 10.0.0.10): ExecutorLostFailure (executor 12 exited caused by one of the running tasks) Reason: Container marked as failed: container_1486378087852_0006_01_000019 on host: 10.0.0.10. Exit status: 52. Diagnostics: Exception from container-launch.
Container id: container_1486378087852_0006_01_000019
Exit code: 52
Stack trace: ExitCodeException exitCode=52:
        at org.apache.hadoop.util.Shell.runCommand(Shell.java:933)

Since I've already set spark.executor.memory=20480m, I feel that the job shouldn't really need more RAM to work, so the other option I see is to increase the number of partitions.

I've tried:

>>> sqlContext.setConf("spark.sql.shuffle.partitions", u"2001")
>>> sqlContext.getConf("spark.sql.shuffle.partitions")
u'2001'

and

>>> all_users.repartition(2001)

however when I start a job I still see the default 200 partitions:

>>> all_users.repartition(2001).show()
[Stage 526:(0 + 30) / 200][Stage 527:>(0 + 0) / 126][Stage 528:>(0 + 0) / 128]0]

I'm using PySpark 2.0.2 on Azure HDInsight. Can anyone point what I'm doing wrong?

EDIT

According to the answer below I tried:

sqlContext.setConf('spark.sql.shuffle.partitions', 2001)

at the start but it didn't work. However, this worked:

sqlContext.setConf('spark.sql.files.maxPartitionBytes', 100000000)

all_users is an sql dataframe. A specific example would be:

all_users = sqlContext.table('RoamPositions')\ 
    .withColumn('prev_district_id', F.lag('district_id', 1).over(user_window))\ 
    .withColumn('prev_district_name', F.lag('district_name', 1).over(user_window))\
    .filter('prev_district_id IS NOT NULL AND prev_district_id != district_id')\
    .select('timetag', 'imsi', 'prev_district_id', 'prev_district_name', 'district_id', 'district_name')
zero323
  • 322,348
  • 103
  • 959
  • 935
André Cruz
  • 500
  • 1
  • 5
  • 15

1 Answers1

1

Based on your comments it looks like you read data from an external source and use window functions before you call repartition. Window functions:

  • Repartition data to a single partition if there is no partitionBy clause provided.
  • Use standard shuffle mechanism if you provide partitionBy clause.

The latter one seems to be the case here. Since default value of spark.sql.shuffle.partition is 200 your data will be shuffled into 200 partitions before it is repartitioned. If you want 2001 all the way you should set it before loading the data

sqlContext.setConf("spark.sql.shuffle.partitions", u"2001")

all_users = ...

Also spark.sql.shuffle.partitions doesn't affect the number of initial partitions. These can be controlled using other properties: How to increase partitions of the sql result from HiveContext in spark sql

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935