5

I have written a storm topology. I basically want to send tuples in avro schema in form of byte array to kafka topic.

This is how I set the bolt :

  builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt<String, byte[]>())
            .fieldsGrouping(BOLT1, new Fields("key"));

And this is how I am converting to byte array

Schema schema = avroObject.getSchema();

        DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
        writer.write(ping, encoder);
        encoder.flush();
        byte[] message = out.toByteArray();
        String key = new String(message, "UTF-8");

When I emit tuple in following way I don't see anything in kafka topic (send byte stream to kafka) :

collector.emit(tuple, new Values(Obj.hashMD5(key), message));

but Instead If I convert byte array to string and then to kafka topic it works :

Something like below :

 builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt<String, String>())
            .fieldsGrouping(BOLT1, new Fields("key"));

collector.emit(tuple, new Values(Obj.hashMD5(key), key));

What am I doing wrong? How do I send byte stream to kafka topic using storm kafka bolt?

user2942227
  • 1,023
  • 6
  • 19
  • 26
  • Please show your kafka producer. – Shams Mar 15 '15 at 11:28
  • I am using Kafka bolt provided by storm. See builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt()) .fieldsGrouping(BOLT1, new Fields("key")); in above code – user2942227 Mar 15 '15 at 17:29
  • You are converting your bytearray into a java String to produce your key, you will probably miss data as java String are not C String. Did you check that your key value is correct? Because if not your hashMD5 will be wrong. Can it be why it does not work? – zenbeni Mar 16 '15 at 15:11
  • So how will I convert byte array to string in java? – user2942227 Mar 16 '15 at 19:22

1 Answers1

5

You have the problem because your MD5 hash is incorrect:

You say that if you convert your bytearray to a java String it works: it is because the value of the MD5 is correct according to a String.

collector.emit(tuple, new Values(Obj.hashMD5(key), key));

As you can see the MD5 is calculated on a String parameter and you send the String corresponding to the MD5: everything is good!

But if you send a bytearray, you need to calculate the MD5 on a bytearray and it will be a bytearray as a result, not a String. Your code:

collector.emit(tuple, new Values(Obj.hashMD5(key), message));

is incorrect as the MD5 does not correspond to message but to the converted value of message in UTF-8 as a String which is lossy (see below).

Here is a link to another question on SO to calculate a MD5 correctly in a bytearray format:

How can I generate an MD5 hash?

This is because converting bytearray to String is lossy in Java (contrary to C) and you will miss data in the process as some bytes do not correspond to a char in Java encoding (you have some of these apparently in your data).

So your KafkaBolt should be

KafkaBolt<byte[], byte[]>

I don't know if it is sufficient to send a bytearray MD5 along with your bytearray in kafka storm. If it is not, you will have to use an encoding that is lossless between bytearray and java String such as BASE64:

Base64 Encoding in Java

You will have to convert your bytearray to a base64 string, using

KafkaBolt<String, String>

and then sending the data as usual

collector.emit(tuple, new Values(Obj.hashMD5(keyInBase64), keyInBase64));

It also means that when you fetch the data from kafka, it will be a String in base64 that you will have to decode to get the bytearray back.

Hope that helps.

Community
  • 1
  • 1
zenbeni
  • 7,019
  • 3
  • 29
  • 60
  • 2
    You also need to make sure the config is set with "serializer.class", "kafka.serializer.DefaultEncoder" for KafkaBolt.KAFKA_BROKER_PROPERTIES It is the default in Kafka, but it is set to StringEncoder in the KafkaBolt example in the storm-kafka project! – MrE Mar 20 '15 at 15:20