1

I'm trying to send the data from a program to a secure Kafka cluster (EventStreams on IBM Cloud - Cloud Foundry Services), then in my consumer application (which is spark streaming), I'm trying to read the data from the same kafka source.

Here are the Properties that I'm setting inside the producer:

def getProperties: Properties = {
    val configs = new Properties()

    configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    configs.put(ProducerConfig.CLIENT_ID_CONFIG, "kafka-java-console-sample-producer")
    configs.put(ProducerConfig.ACKS_CONFIG, "1")
    configs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "<url:port for 5 brokers>")
    configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL")
    configs.put(SaslConfigs.SASL_MECHANISM, "PLAIN")
    configs.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"token\" password=\"" + "<some apikey here>" + "\";")
    configs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2")
    configs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, "TLSv1.2")
    configs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS")

    configs
}

And here is the code I use to send the data to the Kafka cluster:

val producer = new KafkaProducer[String , String](getProperties)

/** Reading the file line by line */

for (line <- file.getLines) {
    /** Sending the lines to the $topic inside kafka cluster initialized inside $producer */
    val data = new ProducerRecord[String , String](topic , "key" , line)
    producer.send(data)
}

I'm able to confirm that this does send the data to the Kafka cluster, as I am able to view data coming in to the cluster using IBM cloud provided Grafana metrics.

Now in my spark streaming code, here's how I'm trying to read from the kafka source:

val df = spark.readStream
        .format("kafka")
        .option("subscribe", "raw_weather")
        .option("kafka.bootstrap.servers", "<url:port for the same 5 brokers>")
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism" , "PLAIN")
        .option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"token\" password=\"" + "<that same password given above>" + "\";")
        .option("kafka.ssl.protocol", "TLSv1.2")
        .option("kafka.ssl.enabled.protocols", "TLSv1.2")
        .option("kafka.ssl.endpoint.identification.algorithm", "HTTPS")
        .load()
        .selectExpr("CAST(value as STRING)")
        .as[String]

followed by:

val query= df.writeStream
    .outputMode(OutputMode.Append())
    .foreachBatch((df: DataFrame , id: Long) => {
        println(df.count() + " " + id)
    })
    .trigger(Trigger.ProcessingTime(1))
    .start()

query.awaitTermination()

I'm not sure as to why, but my Spark Streaming is unable to read data from the source at all. When I start the spark streaming program, it shows this in the output:

19/05/19 04:22:28 DEBUG SparkEnv: Using serializer: class org.apache.spark.serializer.JavaSerializer
19/05/19 04:22:28 INFO SparkEnv: Registering MapOutputTracker
19/05/19 04:22:28 INFO SparkEnv: Registering BlockManagerMaster
19/05/19 04:22:28 INFO SparkEnv: Registering OutputCommitCoordinator
0 0

And once when I run my producer again, spark streaming still remains there at 0 0. I'm not sure as to what I have written wrong here.

EDIT: Kept the consumer running for over 7 hours, still no change

Mickael Maison
  • 25,067
  • 7
  • 71
  • 68
Sparker0i
  • 1,787
  • 4
  • 35
  • 60
  • I would try using a batch query (with read instead of readStream) to read a small batch and then print that batch(with show()) to see if anything goes through. If not I would assume something is missconfigured on your security options – Panagiotis Fytas May 21 '19 at 15:13

0 Answers0