0

In NiFi first I'm converting JSON to Avro and then from Avro to JSON. But while converting from Avro to JSON I'm getting exception.

while converting from Avro to JSON I'm getting the below exception:

2019-07-23 12:48:04,043 ERROR [Timer-Driven Process Thread-9] o.a.n.processors.avro.ConvertAvroToJSON ConvertAvroToJSON[id=1db0939d-016c-1000-caa3-80d0993c3468] ConvertAvroToJSON[id=1db0939d-016c-1000-caa3-80d0993c3468] failed to process session due to org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -40; Processor Administratively Yielded for 1 sec: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -40
org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -40
    at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:430)
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
    at org.apache.nifi.processors.avro.ConvertAvroToJSON$1.process(ConvertAvroToJSON.java:161)
    at org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:2887)
    at org.apache.nifi.processors.avro.ConvertAvroToJSON.onTrigger(ConvertAvroToJSON.java:148)
    at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
    at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1162)
    at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:209)
    at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
    at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Below is the template file: https://community.hortonworks.com/storage/attachments/109978-avro-to-json-and-json-to-avro.xml

The flow that I have drawn is: enter image description here

The input json is:

{
    "name":"test",
    "company":{
        "exp":"1.5"
    }
}

The converted avro data is:

Objavro.schema {"type":"record","name":"MyClass","namespace":"com.acme.avro","fields":[{"name":"name","type":"string"},{"name":"company","type":{"type":"record","name":"company","fields":[{"name":"exp","type":"string"}]}}]}avro.codecdeflate�s™ÍRól&D³DV`•ÔÃ6ã(I-.a3Ô3�s™ÍRól&D³DV`•ÔÃ6
ashok
  • 1,078
  • 3
  • 20
  • 63

1 Answers1

0

For avro data file schema will be embedded and you want to write all the fields in CSV format so we don't need to setup the registry. if you are writing only specific columns not all and for other formats JSON.. then schema registry is required - Shu

NIFI Malformed Data.Length is negative

More about internals

Vikramsinh Shinde
  • 2,742
  • 2
  • 23
  • 29
  • Thank you I removed the schema from the ConvertAvroToJson now it is working, But I didn't understand the problem. If we use schema registry the schema won't be added to avro content that is being published to kafka, only schema ID will be sent is my understanding correct? – ashok Jul 23 '19 at 11:27