I have a siddhi app whose definition is as follows:
@App:name('UMBAlarm')
@sink(type='log')
define stream logStream(OCName string );
@source(type='kafka',
topic.list='TEST2',
partition.no.list='0',
threading.option='single.thread',
group.id="group",
bootstrap.servers='bt1svpff:9092',
@map(type='avro',
schema .def = """{ "type":"record", "name":"AvroTemipAlarm", "namespace":"com.hp.ossa.fault.avro", "fields": [ {"name":"OCName","type":"string"}, {"name":"Identifier","type":"long"}, {"name":"AttributeList","type": {"type":"array","items": {"type":"record", "name":"AttributeRecord", "fields": [ {"name":"AttributeId","type":"long"}, {"name":"AttributeName","type":"string"}, {"name":"AttributeType","type":"int"}, {"name":"IntValue","type":{"type":"array","items":"int"}}, {"name":"LongValue","type":{"type":"array","items":"long"}}, {"name":"StringValue","type":{"type":"array","items":"string"}}, {"name":"BooleanValue","type":{"type":"array","items":"boolean"}}, {"name":"DoubleValue","type":{"type":"array","items":"double"}}]}}}]}""",
@attributes(OCName="OCName") ), use.avro.deserializer="true")
define stream hbStream (OCName string);
from hbStream select * insert into logStream;
However, when I run the app and inject messages into the kafka topic, I get an error in the streaming integrator tooling console:
[2021-11-15_18-22-21_731] ERROR {io.siddhi.extension.map.avro.sourcemapper.AvroSourceMapper} - Event object is invalid. Expected byte Array or ByteBuffer, but found java.lang.String
[2021-11-15_18-22-21_731] ERROR {io.siddhi.core.stream.input.source.SourceMapper} - Error while processing ',qualif5_ns:.ocv8.test2� B Managed Objectn vnetworkPLMN qualif5_ns:.OSS2.MED_U rnc RNC0538 nodeb N22595DnetworkPLMN qualif5_ns:.OSS2.MED_U*networkPLMN.rnc.nodeb networkPLMN Alarm Type EquipmentAlarm $Perceived Severity
Major Additional Text BSRIKANTH IS PERFORMING A TEST MOE Probable Cause Unknown �� EMR Alias N22595 ܜ
User1 OMCENM02 �
user2
User2 �
user3
User3 � "Specific ProblemsF �
�& �� (� # � (� � (� ', for the input Mapping 'avro' for the stream 'hbStream'. (Encoded)
io.siddhi.core.exception.MappingFailedException
at io.siddhi.extension.map.avro.sourcemapper.AvroSourceMapper.mapAndProcess(AvroSourceMapper.java:250)
at io.siddhi.core.stream.input.source.SourceMapper.onEvent(SourceMapper.java:200)
at io.siddhi.core.stream.input.source.SourceMapper.onEvent(SourceMapper.java:153)
at io.siddhi.extension.io.kafka.source.KafkaConsumerThread.run(KafkaConsumerThread.java:240)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Does anyone know how to solve this issue? I wrote a decoder for these messages in python and it works perfectly (as a standalone application).
The python code is below:
#!/opt/appl/automaton/bin/python3
import io
from confluent_kafka import Consumer, KafkaError
from avro.io import DatumReader, BinaryDecoder
import avro.schema
import avro
import logging
logging.basicConfig(level=logging.INFO)
from tabulate import tabulate
schema = avro.schema.Parse(open("/appl/oss2/users/temip/alarm.avsc").read())
reader = DatumReader(schema)
def decode(msg_value):
message_bytes = io.BytesIO(msg_value)
decoder = BinaryDecoder(message_bytes)
event_dict = reader.read(decoder)
return event_dict
conf = {'bootstrap.servers': 'bt1svpff:9092', 'group.id': 'mygroup',
'auto.offset.reset': 'earliest'}
#emptyList
c = Consumer(conf)
c.subscribe(['TEST2'])
running = True
while running:
msg = c.poll()
if not msg.error():
AlarmObject=[]
event_dict = decode(msg_value)
print(event_dict)
print(type(event_dict))
print(event_dict["OCName"])
print(event_dict["Identifier"])