13

I am trying to read the last 4 months of data from s3 using pyspark and process the data but am receiving the following exception.

org.apache.hadoop.mapred.InvalidInputException: Input Pattern s3://path_to_clickstream/date=201508*

On the first day of each month due to there not being an entry in the s3 path (a separate job processes and uploads data onto the s3 path and my job runs before that one), the job fails. I was wondering if there was a way for me to catch this exception and allow the job to continue processing all the paths that exist?

zero323
  • 322,348
  • 103
  • 959
  • 935
anonuser0428
  • 11,789
  • 22
  • 63
  • 86

1 Answers1

22

You can simply try to trigger a cheap action just after the load and catch Py4JJavaError:

from py4j.protocol import Py4JJavaError

def try_load(path):
    rdd = sc.textFile(path)
    try:
        rdd.first()
        return rdd
    except Py4JJavaError as e:
        return sc.emptyRDD()

rdd = try_load(s3_path)
if not rdd.isEmpty():
    run_the_rest_of_your_code(rdd)

Edit:

If you want to handle multiple paths you can process each one separately and combine the results:

paths = [
    "s3://path_to_inputdir/month1*/",
    "s3://path_to_inputdir/month2*/",
    "s3://path_to_inpu‌​tdir/month3*/"]

rdds = sc.union([try_load(path) for path in paths])

If you want a better control you can list content and load known files.

If at least one of theses paths is non-empty you should be able to make things even simpler and use glob like this:

sc.textFile("s3://path_to_inputdir/month[1-3]*/")
Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • I am actually trying to do something like `"s3://path_to_inputdir/month1*/,s3://path_to_inputdir/month2*/,s3://path_to_inputdir/month3*/"` so in this case if `s3://path_to_inputdir/month3*/` throws an exception how should I handle that? Basically I am passing multiple s3 paths into the sc.textFile( ) command in which one path is throwing an exception? – anonuser0428 Aug 01 '15 at 18:28
  • Sure. I've made one more edit to make union even simpler. – zero323 Aug 01 '15 at 19:52
  • trigger a cheap action to catch the exception - helped me another similar scenario – jdprasad Nov 30 '15 at 08:25
  • @anonuser0428 Would consider accepting the answer? Thanks. – zero323 Jun 23 '16 at 00:21
  • I am getting import error on `from py4j.java_gateway import Py4JJavaError`. Path is correct and such. When I unzip this file (`/usr/local/Cellar/apache-spark/1.6.2/python/lib/py4j-0.9-src.zip`) and inspect `java_gateway.py`, there is no `Py4JJavaError`. Any idea on what I am doing wrong? – Gopala Jul 21 '16 at 15:28
  • @Gopala in 0.9 it is not longer imported in gateway. – zero323 Jul 21 '16 at 15:56