8

Kinesis firehose manages the persistence of files, in this case time series JSON, into a folder hierarchy that is partitioned by YYYY/MM/DD/HH (down to the hour in 24 numbering)...great.

How using Spark 2.0 then can I read these nested sub folders and create a static Dataframe from all the leaf json files? Is there an 'option' to the dataframe reader?

My next goal is for this to be a streaming DF, where new files persisted by Firehose into s3 naturally become part of the streaming dataframe using the new structured streaming in Spark 2.0. I know this is all experimental - hoping someone has used S3 as a streaming file source before, where the data is paritioned into folders as described above. Of course would prefer straight of a Kinesis stream but there is no date on this connector for 2.0 so Firehose->S3 is the interim.

ND: I am using databricks, which mounts S3 into DBFS, but could easily be EMR of course or other Spark providers. Be great to see a notebook too if one is shareable that gives an example.

Cheers!

zero323
  • 322,348
  • 103
  • 959
  • 935
Kurt Maile
  • 1,171
  • 3
  • 13
  • 29

2 Answers2

6

Can I read nested subfolders and create a static DataFrame from all the leaf JSON files? Is there an option to the DataFrame reader?

Yes, as your directory structure is regular(YYYY/MM/DD/HH), you can give the path till leaf node with wildcard chars like below

val spark: SparkSession = SparkSession.builder.master("local").getOrCreate

val jsonDf = spark.read.format("json").json("base/path/*/*/*/*/*.json")
// Here */*/*/*/*.json maps to YYYY/MM/DD/HH/filename.json 

Of course, would prefer straight of a Kinesis stream but there is no date on this connector for 2.0 so Firehose->S3 is the interim.

I could see there is a library for Kinesis integration with Spark Streaming. So, you can read the streaming data directly and perform SQL operations on it without reading from S3.

groupId = org.apache.spark
artifactId = spark-streaming-kinesis-asl_2.11
version = 2.0.0

Sample code with Spark Streaming and SQL

import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.kinesis._
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

val kinesisStream = KinesisUtils.createStream(
 streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
 [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)

kinesisStream.foreachRDD { rdd =>

  // Get the singleton instance of SparkSession
  val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
  import spark.implicits._

  // Convert RDD[String] to DataFrame
  val jsonDf = rdd.toDF() // or rdd.toDF("specify schema/columns here")

  // Create a temporary view with DataFrame
  jsonDf.createOrReplaceTempView("json_data_tbl")

  //As we have DataFrame and SparkSession object we can perform most 
  //of the Spark SQL stuff here
}
mrsrinivas
  • 34,112
  • 13
  • 125
  • 125
  • 1
    The nested subfolders answer here would be dreadfully slow. The way that it reads is to do a list on each subfolder recursively, which is about as bad as it gets for performance. – Jayson Minard Dec 19 '16 at 09:45
  • yeah, that is if you for static s3 read. can you give a try with 2nd approach (process `Kinesis` stream directly) ? – mrsrinivas Dec 19 '16 at 10:46
  • 1
    I'm not the OP, I don't yet have the streaming need, I just wanted to see this more clearly documented in Stack Overflow. I'm using the static method you documented, but changing it to scan using better S3 calls that then create either a list of files to be processed in parallel (best) or the sequence of files to pass in. – Jayson Minard Dec 19 '16 at 11:30
  • for the s3 recursive file reading, see http://stackoverflow.com/questions/41062705/reading-multiple-files-from-s3-in-parallel-spark-java/41078601#41078601 – Jayson Minard Dec 19 '16 at 11:55
  • 1
    Maybe this is better: https://forums.databricks.com/questions/480/how-do-i-ingest-a-large-number-of-files-from-s3-my.html – Jayson Minard Dec 19 '16 at 11:58
  • @JaysonMinard: The documentation is available but it assumes comfort with Scala syntax. I [refer to the code](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala) when I need more insight than the documentation describes. – Myles Baker Dec 19 '16 at 14:46
  • What if I wanted to get the YYYY, MM, DD as columns in my resulting data frame? E.g. can Spark understand the partitioning? – HansHarhoff Nov 17 '17 at 17:58
  • Spark SQL has property to read partition columns, it will be enabled by default. – mrsrinivas Nov 18 '17 at 02:24
3

Full disclosure: I work for Databricks but I do not represent them on Stack Overflow.

How using Spark 2.0 then can I read these nested sub folders and create a static Dataframe from all the leaf json files? Is there an 'option' to the dataframe reader?

DataFrameReader supports loading a sequence. See the documentation for def json(paths: String*): DataFrame. You can specify the sequence, use a globbing pattern or build it programmatically (recommended):

val inputPathSeq = Seq[String]("/mnt/myles/structured-streaming/2016/12/18/02", "/mnt/myles/structured-streaming/2016/12/18/03")
val inputPathGlob = "/mnt/myles/structured-streaming/2016/12/18/*"
val basePath = "/mnt/myles/structured-streaming/2016/12/18/0"
val inputPathList = (2 to 4).toList.map(basePath+_+"/*.json")

I know this is all experimental - hoping someone has used S3 as a streaming file source before, where the data is partitioned into folders as described above. Of course would prefer straight of a Kinesis stream but there is no date on this connector for 2.0 so Firehose->S3 is the interim.

Since you're using DBFS, I'm going to assume the S3 buckets where data are streaming from Firehose are already mounted to DBFS. Check out Databricks documentation if you need help mounting your S3 bucket to DBFS. Once you have your input path described above, you can simply load the files into a static or streaming dataframe:

Static

val staticInputDF = 
  spark
    .read
    .schema(jsonSchema)
    .json(inputPathSeq : _*)

staticInputDF.isStreaming
res: Boolean = false

Streaming

val streamingInputDF = 
  spark
    .readStream                       // `readStream` instead of `read` for creating streaming DataFrame
    .schema(jsonSchema)               // Set the schema of the JSON data
    .option("maxFilesPerTrigger", 1)  // Treat a sequence of files as a stream by picking one file at a time
    .json(inputPathSeq : _*)

streamingCountsDF.isStreaming
res: Boolean = true

Most of this is taken straight from Databricks documentation on Structured Streaming. There is even a notebook example you can import into Databricks directly.

Myles Baker
  • 3,600
  • 2
  • 19
  • 25
  • If you use S3 mounted via DBFS is it faster with nested globs than using them with `s3a` file system? – Jayson Minard Dec 19 '16 at 10:00
  • I guess using the glob wouldn't necessarily maintain order, but using the file sequence would (for the streaming case, that seems important), yes? – Jayson Minard Dec 19 '16 at 10:01
  • Lastly, the stream wouldn't continue past the original list of sequence files, so having it pick up new files as they arrive and continue the streaming doesn't really work here. – Jayson Minard Dec 19 '16 at 10:02
  • Working around the slow glob listing of S3 buckets, read: https://forums.databricks.com/questions/480/how-do-i-ingest-a-large-number-of-files-from-s3-my.html and http://stackoverflow.com/questions/41062705/reading-multiple-files-from-s3-in-parallel-spark-java/41078601#41078601 – Jayson Minard Dec 19 '16 at 11:59
  • @JaysonMinard: DBFS vs. s3a: DBFS will contain proprietary and Databricks-optimal access of S3. If you're accessing S3 through Databricks, the best experience is with DBFS. – Myles Baker Dec 19 '16 at 14:30
  • @JaysonMinard: Globs do not maintain order. As the blog post from Chris Fregly on the Databricks forum discusses, the DataFrameReader lists a series of directories based on the paths you pass to it. This is less important for streaming. – Myles Baker Dec 19 '16 at 14:35
  • @JaysonMinard: That's right, if data files arrive to locations that are not specified in the list of paths, then you will not receive new data. I've seen people write Spark streaming jobs where they use checkpointing and path-building, filtering on a dataframe for constantly running jobs, and creating weekly or daily streaming jobs when managing multiple or sustained streaming workloads. – Myles Baker Dec 19 '16 at 14:41