1

We have a use case to prepare a spark job that'll read data from multiple providers, containing info about users present in some arbitrary order and write them back to files in S3. Now, the condition is, all of a user's data must be present in a single file. There are roughly about 1 million unique users, and each one of them has about 10KB of data, at max. We thought of creating at most 1000 files, and let each file contain about 1000 users' records.

We're using java dataframe apis for creating the job against spark 2.4.0. I can't wrap my head around what would be the most logical way of doing this? Should I do a group by operation on the user-id and then somehow collect the rows unless I reach 1000 users, and then roll over (if that's even possible) or there's some better way. Any help or a hint in the right direction is much appreciated.

Update:

After following the suggestion from the answer I went ahead with the following code snippet, still I saw 200 files being written, instead of 1000.

Properties props = PropLoader.getProps("PrepareData.properties");
SparkSession spark = SparkSession.builder().appName("prepareData").master("local[*]")
    .config("fs.s3n.awsAccessKeyId", props.getProperty(Constants.S3_KEY_ID_KEY))
    .config("fs.s3n.awsSecretAccessKey", props.getProperty(Constants.S3_SECERET_ACCESS_KEY)).getOrCreate();

Dataset<Row> dataSet = spark.read().option("header", true).csv(pathToRead);
dataSet.repartition(dataSet.col("idvalue")).coalesce(1000).write().parquet(pathToWrite);

spark.close();

But instead of 1000 if I use 100, then I see 100 files. Then I followed the link shared by @Alexandros, and the following code snippet generated more than 20000 files within their individual directories, and also the execution time shot up like crazy.

dataSet.repartition(1000, dataSet.col("idvalue")).write().partitionBy("idvalue").parquet(pathToWrite);
Bitswazsky
  • 4,242
  • 3
  • 29
  • 58
  • 1
    Hello @Bitswazsky you have the options mentioned here: https://stackoverflow.com/questions/50775870/pyspark-efficiently-have-partitionby-write-to-same-number-of-total-partitions-a – abiratsis Apr 08 '19 at 00:49

1 Answers1

1

You can use repartition and then coalesce function.

 Df.repartion(user_id).coalese(1000)

 Df.repartion(user_id,1000)

First one guarantees there will not be any empty partitions while in second solution some partitions might be empty.

Refer : Spark SQL - Difference between df.repartition and DataFrameWriter partitionBy?

https://spark.apache.org/docs/1.6.3/api/java/org/apache/spark/sql/DataFrame.html#coalesce(int)

Update:

To make this work

dataSet.repartition(dataSet.col("idvalue")).coalesce(1000).write().parquet(pathToWrite);

spark.sql.shuffle.partitions (default: 200). Due to that it don't give 1000 files but works for 100 files. To make it work you will have to first repatriation to 1000 partitions which will be same as approach 2.

dataSet.repartition(1000, dataSet.col("idvalue")).write().partitionBy("idvalue").parquet(pathToWrite);

I think above code will create one million files or more instead of 1000.

dataSet.repartition(1000, dataSet.col("idvalue")).write().parquet(pathToWrite);

It will create 1000 files but you will have to create mapping between ids and files by reading each file once you complete writing the files.

jaimin03
  • 176
  • 9
  • I guess by default we have 200 partitions. Using the first approach you mentioned, when we're performing a coalesce to 1000, i.e. increasing the partition count, would there be any chance of some user's data getting split across partitions? – Bitswazsky Apr 08 '19 at 04:20
  • We are increasing partitions to 1 million then reduce it to 1000. I think coalesce with shuffle=false which default it just combines existing partitions to get desired number of partitions. In your case coalesce will create new partition by merging 1000 existing partitions. Refer: https://www.linkedin.com/pulse/repartition-vs-coalesce-apache-spark-bhanuvardhan-nune Also once you write data frame using df.write(path) you will lose track of which values are in which file you will need to create one more data structure to keep track of it once you complete writing file. – jaimin03 Apr 08 '19 at 07:51
  • Hi, can you please look at the update in the question, because the suggestion isn't working. Am I missing something here? – Bitswazsky Apr 09 '19 at 11:00
  • The 1st option for the updated section worked, but with `--conf spark.sql.shuffle.partitions=1000` option – Bitswazsky Apr 10 '19 at 07:33