0

Is there a way to print a Kafka debug message (I am thinking about log messages that are similar to librdkafka Debug message, or kafkacat -D option), when running PySpark job?

The issue is that I used the following codes on PySpark to connect to a Kafka cluster called A, it works and printing things out to theconsole every time there is a new message coming in. But when I switched to another cluster, called B and setup the same way as cluster A, it didn't print anything out to the screen when there is new messages coming in, I can see that the message is going through just fine using kafkacat tool on both clusters.

consumer.py

from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession

from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

spark = SparkSession.builder.appName("KafkaConsumer").getOrCreate()
sc = spark.sparkContext
sqlc = SQLContext(sc)

hosts = "host1:9092,host2:9092,host3:9092"
topic = "myTopic"
securityProtocol = "SASL_PLAINTEXT"
saslMechanism = "PLAIN"

try:
  df = sqlc \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", hosts) \
    .option("kafka.security.protocol", securityProtocol) \
    .option("kafka.sasl.mechanism", saslMechanism) \
    .option("startingOffsets", "earliest") \
    .option("subscribe", topic) \
    .load()

  dss = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .writeStream.outputMode('append') \
    .format("console") \
    .start()

  dss.awaitTermination()
except KeyboardInterrupt:    
  print 'shutting down...'

kafka.jaas

KafkaClient {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="user1"
   password="sssshhhh"
   serviceName="kafka";
};

shell command:

spark-submit \
    --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1 \
    --files "kafka.jaas" \
    --driver-java-options "-Djava.security.auth.login.config=kafka.jaas" \
    --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka.jaas" \
    "./consumer.py"

Seems like kafka cluster B is reachable since I am able to get the offset information from it, but it's just not reading the messages.

Chris Wijaya
  • 1,276
  • 3
  • 16
  • 34

1 Answers1

0

The issue was caused by worker nodes connection to the Kafka cluster, the worker nodes IP address weren't on the firewall whitelist on the Kafka cluster. The code above caused worker nodes to time out and keep on retrying to connect to Kafka cluster until Interrupt signal is given.

In relation to the error message itself, no error message was generated to the Master node as worker node is still attempting to connect to Kafka cluster, but every now and then there is a message printed out on the Master console saying it failed to communicate with the worker node (or some message like 'gathering information').

NOTE: This is what I presumed happen in the worker node (which I unable to log on to, due to admin rights), but there may be a log that is stored on the worker nodes. (If someone can back or prove otherwise. it'll be much appreciated)

As for the Kafka debug message itself, it looks like already printing to the screen by default if there is Error, Info or Warning happen depending on the logger level setup and in some odd instance like this one, the log message may not be directly visible to the screen.

Chris Wijaya
  • 1,276
  • 3
  • 16
  • 34