0

(end goal) before trying out whether i could eventually read avro data, usng spark stream, out of the Confluent Platform like some described here: Integrating Spark Structured Streaming with the Confluent Schema Registry

I'd to verify whether I could use below command to read them:

$ kafka-avro-console-consumer \
> --topic my-topic-produced-using-file-pulse-xml \
> --from-beginning \
> --bootstrap-server localhost:9092 \
> --property schema.registry.url=http://localhost:8081

I receive this error message, Unknown magic byte

Processed a total of 1 messages
[2020-09-10 12:59:54,795] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:76)
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2020-09-10 12:59:54,795] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:76)
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

note, The message can be read like this (using console-consumer instead of avro-console-consumer):

kafka-console-consumer \
--bootstrap-server localhost:9092 --group my-group-console \
--from-beginning \
--topic my-topic-produced-using-file-pulse-xml

The message was produced using confluent connect file-pulse (1.5.2) reading xml file (streamthoughts/kafka-connect-file-pulse)

Please help here: Did I use the kafka-avro-console-consumer wrong? I tried "deserializer" properties options described here: https://stackoverflow.com/a/57703102/4582240, did not help

I did not want to be brave to start the spark streaming to read the data yet.

the file-pulse 1.5.2 properties i used are like below added 11/09/2020 for completion.

name=connect-file-pulse-xml
connector.class=io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector
topic= my-topic-produced-using-file-pulse-xml
tasks.max=1

# File types
fs.scan.filters=io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter
file.filter.regex.pattern=.*\\.xml$
task.reader.class=io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader
force.array.on.fields=sometagNameInXml

# File scanning
fs.cleanup.policy.class=io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy
fs.scanner.class=io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker

fs.scan.directory.path=/tmp/kafka-connect/xml/
fs.scan.interval.ms=10000

# Internal Reporting
internal.kafka.reporter.bootstrap.servers=localhost:9092
internal.kafka.reporter.id=connect-file-pulse-xml
internal.kafka.reporter.topic=connect-file-pulse-status

# Track file by name
offset.strategy=name

1 Answers1

2

If you are getting Unknown Magic Byte with the consumer, then the producer didn't use the Confluent AvroSerializer, and might have pushed Avro data that doesn't use the Schema Registry.

Without seeing the Producer code or consuming and inspecting the data in binary format, it is difficult to know which is the case.

The message was produced using confluent connect file-pulse

Did you use value.converter with the AvroConverter class?

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • Hi @OneCriketeer, I did not setup explicitly. I just used the default properties file came with it https://stackoverflow.com/questions/63796593/kafka-connect-file-pulse-connector-standalone-could-not-startclass-not-found-ex myquestion has the content of what i used. I will see if i can paste my final one here. – soMuchToLearnAndShare Sep 11 '20 at 07:54
  • I've now added the properties used in the orignal question. @OneCricketeer. Please also note: I did not have problems using kafka-console-consumer to consume/read the message. I did have problem with ```avro-console``` consumer. – soMuchToLearnAndShare Sep 11 '20 at 08:01
  • Hi @OneCricketeer, when you say "did you use value.converter", do you mean in the file-pulse properties? or did you mean when i read via the avro-console? either case, I did not use it. I assume the connector use it by default. – soMuchToLearnAndShare Sep 11 '20 at 12:45
  • Hi @OneCricketeer, thanks for the tip. i checked my run instance again, and found ```key.converter = null value.converter = null ``` – soMuchToLearnAndShare Sep 11 '20 at 12:51
  • 1
    after OneCriketeer's tip. I found this article really helps: https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/ – soMuchToLearnAndShare Sep 11 '20 at 15:12
  • 1
    @Minnie The default in `connect-standalone.properties` is JSONConverter, I believe. And they shouldn't be null since then Connect wouldn't know how to serialize data – OneCricketeer Sep 11 '20 at 16:40
  • Yes. The log I pasted is half, it is saying what the user specified In the file pulse properties file - nothing. So it did use the default. I checked the full log. – soMuchToLearnAndShare Sep 11 '20 at 23:38