3

A high level overview of my goal: I need to find the file(s) (they are in JSON format) that contain a particular ID. Basically need to return a DF (or a list) of the ID and the file name that contains it.

// Read in the data from s3
val dfLogs = spark.read.json("s3://some/path/to/data")
   .withColumn("fileSourceName", input_file_name())

// Filter for the ID and select then id and fileSourceName
val results = dfLogs.filter($"id" === "some-unique-id")
  .select($"id", $"fileSourceName")

// Return the results
results.show(false)

Sounds simple enough, right? However, the challenge I'm facing is that the S3 directory I'm reading from contains millions (approximately 5+ million) files which average in size of 10kb. Small file problem! To do this I've been spinning up a 5 Node cluster (m4.xlarge) on EMR and using Zeppelin to interactively run the above code.

However, I keep getting thrown the following error when running the first spark statement (read):

org.apache.thrift.transport.TTransportException at  
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)

I'm having a hard time finding out more about the above error but I suspect it has to do with the requests being made from my spark job to s3.

Does anyone have any suggestions on how to handle so many small files? Should I do a s3-dist-cp from S3 -> HDFS on the EMR cluster and then run the query above but read from HDFS? Or some other option? This is a one time activity...is it worth creating a super large cluster? Would that improve the performance or solve my error? I've thought about trying to group the files together into bigger ones...but I need the unique files in which contain the ID.

I would love to change the way in which these files are being aggregated in S3...but there is nothing I can do about it.

Note: I've seen a few posts around here but they're quite old. Another link, but this I do not think pertains to my situation

fletchr
  • 646
  • 2
  • 8
  • 25
  • How did you go about this? In a similar situation. – activelearner Aug 15 '20 at 00:52
  • Could you solve it? I am having the same issue. Currently usign s3-dist-cp on some amount of jsons, and then reading in spark. But it is really inefficient. Is there a better way? – Camilo Velasquez Jan 26 '21 at 04:38
  • @fletchr Did this get solved? – Buzz Moschetti Feb 26 '21 at 15:03
  • I had exactly the same issue. You cannot change files which are on s3 (aggregation wise). You can download, merge (concat) files and upload again, that is the only solution (if you want to read s3). The bottom point is that, you need to upload these files to s3 at specific moment anyway (either from your localhost, or from ec2 instance). Hence, adding intermediate step to merge these files to larger files should not be an issue. Eg. Using ec2 instance with 12 cores, I am able to merge 35k files (65GB aprox) in 18 mins - to 350 files which are larger. This is the best approach. – cool Apr 26 '21 at 04:38

0 Answers0