4

I have a partitioned dataframe saved into hdfs. I am supposed to periodically load new data from a kafka topic and update the hdfs data. The data are simple: it's just the number of tweets received during a certain period of time.

So, the partition Jan 18, 10 AM might have the value of 2, and I might receive late data from kafka, consisting of 3 tweets, sent at Jan 18, 10 AM. So, I need to update Jan 18, 10 AM to the value of 2+3=5.

My current solution is bad, because I

  • load everything from the hdfs into RAM
  • delete everything from hdfs
  • read the new dataframe from kafka
  • combine the 2 dataframes
  • write the new, combined dataframe to the hdfs.

(I provided comments in my code for each step.)

The problem is that the dataframe stored on hdfs might be 1 TB, and that's unfeasible.

import com.jayway.jsonpath.JsonPath
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType}

//scalastyle:off
object TopicIngester {
  val mySchema = new StructType(Array(
    StructField("date", StringType, nullable = true),
    StructField("key", StringType, nullable = true),
    StructField("cnt", IntegerType, nullable = true)
  ))

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[*]") // remove this later
      .appName("Ingester")
      .getOrCreate()

    import spark.implicits._
    import org.apache.spark.sql.functions.count

    // read the old one
    val old = spark.read
      .schema(mySchema)
      .format("csv")
      .load("/user/maria_dev/test")

    // remove it
    val fs = FileSystem.get(new Configuration)
    val outPutPath = "/user/maria_dev/test"

    if (fs.exists(new Path(outPutPath))) {
      fs.delete(new Path(outPutPath), true)
    }

    // read the new one
    val _new = spark.read
      .format("kafka")
      .option("kafka.bootstrap.servers", "sandbox-hdp.hortonworks.com:6667")
      .option("subscribe", "test1")
      .option("startingOffsets", "earliest")
      .option("endingOffsets", "latest")
      .load()
      .selectExpr("CAST(value AS String)")
      .as[String]
      .map(value => transformIntoObject(value))
      .map(tweet => (tweet.tweet, tweet.key, tweet.date))
      .toDF("tweet", "key", "date")
      .groupBy("date", "key")
      .agg(count("*").alias("cnt"))


     // combine the old one with the new one and write to hdfs
    _new.union(old)
        .groupBy("date", "key")
        .agg(sum("sum").alias("cnt"))
        .write.partitionBy("date", "key")
        .csv("/user/maria_dev/test")

    spark.stop()
  }

  def transformIntoObject(tweet: String): TweetWithKeys = {
    val date = extractDate(tweet)
    val hashTags = extractHashtags(tweet)

    val tagString = String.join(",", hashTags)

    TweetWithKeys(tweet, date, tagString)
  }

  def extractHashtags(str: String): java.util.List[String] = {
    JsonPath.parse(str).read("$.entities.hashtags[*].text")
  }

  def extractDate(str: String): String = {
    JsonPath.parse(str).read("$.created_at")
  }

  final case class TweetWithKeys(tweet: String, date: String, key: String)

}

How can I only load the necessary partitions and update them more efficiently?

pavel_orekhov
  • 1,657
  • 2
  • 15
  • 37
  • Possible duplicate of [Overwrite specific partitions in spark dataframe write method](https://stackoverflow.com/questions/38487667/overwrite-specific-partitions-in-spark-dataframe-write-method) – eakotelnikov Jan 15 '19 at 16:17
  • @eakotelnikov that talks about overwrite, not update. I want to be able to only load the necessary partitions into memory, without writing too much spaghetti code. – pavel_orekhov Jan 15 '19 at 16:17
  • This doesn't answer your question. Just a couple of alternative solutions: 1. Consider using HBase instead of HDFS. 2. Add a new field e.g. ts with timestamp at the moment of writing the row and just query for the latest data based on 'ts' field when querying data. You'll need to compact your files periodically if you take this route. – facha Jan 15 '19 at 17:00
  • Wgat is wrong with 2+3 except for small files? You cam sqoop merge. You could use KUDU if on Cloudera. – thebluephantom Jan 15 '19 at 18:38
  • I figured it out, I'll just read everything from kafka first, then see what existing partitions will need to be updated, read these partitions as described here: https://stackoverflow.com/questions/34659184/how-to-load-specific-hive-partition-in-dataframe-spark-1-6 and then overwrite them with merged data. – pavel_orekhov Jan 15 '19 at 18:41

0 Answers0