I am using Scala 2.12 and have required libraries to convert the message to Avro (need to convert) and kafka clients too.
I am running the code on the Linux host (dev) where other application (Apache NiFi) is running and able to create KafkaProducer and publish the message to remote Kafka.
Since it is dev for now, the protocol is PLAINTEXT.
E.g. of KafkaProducer config in Nifi.
acks = 1
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers = [server1.cloud.domain:9096, server2.cloud.domain:9096, server3.cloud.domain:9096]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 5000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = kafka
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 30000
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
Also, NiFi starts with java option to use JAAS file, whose contents are:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
principal="myUserName@myRealm"
useKeyTab=true
client=true
keyTab="/path/myfile.keytab"
serviceName="kafka";
};
Also the krb5.conf file is available which is used.
Using above config, NiFi is able to create KafkaProducer and send messages across.
Now, I am using the same with Scala code. Simple Class which uses following build.sbt and code, to send the message.
build.sbt:
// https://mvnrepository.com/artifact/org.apache.avro/avro
libraryDependencies += "org.apache.avro" % "avro" % "1.8.1"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.1.1"
libraryDependencies += "org.slf4j" % "slf4j-simple" % "1.6.4"
fork in run := true
javaOptions += "-Djava.security.auth.login.config=/path/to/jaas/kafka-jaas.conf"
javaOptions += "-Djava.security.krb5.conf=/path/to/krb/krb5.conf"
My code to send message. Removed unwanted lines for brevity. Please note testing of creating the data to Avro is running fine. The same message when given to NiFi, it is being able to publish correctly to the topic. What is not running, is the publish to kafka using Scala.
Code:
package example
import java.io.ByteArrayOutputStream
import java.util
import java.io.File
import java.util.{Properties, UUID}
import org.apache.avro.Schema.Parser
import org.apache.avro.Schema
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.avro.generic.GenericData.Record
import org.apache.avro.io.{DecoderFactory, EncoderFactory}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import scala.io.Source
import scala.io.StdIn
object Hello extends Greeting with App {
// case classes for creating avro record
// This part works fine.
val schemaFile = "/path/Schema.avsc"
val schema = new Schema.Parser().parse(new File(schemaFile))
val reader = new GenericDatumReader[GenericRecord](schema)
val avroRecord = new GenericData.Record(schema)
// populate correctly the record.
// works fine.
val brokers = "server1.domain:9096,server2.domain:9096,server3.domain:9096"
val topic = "myTopic"
private def configuration: Properties = {
val props = new Properties()
props.put("bootstrap.servers", brokers)
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer")
props.put("security.protocol", "PLAINTEXT")
props.put("sasl.kerberos.service.name", "kafka")
props.put("acks", "all")
props.put("retries","0")
props
}
val producer = new KafkaProducer[String, Array[Byte]](configuration)
val writer = new SpecificDatumWriter[GenericRecord](schema)
val out = new ByteArrayOutputStream()
val encoder = EncoderFactory.get.binaryEncoder(out, null)
writer.write(avroRecord, encoder)
encoder.flush()
out.close()
val serializedBytes: Array[Byte] = out.toByteArray()
val recordToSend = new ProducerRecord[String, Array[Byte]](topic, serializedBytes)
producer.send(recordToSend)
}
trait Greeting {
lazy val greeting: String = "hello"
}
When I run it on sbt command line:
sbt clean
sbt compile
sbt run
I get the following error/output. Nothing published.
Output:
-bash-4.2$ sbt run
[warn] Executing in batch mode.
[warn] For better performance, hit [ENTER] to switch to interactive mode, or
[warn] consider launching sbt without any commands, or explicitly passing 'shell'
[info] Loading project definition from /path/Scala/hello-world/project
[info] Set current project to hello-world (in build file:/path/Scala/hello-world/)
[info] Running example.Hello
[info] hello
[info]
[error] 9 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
[error] acks = 1
[error] batch.size = 16384
[error] bootstrap.servers = [server1.cloud.domain:9096, server2.cloud.domain:9096, server3.cloud.domain:9096]
[error] buffer.memory = 33554432
[error] client.dns.lookup = default
[error] client.id =
[error] compression.type = none
[error] connections.max.idle.ms = 540000
[error] delivery.timeout.ms = 120000
[error] enable.idempotence = false
[error] interceptor.classes = []
[error] key.serializer = class org.apache.kafka.common.serialization.StringSerializer
[error] linger.ms = 0
[error] max.block.ms = 60000
[error] max.in.flight.requests.per.connection = 5
[error] max.request.size = 1048576
[error] metadata.max.age.ms = 300000
[error] metric.reporters = []
[error] metrics.num.samples = 2
[error] metrics.recording.level = INFO
[error] metrics.sample.window.ms = 30000
[error] partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
[error] receive.buffer.bytes = 32768
[error] reconnect.backoff.max.ms = 1000
[error] reconnect.backoff.ms = 50
[error] request.timeout.ms = 30000
[error] retries = 0
[error] retry.backoff.ms = 100
[error] sasl.client.callback.handler.class = null
[error] sasl.jaas.config = null
[error] sasl.kerberos.kinit.cmd = /usr/bin/kinit
[error] sasl.kerberos.min.time.before.relogin = 60000
[error] sasl.kerberos.service.name = kafka
[error] sasl.kerberos.ticket.renew.jitter = 0.05
[error] sasl.kerberos.ticket.renew.window.factor = 0.8
[error] sasl.login.callback.handler.class = null
[error] sasl.login.class = null
[error] sasl.login.refresh.buffer.seconds = 300
[error] sasl.login.refresh.min.period.seconds = 60
[error] sasl.login.refresh.window.factor = 0.8
[error] sasl.login.refresh.window.jitter = 0.05
[error] sasl.mechanism = GSSAPI
[error] security.protocol = PLAINTEXT
[error] send.buffer.bytes = 131072
[error] ssl.cipher.suites = null
[error] ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
[error] ssl.endpoint.identification.algorithm =
[error] ssl.key.password = null
[error] ssl.keymanager.algorithm = SunX509
[error] ssl.keystore.location = null
[error] ssl.keystore.password = null
[error] ssl.keystore.type = JKS
[error] ssl.protocol = TLS
[error] ssl.provider = null
[error] ssl.secure.random.implementation = null
[error] ssl.trustmanager.algorithm = PKIX
[error] ssl.truststore.location = null
[error] ssl.truststore.password = null
[error] ssl.truststore.type = JKS
[error] transaction.timeout.ms = 60000
[error] transactional.id = null
[error] value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
[error]
[error] 109 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.1.1
[error] 109 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 21234bee31165527
[error] 248 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: 5NMDh7lDS-SxXpgprjR6oA
[success] Total time: 1 s, completed Mar 6, 2019 1:38:14 PM
I am sure, it has to do something with security or kerberos. But other apps are able to push message, not with my scala code.
UPDATE:
Based on the response from @tgrez , I tried to block with Future get.
//producer.send(recordToSend)
val metaF: Future[RecordMetadata] = producer.send(recordToSend)
val meta = metaF.get() //blocking
val msgLog =
s"""
|offset = ${meta.offset()}
|partition = ${meta.partition()}
|topic = ${meta.topic()}
""".stripMargin
println(msgLog)
producer.close()
However still I am similar error.
[error] 10 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
[error] acks = 1
[error] batch.size = 16384
[error] bootstrap.servers = [server1.cloud.domain:9096, server2.cloud.domain:9096, server3.cloud.domain:9096]
[error] buffer.memory = 33554432
[error] client.dns.lookup = default
[error] client.id =
[error] compression.type = none
[error] connections.max.idle.ms = 540000
[error] delivery.timeout.ms = 120000
[error] enable.idempotence = false
[error] interceptor.classes = []
[error] key.serializer = class org.apache.kafka.common.serialization.StringSerializer
[error] linger.ms = 0
[error] max.block.ms = 60000
[error] max.in.flight.requests.per.connection = 5
[error] max.request.size = 1048576
[error] metadata.max.age.ms = 300000
[error] metric.reporters = []
[error] metrics.num.samples = 2
[error] metrics.recording.level = INFO
[error] metrics.sample.window.ms = 30000
[error] partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
[error] receive.buffer.bytes = 32768
[error] reconnect.backoff.max.ms = 1000
[error] reconnect.backoff.ms = 50
[error] request.timeout.ms = 30000
[error] retries = 0
[error] retry.backoff.ms = 100
[error] sasl.client.callback.handler.class = null
[error] sasl.jaas.config = null
[error] sasl.kerberos.kinit.cmd = /usr/bin/kinit
[error] sasl.kerberos.min.time.before.relogin = 60000
[error] sasl.kerberos.service.name = kafka
[error] sasl.kerberos.ticket.renew.jitter = 0.05
[error] sasl.kerberos.ticket.renew.window.factor = 0.8
[error] sasl.login.callback.handler.class = null
[error] sasl.login.class = null
[error] sasl.login.refresh.buffer.seconds = 300
[error] sasl.login.refresh.min.period.seconds = 60
[error] sasl.login.refresh.window.factor = 0.8
[error] sasl.login.refresh.window.jitter = 0.05
[error] sasl.mechanism = GSSAPI
[error] security.protocol = PLAINTEXT
[error] send.buffer.bytes = 131072
[error] ssl.cipher.suites = null
[error] ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
[error] ssl.endpoint.identification.algorithm =
[error] ssl.key.password = null
[error] ssl.keymanager.algorithm = SunX509
[error] ssl.keystore.location = null
[error] ssl.keystore.password = null
[error] ssl.keystore.type = JKS
[error] ssl.protocol = TLS
[error] ssl.provider = null
[error] ssl.secure.random.implementation = null
[error] ssl.trustmanager.algorithm = PKIX
[error] ssl.truststore.location = null
[error] ssl.truststore.password = null
[error] ssl.truststore.type = JKS
[error] transaction.timeout.ms = 60000
[error] transactional.id = null
[error] value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
[error]
[error] 110 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.1.1
[error] 110 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 21234bee31165527
[error] 249 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: 5NMDh7lDS-SxXpgprjR6oA
[info]
[info] offset = 8
[info] partition = 1
[info] topic = myTopic
[info]
[error] 323 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[success] Total time: 1 s, completed Mar 6, 2019 3:26:53 PM
Anything I am missing here?
UPDATE 2:
As mentioned below, I changed my code. however it is not working either. I realized something is wrong in serialization.
I already have avroRecord in the GenericData.Record format. Can't I use the same to publish the data to Kafka? Why I have to use the Array of Bytes or any other serializer for the same?
Only example I found is to use io.confluent avro serializer. But I am unable to use that as sbt or maven is failing to download it now. Infact the URL: http://packages.confluent.io/maven/ is not working. Somehow I downloaded the jars and using it as external libraries.
Changed to code to:
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
val producer = new KafkaProducer[String, GenericData.Record](configuration)
val recordToSend = new ProducerRecord[String, GenericData.Record](topic, avroRecord)
Now it is working fine.
However, I am still looking for any other serializer class (which is available in Maven) to send the message as GenericData instead of Array of Bytes.
UPDATE 3:
As suggested by user @KZapagol, I tried to use the same and getting following error.
Schema : (It is complex, so need help if I am correctly transforming the data)
{"type": "record","name": "MyPnl","doc": "This schema contains the metadata fields wrapped in a header field which follows the official schema.","fields": [{"name":"header","type":{"type":"record","name":"header","fields":[{"name":"messageId","type":"string"},{"name":"businessId","type":"string"},{"name":"batchId","type":"string"},{"name":"sourceSystem","type":"string"},{"name":"secondarySourceSystem","type":[ "null", "string" ]},{"name":"sourceSystemCreationTimestamp","type":"long","logicalType": "timestamp-millis"},{"name":"sentBy","type":"string"},{"name":"sentTo","type":"string"},{"name":"messageType","type":"string"},{"name":"schemaVersion","type":"string"},{"name":"processing","type":"string"},{"name":"recordOffset","type":[ "null", "string" ]}]}},{"name":"pnlData","type":{"type":"record","name":"pnlData","fields":[{"name":"pnlHeader","type":{"type":"record","name":"pnlData","namespace":"pnlHeader","fields":[{"name":"granularity","type":"string"},{"name":"pnlType","type":"string"},{"name":"pnlSubType","type":"string"},{"name":"businessDate","type":"string","logicalType": "date"},{"name":"bookId","type":"string"},{"name":"bookDescription","type":"string"},{"name":"pnlStatus","type":"string"}]}},{"name":"pnlBreakDown","type":{"type":"array","items":{"type":"record","name":"pnlData","namespace":"pnlBreakDown","fields":[{"name":"category","type":[ "null", "string" ]},{"name":"subCategory","type":[ "null", "string" ]},{"name":"riskCategory","type":[ "null", "string" ]},{"name":"pnlCurrency","type":"string"},{"name":"pnlDetails", "type":{"type":"array","items": {"type":"record","name":"pnlData","namespace":"pnlDetails","fields":[{"name":"pnlLocalAmount","type":"double"},{"name":"pnlCDEAmount","type":"double"}]}}}]}}}]}}]}
I have corresponding case classes for above. (Please suggest if I have missed anything here?)
case class MessageHeader( messageId: String,
businessId: String,
batchId: String,
sourceSystem: String,
secondarySourceSystem: String,
sourceSystemCreationTimestamp: Long,
sentBy: String,
sentTo: String,
messageType: String,
schemaVersion: String,
processing: String,
recordOffset: String
)
case class PnlHeader ( granularity: String,
pnlType: String,
pnlSubType: String,
businessDate: String,
bookId: String,
bookDescription: String,
pnlStatus: String
)
case class PnlDetails ( pnlLocalAmount: Double,
pnlCDEAmount: Double
)
case class PnlBreakdown ( category: String,
subCategory: String,
riskCategory: String,
pnlCurrency: String,
pnlDetails: List[PnlDetails]
)
case class PnlData ( pnlHeader: PnlHeader, pnlBreakdown: List[PnlBreakdown] )
case class PnlRecord (header: MessageHeader, pnlData: PnlData )
I have modeled my data in above PnlRecord format. I have list of such records.
From list of such records, I iterate and try to publish it to Kafka.
// Create Producer
val producer = new KafkaProducer[String, Array[Byte]](properties)
// This filename is file where above schema is saved.
val avroJsonSchema = Source.fromFile(new File(schemaFileName)).getLines.mkString
val avroMessage = new AvroMessage(avroJsonSchema)
val avroRecord = new Record(avroMessage.schema)
// recordListToSend is of type: List[PnlRecord]
for (record <- recordListToSend) {
avroRecord.put("header", record.header)
avroRecord.put("pnlData", record.pnlData)
//logger.info(s"Record: ${avroRecord}\n")
avroMessage.gdw.write(avroRecord, EncoderFactory.get().binaryEncoder(avroMessage.baos, null))
avroMessage.dfw.append(avroRecord)
avroMessage.dfw.close()
val bytes = avroMessage.baos.toByteArray
// send data
producer.send(new ProducerRecord[String, Array[Byte]](topic, bytes), new ProducerCallback)
//flush data
producer.flush()
//flush and close producer
producer.close()
}
AvroMessage class (as suggested by user)
import java.io.ByteArrayOutputStream
import org.apache.avro
import org.apache.avro.Schema
import org.apache.avro.file.CodecFactory
import org.apache.avro.generic.{GenericDatumWriter, GenericRecord}
class AvroMessage(avroJsonSchema: String) {
val parser = new Schema.Parser()
val schema = parser.parse(avroJsonSchema)
val baos = new ByteArrayOutputStream()
val gdw = new GenericDatumWriter[GenericRecord](schema)
val dfw = new avro.file.DataFileWriter[GenericRecord](gdw)
val compressionLevel = 5
dfw.setCodec(CodecFactory.deflateCodec(compressionLevel))
dfw.create(schema, baos)
}
I am getting the below error:
2019-03-13 16:00:09.855 [application-akka.actor.default-dispatcher-11] ERROR controllers.SAController.$anonfun$publishToSA$2(34) - com.domain.sa.model.MessageHeader cannot be cast to org.apache.avro.generic.IndexedRecord
java.lang.ClassCastException: ca.domain.my.sa.model.MessageHeader cannot be cast to org.apache.avro.generic.IndexedRecord
at org.apache.avro.generic.GenericData.getField(GenericData.java:697)
at org.apache.avro.generic.GenericData.getField(GenericData.java:712)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:164)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
at ca.domain.my.sa.dao.myPnlDao$.$anonfun$publishAvroToKafka$1(myPnlDao.scala:95)
Is my original case classes are right as per the schema?
My MessageHeader case class is shown above.
My Schema is shown above (updated).
My Record:
Record: {"header": Header(my_20190313180602_00000011,my_BookLevel_Daily_Regular_20181130_EMERGINGTRS,11_20181130_8259,my,null,65162584,my,SA,PnLMessage,test,RealTime,null), "pnlData": PnlData(PnlHeader(BookLevel,Daily,Regular,2018-11-30,8259,EMERGINGTRS,Locked),List(PnlBreakdown(null,null,null,eur,List(PnlDetails(0.0,0.0022547507286072))), PnlBreakdown(null,null,null,jpy,List(PnlDetails(0.0,0.0))), PnlBreakdown(null,null,null,usd,List(PnlDetails(0.19000003399301,0.642328574985149))), PnlBreakdown(null,null,null,brl,List(PnlDetails(2.65281414613128E-8,2.4107750505209E-5))), PnlBreakdown(null,null,null,gbp,List(PnlDetails(0.0,-5.05781173706088E-5))), PnlBreakdown(null,null,null,cad,List(PnlDetails(145.399999991953,145.399999991953)))))}