-1

I have the files collection specified with comma separator, like:

hdfs://user/cloudera/date=2018-01-15,hdfs://user/cloudera/date=2018-01-16,hdfs://user/cloudera/date=2018-01-17,hdfs://user/cloudera/date=2018-01-18,hdfs://user/cloudera/date=2018-01-19,hdfs://user/cloudera/date=2018-01-20,hdfs://user/cloudera/date=2018-01-21,hdfs://user/cloudera/date=2018-01-22

and I'm loading the files with Apache Spark, all in once with:

val input = sc.textFile(files)

Also, I have additional information associated with each file - the unique ID, for example:

File                                     ID
--------------------------------------------------
hdfs://user/cloudera/date=2018-01-15  | 12345
hdfs://user/cloudera/date=2018-01-16  | 09245
hdfs://user/cloudera/date=2018-01-17  | 345hqw4
and so on

As the output, I need to receive the DataFrame with the rows, where each row will contain the same ID, as the ID of the file from which this line was read.

Is it possible to pass this information in some way to Spark in order to be able to associate with the lines?

alexanoid
  • 24,051
  • 54
  • 210
  • 410

2 Answers2

2

Core sql approach with UDF (the same thing you can achieve with join if you represent File -> ID mapping as Dataframe):

import org.apache.spark.sql.functions

val inputDf = sparkSession.read.text(".../src/test/resources/test")
    .withColumn("fileName", functions.input_file_name())

def withId(mapping: Map[String, String]) = functions.udf(
  (file: String) => mapping.get(file)
)

val mapping = Map(
  "file:///.../src/test/resources/test/test1.txt" -> "id1",
  "file:///.../src/test/resources/test/test2.txt" -> "id2"
)

val resutlDf = inputDf.withColumn("id", withId(mapping)(inputDf("fileName")))
resutlDf.show(false)

Result:

+-----+---------------------------------------------+---+
|value|fileName                                     |id |
+-----+---------------------------------------------+---+
|row1 |file:///.../src/test/resources/test/test1.txt|id1|
|row11|file:///.../src/test/resources/test/test1.txt|id1|
|row2 |file:///.../src/test/resources/test/test2.txt|id2|
|row22|file:///.../src/test/resources/test/test2.txt|id2|
+-----+---------------------------------------------+---+

text1.txt:

row1
row11

text2.txt:

row2
row22
morsik
  • 1,250
  • 14
  • 17
1

This could help (not tested)

// read single text file into DataFrame and add 'id' column
def readOneFile(filePath: String, fileId: String)(implicit spark: SparkSession): DataFrame = {
  val dfOriginal: DataFrame = spark.read.text(filePath)
  val dfWithIdColumn: DataFrame = dfOriginal.withColumn("id", lit(fileId))

  dfWithIdColumn
}

// read all text files into DataFrame
def readAllFiles(filePathIdsSeq: Seq[(String, String)])(implicit spark: SparkSession): DataFrame = {
  // create empty DataFrame with expected schema
  val emptyDfSchema: StructType = StructType(List(
    StructField("value", StringType, false),
    StructField("id", StringType, false)
  ))
  val emptyDf: DataFrame = spark.createDataFrame(
    rowRDD = spark.sparkContext.emptyRDD[Row],
    schema = emptyDfSchema
  )

  val unionDf: DataFrame = filePathIdsSeq.foldLeft(emptyDf) { (intermediateDf: DataFrame, filePathIdTuple: (String, String)) =>
    intermediateDf.union(readOneFile(filePathIdTuple._1, filePathIdTuple._2))
  }
  unionDf
}

References

y2k-shubham
  • 10,183
  • 11
  • 55
  • 131