1

I'm running a spark job whose job is to scan a large file and split it into smaller files. The file is in Json Lines format and I'm trying to partition it by a certain column (id) and save each partition as a separate file to S3. The file size is about 12 GB but there are about 500000 distinct values of id. The query is taking almost 15 hours. What can I do to improve performance? Is Spark a poor choice for such a task? Please note that I do have the liberty to making sure that the source as a fixed number of rows per id.

import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from awsglue.utils import getResolvedOptions
from awsglue.transforms import *
from pyspark.sql.functions import udf, substring, instr, locate
from datetime import datetime, timedelta

    
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
# Get parameters that were passed to the job
args = getResolvedOptions(sys.argv, ['INPUT_FOLDER', 'OUTPUT_FOLDER', 'ID_TYPE', 'DATASET_DATE'])

id_type = args["ID_TYPE"]
output_folder = "{}/{}/{}".format(args["OUTPUT_FOLDER"], id_type, args["DATASET_DATE"])
input_folder = "{}/{}/{}".format(args["INPUT_FOLDER"], id_type, args["DATASET_DATE"])


INS_SCHEMA = StructType([
    StructField("camera_capture_timestamp", StringType(), True),
    StructField(id_type, StringType(), True),
    StructField("image_uri", StringType(), True)
])


data = spark.read.format("json").load(input_folder, schema=INS_SCHEMA)

data = data.withColumn("fnsku_1", F.col("fnsku"))

data.coalesce(1).write.partitionBy(["fnsku_1"]).mode('append').json(output_folder)   

I have tried repartition instead of coalesce too.

I'm using AWS Glue

lalatnayak
  • 160
  • 1
  • 6
  • 21
  • why do you need `coalesce(1)`? Any reason? - Ah... mode append, I see. – Lamanus Aug 01 '20 at 04:37
  • Spark uses the distributed file system and so the single file process is really bad and it makes job slow down. – Lamanus Aug 01 '20 at 04:40
  • So are you stating that if I have multiple files as source it will be faster? I thought Spark will use distributed processing even if the source is a single file. – lalatnayak Aug 01 '20 at 04:55
  • I mean when you write, not reading. – Lamanus Aug 01 '20 at 04:56
  • I'm writing to multiple files. Just that I need one file per partition. – lalatnayak Aug 01 '20 at 05:22
  • 1
    So what happened when you tried `repartition("fnsku_1")` instead of `coalesce(1)`? The difference between the two is that `repartition` creates a stage boundary whereas `coalesce` can be "optimized" forward (https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.coalesce _...However if you're doing a drastic coalesce..._) – mazaneicha Aug 03 '20 at 16:18
  • Could you please update Spark UI and ganglia chart in question and plz mention how you are runing EMR or Glue ? – vaquar khan Aug 03 '20 at 16:47
  • @lalatnayak Have you tried reading this via Glue dynamic frame ? Also how many columns does your source data have ? It will be good to update your question with sample records.And is there any reason why you are using coalesce(1) with 1 as value instead of repartition(totalfilesize/n not less than (128 MB) ) ? – Prabhakar Reddy Aug 08 '20 at 16:43

2 Answers2

4

Please consider the following as one of possible options. It would be awesome to see if it helped :)

First, if you coalesce, as said @Lamanus in the comments, it means that you will reduce the number of partitions, hence also reduce the number of writer task, hence shuffle all data to 1 task. It can be the first factor to improve.

To overcome the issue, ie. write a file per partition and keep the parallelization level, you can change the logic on the following one:

object TestSoAnswer extends App {

  private val testSparkSession = SparkSession.builder()
    .appName("Demo groupBy and partitionBy").master("local[*]")
    .getOrCreate()
  import testSparkSession.implicits._

  // Input dataset with 5 partitions
  val dataset = testSparkSession.sparkContext.parallelize(Seq(
    TestData("a", 0), TestData("a", 1), TestData("b", 0), TestData("b", 1),
    TestData("c", 1), TestData("c", 2)
  ), 5).toDF("letter", "number")

  dataset.as[TestData].groupByKey(row => row.letter)
    .flatMapGroups {
      case (_, values) => values
    }.write.partitionBy("letter").mode("append").json("/tmp/test-parallel-write")

}

case class TestData(letter: String, number: Int)

How does it work?

First, the code performs a shuffle to collect all rows related to a specific key (same as for the partitioning) to the same partitions. So that, it will perform the write on all the rows belonging to the key at once. Some time ago I wrote a blog post about partitionBy method. Roughly, internally it will sort the records on the given partition and later write them one-by-one into the file.

That way we get the plan like this one, where only 1 shuffle, so processing-consuming operation is present:

== Physical Plan ==
*(2) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, TestData, true])).letter, true, false) AS letter#22, knownnotnull(assertnotnull(input[0, TestData, true])).number AS number#23]
+- MapGroups TestSoAnswer$$$Lambda$1236/295519299@55c50f52, value#18.toString, newInstance(class TestData), [value#18], [letter#3, number#4], obj#21: TestData
   +- *(1) Sort [value#18 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(value#18, 200), true, [id=#15]
         +- AppendColumnsWithObject TestSoAnswer$$$Lambda$1234/1747367695@6df11e91, [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, TestData, true])).letter, true, false) AS letter#3, knownnotnull(assertnotnull(input[0, TestData, true])).number AS number#4], [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#18]
            +- Scan[obj#2]

The output of the TestSoAnswer executed twice looks like that:

test-parallel-write % ls
_SUCCESS letter=a letter=b letter=c
test-parallel-write % ls letter=a
part-00170-68245d8b-b155-40ca-9b5c-d9fb746ac76c.c000.json part-00170-cd90d64f-43c6-4582-aae6-fe443b6617f4.c000.json

test-parallel-write % ls letter=b
part-00161-68245d8b-b155-40ca-9b5c-d9fb746ac76c.c000.json part-00161-cd90d64f-43c6-4582-aae6-fe443b6617f4.c000.json

test-parallel-write % ls letter=c
part-00122-68245d8b-b155-40ca-9b5c-d9fb746ac76c.c000.json part-00122-cd90d64f-43c6-4582-aae6-fe443b6617f4.c000.json

You can also control the number of records written per file with this configuration.

Edit: Didn't see the comment of @mazaneicha but indeed, you can try with repartition("partitioning column")! It's even more clear than the grouping expression.

Best,

Bartosz.

Bartosz Konieczny
  • 1,985
  • 12
  • 27
  • Thanks. One of my team mates is going to try this and respond in the comments section – lalatnayak Aug 05 '20 at 20:24
  • So with this approach the run time shortened from 50 hours to 20 hours! Quite an improvement but I need to bring it down to under 5 hours. I was processing a 50 GB file and the partition column has 1.7 million distinct values. Is there any thing else you suggest at this point? – lalatnayak Aug 11 '20 at 17:02
  • What is the format of the file? Compressed vs not compressed? If you want to give a try meantime, maybe you can split the big job into smaller ones and by splitting I mean that each job filters on different partition column range. You run these jobs in parallel and unless you have some contention on the reading, it should accelerate the time. I don't see the logs but suppose then that for 1.7mln partitions, the I/O part for writing takes time and with a single process, don't see a way to accelerate it. – Bartosz Konieczny Aug 12 '20 at 04:13
1

If you're not going to use Spark for anything other than to split the file into smaller versions of itself, then I would say Spark is a poor choice. You'd be better off doing this within AWS following an approach such as the one given in this Stack Overflow post

Assuming you have an EC2 instance available, you'd run something like this:

aws s3 cp s3://input_folder/12GB.json - | split -l 1000 - output.
aws s3 cp output.* s3://output_folder/

If you're looking to do some further processing of the data in Spark, you're going to want to repartition the data to chunks between 128MB and 1 GB. With the default (snappy) compression, you typically end up with 20% of the original file size. So, in your case: between (12/5) ~3 and (12/5/8) ~20 partitions, so:

data = spark.read.format("json").load(input_folder, schema=INS_SCHEMA) 

dataPart = data.repartition(12)

This is not actually a particularly large data set for Spark and should not be as cumbersome to deal with.

Saving as parquet gives you a good recovery point, and re-reading the data will be very fast. The total file size will be about 2.5 GB.

Lars Skaug
  • 1,376
  • 1
  • 7
  • 13
  • This will work only if there are an equal number of rows per partition column. – lalatnayak Aug 05 '20 at 20:21
  • data.repartition(12) will reshuffle the data in the RDD randomly to fit 12 partitions. No partition column given, none used. Ref: http://spark.apache.org/docs/latest/rdd-programming-guide.html – Lars Skaug Aug 05 '20 at 20:32