i've a Structured Streaming code which reads data from a Kafka Topic (on a VM) & writes to another Kafka Topic on GKE (i should be using a Mirror Maker for this, but have not implemented that yet). it suddenly stopped working (been working fine for many months) giving following error :
22/10/18 19:02:35 WARN org.apache.spark.sql.streaming.StreamingQueryManager: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
22/10/18 19:03:42 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (stream2kafka2-w-1.c.versa-sml-googl.internal executor 2): org.apache.kafka.common.errors.TimeoutException: Topic syslog.ueba-us4.v1.versa.demo4 not present in metadata after 60000 ms.
22/10/18 19:03:42 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.1 in stage 0.0 (TID 1) (stream2kafka2-w-1.c.versa-sml-googl.internal executor 2): org.apache.spark.sql.execution.streaming.continuous.ContinuousTaskRetryException: Continuous execution does not support task retry
at org.apache.spark.sql.execution.streaming.continuous.ContinuousDataSourceRDD.compute(ContinuousDataSourceRDD.scala:76)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.sql.execution.streaming.continuous.ContinuousWriteRDD.$anonfun$compute$1(ContinuousWriteRDD.scala:53)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
at org.apache.spark.sql.execution.streaming.continuous.ContinuousWriteRDD.compute(ContinuousWriteRDD.scala:84)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Suppressed: java.lang.NullPointerException
at org.apache.spark.sql.execution.streaming.continuous.ContinuousWriteRDD.$anonfun$compute$7(ContinuousWriteRDD.scala:84)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1495)
... 11 more
Code is very simple, and has been working for many months now :
class ReadFromKafka:
def readAndWrite(self):
df = spark \
.readStream \
.format('kafka') \
.option("kafka.bootstrap.servers", kafkaBrokersSrc) \
.option("subscribe", srcTopic) \
.option("startingOffsets", "latest") \
.option("failOnDataLoss", "false") \
.load()
query = df.selectExpr("CAST(value AS STRING)", "cast(key AS String)") \
.writeStream \
.format("kafka") \
.option("checkpointLocation", checkpoint) \
.option("outputMode", "append") \
.option("truncate", "false") \
.option("kafka.security.protocol", security_protocol) \
.option("kafka.ssl.truststore.location", ssl_truststore_location) \
.option("kafka.ssl.truststore.password", ssl_truststore_password) \
.option("kafka.ssl.keystore.location", ssl_keystore_location) \
.option("kafka.ssl.keystore.password", ssl_keystore_password) \
.option("kafka.bootstrap.servers", kafkaBrokersTgt) \
.option("topic", tgtTopic) \
.option("kafka.ssl.keystore.type", "PKCS12") \
.option("kafka.ssl.truststore.type", "PKCS12") \
.trigger(continuous='5 seconds') \
.start()
query.awaitTermination()
I'm running this on google dataproc
gcloud dataproc jobs submit pyspark /Users/karanalang/PycharmProjects/Kafka/versa-movedata2kafka/StructuredStreaming-readFromKafka-versa-sml-googl-v1.py --cluster stream2kafka --properties spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,spark.dynamicAllocation.enabled=true,spark.shuffle.service.enabled=true --files gs://kafka-certs/versa-kafka-gke-ca.p12,gs://kafka-certs/syslog-vani-noacl.p12 --region us-east1
any ideas on what the issue might be & how to debug this ? tia!
Update : I'm able to read & write in to the Kafka Topic when i use python Kafka Producer/Consumer- but Structured Streaming code is failing
Update : Update :
I'm able to read the topic from GKE using spark-submit (batch & streaming mode), the SSL certs are stored on my local mac from where spark-submit is run So, it seems like Spark is behaving correctly.
However, I tried reading from the kafka topic on GKE using - google cloud submit, and it gives error saying broker is not found (shown below) .. the SSL certs are stored in the storage bucket, and i'm passing the certs as '--files gs://kafka-certs/versa-kafka-gke-ca.p12,gs://kafka-certs/syslog-vani-noacl.p12'
In the pyspark code, i access them using the file names - this has been working earlier, however i suspect - this might be causing the issue. Question - is this the corect way to access the certs when I'm using Dataproc ?
commands :
gcloud dataproc jobs submit pyspark /Users/karanalang/PycharmProjects/Kafka/versa-movedata2kafka/StructuredStream-stream-readfrom-versa-sml-googl-certs-gs.py --cluster stream2kafka2 --properties spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,spark.dynamicAllocation.enabled=true,spark.shuffle.service.enabled=true --files gs://kafka-certs/versa-kafka-gke-ca.p12,gs://kafka-certs/syslog-vani-noacl.p12 --region us-east1
Code :
kafkaBrokersTgt='IP:port'
tgtTopic = "syslog.ueba-us4.v1.versa.demo3"
checkpoint='gs://versa-move2syslogdemo3/'
security_protocol="SSL"
ssl_truststore_location="versa-kafka-gke-ca.p12"
ssl_truststore_password='xxxx'
ssl_keystore_location = 'syslog-vani-noacl.p12'
ssl_keystore_password ='yyyy'
print(" reading from Kafka topic syslog-demo3 on versa-sml-googl, certs on gs storage ")
df_reader = spark.readStream.format('kafka')\
.option("kafka.bootstrap.servers",kafkaBrokersTgt)\
.option("kafka.security.protocol",security_protocol) \
.option("kafka.ssl.truststore.location",ssl_truststore_location) \
.option("kafka.ssl.truststore.password",ssl_truststore_password) \
.option("kafka.ssl.keystore.location", ssl_keystore_location)\
.option("kafka.ssl.keystore.password", ssl_keystore_password)\
.option("subscribe", tgtTopic) \
.option("startingOffsets", "earliest") \
.option("maxOffsetsPerTrigger", 20) \
.option("kafka.max.poll.records", 20) \
.option("kafka.ssl.keystore.type", "PKCS12") \
.option("kafka.ssl.truststore.type", "PKCS12") \
.load()
# .option("kafka.group.id", "ss.consumer1") \
query = df_reader.selectExpr("CAST(value AS STRING)", "cast(key AS String)") \
.writeStream \
.format("console") \
.option("numRows",500)\
.option("outputMode", "complete")\
.option("truncate", "false") \
.trigger(processingTime='3 minutes') \
.option("checkpointLocation", checkpoint) \
.start()
query.awaitTermination()
Error :
reading from Kafka topic syslog-demo3 on versa-sml-googl, certs on gs storage
22/10/20 04:37:48 WARN org.apache.spark.sql.streaming.StreamingQueryManager: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
22/10/20 04:37:50 INFO org.apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [34.138.213.152:9094]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 1
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = SSL
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = syslog-vani-noacl.p12
ssl.keystore.password = [hidden]
ssl.keystore.type = PKCS12
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = versa-kafka-gke-ca.p12
ssl.truststore.password = [hidden]
ssl.truststore.type = PKCS12
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
22/10/20 04:37:50 INFO org.apache.kafka.common.utils.AppInfoParser: Kafka version: 2.6.0
22/10/20 04:37:50 INFO org.apache.kafka.common.utils.AppInfoParser: Kafka commitId: 62abe01bee039651
22/10/20 04:37:50 INFO org.apache.kafka.common.utils.AppInfoParser: Kafka startTimeMs: 1666240670692
22/10/20 04:37:50 INFO org.apache.kafka.clients.consumer.KafkaConsumer: [Consumer clientId=consumer-spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0-1, groupId=spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0] Subscribed to topic(s): syslog.ueba-us4.v1.versa.demo3
22/10/20 04:40:01 WARN org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0-1, groupId=spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0] Connection to node -1 (/34.138.213.152:9094) could not be established. Broker may not be available.
22/10/20 04:40:01 WARN org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0-1, groupId=spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0] Bootstrap broker 34.138.213.152:9094 (id: -1 rack: null) disconnected
22/10/20 04:42:12 WARN org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0-1, groupId=spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0] Connection to node -1 (/34.138.213.152:9094) could not be established. Broker may not be available.
22/10/20 04:42:12 WARN org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0-1, groupId=spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0] Bootstrap broker 34.138.213.152:9094 (id: -1 rack: null) disconnected
22/10/20 04:44:23 WARN org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0-1, groupId=spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0] Connection to node -1 (/34.138.213.152:9094) could not be established. Broker may not be available.
22/10/20 04:44:23 WARN org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0-1, groupId=spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0] Bootstrap broker 34.138.213.152:9094 (id: -1 rack: null) disconnected
22/10/20 04:46:34 WARN org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0-1, groupId=spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0] Connection to node -1 (/34.138.213.152:9094) could not be established. Broker may not be available.
22/10/20 04:46:34 WARN org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0-1, groupId=spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0] Bootstrap broker 34.138.213.152:9094 (id: -1 rack: null) disconnected
22/10/20 04:48:45 WARN org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0-1, groupId=spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0] Connection to node -1 (/34.138.213.152:9094) could not be established. Broker may not be available.
22/10/20 04:48:45 WARN org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0-1, groupId=spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0] Bootstrap broker 34.138.213.152:9094 (id: -1 rack: null) disconnected
22/10/20 04:50:56 WARN org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0-1, groupId=spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0] Connection to node -1 (/34.138.213.152:9094) could not be established. Broker may not be available.
22/10/20 04:50:56 WARN org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0-1, groupId=spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0] Bootstrap broker 34.138.213.152:9094 (id: -1 rack: null) disconnected
Update :
per comment from @Daganag, when i use SparkFiles.get(filename) .. here is the error i get :
d-9fe7bb774985/syslog-vani-noacl.p12
java.nio.file.NoSuchFileException: /hadoop/spark/tmp/spark-19943d8b-d8c7-4406-b5cf-c352837ad71e/userFiles-32e5ebe3-7013-44f2-a0bd-9fe7bb774985/syslog-vani-noacl.p12
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixFileAttributeViews$Basic.readAttributes(UnixFileAttributeViews.java:55)
at sun.nio.fs.UnixFileSystemProvider.readAttributes(UnixFileSystemProvider.java:144)
at sun.nio.fs.LinuxFileSystemProvider.readAttributes(LinuxFileSystemProvider.java:99)
at java.nio.file.Files.readAttributes(Files.java:1737)
at java.nio.file.Files.getLastModifiedTime(Files.java:2266)
at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$SecurityStore.lastModifiedMs(DefaultSslEngineFactory.java:312)
at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$SecurityStore.<init>(DefaultSslEngineFactory.java:284)
at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createKeystore(DefaultSslEngineFactory.java:255)
@Dagang, @OneCrickteer - i logged onto the worker nodes & i see the SSL certs uploaded (when i pass the certs as --files gs:// in google cloud submit.
How do i access them in code .. SparkFiles.get('cert') is not working, since the path SparkFiles gets is not the same
SSL certs on The Worker Node :
------------------------------
root@stream2kafka2-w-0:/# find . -name versa-kafka-gke-ca.p12
./hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1666127856693_0016/container_e01_1666127856693_0016_01_000002/versa-kafka-gke-ca.p12
./hadoop/yarn/nm-local-dir/usercache/root/filecache/67/versa-kafka-gke-ca.p12
./hadoop/yarn/nm-local-dir/usercache/root/filecache/39/versa-kafka-gke-ca.p12
./hadoop/yarn/nm-local-dir/usercache/root/filecache/165/versa-kafka-gke-ca.p12
./hadoop/yarn/nm-local-dir/usercache/root/filecache/194/versa-kafka-gke-ca.p12
./hadoop/yarn/nm-local-dir/usercache/root/filecache/109/versa-kafka-gke-ca.p12
./hadoop/yarn/nm-local-dir/usercache/root/filecache/81/versa-kafka-gke-ca.p12
./hadoop/yarn/nm-local-dir/usercache/root/filecache/53/versa-kafka-gke-ca.p12
./hadoop/yarn/nm-local-dir/usercache/root/filecache/208/versa-kafka-gke-ca.p12
./hadoop/yarn/nm-local-dir/usercache/root/filecache/179/versa-kafka-gke-ca.p12
./hadoop/yarn/nm-local-dir/usercache/root/filecache/151/versa-kafka-gke-ca.p12
./hadoop/yarn/nm-local-dir/usercache/root/filecache/137/versa-kafka-gke-ca.p12
./hadoop/yarn/nm-local-dir/usercache/root/filecache/23/versa-kafka-gke-ca.p12
./hadoop/yarn/nm-local-dir/usercache/root/filecache/95/versa-kafka-gke-ca.p12
./hadoop/yarn/nm-local-dir/usercache/root/filecache/123/versa-kafka-gke-ca.p12