0

I am using Scala 2.12 and have required libraries to convert the message to Avro (need to convert) and kafka clients too.

I am running the code on the Linux host (dev) where other application (Apache NiFi) is running and able to create KafkaProducer and publish the message to remote Kafka.

Since it is dev for now, the protocol is PLAINTEXT.

E.g. of KafkaProducer config in Nifi.

acks = 1
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers = [server1.cloud.domain:9096, server2.cloud.domain:9096, server3.cloud.domain:9096]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 5000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = kafka
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 30000
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

Also, NiFi starts with java option to use JAAS file, whose contents are:

KafkaClient {
   com.sun.security.auth.module.Krb5LoginModule required
   principal="myUserName@myRealm"
   useKeyTab=true
   client=true
   keyTab="/path/myfile.keytab"
   serviceName="kafka";
};

Also the krb5.conf file is available which is used.

Using above config, NiFi is able to create KafkaProducer and send messages across.

Now, I am using the same with Scala code. Simple Class which uses following build.sbt and code, to send the message.

build.sbt:

// https://mvnrepository.com/artifact/org.apache.avro/avro
libraryDependencies += "org.apache.avro" % "avro" % "1.8.1"

// https://mvnrepository.com/artifact/org.apache.kafka/kafka
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.1.1"

libraryDependencies += "org.slf4j" % "slf4j-simple" % "1.6.4"

fork in run := true

javaOptions += "-Djava.security.auth.login.config=/path/to/jaas/kafka-jaas.conf"
javaOptions += "-Djava.security.krb5.conf=/path/to/krb/krb5.conf"

My code to send message. Removed unwanted lines for brevity. Please note testing of creating the data to Avro is running fine. The same message when given to NiFi, it is being able to publish correctly to the topic. What is not running, is the publish to kafka using Scala.

Code:

package example

import java.io.ByteArrayOutputStream
import java.util
import java.io.File
import java.util.{Properties, UUID}
import org.apache.avro.Schema.Parser

import org.apache.avro.Schema
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.avro.generic.GenericData.Record
import org.apache.avro.io.{DecoderFactory, EncoderFactory}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer

import scala.io.Source
import scala.io.StdIn


object Hello extends Greeting with App {

  // case classes for creating avro record
  // This part works fine.

  val schemaFile = "/path/Schema.avsc"

  val schema = new Schema.Parser().parse(new File(schemaFile))

  val reader = new GenericDatumReader[GenericRecord](schema)

  val avroRecord = new GenericData.Record(schema)
  // populate correctly the record.
  // works fine.

  val brokers = "server1.domain:9096,server2.domain:9096,server3.domain:9096"
  val topic = "myTopic"
  private def configuration: Properties = {
    val props = new Properties()
    props.put("bootstrap.servers", brokers)
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer")
    props.put("security.protocol", "PLAINTEXT")
    props.put("sasl.kerberos.service.name", "kafka")
    props.put("acks", "all")
    props.put("retries","0")
    props
  }


  val producer = new KafkaProducer[String, Array[Byte]](configuration)
  val writer = new SpecificDatumWriter[GenericRecord](schema)
  val out = new ByteArrayOutputStream()
  val encoder = EncoderFactory.get.binaryEncoder(out, null)
  writer.write(avroRecord, encoder)
  encoder.flush()
  out.close()
  val serializedBytes: Array[Byte] = out.toByteArray()

  val recordToSend = new ProducerRecord[String, Array[Byte]](topic, serializedBytes)
  producer.send(recordToSend)


}

trait Greeting {
  lazy val greeting: String = "hello"
}

When I run it on sbt command line:

sbt clean

sbt compile

sbt run

I get the following error/output. Nothing published.

Output:

-bash-4.2$ sbt run
[warn] Executing in batch mode.
[warn]   For better performance, hit [ENTER] to switch to interactive mode, or
[warn]   consider launching sbt without any commands, or explicitly passing 'shell'
[info] Loading project definition from /path/Scala/hello-world/project
[info] Set current project to hello-world (in build file:/path/Scala/hello-world/)
[info] Running example.Hello
[info] hello
[info] 
[error] 9 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
[error]         acks = 1
[error]         batch.size = 16384
[error]         bootstrap.servers = [server1.cloud.domain:9096, server2.cloud.domain:9096, server3.cloud.domain:9096]
[error]         buffer.memory = 33554432
[error]         client.dns.lookup = default
[error]         client.id =
[error]         compression.type = none
[error]         connections.max.idle.ms = 540000
[error]         delivery.timeout.ms = 120000
[error]         enable.idempotence = false
[error]         interceptor.classes = []
[error]         key.serializer = class org.apache.kafka.common.serialization.StringSerializer
[error]         linger.ms = 0
[error]         max.block.ms = 60000
[error]         max.in.flight.requests.per.connection = 5
[error]         max.request.size = 1048576
[error]         metadata.max.age.ms = 300000
[error]         metric.reporters = []
[error]         metrics.num.samples = 2
[error]         metrics.recording.level = INFO
[error]         metrics.sample.window.ms = 30000
[error]         partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
[error]         receive.buffer.bytes = 32768
[error]         reconnect.backoff.max.ms = 1000
[error]         reconnect.backoff.ms = 50
[error]         request.timeout.ms = 30000
[error]         retries = 0
[error]         retry.backoff.ms = 100
[error]         sasl.client.callback.handler.class = null
[error]         sasl.jaas.config = null
[error]         sasl.kerberos.kinit.cmd = /usr/bin/kinit
[error]         sasl.kerberos.min.time.before.relogin = 60000
[error]         sasl.kerberos.service.name = kafka
[error]         sasl.kerberos.ticket.renew.jitter = 0.05
[error]         sasl.kerberos.ticket.renew.window.factor = 0.8
[error]         sasl.login.callback.handler.class = null
[error]         sasl.login.class = null
[error]         sasl.login.refresh.buffer.seconds = 300
[error]         sasl.login.refresh.min.period.seconds = 60
[error]         sasl.login.refresh.window.factor = 0.8
[error]         sasl.login.refresh.window.jitter = 0.05
[error]         sasl.mechanism = GSSAPI
[error]         security.protocol = PLAINTEXT
[error]         send.buffer.bytes = 131072
[error]         ssl.cipher.suites = null
[error]         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
[error]         ssl.endpoint.identification.algorithm =
[error]         ssl.key.password = null
[error]         ssl.keymanager.algorithm = SunX509
[error]         ssl.keystore.location = null
[error]         ssl.keystore.password = null
[error]         ssl.keystore.type = JKS
[error]         ssl.protocol = TLS
[error]         ssl.provider = null
[error]         ssl.secure.random.implementation = null
[error]         ssl.trustmanager.algorithm = PKIX
[error]         ssl.truststore.location = null
[error]         ssl.truststore.password = null
[error]         ssl.truststore.type = JKS
[error]         transaction.timeout.ms = 60000
[error]         transactional.id = null
[error]         value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
[error]
[error] 109 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.1.1
[error] 109 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 21234bee31165527
[error] 248 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: 5NMDh7lDS-SxXpgprjR6oA
[success] Total time: 1 s, completed Mar 6, 2019 1:38:14 PM

I am sure, it has to do something with security or kerberos. But other apps are able to push message, not with my scala code.

UPDATE:

Based on the response from @tgrez , I tried to block with Future get.

 //producer.send(recordToSend)
    val metaF: Future[RecordMetadata] = producer.send(recordToSend)
    val meta = metaF.get() //blocking
    val msgLog =
    s"""
       |offset = ${meta.offset()}
       |partition = ${meta.partition()}
       |topic = ${meta.topic()}
     """.stripMargin
    println(msgLog)
    producer.close()

However still I am similar error.

[error] 10 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
[error]         acks = 1
[error]         batch.size = 16384
[error]         bootstrap.servers = [server1.cloud.domain:9096, server2.cloud.domain:9096, server3.cloud.domain:9096]
[error]         buffer.memory = 33554432
[error]         client.dns.lookup = default
[error]         client.id =
[error]         compression.type = none
[error]         connections.max.idle.ms = 540000
[error]         delivery.timeout.ms = 120000
[error]         enable.idempotence = false
[error]         interceptor.classes = []
[error]         key.serializer = class org.apache.kafka.common.serialization.StringSerializer
[error]         linger.ms = 0
[error]         max.block.ms = 60000
[error]         max.in.flight.requests.per.connection = 5
[error]         max.request.size = 1048576
[error]         metadata.max.age.ms = 300000
[error]         metric.reporters = []
[error]         metrics.num.samples = 2
[error]         metrics.recording.level = INFO
[error]         metrics.sample.window.ms = 30000
[error]         partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
[error]         receive.buffer.bytes = 32768
[error]         reconnect.backoff.max.ms = 1000
[error]         reconnect.backoff.ms = 50
[error]         request.timeout.ms = 30000
[error]         retries = 0
[error]         retry.backoff.ms = 100
[error]         sasl.client.callback.handler.class = null
[error]         sasl.jaas.config = null
[error]         sasl.kerberos.kinit.cmd = /usr/bin/kinit
[error]         sasl.kerberos.min.time.before.relogin = 60000
[error]         sasl.kerberos.service.name = kafka
[error]         sasl.kerberos.ticket.renew.jitter = 0.05
[error]         sasl.kerberos.ticket.renew.window.factor = 0.8
[error]         sasl.login.callback.handler.class = null
[error]         sasl.login.class = null
[error]         sasl.login.refresh.buffer.seconds = 300
[error]         sasl.login.refresh.min.period.seconds = 60
[error]         sasl.login.refresh.window.factor = 0.8
[error]         sasl.login.refresh.window.jitter = 0.05
[error]         sasl.mechanism = GSSAPI
[error]         security.protocol = PLAINTEXT
[error]         send.buffer.bytes = 131072
[error]         ssl.cipher.suites = null
[error]         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
[error]         ssl.endpoint.identification.algorithm =
[error]         ssl.key.password = null
[error]         ssl.keymanager.algorithm = SunX509
[error]         ssl.keystore.location = null
[error]         ssl.keystore.password = null
[error]         ssl.keystore.type = JKS
[error]         ssl.protocol = TLS
[error]         ssl.provider = null
[error]         ssl.secure.random.implementation = null
[error]         ssl.trustmanager.algorithm = PKIX
[error]         ssl.truststore.location = null
[error]         ssl.truststore.password = null
[error]         ssl.truststore.type = JKS
[error]         transaction.timeout.ms = 60000
[error]         transactional.id = null
[error]         value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
[error]
[error] 110 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.1.1
[error] 110 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 21234bee31165527
[error] 249 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: 5NMDh7lDS-SxXpgprjR6oA
[info]
[info] offset = 8
[info] partition = 1
[info] topic = myTopic
[info]
[error] 323 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[success] Total time: 1 s, completed Mar 6, 2019 3:26:53 PM

Anything I am missing here?

UPDATE 2:

As mentioned below, I changed my code. however it is not working either. I realized something is wrong in serialization.

I already have avroRecord in the GenericData.Record format. Can't I use the same to publish the data to Kafka? Why I have to use the Array of Bytes or any other serializer for the same?

Only example I found is to use io.confluent avro serializer. But I am unable to use that as sbt or maven is failing to download it now. Infact the URL: http://packages.confluent.io/maven/ is not working. Somehow I downloaded the jars and using it as external libraries.

Changed to code to:

props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")

val producer = new KafkaProducer[String, GenericData.Record](configuration)

val recordToSend = new ProducerRecord[String, GenericData.Record](topic, avroRecord)

Now it is working fine.

However, I am still looking for any other serializer class (which is available in Maven) to send the message as GenericData instead of Array of Bytes.

UPDATE 3:

As suggested by user @KZapagol, I tried to use the same and getting following error.

Schema : (It is complex, so need help if I am correctly transforming the data)

{"type": "record","name": "MyPnl","doc": "This schema contains the metadata fields wrapped in a header field which follows the official schema.","fields": [{"name":"header","type":{"type":"record","name":"header","fields":[{"name":"messageId","type":"string"},{"name":"businessId","type":"string"},{"name":"batchId","type":"string"},{"name":"sourceSystem","type":"string"},{"name":"secondarySourceSystem","type":[ "null", "string" ]},{"name":"sourceSystemCreationTimestamp","type":"long","logicalType": "timestamp-millis"},{"name":"sentBy","type":"string"},{"name":"sentTo","type":"string"},{"name":"messageType","type":"string"},{"name":"schemaVersion","type":"string"},{"name":"processing","type":"string"},{"name":"recordOffset","type":[ "null", "string" ]}]}},{"name":"pnlData","type":{"type":"record","name":"pnlData","fields":[{"name":"pnlHeader","type":{"type":"record","name":"pnlData","namespace":"pnlHeader","fields":[{"name":"granularity","type":"string"},{"name":"pnlType","type":"string"},{"name":"pnlSubType","type":"string"},{"name":"businessDate","type":"string","logicalType": "date"},{"name":"bookId","type":"string"},{"name":"bookDescription","type":"string"},{"name":"pnlStatus","type":"string"}]}},{"name":"pnlBreakDown","type":{"type":"array","items":{"type":"record","name":"pnlData","namespace":"pnlBreakDown","fields":[{"name":"category","type":[ "null", "string" ]},{"name":"subCategory","type":[ "null", "string" ]},{"name":"riskCategory","type":[ "null", "string" ]},{"name":"pnlCurrency","type":"string"},{"name":"pnlDetails", "type":{"type":"array","items": {"type":"record","name":"pnlData","namespace":"pnlDetails","fields":[{"name":"pnlLocalAmount","type":"double"},{"name":"pnlCDEAmount","type":"double"}]}}}]}}}]}}]}

I have corresponding case classes for above. (Please suggest if I have missed anything here?)

case class MessageHeader( messageId: String,
                   businessId: String,
                   batchId: String,
                   sourceSystem: String,
                   secondarySourceSystem: String,
                   sourceSystemCreationTimestamp: Long,
                   sentBy: String,
                   sentTo: String,
                   messageType: String,
                   schemaVersion: String,
                   processing: String,
                   recordOffset: String
                 )

case class PnlHeader (  granularity: String,
                        pnlType: String,
                        pnlSubType: String,
                        businessDate: String,
                        bookId: String,
                        bookDescription: String,
                        pnlStatus: String
                       )

case class PnlDetails (  pnlLocalAmount: Double,
                         pnlCDEAmount: Double
                        )

case class PnlBreakdown (  category: String,
                           subCategory: String,
                           riskCategory: String,
                           pnlCurrency: String,
                           pnlDetails: List[PnlDetails]
                          )

case class PnlData ( pnlHeader: PnlHeader, pnlBreakdown: List[PnlBreakdown] )

case class PnlRecord (header: MessageHeader, pnlData: PnlData )

I have modeled my data in above PnlRecord format. I have list of such records.

From list of such records, I iterate and try to publish it to Kafka.

 // Create Producer
    val producer = new KafkaProducer[String, Array[Byte]](properties)

 // This filename is file where above schema is saved.
    val avroJsonSchema = Source.fromFile(new File(schemaFileName)).getLines.mkString
    val avroMessage = new AvroMessage(avroJsonSchema)
    val avroRecord = new Record(avroMessage.schema)

// recordListToSend is of type: List[PnlRecord]
for (record <- recordListToSend) {
      avroRecord.put("header", record.header)
      avroRecord.put("pnlData", record.pnlData)
      //logger.info(s"Record: ${avroRecord}\n")
      avroMessage.gdw.write(avroRecord, EncoderFactory.get().binaryEncoder(avroMessage.baos, null))
      avroMessage.dfw.append(avroRecord)
      avroMessage.dfw.close()
      val bytes = avroMessage.baos.toByteArray

      // send data
      producer.send(new ProducerRecord[String, Array[Byte]](topic, bytes), new ProducerCallback)

      //flush data
      producer.flush()
      //flush and close producer
      producer.close()
    }

AvroMessage class (as suggested by user)

import java.io.ByteArrayOutputStream

import org.apache.avro
import org.apache.avro.Schema
import org.apache.avro.file.CodecFactory
import org.apache.avro.generic.{GenericDatumWriter, GenericRecord}


class AvroMessage(avroJsonSchema: String) {

  val parser = new Schema.Parser()
  val schema = parser.parse(avroJsonSchema)
  val baos = new ByteArrayOutputStream()
  val gdw = new GenericDatumWriter[GenericRecord](schema)
  val dfw = new avro.file.DataFileWriter[GenericRecord](gdw)
  val compressionLevel = 5
  dfw.setCodec(CodecFactory.deflateCodec(compressionLevel))
  dfw.create(schema, baos)

}

I am getting the below error:

2019-03-13 16:00:09.855 [application-akka.actor.default-dispatcher-11] ERROR controllers.SAController.$anonfun$publishToSA$2(34) - com.domain.sa.model.MessageHeader cannot be cast to org.apache.avro.generic.IndexedRecord
java.lang.ClassCastException: ca.domain.my.sa.model.MessageHeader cannot be cast to org.apache.avro.generic.IndexedRecord
        at org.apache.avro.generic.GenericData.getField(GenericData.java:697)
        at org.apache.avro.generic.GenericData.getField(GenericData.java:712)
        at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:164)
        at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
        at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
        at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
        at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
        at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
        at ca.domain.my.sa.dao.myPnlDao$.$anonfun$publishAvroToKafka$1(myPnlDao.scala:95)

Is my original case classes are right as per the schema?

My MessageHeader case class is shown above.

My Schema is shown above (updated).

My Record:

Record: {"header": Header(my_20190313180602_00000011,my_BookLevel_Daily_Regular_20181130_EMERGINGTRS,11_20181130_8259,my,null,65162584,my,SA,PnLMessage,test,RealTime,null), "pnlData": PnlData(PnlHeader(BookLevel,Daily,Regular,2018-11-30,8259,EMERGINGTRS,Locked),List(PnlBreakdown(null,null,null,eur,List(PnlDetails(0.0,0.0022547507286072))), PnlBreakdown(null,null,null,jpy,List(PnlDetails(0.0,0.0))), PnlBreakdown(null,null,null,usd,List(PnlDetails(0.19000003399301,0.642328574985149))), PnlBreakdown(null,null,null,brl,List(PnlDetails(2.65281414613128E-8,2.4107750505209E-5))), PnlBreakdown(null,null,null,gbp,List(PnlDetails(0.0,-5.05781173706088E-5))), PnlBreakdown(null,null,null,cad,List(PnlDetails(145.399999991953,145.399999991953)))))}
Mihir
  • 531
  • 2
  • 10
  • 35
  • confluent repo is available in maven, but is not browsable in browser, see: https://stackoverflow.com/questions/43488853/confluent-maven-repository-not-working – tgrez Mar 08 '19 at 14:34

2 Answers2

0

It could be simpler than it seems. The send method is asynchronous, it returns a Future<RecordMetadata>. Your example exits before the message is actually sent.

Kafka producer is batching messages in the background, so to ensure the messages are sent you should either block with e.g. Future.get (this means waiting for broker to respond with metadata) or ensure buffers are flushed with kafkaProducer.flush().

In tests I recommend to block on Future.

tgrez
  • 704
  • 5
  • 10
  • Thanks for the response. I am pretty new to Scala, so let me search about the Future. – Mihir Mar 06 '19 at 19:33
  • I tried the Future. See update in the original question, doesn't work. – Mihir Mar 06 '19 at 20:37
  • Was the message delivered to the Kafka topic? It should be now. – tgrez Mar 07 '19 at 10:23
  • As per my latest updates (see original questions), blocking with Future is definitely helping. However the issue is not resolved, as messages were not delivered to Kafka. I had to change the serializtion (see UPDATE2 above) and then only it works. So not sure what is the issue in serializing the avro record. – Mihir Mar 08 '19 at 03:39
  • I usually implemented my own KafkaAvroSerializer in such cases, maybe you can try to change SpecificDatumWriter to GenericDatumWriter in your code? see also: https://www.sderosiaux.com/articles/2017/03/02/serializing-data-efficiently-with-apache-avro-and-dealing-with-a-schema-registry/#how-to-convert-avro-records-to-bytes-and-vice-versa – tgrez Mar 08 '19 at 10:31
0

Please update your code as below and try once. It looks like you have not closed the output stream,encoder and producer properly.

val producer = new KafkaProducer[String, Array[Byte]](configuration)
  val writer = new SpecificDatumWriter[GenericRecord](schema)
  val out = new ByteArrayOutputStream()
  val encoder = EncoderFactory.get.binaryEncoder(out, null)
  writer.write(avroRecord, encoder)

  val serializedBytes: Array[Byte] = out.toByteArray()

  encoder.flush()
  out.close()


  val recordToSend = new ProducerRecord[String, Array[Byte]](topic, serializedBytes)
  producer.send(recordToSend,new ProducerCallback)

  //flush data
  producer.flush()
  //flush and close producer
  producer.close()



class ProducerCallback(implicit logger: Logger) extends Callback {

  override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
    //executes every time a record is successfully sent or exception thrown
    Option(metadata) match {
      case Some(_) =>
        logger.info("Received new metadata. \n" +
          "Topic: " + metadata.topic() + "\n" +
          "Partition: " + metadata.partition() + "\n" +
          "Offset: " + metadata.offset() + "\n" +
          "Timestamp: " + metadata.timestamp() + "\n" +
          "Checksum: " + metadata.checksum())
      case None => ;
    }
    Option(exception) match {
      case Some(_) =>
        logger.error("Exception thrown during processing of record... " + exception)
        throw exception
      case None => ;
    }
  }
}

Please refer link https://github.com/Zapagol/apache-kafka/tree/master/src/main/scala/com/org/apache for more kafka producer and consumer examples. Hope it will help!

Update

I have added KafkaProducer example for Avroschema input. Please refer https://github.com/Zapagol/apache-kafka/blob/master/src/main/scala/com/org/apache/producers/ProducerForAvroschema.scala .

I have used apache avro jar and sample avsc file as below. Please modify schema file according to your requirement.And I am able to produce record successfully.

{
   "type": "record",
   "name": "employee",
   "fields": [
      {"name": "name", "type": "string"},
      {"name": "id", "type": "int"},
      {"name": "mobileNumber", "type": ["string", "null"]},
      {"name": "salary", "type": ["int", "null"]}
  ]
}
KZapagol
  • 888
  • 6
  • 9
  • Thanks for the input. I tried the same, however it was still not able to send the message. I wanted to check if my serialization is happening correctly or not. I tried to download the io.confluent JARs and used "io.confluent.kafka.serializers.KafkaAvroSerializer" as my Value seriliazer. Basically my record: `val recordToSend = new ProducerRecord[String, GenericData.Record](topic, avroRecord)` For this, only serializer I found is from io.confluent. Is there any other way to use the above? i.e.I have avro record (GenericRecord) and send the same format to Kafka? No bytesArray. – Mihir Mar 08 '19 at 03:25
  • If you have avsc file which defines schema for your data then you can use `org.apache.avro.generic.GenericData` and `org.apache.avro.generic.GenericData.Record` or else you can use `avro4g` jar. – KZapagol Mar 08 '19 at 05:17
  • Hi @KZapagol, thanks. Yes, I have avsc file.My avro record is created as per that. `val avroRecord = new GenericData.Record(schema)` I wanted to know after this, when we send the data to Kafka, what should be the Value serializer (`props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")`). This works but I have issues with the library in maven (see my details above). If I use `org.apache.kafka.common.serialization.ByteArraySerializer`, this is where I think something is wrong. Basically, since I have data converted in avro, can I sent it directly? – Mihir Mar 08 '19 at 13:33
  • Hi @Mihir I have created Producer example with avro schema input. I have provided link above. Please update avsc file according to your requirement.This would be the only change you need to make i believe. – KZapagol Mar 08 '19 at 14:33
  • Hi @KZapagol: Thanks for the input, sorry to come back late, was working on some other things. I am updating my original question with the issue I am facing. I tried to use your example. – Mihir Mar 13 '19 at 20:09
  • Hi @KZapagol. Thanks yes. I am able to use your emaple to send the sample message in avro format to Kafka. I marked to accept the solution. Thanks again. Now I tried to publish the original message which is nested. Based on AVSC file I created my case class. Not sure what I am doing wrong there. – Mihir Mar 14 '19 at 13:06