2

I have a few Samza jobs running all reading messages off of a Kafka topic and writing a new message to a new topic. To send the new messages, I am using Samza's built in OutgoingMessageEnvelope. Also using a MessageCollector to send out the new message. It looks something like this:

collector.send(new OutgoingMessageEnvelope(SystemStream, newMessage))

Is there a way I can use this to add partitions to the Kafka topic? Such as partitioning on a user ID or something like that.

Or if there is a better way I would love to hear it!

Ryan Wilson
  • 1,743
  • 3
  • 15
  • 26

1 Answers1

3

You should be able to send messages using a partitioning key,

    public OutgoingMessageEnvelope(SystemStream systemStream,
                               java.lang.Object partitionKey,
                               java.lang.Object key,
                               java.lang.Object message)
Constructs a new OutgoingMessageEnvelope from specified components.
Parameters:
systemStream - Object representing the appropriate stream of which this envelope will be sent on.
partitionKey - A key representing which partition of the systemStream to send this envelope on.
key - A deserialized key to be used for the message.
message - A deserialized message to be sent in this envelope.

Using this method will partition your data. However I think if you are looking at controlling the number of partitions programmatically, you should use the kafka API to create/alter the topic as mentioned here

Community
  • 1
  • 1
Joseph
  • 698
  • 5
  • 12
  • I saw this in the docs as well but wasn't sure whether any serialization/deserialization configuration - similar to key and message - is needed or not for the partitionKey. – Edi Bice Oct 01 '15 at 15:23
  • Did you find out whether any config was necessary? Will Samza create all the necessary partitions for you by just using a different partitioning key? Does it matter what object is passed as a partitioning key? – John Nov 03 '15 at 14:47