1

I'm trying to create a structured streaming pipeline that will read N kafka topics, do some payload validation, explode the payload and write to:

  • n kafka topics
  • Amazon s3

I've followed this article to generate the pipeline. The shape of my piepline can either be:

 Subscription
      |
---process---
|  |  |  |  |

N outputs

or

N Subscriptions
 |  |  |  |  |
  N Processes
 |  |  |  |  |
   N outputs

This is the code I'm using:

spark = SparkSession \
    .builder \
    .appName(f"ingest") \
    .master("local[*]") \
    .getOrCreate()

def start_job(spark, topic):
   # pipeline logic here

for topic in pipeline_config.list_topics():
    thread = threading.Thread(target=start_job, args=(spark, topic))
    thread.start()

spark.streams.awaitAnyTermination()

Whenever I run this I get java.util.ConcurrentModificationException: Another instance of this query was just started by a concurrent session.:

Exception in thread Thread-2 (start_job):
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/threading.py", line 1009, in _bootstrap_inner
Exception in thread Thread-3 (start_job):
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/threading.py", line 1009, in _bootstrap_inner
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/threading.py", line 946, in run
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/threading.py", line 946, in run
    self._target(*self._args, **self._kwargs)    self._target(*self._args, **self._kwargs)
  File "/Users/USER/git/schema-tools/pipeline2.py", line 164, in start_job

  File "/Users/USER/git/schema-tools/pipeline2.py", line 164, in start_job
        parsed_with_metadata \parsed_with_metadata \
  File "/Users/USER/venvs/schema-tools-310/lib/python3.10/site-packages/pyspark/sql/streaming.py", line 1491, in start

  File "/Users/USER/venvs/schema-tools-310/lib/python3.10/site-packages/pyspark/sql/streaming.py", line 1491, in start
    return self._sq(self._jwrite.start())
  File "/Users/USER/venvs/schema-tools-310/lib/python3.10/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return self._sq(self._jwrite.start())
  File "/Users/USER/venvs/schema-tools-310/lib/python3.10/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
    return_value = get_return_value(  File "/Users/USER/venvs/schema-tools-310/lib/python3.10/site-packages/pyspark/sql/utils.py", line 111, in deco

  File "/Users/USER/venvs/schema-tools-310/lib/python3.10/site-packages/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
    return f(*a, **kw)
  File "/Users/USER/venvs/schema-tools-310/lib/python3.10/site-packages/py4j/protocol.py", line 326, in get_return_value
  File "/Users/USER/venvs/schema-tools-310/lib/python3.10/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o314.start.
: java.util.ConcurrentModificationException: Another instance of this query was just started by a concurrent session.
    at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:411)
    at org.apache.spark.sql.streaming.DataStreamWriter.startQuery(DataStreamWriter.scala:466)
    at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:456)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:301)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
: An error occurred while calling o312.start.
: java.util.ConcurrentModificationException: Another instance of this query was just started by a concurrent session.

Is this really not possible?

Running on:

  • py3.10
  • spark 3.1.2
  • packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.2, org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2, org.apache.commons:commons-pool2:2.11.1
  • Mac M1 pro

PS: I can do something similar using foreach batch but I really don't like the approach.

Alberto C
  • 783
  • 8
  • 13
  • I don't know if this will help, but try putting the jobs in different scheduler pools, spark.sparkContext.setLocalProperty("spark.scheduler.pool", ...) before you start each query. – Carter Shanklin Apr 23 '22 at 03:58
  • Yeah, trued that, not helpful. Turns out is something to do with spark under Python 3.10 on arm64 machines. I have changed the code and run it under x86 python 3.8 and it works. I can even fdo this: ``` for topic in pipeline_config.list_topics(): exploded_data = assemble_pipeline(topics_and_data, topic) sink_to_s3(exploded_data, topic) sink_to_kafka(exploded_data, topic) spark.streams.awaitAnyTermination() ``` Which seems the ocrrect way to start multiple jobs – Alberto C Apr 26 '22 at 09:06
  • @AlbertoC: is the streaming query creation and start the streaming query part of `start_job` function? – Shankar May 09 '22 at 14:46
  • @Shankar yes, so I got this to work on an Intel version of Python 3.8. So the problem seems to be related to either M1 machines or python 3.10 – Alberto C May 13 '22 at 10:15

0 Answers0