11

I am using the following code to read some json data from S3:

df = spark_sql_context.read.json("s3a://test_bucket/test.json")
df.show()

The above code throws the following exception:

py4j.protocol.Py4JJavaError: An error occurred while calling o64.json.
: java.lang.NumberFormatException: For input string: "100M"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Long.parseLong(Long.java:589)
    at java.lang.Long.parseLong(Long.java:631)
    at org.apache.hadoop.conf.Configuration.getLong(Configuration.java:1538)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:248)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3303)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:547)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:355)
    at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
    at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:391)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

I have read several other SO posts on this topic (like this one or this) and have done all they have mentioned but nothing seems to fix my issue.

I am using spark-2.4.4-bin-without-hadoop and hadoop-3.1.2. As for the jar files, I've got:

  • aws-java-sdk-bundle-1.11.199.jar
  • hadoop-aws-3.0.0.jar
  • hadoop-common-3.0.0.jar

Also, using the following spark-submit command to run the code:

/opt/spark-2.4.4-bin-without-hadoop/bin/spark-submit 
--conf spark.app.name=read_json --master yarn --deploy-mode client --num-executors 2 
--executor-cores 2 --executor-memory 2G --driver-cores 2 --driver-memory 1G 
--jars /home/my_project/jars/aws-java-sdk-bundle-1.11.199.jar,
/home/my_project/jars/hadoop-aws-3.0.0.jar,/home/my_project/jars/hadoop-common-3.0.0.jar 
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.rpc.askTimeout=600s" /home/my_project/read_json.py

Anything I might be missing here?

ahajib
  • 12,838
  • 29
  • 79
  • 120
  • 2
    The error seems pretty self-explanatory. `100M` is a string, not a number. – Andrew Feb 11 '20 at 16:08
  • 2
    @Andrew Well this is a know issue: https://issues.apache.org/jira/browse/HADOOP-13680 and should have been fixed in the hadoop version that I'm using which is why I'm asking this question in first place – ahajib Feb 11 '20 at 18:23

2 Answers2

1

From the stack trace the error is thrown when it's trying to read one of the configuration options, so the issue is with one of the default configuration options that now require numeric format.

In my case the error was resolved after I added the following configuration parameter to the spark-submit command:

--conf fs.s3a.multipart.size=104857600

See Tuning S3A Uploads.

dk-na
  • 121
  • 1
  • 4
  • 1
    This occurs when it's parsing a .xml file (core-default.xml to be more exact) and has nothing to do with the configurations. I tried your solution anyways and that did not solve the issue. – ahajib Feb 11 '20 at 18:24
  • 2
    @ahajib `core-default.xml` is a hadoop configuration file that provides default values. One of the parameters contains '100M` which is expected to be numeric in the version of S3AFileSystem class you are loading. Either make it numeric (override with --conf parameter) or load version that matches your EMR cluster. – dk-na Feb 11 '20 at 19:19
  • Setting a Hadoop property via `spark-defaults.conf` (or `--conf` on the command line) usually requires a `spark.hadoop.` prefix, otherwise the prop is not pushed to the Hadoop libs (and is usually ignored by Spark itself). You should try `--conf spark.hadoop.fs.s3a.multipart.size=104857600` – Samson Scharfrichter Feb 16 '20 at 15:30
0

I am posting what I ended up doing to fix the issue for anyone who might see the same exception:

I added hadoop-aws to HADOOP_OPTIONAL_TOOLS in hadoop-env.sh. I also removed all configurations in spark for s3a except the access/secret and everything worked. My code before the changes:

# Setup the Spark Process
conf = SparkConf() \
       .setAppName(app_name) \
       .set("spark.hadoop.mapred.output.compress", "true") \
       .set("spark.hadoop.mapred.output.compression.codec", "true") \
       .set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec") \
       .set("spark.hadoop.mapred.output.compression.`type", "BLOCK") \
       .set("spark.speculation", "false")\
       .set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider")\
       .set("com.amazonaws.services.s3.enableV4", "true")

# Some other configs

spark_context._jsc.hadoopConfiguration().set(
            "fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"
)

spark_context._jsc.hadoopConfiguration().set(
            "fs.s3a.access.key", s3_key
)

spark_context._jsc.hadoopConfiguration().set(
            "fs.s3a.secret.key", s3_secret
)

spark_context._jsc.hadoopConfiguration().set(
            "fs.s3a.multipart.size", "104857600"
)

And after:

# Setup the Spark Process
conf = SparkConf() \
       .setAppName(app_name) \
       .set("spark.hadoop.mapred.output.compress", "true") \
       .set("spark.hadoop.mapred.output.compression.codec", "true") \
       .set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec") \
       .set("spark.hadoop.mapred.output.compression.`type", "BLOCK") \
       .set("spark.speculation", "false")

# Some other configs

spark_context._jsc.hadoopConfiguration().set(
            "fs.s3a.access.key", s3_key
)

spark_context._jsc.hadoopConfiguration().set(
            "fs.s3a.secret.key", s3_secret
)

That probably means that it was a class path issue. The hadoop-aws wasn't getting added to the class path and so under the covers it was defaulting to some other implementation of S3AFileSystem.java. Hadoop and spark are a huge pain in this area because there are so many different places and ways to load things and java is particular about the order as well because if it doesn't happen in the right order, it will just go with whatever was loaded last. Hope this helps others facing the same issue.

ahajib
  • 12,838
  • 29
  • 79
  • 120
  • 1
    Hadoop.mapred properties are deprecated, by the way, and you can put those in the xml files rather than code – OneCricketeer Feb 13 '20 at 02:34
  • @cricket_007 thanks for pointing that out. I'll make sure to fix that as well. – ahajib Feb 13 '20 at 03:25
  • `SparkConf().set("spark.hadoop.fs.s3a.multipart.size", "104857600")` should do the trick since the `spark.hadoop.` prefix means _"push that property to the Hadoop conf automatically (and override the generic value set statically in XML files)"_ – Samson Scharfrichter Feb 16 '20 at 15:35