0

I have a Spark streaming application written in Scala running in CDH. The application reads data from Kafka and writes the data to HDFS. Before writing data to HDFS, I execute partitionBy, so the data is written partitioned. this is the code:

//Some code
val ssc = new StreamingContext(sc, Seconds(1))
val stream = KafkaUtils.createDirectStream[String, String](
             ssc,
             PreferConsistent,
             Subscribe[String,String](topics, kafkaParams))
val sparkExecutorsCount = sc.getConf.getInt("spark.executor.instances", 1)
//Some code
stream.foreachRDD { rdd =>
    if(!rdd.isEmpty()) {
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        val data = rdd.map(kafkaData => (getKey(kafkaData.value()), kafkaData.value()))
        val columns = Array("key", "value")
        val addOp = (record1: String, record2:String) => record1 + "\n" + record2
        val mergeOp = (record1: String, record2:String) => record1 + record2
        val initialValue = ""
        val out = data.aggregateByKey(initialValue)(addOp, mergeOp)
        out.toDF(columns: _*).coalesce(sparkExecutorsCount)
            .write.mode(SaveMode.Append)
            .partitionBy("key").text(MY_PATH)

       stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    } else {
        //handle empty RDD
    }
}

My expectation is that this code generates the following output (example of ls -l command):

> MY_PATH/key=1
> MY_PATH/key=1/file1.txt
> MY_PATH/key=1/file2.txt
> MY_PATH/key=1/file3.txt
> MY_PATH/key=2
> MY_PATH/key=2/file1.txt
> MY_PATH/key=2/file2.txt
> MY_PATH/key=2/file3.txt

and in each text file there will be entries from the DataFrame, line by line.

In fact, this is actually happening. The only problem is that initialValue always appears as first line in each file even though I initalValue="", thus I always get extra empty line in each file.

This extra empty line is a huge problem for me and I have to avoid it. One of the options is to use groupByKey instead of aggregateByKey, but groupByKey will cause more shuffling in the cluster and I would like to avoid it.

Please advise how to prevent the extra empty line in each written file.

sparker
  • 33
  • 1
  • 1
  • 5

1 Answers1

0

TL; DR Just use groupByKey followed by mapValues(_.mkString("\n")).

Two things:

  • The initialValue can be added an arbitrary (in practice #partitions) number of times. This means that every partition will start with an empty string followed by newline sign. You check if record1 or record2 are empty for addOp and mergeOp and skip \n if it is.

  • Additionally your statement:

    but groupByKey will cause more shuffling in the cluster and I would like to avoid it.

    is not really accurate. The code you have doesn't significantly (if at all) reduce amount of data. Depending on the the key, it can actually increase it.

    See for example:

  • Actually, I tried the option of groupByKey. In same cases it causes very significant delays because of shuffling. In my case I have 1000 different values for `key` and only 20 executors. So, amount of shuffling is much higher than with aggregateByKey – sparker Oct 17 '18 at 11:27
  • Accepted this answer for this hint: "You check if record1 or record2 are empty for addOp and mergeOp and skip \n if it is.". The required validation was in addOp. Thank you. – sparker Oct 29 '18 at 07:28