2

I have a spark streaming job with a batch interval of 2 mins(configurable).
This job reads from a Kafka topic and creates a Dataset and applies a schema on top of it and inserts these records into the Hive table.

The Spark Job creates one file per batch interval in the Hive partition like below:

dataset.coalesce(1).write().mode(SaveMode.Append).insertInto(targetEntityName);

Now the data that comes in is not that big, and if I increase the batch duration to maybe 10mins or so, then even I might end up getting only 2-3mb of data, which is way less than the block size.

This is the expected behaviour in Spark Streaming.
I am looking for efficient ways to do a post processing to merge all these small files and create one big file.
If anyone's done it before, please share your ideas.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
suhas
  • 71
  • 2
  • 3

2 Answers2

6

I would encourage you to not use Spark to stream data from Kafka to HDFS.

Kafka Connect HDFS Plugin by Confluent (or Apache Gobblin by LinkedIn) exist for this very purpose. Both offer Hive integration.

Find my comments about compaction of small files in this Github issue

If you need to write Spark code to process Kafka data into a schema, then you can still do that, and write into another topic in (preferably) Avro format, which Hive can easily read without a predefined table schema

I personally have written a "compaction" process that actually grabs a bunch of hourly Avro data partitions from a Hive table, then converts into daily Parquet partitioned table for analytics. It's been working great so far.

If you want to batch the records before they land on HDFS, that's where Kafka Connect or Apache Nifi (mentioned in the link) can help, given that you have enough memory to store records before they are flushed to HDFS

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
1

I have exactly the same situation as you. I solved it by:

Lets assume that your new coming data are stored in a dataset: dataset1

1- Partition the table with a good partition key, in my case I have found that I can partition using a combination of keys to have around 100MB per partition.

2- Save using spark core not using spark sql:

a- load the whole partition in you memory (inside a dataset: dataset2) when you want to save

b- Then apply dataset union function: dataset3 = dataset1.union(dataset2)

c- make sure that the resulted dataset is partitioned as you wish e.g: dataset3.repartition(1)

d - save the resulting dataset in "OverWrite" mode to replace the existing file

If you need more details about any step please reach out.

Abdulhafeth Sartawi
  • 1,086
  • 1
  • 11
  • 20