A high level overview of my goal: I need to find the file(s) (they are in JSON format) that contain a particular ID. Basically need to return a DF (or a list) of the ID and the file name that contains it.
// Read in the data from s3
val dfLogs = spark.read.json("s3://some/path/to/data")
.withColumn("fileSourceName", input_file_name())
// Filter for the ID and select then id and fileSourceName
val results = dfLogs.filter($"id" === "some-unique-id")
.select($"id", $"fileSourceName")
// Return the results
results.show(false)
Sounds simple enough, right? However, the challenge I'm facing is that the S3 directory I'm reading from contains millions (approximately 5+ million) files which average in size of 10kb. Small file problem! To do this I've been spinning up a 5 Node cluster (m4.xlarge) on EMR and using Zeppelin to interactively run the above code.
However, I keep getting thrown the following error when running the first spark statement (read):
org.apache.thrift.transport.TTransportException at
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
I'm having a hard time finding out more about the above error but I suspect it has to do with the requests being made from my spark job to s3.
Does anyone have any suggestions on how to handle so many small files? Should I do a s3-dist-cp from S3 -> HDFS on the EMR cluster and then run the query above but read from HDFS? Or some other option? This is a one time activity...is it worth creating a super large cluster? Would that improve the performance or solve my error? I've thought about trying to group the files together into bigger ones...but I need the unique files in which contain the ID.
I would love to change the way in which these files are being aggregated in S3...but there is nothing I can do about it.
Note: I've seen a few posts around here but they're quite old. Another link, but this I do not think pertains to my situation