3

spark version: 2.3.0

python version: 3.7 and tried 3.4 as well.

while running the below code in spark-submit with the argument as filename:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

sc = SparkContext("local[*]", "KafkaStreamingConsumer")
ssc = StreamingContext(sc, 2)

kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181", "test-consumer-group", {"test": 1})

lines = kafkaStream.map(lambda x: x[1])
lines.pprint()

ssc.start()
ssc.awaitTermination()

the below error is thrown:

2019-06-14 14:23:11 ERROR Executor:91 - Exception in task 0.0 in stage 1.0 (TID 1)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\nat\spark\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 1354, in takeUpToNumLeft
StopIteration

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\nat\spark\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 229, in main
  File "C:\nat\spark\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 224, in process
  File "C:\nat\spark\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
RuntimeError: generator raised StopIteration

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
        at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
        at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
        at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
        at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
        at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2019-06-14 14:23:11 ERROR TaskSetManager:70 - Task 0 in stage 1.0 failed 1 times; aborting job
2019-06-14 14:23:11 INFO  TaskSchedulerImpl:54 - Removed TaskSet 1.0, whose tasks have all completed, from pool
2019-06-14 14:23:11 INFO  TaskSchedulerImpl:54 - Cancelling stage 1

Driver stacktrace:
2019-06-14 14:23:11 INFO  DAGScheduler:54 - Job 1 failed: runJob at PythonRDD.scala:141, took 1.225432 s
2019-06-14 14:23:11 INFO  JobScheduler:54 - Finished job streaming job 1560489790000 ms.0 from job set of time 1560489790000 ms
2019-06-14 14:23:11 ERROR JobScheduler:91 - Error running job streaming job 1560489790000 ms.0
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "C:\nat\spark\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\streaming\util.py", line 65, in call
    r = self.func(t, *rdds)
  File "C:\nat\spark\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\streaming\dstream.py", line 171, in takeAndPrint
    taken = rdd.take(num + 1)
  File "C:\nat\spark\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 1358, in take
    res = self.context.runJob(self, takeUpToNumLeft, p)
  File "C:\nat\spark\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\context.py", line 1001, in runJob
    port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
  File "C:\nat\spark\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\nat\spark\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py", line 320, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\nat\spark\spark-2.3.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 1354, in takeUpToNumLeft
StopIteration

2019-06-14 14:23:11 INFO  StreamingContext:54 - Invoking stop(stopGracefully=false) from shutdown hook
2019-06-14 14:23:11 INFO  ReceiverTracker:54 - Sent stop signal to all 1 receivers
2019-06-14 14:23:11 INFO  ReceiverSupervisorImpl:54 - Received stop signal
2019-06-14 14:23:11 INFO  ReceiverSupervisorImpl:54 - Stopping receiver with message: Stopped by driver:

when i submit the spark job, it is running fine with the stream. the error(stopiteration) getting called when i give some input in producer console.

i assume this is something related to python. same error is thrown when i try to use python3.7 and 3.4.

please help me. thanks.

DennisLi
  • 3,915
  • 6
  • 30
  • 66
natarajan k
  • 406
  • 9
  • 24

1 Answers1

4

I had the same error when using ‍‍‍‍‍‍‍pyspark for consuming some Kafka topics. I found some clues in this helpful answer: https://stackoverflow.com/a/51701040/7781704 which have a solution for fixing StopIteration exceptions.

In my case, the error was thrown because Python 3.7 was not compatible to Spark 2.3.0!
After upgrading Spark to version 2.4.4, it works fine.

Amir Dadkhah
  • 1,074
  • 2
  • 11
  • 17