1

I have a simple stream processor (not consumer/producer) that looks like this (Kotlin)

@Bean
fun processFoo():Function<KStream<FooName, FooAddress>, KStream<FooName, FooAddressPlus>> {
    return Function { input-> input.map { key, value ->
        println("\nPAYLOAD KEY: ${key.name}\n");
        println("\nPAYLOAD value: ${value.address}\n");
        val output = FooAddressPlus()
        output.address = value.address
        output.name = value.name
        output.plus = "$value.name-$value.address"
        KeyValue(key, output)
    }}
}

These classes FooName, FooAddress and FooAddressPlus are in the same package as the processor. Here’s my config file:

spring.cloud.stream.kafka.binder:
  brokers: localhost:9093

spring.cloud.stream.function.definition: processFoo

spring.cloud.stream.kafka.streams.binder.functions.processFoo.applicationId: foo-processor
spring.cloud.stream.bindings.processFoo-in-0:
  destination: foo.processor
spring.cloud.stream.bindings.processFoo-out-0:
  destination: foo.processor.out

spring.cloud.stream.kafka.streams.binder:
  deserializationExceptionHandler: logAndContinue
  configuration:
    default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
    default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
    commit.interval.ms: 1000

I get this error when running the processor:

The class '<here_comes_package>.FooAddress' is not in the trusted packages: [java.util, java.lang]. 
If you believe this class is safe to deserialize, please provide its name. 
If the serialization is only done by a trusted source, you can also enable trust all (*).

What is the best way to set trusted packages to everything when using Kafka Streams Binder Stream Processor? (no consumer/producer but stream processor)

Thanks a lot!

  • Does this answer your question? [Spring Kafka The class is not in the trusted packages](https://stackoverflow.com/questions/51688924/spring-kafka-the-class-is-not-in-the-trusted-packages) – jokarls Oct 25 '20 at 06:14
  • @jokarls Thanks. That was one the things I have tried before but I have a stream processor and in that answer they are using a consumer and producer separately. In that case they either set the consumer/producer props in the config (spring.kafka.consumer.properties.spring.json.trusted.packages) or they have their consumer/producer factories where they set the deserializer. Since I have a stream processor, there's no consumer nor producer props or factories that I can set, but I might be wrong about this. – Gonzalo Borobio Oct 25 '20 at 07:18

2 Answers2

5

You can set arbitrary configuration properties (Kafka streams properties, properties used by the serdes, etc) under ...binder.configuration:

spring:
  cloud:
    stream:
      kafka:
        streams:
          binder:
            configuration:
              spring.json.trusted.packages: '*'
Andras Hatvani
  • 4,346
  • 4
  • 29
  • 45
Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • 1
    That code is useful, but your answer would be better if you could add a description on how this code fragment addresses the problem – Cleptus Oct 26 '20 at 13:57
  • Thanks @gary-russel. As asked here https://stackoverflow.com/questions/64516217/spring-cloud-stream-with-kafka-stream-binder-using-topologytestdriver-i-get-the?noredirect=1#comment114116760_64516217 how can I set this property in tests when using `TopologyTestDriver`? I can't make it work when setting this porperty in TopologYTestDriver creation. Thanks – Gonzalo Borobio Oct 26 '20 at 14:51
  • @gary-russell this doesn't work (anymore), I see the following: `2021-04-12 13:46:55.767 [main] WARN o.a.k.c.consumer.ConsumerConfig - The configuration 'spring.json.trusted.packages' was supplied but isn't a known config.` – Andras Hatvani Apr 12 '21 at 11:47
  • That is just a bogus warning coming from the kafka Consumer about a property it doesn't recognize. In fact it proves that the property **is** being set correctly - the same properties are passed into the serde in its `configure()` method. If you are having problems, ask a new question showing code, configuration, versions, logs etc. – Gary Russell Apr 12 '21 at 13:12
0

I am using Spring Cloud Stream binder for Kafka, adding below properties has resolved deserialization issue with trusted packages

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.trusted.packages=*

riaz7se
  • 91
  • 1
  • 3