I have written a storm topology. I basically want to send tuples in avro schema in form of byte array to kafka topic.
This is how I set the bolt :
builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt<String, byte[]>())
.fieldsGrouping(BOLT1, new Fields("key"));
And this is how I am converting to byte array
Schema schema = avroObject.getSchema();
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(ping, encoder);
encoder.flush();
byte[] message = out.toByteArray();
String key = new String(message, "UTF-8");
When I emit tuple in following way I don't see anything in kafka topic (send byte stream to kafka) :
collector.emit(tuple, new Values(Obj.hashMD5(key), message));
but Instead If I convert byte array to string and then to kafka topic it works :
Something like below :
builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt<String, String>())
.fieldsGrouping(BOLT1, new Fields("key"));
collector.emit(tuple, new Values(Obj.hashMD5(key), key));
What am I doing wrong? How do I send byte stream to kafka topic using storm kafka bolt?