7

I am experiencing a reproducible error while producing Avro messages with reactive kafka and avro4s. Once the identityMapCapacity of the client (CachedSchemaRegistryClient) is reached, serialization fails with

java.lang.IllegalStateException: Too many schema objects created for <myTopic>-value

This is unexpected, since all messages should have the same schema - they are serializations of the same case class.

val avroProducerSettings: ProducerSettings[String, GenericRecord] = 
  ProducerSettings(system, Serdes.String().serializer(), 
  avroSerde.serializer())
 .withBootstrapServers(settings.bootstrapServer)

val avroProdFlow: Flow[ProducerMessage.Message[String, GenericRecord, String],
                    ProducerMessage.Result[String, GenericRecord, String],
                    NotUsed] = Producer.flow(avroProducerSettings)

val avroQueue: SourceQueueWithComplete[Message[String, GenericRecord, String]] = 
  Source.queue(bufferSize, overflowStrategy)
  .via(avroProdFlow)
  .map(logResult)
  .to(Sink.ignore)
  .run()

...
queue.offer(msg)

The serializer is a KafkaAvroSerializer, instantiated with a new CachedSchemaRegistryClient(settings.schemaRegistry, 1000)

Generating the GenericRecord:

def toAvro[A](a: A)(implicit recordFormat: RecordFormat[A]): GenericRecord =
  recordFormat.to(a)

val makeEdgeMessage: (Edge, String) => Message[String, GenericRecord, String] = { (edge, topic) =>
  val edgeAvro: GenericRecord = toAvro(edge)
  val record   = new ProducerRecord[String, GenericRecord](topic, edge.id, edgeAvro)
  ProducerMessage.Message(record, edge.id)
}

The schema is created deep in the code (io.confluent.kafka.serializers.AbstractKafkaAvroSerDe#getSchema, invoked by io.confluent.kafka.serializers.AbstractKafkaAvroSerializer#serializeImpl) where I have no influence on it, so I have no idea how to fix the leak. Looks to me like the two confluent projects do not work well together.

The issues I have found here, here and here do not seem to address my use case.

The two workarounds for me are currently:

  • not use schema registry - not a long-term solution obviously
  • create custom SchemaRegistryClient not relying on object identity - doable but I would like to avoid creating more issues than by reimplementing

Is there a way to generate or cache a consistent schema depending on message/record type and use it with my setup?

kostja
  • 60,521
  • 48
  • 179
  • 224

1 Answers1

6

edit 2017.11.20

The issue in my case was that each instance of GenericRecord carrying my message has been serialized by a different instance of RecordFormat, containing a different instance of the Schema. The implicit resolution here generated a new instance each time.

def toAvro[A](a: A)(implicit recordFormat: RecordFormat[A]): GenericRecord = recordFormat.to(a)

The solution was to pin the RecordFormat instance to a val and reuse it explicitly. Many thanks to https://github.com/heliocentrist for explaining the details.

original response:

After waiting for a while (also no answer for the github issue) I had to implement my own SchemaRegistryClient. Over 90% is copied from the original CachedSchemaRegistryClient, just translated into scala. Using a scala mutable.Map fixed the memory leak. I have not performed any comprehensive tests, so use at your own risk.

import java.util

import io.confluent.kafka.schemaregistry.client.rest.entities.{ Config, SchemaString }
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.ConfigUpdateRequest
import io.confluent.kafka.schemaregistry.client.rest.{ RestService, entities }
import io.confluent.kafka.schemaregistry.client.{ SchemaMetadata, SchemaRegistryClient }
import org.apache.avro.Schema

import scala.collection.mutable

class CachingSchemaRegistryClient(val restService: RestService, val identityMapCapacity: Int)
    extends SchemaRegistryClient {

  val schemaCache: mutable.Map[String, mutable.Map[Schema, Integer]] = mutable.Map()
  val idCache: mutable.Map[String, mutable.Map[Integer, Schema]] =
    mutable.Map(null.asInstanceOf[String] -> mutable.Map())
  val versionCache: mutable.Map[String, mutable.Map[Schema, Integer]] = mutable.Map()

  def this(baseUrl: String, identityMapCapacity: Int) {
    this(new RestService(baseUrl), identityMapCapacity)
  }

  def this(baseUrls: util.List[String], identityMapCapacity: Int) {
    this(new RestService(baseUrls), identityMapCapacity)
  }

  def registerAndGetId(subject: String, schema: Schema): Int =
    restService.registerSchema(schema.toString, subject)

  def getSchemaByIdFromRegistry(id: Int): Schema = {
    val restSchema: SchemaString = restService.getId(id)
    (new Schema.Parser).parse(restSchema.getSchemaString)
  }

  def getVersionFromRegistry(subject: String, schema: Schema): Int = {
    val response: entities.Schema = restService.lookUpSubjectVersion(schema.toString, subject)
    response.getVersion.intValue
  }

  override def getVersion(subject: String, schema: Schema): Int = synchronized {
    val schemaVersionMap: mutable.Map[Schema, Integer] =
      versionCache.getOrElseUpdate(subject, mutable.Map())

    val version: Integer = schemaVersionMap.getOrElse(
      schema, {
        if (schemaVersionMap.size >= identityMapCapacity) {
          throw new IllegalStateException(s"Too many schema objects created for $subject!")
        }

        val version = new Integer(getVersionFromRegistry(subject, schema))
        schemaVersionMap.put(schema, version)
        version
      }
    )
    version.intValue()
  }

  override def getAllSubjects: util.List[String] = restService.getAllSubjects()

  override def getByID(id: Int): Schema = synchronized { getBySubjectAndID(null, id) }

  override def getBySubjectAndID(subject: String, id: Int): Schema = synchronized {
    val idSchemaMap: mutable.Map[Integer, Schema] = idCache.getOrElseUpdate(subject, mutable.Map())
    idSchemaMap.getOrElseUpdate(id, getSchemaByIdFromRegistry(id))
  }

  override def getSchemaMetadata(subject: String, version: Int): SchemaMetadata = {
    val response = restService.getVersion(subject, version)
    val id       = response.getId.intValue
    val schema   = response.getSchema
    new SchemaMetadata(id, version, schema)
  }

  override def getLatestSchemaMetadata(subject: String): SchemaMetadata = synchronized {
    val response = restService.getLatestVersion(subject)
    val id       = response.getId.intValue
    val version  = response.getVersion.intValue
    val schema   = response.getSchema
    new SchemaMetadata(id, version, schema)
  }

  override def updateCompatibility(subject: String, compatibility: String): String = {
    val response: ConfigUpdateRequest = restService.updateCompatibility(compatibility, subject)
    response.getCompatibilityLevel
  }

  override def getCompatibility(subject: String): String = {
    val response: Config = restService.getConfig(subject)
    response.getCompatibilityLevel
  }

  override def testCompatibility(subject: String, schema: Schema): Boolean =
    restService.testCompatibility(schema.toString(), subject, "latest")

  override def register(subject: String, schema: Schema): Int = synchronized {
    val schemaIdMap: mutable.Map[Schema, Integer] =
      schemaCache.getOrElseUpdate(subject, mutable.Map())

    val id = schemaIdMap.getOrElse(
      schema, {
        if (schemaIdMap.size >= identityMapCapacity)
          throw new IllegalStateException(s"Too many schema objects created for $subject!")
        val id: Integer = new Integer(registerAndGetId(subject, schema))
        schemaIdMap.put(schema, id)
        idCache(null).put(id, schema)
        id
      }
    )
    id.intValue()
  }
}
kostja
  • 60,521
  • 48
  • 179
  • 224
  • Nice that you manage to solve it. But implement your own `SchemaRegistryClient` definitely should not be the solution. – Rick Nov 14 '17 at 21:26
  • 1
    this is exactly the reason why this is an open question at SO ;) I will gladly accept better solutions – kostja Nov 15 '17 at 08:24
  • Totally agree with you. I'm having exactly the same issue, and I'm willing to try your solution. How did you plugin your class with Kafka? Did you just changed the property `properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, ))`? – Rick Nov 15 '17 at 10:17
  • Or how do you actually use it? Sorry to bother you with questions, just facing the same issue here. :) – Rick Nov 15 '17 at 10:44
  • NP, I am working with it in 2 ways: `val props = Map("schema.registry.url" -> "mock") // irrelevant for the schema cache issue val client: CachingSchemaRegistryClient = new CachingSchemaRegistryClient(settings.schemaRegistry, 1000) val serializer: KafkaAvroSerializer = new KafkaAvroSerializer(client, props.asJava)` – kostja Nov 15 '17 at 12:00
  • 1
    I am also using a different approach, which is dirtier, but only requires overriding a single method of the KafkaAvroSerilizer so it returns the same instance and is thus much faster: `val schema: Schema = SchemaFor[ComplexMsg]() val prepSerializer: KafkaAvroSerializer = new KafkaAvroSerializer(client) { override def getSchema(`object`: Object): Schema = schema } val prepProducerSettings: ProducerSettings[AnyRef, AnyRef] = ProducerSettings(system, serializer, prepSerializer) .withBootstrapServers(kafkaSettings.bootstrapServer)` – kostja Nov 15 '17 at 12:03
  • Thank you. The `KafkaAvroSerializer` method sounds easier and would better fit my code. This issue is quite a shame :( – Rick Nov 15 '17 at 12:38
  • 1
    @Rick you might be interested to know the proper solution ;) please see the edit. – kostja Nov 20 '17 at 07:36
  • 1
    Heliocentrist is actually my colleague at work :) I was discussing the problem with him when he had the idea that solved the problem. All credits goes to him. But many thanks for writing back to me in the comments. Cheers! :) – Rick Nov 20 '17 at 09:51