6

Running PySpark through IPython notebook on EMR (Hadoop 2.4.0) with Spark (1.4.0) in YARN mode using:

IPYTHON_OPTS="notebook --no-browser" nohup /usr/lib/spark/bin/pyspark --master yarn-client --num-executors 2 --executor-memory 512m --executor-cores 1 > /mnt/var/log/python_notebook.log 2> /mnt/var/log/python_notebook_err.log &

Have placed a simple CSV file in HDFS, and trying to read it in using

sc.textFile('/tmp/text.csv').first()

However, this gives me Caused by: java.lang.ClassNotFoundException: Class com.hadoop.compression.lzo.LzoCodec not found.

In context:

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-54-e39168c6841b> in <module>()
----> 1 sc.textFile('/tmp/text.csv').first()

/usr/lib/spark/python/pyspark/rdd.py in first(self)
   1293         ValueError: RDD is empty
   1294         """
-> 1295         rs = self.take(1)
   1296         if rs:
   1297             return rs[0]

/usr/lib/spark/python/pyspark/rdd.py in take(self, num)
   1245         """
   1246         items = []
-> 1247         totalParts = self.getNumPartitions()
   1248         partsScanned = 0
   1249 

/usr/lib/spark/python/pyspark/rdd.py in getNumPartitions(self)
    353         2
    354         """
--> 355         return self._jrdd.partitions().size()
    356 
    357     def filter(self, f):

/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling o159.partitions.
: java.lang.RuntimeException: Error in configuring object
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)
    at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
    at org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:190)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:65)
    at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:47)
    at sun.reflect.GeneratedMethodAccessor30.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:207)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.GeneratedMethodAccessor31.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
    ... 25 more
Caused by: java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.lzo.LzoCodec not found.
    at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:135)
    at org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:175)
    at org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45)
    ... 29 more
Caused by: java.lang.ClassNotFoundException: Class com.hadoop.compression.lzo.LzoCodec not found
    at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1980)
    at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:128)
    ... 31 more

I have tried to follow the instructions here and done:

os.environ['SPARK_LIBRARY_PATH'] = "/usr/lib/hadoop-lzo/lib/native/"
os.environ['SPARK_CLASSPATH'] = "/usr/lib/hadoop-lzo/lib/"

However, that does not seem to help.

Community
  • 1
  • 1
tchakravarty
  • 10,736
  • 12
  • 72
  • 116
  • Have you tried [this link](https://github.com/awslabs/emr-bootstrap-actions/blob/master/spark/examples/reading-lzo-files.md)? It looks like there's an issue reading LZO using the `textFile` method. – santon Aug 27 '15 at 03:58
  • @santon Well, the file that I am trying to read is not an LZO file and it is in local HDFS, so I am not sure why it is being interpreted as an LZO file. I have seen that link, but it seems overkill to invoke the a Hadoop API call to read a text file. – tchakravarty Aug 27 '15 at 06:59
  • Got it. Sorry I misread the question. Maybe check your `core-site.xml` file to see if LZO is specified as the default compression codec? – santon Aug 27 '15 at 19:09
  • Did you ever resolve this? If so, can you post a solution? – dudemonkey Aug 16 '16 at 17:57
  • @dudemonkey We got someone from our tech team to help us with this. I don't know how they fixed it, but they gave us a working setup. Sorry, can't help more than that. – tchakravarty Aug 17 '16 at 04:31
  • The solution is to remove com.hadoop.compression.lzo.LzoCodec from core-site.xml. io.compression.codec.lzo.class com.hadoop.compression.lzo.LzoCodec and from property io.compression.codecs too – Neethu Lalitha Apr 01 '22 at 19:47

1 Answers1

7

I know this question is old but I was dealing with this for the past week so I figured I would post our solution incase others come across this. The setup we have is one EC2 instance running as the driver outside of EMR which then can create EMR clusters and communicate with the master. The cluster is running Spark 2.2.0 and the EMR release is 5.9.0.

The solution was to clone the Twitter Hadoop-Lzo Github repo on the Spark driver and then add the path to the hadoop-lzo.jar to spark submit args. SUBMIT_ARGS='--jars /opt/hadoop-lzo/target/hadoop-lzo-0.4.21-SNAPSHOT.jar. And just replace the path to the .jar with the one wherever you cloned the repo to.

rdeg
  • 169
  • 3
  • 15
  • but if I clone the repo I don not have any jar, I just have the source code. Do I have to build the jar also myself first on the EMR driver? – Alex Ortner Nov 26 '19 at 13:09
  • Good solution, or add lzo to dependencies and build assembly with own's code. Thank you! – Dequn Apr 26 '20 at 03:39