I have a Spark streaming application written in Scala running in CDH. The application reads data from Kafka and writes the data to HDFS. Before writing data to HDFS, I execute partitionBy, so the data is written partitioned. this is the code:
//Some code
val ssc = new StreamingContext(sc, Seconds(1))
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String,String](topics, kafkaParams))
val sparkExecutorsCount = sc.getConf.getInt("spark.executor.instances", 1)
//Some code
stream.foreachRDD { rdd =>
if(!rdd.isEmpty()) {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val data = rdd.map(kafkaData => (getKey(kafkaData.value()), kafkaData.value()))
val columns = Array("key", "value")
val addOp = (record1: String, record2:String) => record1 + "\n" + record2
val mergeOp = (record1: String, record2:String) => record1 + record2
val initialValue = ""
val out = data.aggregateByKey(initialValue)(addOp, mergeOp)
out.toDF(columns: _*).coalesce(sparkExecutorsCount)
.write.mode(SaveMode.Append)
.partitionBy("key").text(MY_PATH)
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
} else {
//handle empty RDD
}
}
My expectation is that this code generates the following output (example of ls -l
command):
> MY_PATH/key=1
> MY_PATH/key=1/file1.txt
> MY_PATH/key=1/file2.txt
> MY_PATH/key=1/file3.txt
> MY_PATH/key=2
> MY_PATH/key=2/file1.txt
> MY_PATH/key=2/file2.txt
> MY_PATH/key=2/file3.txt
and in each text file there will be entries from the DataFrame, line by line.
In fact, this is actually happening. The only problem is that initialValue
always appears as first line in each file even though I initalValue=""
, thus I always get extra empty line in each file.
This extra empty line is a huge problem for me and I have to avoid it. One of the options is to use groupByKey
instead of aggregateByKey
, but groupByKey
will cause more shuffling in the cluster and I would like to avoid it.
Please advise how to prevent the extra empty line in each written file.