0

I have a siddhi app whose definition is as follows:

@App:name('UMBAlarm')

@sink(type='log')
define stream logStream(OCName string );


        
        
@source(type='kafka',
        topic.list='TEST2',
        partition.no.list='0',
        threading.option='single.thread',
        group.id="group",
        bootstrap.servers='bt1svpff:9092',
        @map(type='avro',
        schema .def = """{     "type":"record",     "name":"AvroTemipAlarm",     "namespace":"com.hp.ossa.fault.avro",     "fields":             [                     {"name":"OCName","type":"string"},                     {"name":"Identifier","type":"long"},                     {"name":"AttributeList","type":                         {"type":"array","items":                             {"type":"record",                                 "name":"AttributeRecord",                                 "fields":                                     [                                         {"name":"AttributeId","type":"long"},                                         {"name":"AttributeName","type":"string"},                                         {"name":"AttributeType","type":"int"},                                         {"name":"IntValue","type":{"type":"array","items":"int"}},                                         {"name":"LongValue","type":{"type":"array","items":"long"}},                                         {"name":"StringValue","type":{"type":"array","items":"string"}},                                         {"name":"BooleanValue","type":{"type":"array","items":"boolean"}},                                         {"name":"DoubleValue","type":{"type":"array","items":"double"}}]}}}]}""",
        @attributes(OCName="OCName") ), use.avro.deserializer="true")
define stream hbStream (OCName string);
from hbStream select * insert into logStream;

However, when I run the app and inject messages into the kafka topic, I get an error in the streaming integrator tooling console:

[2021-11-15_18-22-21_731] ERROR {io.siddhi.extension.map.avro.sourcemapper.AvroSourceMapper} - Event object is invalid. Expected byte Array or ByteBuffer, but found java.lang.String 
[2021-11-15_18-22-21_731] ERROR {io.siddhi.core.stream.input.source.SourceMapper} - Error while processing ',qualif5_ns:.ocv8.test2� B  Managed Objectn   vnetworkPLMN qualif5_ns:.OSS2.MED_U rnc RNC0538 nodeb N22595DnetworkPLMN qualif5_ns:.OSS2.MED_U*networkPLMN.rnc.nodeb networkPLMN     Alarm Type       EquipmentAlarm    $Perceived Severity      
Major     Additional Text    BSRIKANTH IS PERFORMING A TEST MOE     Probable Cause       Unknown   ��  EMR Alias     N22595   ܜ 
User1     OMCENM02   � 
user2    
User2   � 
user3    
User3   � "Specific ProblemsF  �
                        �&      �� (�   #                               �    (�   �  (�   ', for the input Mapping 'avro' for the stream 'hbStream'. (Encoded) 
io.siddhi.core.exception.MappingFailedException
    at io.siddhi.extension.map.avro.sourcemapper.AvroSourceMapper.mapAndProcess(AvroSourceMapper.java:250)
    at io.siddhi.core.stream.input.source.SourceMapper.onEvent(SourceMapper.java:200)
    at io.siddhi.core.stream.input.source.SourceMapper.onEvent(SourceMapper.java:153)
    at io.siddhi.extension.io.kafka.source.KafkaConsumerThread.run(KafkaConsumerThread.java:240)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Does anyone know how to solve this issue? I wrote a decoder for these messages in python and it works perfectly (as a standalone application).

The python code is below:

#!/opt/appl/automaton/bin/python3

import io

from confluent_kafka import Consumer, KafkaError
from avro.io import DatumReader, BinaryDecoder
import avro.schema
import avro
import logging
logging.basicConfig(level=logging.INFO)
from tabulate import tabulate


schema = avro.schema.Parse(open("/appl/oss2/users/temip/alarm.avsc").read())
reader = DatumReader(schema)



def decode(msg_value):
    message_bytes = io.BytesIO(msg_value)
    decoder = BinaryDecoder(message_bytes)
    event_dict = reader.read(decoder)
    return event_dict


conf = {'bootstrap.servers': 'bt1svpff:9092', 'group.id': 'mygroup',
        'auto.offset.reset': 'earliest'}


#emptyList

c = Consumer(conf)
c.subscribe(['TEST2'])
running = True
while running:
    msg = c.poll()
    if not msg.error():
        AlarmObject=[]
         event_dict = decode(msg_value)
        print(event_dict)
        print(type(event_dict))
        print(event_dict["OCName"])
        print(event_dict["Identifier"])
SrikanthR
  • 15
  • 6
  • What is the deserialiser class that you use in your python application? – Dilini Nov 16 '21 at 09:59
  • The "value.deserializer" used in Siddhi Kafka Source is , "io.confluent.kafka.serializers.KafkaAvroDeserializer" – Dilini Nov 16 '21 at 10:02
  • First of all, thanks for your reply. I have used the following déserializer in my python code: ... Python code: schema = avro.schema.Parse(open("/appl/oss2/users/srikanth/alarm.avsc").read()) reader = DatumReader(schema) def decode(msg_value): message_bytes = io.BytesIO(msg_value) decoder = BinaryDecoder(message_bytes) event_dict = reader.read(decoder) return event_dict c = Consumer(conf) c.subscribe(['TEST2']) while True: msg = c.poll() msg_value = msg.value() event_dict = decode(msg_value) print(event_dict) – SrikanthR Nov 17 '21 at 10:02
  • Please refer https://stackoverflow.com/a/65805815/1033469 – Dilini Nov 22 '21 at 11:40

0 Answers0