5

INPUT:

The input data set contains 10 million transactions in multiple files stored as parquet. The size of the entire data set including all files ranges from 6 to 8GB.

PROBLEM STATEMENT:

Partition the transactions based on customer id's which would create one folder per customer id and each folder containing all the transactions done by that particular customer.

HDFS has a hard limit of 6.4 million on the number of sub directories within a root directory that can be created so using the last two digits of the customer id ranging from 00,01,02...to 99 to create top level directories and each top level directory would contain all the customer id's ending with that specific two digits.

Sample output directory structure:

00/cust_id=100900/part1.csv
00/cust_id=100800/part33.csv

01/cust_id=100801/part1.csv
03/cust_id=100803/part1.csv

CODE:

// Reading input file and storing in cache
val parquetReader = sparksession.read
  .parquet("/inputs")
  .persist(StorageLevel.MEMORY_ONLY) //No spill will occur has enough memory

// Logic to partition
var customerIdEndingPattern = 0
while (cardAccountEndingPattern < 100) {
  var idEndPattern = customerIdEndingPattern + ""
  if (customerIdEndingPattern < 10) {
    idEndPattern = "0" + customerIdEndingPattern
  }

  parquetReader
    .filter(col("customer_id").endsWith(idEndPattern))
    .repartition(945, col("customer_id"))
    .write
    .partitionBy("customer_id")
    .option("header", "true")
    .mode("append")
    .csv("/" + idEndPattern)
  customerIdEndingPattern = customerIdEndingPattern + 1
}

Spark Configuration: Amazon EMR 5.29.0 (Spark 2.4.4 & Hadoop 2.8.5)

1 master and 10 slaves and each of them has 96 vCores and 768GB RAM(Amazon AWS R5.24xlarge instance). Hard disks are EBS with bust of 3000 IOPS for 30 mins.

            'spark.hadoop.dfs.replication': '3',
            'spark.driver.cores':'5',
            'spark.driver.memory':'32g',
            'spark.executor.instances': '189',
            'spark.executor.memory': '32g',
            'spark.executor.cores': '5',
            'spark.executor.memoryOverhead':'8192',
            'spark.driver.memoryOverhead':'8192',
            'spark.default.parallelism':'945',
            'spark.sql.shuffle.partitions' :'945',
            'spark.serializer':'org.apache.spark.serializer.KryoSerializer',
            'spark.dynamicAllocation.enabled': 'false',
            'spark.memory.fraction':'0.8',
            'spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version':'2',
            'spark.memory.storageFraction':'0.2',
            'spark.task.maxFailures': '6',
            'spark.driver.extraJavaOptions': '-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=12 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError="kill -9 %p"
            'spark.executor.extraJavaOptions': '-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=12 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError="kill -9 %p"

SCALING ISSUES:

  1. Experimented from 10 to all the way upto 40 slaves(adjusting the spark configs accordingly) but still the same results the job takes more than 2hrs to complete(as shown in the first pic each job takes more than a minute and the while loop runs 99 times). Also the reads from remote executors are almost non existent(which is good) most are process local.

  2. Partition seems to work fine(refer second pic) got 5 RDD blocks per instance and 5 tasks running at all times(each instance has 5 cores and 19 instances per slave node). GC is optimized too.

  3. Each partitionby task as written in the while loop takes a minute or more to complete.

METRICS:


Sample duration of a few jobs we have 99 jobs in total
Duration for each of the jobs(totally 99)


Partition seems okay
Partition seems okay

Summary from 1 job basically one partitionby execution
Summary of 1 job

Summary of a few instances after full job completion hence RDD blocks is zero and the first row is driver. enter image description here



So the question is how to optimize it more and why it's not scaling up? Is there a better way to go about it? Have I reached the max performance already? Assuming I have access to more resources in terms of hardware is there anything I could do better? Any suggestions are welcome.

user1613360
  • 1,280
  • 3
  • 16
  • 42
  • Use bucketing with partitioning for optimization. It is not a good way to partition based on customer id. As customer id is huge in number. You should have partition the data by country/state/age etc and use bucketing on customer id. – Kishore Feb 20 '20 at 07:21

1 Answers1

5

Touching every record 100 times is very inefficient, even if data can be cached in memory and not be evicted downstream. Not to mention persisting alone is expensive

Instead you could add a virtual column

import org.apache.spark.sql.functions.substring

val df = sparksession.read
  .parquet("/inputs")
  .withColumn("partition_id", substring($"customer_id", -2, 2))

and use it later for partitioning

df
  .write
  .partitionBy("partition_id", "customer_id")
  .option("header", "true")
  .mode("append")
  .csv("/")

To avoid to many small files you can repartition first using longer suffix

val nParts: Int = ???
val suffixLength: Int = ???  // >= suffix length used for write partitions

df
  .repartitionByRange(
    nParts,
    substring($"customer_id", -suffixLength, suffixLength)
  .write
  .partitionBy("partition_id", "customer_id")
  .option("header", "true")
  .mode("append")
  .csv("/")

Such changes will allow you to process all data in a single pass without any explicit caching.

user10938362
  • 3,991
  • 2
  • 12
  • 29
  • Thanks for the suggestion. It's more or less the same way I tried before the reason for looping was to split it into different EMR clusters based on partition id like 00-25 in one cluster and 25-50 in the next and so on. Since nothing seems to help and just to double check I tried running the same snippet you shared and it exactly takes the same amount of time no difference in performance. – user1613360 Feb 18 '20 at 23:14
  • 1
    @user1613360 The reason that your code is not scaling as you are expecting is that jobs do not run in parallel in spark; stages(if not dependent) and tasks do. As you are running a while loop 99 times you are ending up with 99 jobs which runs serially and each job is run in two stages(from your first pic). code snipped shared should work. Can you share the job and stages that are being created from UI? – wypul Feb 19 '20 at 07:52