3

i've a Structured Streaming code which reads data from a Kafka Topic (on a VM) & writes to another Kafka Topic on GKE (i should be using a Mirror Maker for this, but have not implemented that yet). it suddenly stopped working (been working fine for many months) giving following error :

22/10/18 19:02:35 WARN org.apache.spark.sql.streaming.StreamingQueryManager: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
22/10/18 19:03:42 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (stream2kafka2-w-1.c.versa-sml-googl.internal executor 2): org.apache.kafka.common.errors.TimeoutException: Topic syslog.ueba-us4.v1.versa.demo4 not present in metadata after 60000 ms.

22/10/18 19:03:42 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.1 in stage 0.0 (TID 1) (stream2kafka2-w-1.c.versa-sml-googl.internal executor 2): org.apache.spark.sql.execution.streaming.continuous.ContinuousTaskRetryException: Continuous execution does not support task retry
    at org.apache.spark.sql.execution.streaming.continuous.ContinuousDataSourceRDD.compute(ContinuousDataSourceRDD.scala:76)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.sql.execution.streaming.continuous.ContinuousWriteRDD.$anonfun$compute$1(ContinuousWriteRDD.scala:53)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
    at org.apache.spark.sql.execution.streaming.continuous.ContinuousWriteRDD.compute(ContinuousWriteRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
    Suppressed: java.lang.NullPointerException
        at org.apache.spark.sql.execution.streaming.continuous.ContinuousWriteRDD.$anonfun$compute$7(ContinuousWriteRDD.scala:84)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1495)
        ... 11 more

Code is very simple, and has been working for many months now :


class ReadFromKafka:

    def readAndWrite(self):
        df = spark \
            .readStream \
            .format('kafka') \
            .option("kafka.bootstrap.servers", kafkaBrokersSrc) \
            .option("subscribe", srcTopic) \
            .option("startingOffsets", "latest") \
            .option("failOnDataLoss", "false") \
            .load()

        query = df.selectExpr("CAST(value AS STRING)", "cast(key AS String)") \
            .writeStream \
            .format("kafka") \
            .option("checkpointLocation", checkpoint) \
            .option("outputMode", "append") \
            .option("truncate", "false") \
            .option("kafka.security.protocol", security_protocol) \
            .option("kafka.ssl.truststore.location", ssl_truststore_location) \
            .option("kafka.ssl.truststore.password", ssl_truststore_password) \
            .option("kafka.ssl.keystore.location", ssl_keystore_location) \
            .option("kafka.ssl.keystore.password", ssl_keystore_password) \
            .option("kafka.bootstrap.servers", kafkaBrokersTgt) \
            .option("topic", tgtTopic) \
            .option("kafka.ssl.keystore.type", "PKCS12") \
            .option("kafka.ssl.truststore.type", "PKCS12") \
            .trigger(continuous='5 seconds') \
            .start()
        query.awaitTermination()

I'm running this on google dataproc

gcloud dataproc jobs submit pyspark /Users/karanalang/PycharmProjects/Kafka/versa-movedata2kafka/StructuredStreaming-readFromKafka-versa-sml-googl-v1.py  --cluster stream2kafka  --properties spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,spark.dynamicAllocation.enabled=true,spark.shuffle.service.enabled=true  --files gs://kafka-certs/versa-kafka-gke-ca.p12,gs://kafka-certs/syslog-vani-noacl.p12 --region us-east1

any ideas on what the issue might be & how to debug this ? tia!

Update : I'm able to read & write in to the Kafka Topic when i use python Kafka Producer/Consumer- but Structured Streaming code is failing

Update : Update :

I'm able to read the topic from GKE using spark-submit (batch & streaming mode), the SSL certs are stored on my local mac from where spark-submit is run So, it seems like Spark is behaving correctly.

However, I tried reading from the kafka topic on GKE using - google cloud submit, and it gives error saying broker is not found (shown below) .. the SSL certs are stored in the storage bucket, and i'm passing the certs as '--files gs://kafka-certs/versa-kafka-gke-ca.p12,gs://kafka-certs/syslog-vani-noacl.p12'

In the pyspark code, i access them using the file names - this has been working earlier, however i suspect - this might be causing the issue. Question - is this the corect way to access the certs when I'm using Dataproc ?


commands :
gcloud dataproc jobs submit pyspark /Users/karanalang/PycharmProjects/Kafka/versa-movedata2kafka/StructuredStream-stream-readfrom-versa-sml-googl-certs-gs.py  --cluster stream2kafka2  --properties spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,spark.dynamicAllocation.enabled=true,spark.shuffle.service.enabled=true  --files gs://kafka-certs/versa-kafka-gke-ca.p12,gs://kafka-certs/syslog-vani-noacl.p12 --region us-east1

Code :

kafkaBrokersTgt='IP:port'
tgtTopic = "syslog.ueba-us4.v1.versa.demo3"
checkpoint='gs://versa-move2syslogdemo3/'
security_protocol="SSL"
ssl_truststore_location="versa-kafka-gke-ca.p12"
ssl_truststore_password='xxxx'
ssl_keystore_location = 'syslog-vani-noacl.p12'
ssl_keystore_password ='yyyy'

print(" reading from Kafka topic syslog-demo3 on versa-sml-googl, certs on gs storage ")

df_reader = spark.readStream.format('kafka')\
    .option("kafka.bootstrap.servers",kafkaBrokersTgt)\
    .option("kafka.security.protocol",security_protocol) \
    .option("kafka.ssl.truststore.location",ssl_truststore_location) \
    .option("kafka.ssl.truststore.password",ssl_truststore_password) \
    .option("kafka.ssl.keystore.location", ssl_keystore_location)\
    .option("kafka.ssl.keystore.password", ssl_keystore_password)\
    .option("subscribe", tgtTopic) \
    .option("startingOffsets", "earliest") \
    .option("maxOffsetsPerTrigger", 20) \
    .option("kafka.max.poll.records", 20) \
    .option("kafka.ssl.keystore.type", "PKCS12") \
    .option("kafka.ssl.truststore.type", "PKCS12") \
    .load()

# .option("kafka.group.id", "ss.consumer1") \

query = df_reader.selectExpr("CAST(value AS STRING)", "cast(key AS String)") \
    .writeStream \
    .format("console") \
    .option("numRows",500)\
    .option("outputMode", "complete")\
    .option("truncate", "false") \
    .trigger(processingTime='3 minutes') \
    .option("checkpointLocation", checkpoint) \
    .start()

query.awaitTermination()


Error :

 reading from Kafka topic syslog-demo3 on versa-sml-googl, certs on gs storage 
22/10/20 04:37:48 WARN org.apache.spark.sql.streaming.StreamingQueryManager: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
22/10/20 04:37:50 INFO org.apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [34.138.213.152:9094]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips
    client.id = consumer-spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0-1
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    internal.throw.on.fetch.stable.offset.unsupported = false
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 1
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = SSL
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = syslog-vani-noacl.p12
    ssl.keystore.password = [hidden]
    ssl.keystore.type = PKCS12
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = versa-kafka-gke-ca.p12
    ssl.truststore.password = [hidden]
    ssl.truststore.type = PKCS12
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

22/10/20 04:37:50 INFO org.apache.kafka.common.utils.AppInfoParser: Kafka version: 2.6.0
22/10/20 04:37:50 INFO org.apache.kafka.common.utils.AppInfoParser: Kafka commitId: 62abe01bee039651
22/10/20 04:37:50 INFO org.apache.kafka.common.utils.AppInfoParser: Kafka startTimeMs: 1666240670692
22/10/20 04:37:50 INFO org.apache.kafka.clients.consumer.KafkaConsumer: [Consumer clientId=consumer-spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0-1, groupId=spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0] Subscribed to topic(s): syslog.ueba-us4.v1.versa.demo3
22/10/20 04:40:01 WARN org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0-1, groupId=spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0] Connection to node -1 (/34.138.213.152:9094) could not be established. Broker may not be available.
22/10/20 04:40:01 WARN org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0-1, groupId=spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0] Bootstrap broker 34.138.213.152:9094 (id: -1 rack: null) disconnected
22/10/20 04:42:12 WARN org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0-1, groupId=spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0] Connection to node -1 (/34.138.213.152:9094) could not be established. Broker may not be available.
22/10/20 04:42:12 WARN org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0-1, groupId=spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0] Bootstrap broker 34.138.213.152:9094 (id: -1 rack: null) disconnected
22/10/20 04:44:23 WARN org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0-1, groupId=spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0] Connection to node -1 (/34.138.213.152:9094) could not be established. Broker may not be available.
22/10/20 04:44:23 WARN org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0-1, groupId=spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0] Bootstrap broker 34.138.213.152:9094 (id: -1 rack: null) disconnected
22/10/20 04:46:34 WARN org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0-1, groupId=spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0] Connection to node -1 (/34.138.213.152:9094) could not be established. Broker may not be available.
22/10/20 04:46:34 WARN org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0-1, groupId=spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0] Bootstrap broker 34.138.213.152:9094 (id: -1 rack: null) disconnected
22/10/20 04:48:45 WARN org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0-1, groupId=spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0] Connection to node -1 (/34.138.213.152:9094) could not be established. Broker may not be available.
22/10/20 04:48:45 WARN org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0-1, groupId=spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0] Bootstrap broker 34.138.213.152:9094 (id: -1 rack: null) disconnected
22/10/20 04:50:56 WARN org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0-1, groupId=spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0] Connection to node -1 (/34.138.213.152:9094) could not be established. Broker may not be available.
22/10/20 04:50:56 WARN org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0-1, groupId=spark-kafka-source-10bf0d29-761e-4b5a-95c6-308e036ca6f9-764682263-driver-0] Bootstrap broker 34.138.213.152:9094 (id: -1 rack: null) disconnected

Update :

per comment from @Daganag, when i use SparkFiles.get(filename) .. here is the error i get :

d-9fe7bb774985/syslog-vani-noacl.p12
java.nio.file.NoSuchFileException: /hadoop/spark/tmp/spark-19943d8b-d8c7-4406-b5cf-c352837ad71e/userFiles-32e5ebe3-7013-44f2-a0bd-9fe7bb774985/syslog-vani-noacl.p12
    at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
    at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
    at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
    at sun.nio.fs.UnixFileAttributeViews$Basic.readAttributes(UnixFileAttributeViews.java:55)
    at sun.nio.fs.UnixFileSystemProvider.readAttributes(UnixFileSystemProvider.java:144)
    at sun.nio.fs.LinuxFileSystemProvider.readAttributes(LinuxFileSystemProvider.java:99)
    at java.nio.file.Files.readAttributes(Files.java:1737)
    at java.nio.file.Files.getLastModifiedTime(Files.java:2266)
    at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$SecurityStore.lastModifiedMs(DefaultSslEngineFactory.java:312)
    at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$SecurityStore.<init>(DefaultSslEngineFactory.java:284)
    at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createKeystore(DefaultSslEngineFactory.java:255)

@Dagang, @OneCrickteer - i logged onto the worker nodes & i see the SSL certs uploaded (when i pass the certs as --files gs:// in google cloud submit.

How do i access them in code .. SparkFiles.get('cert') is not working, since the path SparkFiles gets is not the same

SSL certs on The Worker Node :
------------------------------
root@stream2kafka2-w-0:/# find . -name versa-kafka-gke-ca.p12
./hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1666127856693_0016/container_e01_1666127856693_0016_01_000002/versa-kafka-gke-ca.p12
./hadoop/yarn/nm-local-dir/usercache/root/filecache/67/versa-kafka-gke-ca.p12
./hadoop/yarn/nm-local-dir/usercache/root/filecache/39/versa-kafka-gke-ca.p12
./hadoop/yarn/nm-local-dir/usercache/root/filecache/165/versa-kafka-gke-ca.p12
./hadoop/yarn/nm-local-dir/usercache/root/filecache/194/versa-kafka-gke-ca.p12
./hadoop/yarn/nm-local-dir/usercache/root/filecache/109/versa-kafka-gke-ca.p12
./hadoop/yarn/nm-local-dir/usercache/root/filecache/81/versa-kafka-gke-ca.p12
./hadoop/yarn/nm-local-dir/usercache/root/filecache/53/versa-kafka-gke-ca.p12
./hadoop/yarn/nm-local-dir/usercache/root/filecache/208/versa-kafka-gke-ca.p12
./hadoop/yarn/nm-local-dir/usercache/root/filecache/179/versa-kafka-gke-ca.p12
./hadoop/yarn/nm-local-dir/usercache/root/filecache/151/versa-kafka-gke-ca.p12
./hadoop/yarn/nm-local-dir/usercache/root/filecache/137/versa-kafka-gke-ca.p12
./hadoop/yarn/nm-local-dir/usercache/root/filecache/23/versa-kafka-gke-ca.p12
./hadoop/yarn/nm-local-dir/usercache/root/filecache/95/versa-kafka-gke-ca.p12
./hadoop/yarn/nm-local-dir/usercache/root/filecache/123/versa-kafka-gke-ca.p12

Karan Alang
  • 869
  • 2
  • 10
  • 35
  • Code seems fine. Given it's an internal error to Spark code, I suggest you file a JIRA ticket and, yes, look into using MirrorMaker2 instead – OneCricketeer Oct 18 '22 at 12:36
  • @OneCricketeer - thnx, I'll create a JIRA ticket .. also, I've updated the logs, essentially there is a timeout exception causing the issue - org.apache.kafka.common.errors.TimeoutException: Topic syslog.ueba-us4.v1.versa.demo4 not present in metadata after 60000 ms.. i re-created the Dataproc cluster, and that started the initial load into the Kafka topic, and then stopped and is giving error again.. if you have any pointers on this, pls let me know .. thnx – Karan Alang Oct 18 '22 at 19:08
  • @OneCricketeer - created JIRA (https://issues.apache.org/jira/browse/SPARK-40837) – Karan Alang Oct 19 '22 at 00:29
  • also, checked if there is connectivity issue .. i'm able to telnet to the brokerAddress:port – Karan Alang Oct 19 '22 at 00:30
  • Is it a new cluster or old cluster? What's the Dataproc image version? – Dagang Oct 20 '22 at 03:25
  • hi @Dagang, Image version is - 2.0.47-ubuntu18 .. i've been using this for last few month and it has been running fine, in the last week it has been failing with the error shown in the description.. pls let me know if you have inputs on this .. thnx! – Karan Alang Oct 20 '22 at 04:32
  • Did you monitor node health and disk utilization? – Dagang Oct 20 '22 at 04:57
  • @Dagang - yes, the nodes & disk utilization are fine .. i did further testing on my local m/c - and spark is able to read the data from the topic, however - when i run the program on google dataproc, it seems to be not working .. can you pls check the updates in the description, is there any issue with the way i'm reading the certs stored in google storage ? – Karan Alang Oct 20 '22 at 05:21
  • I think you need to use `SparkFiles.get` to get the actual path of the files submitted through `--files` https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.SparkFiles.get.html – Dagang Oct 20 '22 at 05:31
  • @Dagang - that doesn't work .. updated the description with the error – Karan Alang Oct 20 '22 at 05:42
  • If it was working before then suddenly stopped working, is it possible that the certs expired? – Dagang Oct 20 '22 at 12:15
  • @Dagang - the certs are fine, when i use spark-submit(with certs on local m/c) .. i'm able to read the data .. infact, i'd raised a stackover in feb'22 regd this .. https://stackoverflow.com/questions/70964198/gcp-dataproc-failed-to-construct-kafka-consumer-failed-to-load-ssl-keystore-d – Karan Alang Oct 20 '22 at 18:13
  • @Dagang - i logged onto the worker nodes & see the SSL files on the worker node, i've updated the description .. how do i access this in the code ? – Karan Alang Oct 20 '22 at 20:09
  • `.../container_e01_1666127856693_0016_01_000002/versa-kafka-gke-ca.p12` means it is in the current working dir of the YARN container for Spark executor. – Dagang Oct 20 '22 at 20:48
  • @Dagang - yes, that is correct .. how do i access this location in python/pyspark code ? – Karan Alang Oct 20 '22 at 20:50
  • Did you try `./versa-kafka-gke-ca.p12`? – Dagang Oct 20 '22 at 20:53
  • @Dagang - just tried that, still the same error :( – Karan Alang Oct 20 '22 at 21:02
  • If you want to verify it is cert path related, you can use an init action to put the cert in a fixed location, then use the absolute path in your PySpark code. – Dagang Oct 20 '22 at 22:03
  • @Dagang - seems the cert path is not the issue, if i give the wrong cert name, it gives error saying - cert not found .. however, with the correct file - it is not able to connect to Kafka topic using SSL (but only when i use dataproc) .. any ideas on how to debug/fix ? – Karan Alang Oct 20 '22 at 22:58
  • What's the exact error message? Is it `reading from Kafka topic syslog-demo3 on versa-sml-googl, certs on gs storage `? – Dagang Oct 20 '22 at 23:19
  • yes, scenario is - reading from Kafka topic syslog-demo3 on versa-sml-googl, certs on gs storage.. error message -> 22/10/20 23:25:36 WARN org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-spark-kafka-source-832594db-245e-475b-a903-96a2989615c7-764682263-driver-0-1, groupId=spark-kafka-source-832594db-245e-475b-a903-96a2989615c7-764682263-driver-0] Connection to node -1 (/ip:port) could not be established. Broker may not be available. – Karan Alang Oct 20 '22 at 23:30

0 Answers0