Is there a way to print a Kafka debug message (I am thinking about log messages that are similar to librdkafka
Debug message, or kafkacat -D
option), when running PySpark job?
The issue is that I used the following codes on PySpark to connect to a Kafka cluster called A, it works and printing things out to theconsole every time there is a new message coming in. But when I switched to another cluster, called B and setup the same way as cluster A, it didn't print anything out to the screen when there is new messages coming in, I can see that the message is going through just fine using kafkacat
tool on both clusters.
consumer.py
from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
spark = SparkSession.builder.appName("KafkaConsumer").getOrCreate()
sc = spark.sparkContext
sqlc = SQLContext(sc)
hosts = "host1:9092,host2:9092,host3:9092"
topic = "myTopic"
securityProtocol = "SASL_PLAINTEXT"
saslMechanism = "PLAIN"
try:
df = sqlc \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", hosts) \
.option("kafka.security.protocol", securityProtocol) \
.option("kafka.sasl.mechanism", saslMechanism) \
.option("startingOffsets", "earliest") \
.option("subscribe", topic) \
.load()
dss = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream.outputMode('append') \
.format("console") \
.start()
dss.awaitTermination()
except KeyboardInterrupt:
print 'shutting down...'
kafka.jaas
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="user1"
password="sssshhhh"
serviceName="kafka";
};
shell command:
spark-submit \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1 \
--files "kafka.jaas" \
--driver-java-options "-Djava.security.auth.login.config=kafka.jaas" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka.jaas" \
"./consumer.py"
Seems like kafka
cluster B is reachable since I am able to get the offset information from it, but it's just not reading the messages.