4

I have a requirement where I want to write each individual records in an RDD to an individual file in HDFS.

I did it for the normal filesystem but obviously, it doesn't work for HDFS.

stream.foreachRDD{ rdd =>
    if(!rdd.isEmpty()) {
        rdd.foreach{
          msg =>
            val value = msg._2
            println(value)
            val fname = java.util.UUID.randomUUID.toString
            val path = dir + fname
            write(path, value)
        }
      }
    }

where write is a function which writes to the filesystem.

Is there a way to do it within spark so that for each record I can natively write to the HDFS, without using any other tool like Kafka Connect or Flume??


EDIT: More Explanation

For eg: If my DstreamRDD has the following records,

  • abcd
  • efgh
  • ijkl
  • mnop

I need different files for each record, so different file for "abcd", different for "efgh" and so on.

I tried creating an RDD within the streamRDD but I learnt it's not allowed as the RDD's are not serializable.

Biplob Biswas
  • 1,761
  • 19
  • 33
  • can you please post the working solution or accept the correct one. It helps other people who have similar issue. – Explorer May 13 '17 at 13:58
  • @LiveAndLetLive I didn't find a solution to this problem yet, and as I mentioned in one of the previous comment, we moved from storing record to storing the entire RDD with multiple record. So, this question is still open. – Biplob Biswas May 15 '17 at 14:47
  • you may use your own MultipleTextOutputFormat, see this reply: https://stackoverflow.com/a/26051042/609597 – softwarevamp Aug 18 '17 at 08:20

2 Answers2

0

You can forcefully repartition the rdd to no. of partitions as many no. of records and then save

val rddCount = rdd.count()
rdd.repartition(rddCount).saveAsTextFile("your/hdfs/loc")
Shasankar
  • 672
  • 6
  • 16
-1

You can do in couple of ways..

From rdd, you can get the sparkCOntext, once you got the sparkCOntext, you can use parallelize method and pass the String as List of String.

For example:

val sc = rdd.sparkContext
sc.parallelize(Seq("some string")).saveAsTextFile(path)

Also, you can use sqlContext to convert the string to DF then write in the file.

for Example:

import sqlContext.implicits._
Seq(("some string")).toDF
Shankar
  • 8,529
  • 26
  • 90
  • 159