5

As we know Topic creation in Kafka should be handled on the the server initialization part. There we use the default script ./kafka-topics --zookeeper ..., but what if we need to create a topic dynamically?

Andrei Nechaev
  • 225
  • 3
  • 12
  • So, what is your question, if there is a better way to do it? – Nick Vanderhoven Dec 04 '16 at 18:23
  • @NickVanderhoven it's more like the tip for those who is looking for the answer. I couldn't find the answer neither in documentation nor here – Andrei Nechaev Dec 04 '16 at 18:25
  • @Andrey: please, edit this as a question, something like "how do I create Apache Kafka topics on runtime" and post your original post as new answer. Answering to your own posts is ok. – Filip Malczak Dec 04 '16 at 18:31
  • Well, good enough. Anyways: really nice job ;) I think I'm gonna be using Kafka soon, so I will probably use this. – Filip Malczak Dec 04 '16 at 18:32
  • @FilipMalczak yeah, I see the confusion. Split the question and the answer in two parts. – Andrei Nechaev Dec 04 '16 at 18:33
  • @FilipMalczak I'm working with Kafka for last 4 month and it's really nice tool, but sometimes can be overkilling. If the scalability isn't a requirement and you're looking for just a queuing solution, I'd go with something simpler, like `ChronicleQueue`. Anyway, Kafka is really great with queuing large amount of small pieces and I encourage you to find the way to use Kafka Streams, very impressive ;) Btw, check out Apache Avro. Works great along with Kafka – Andrei Nechaev Dec 04 '16 at 18:41

2 Answers2

7

Fortunately, Kafka 0.10.1.0 brought us this ability. I saw these fascinating feature on the Confluence Jira board but couldn't find any documentation related to the topic, irony, isn't it?

So, I went to the source code and found the way of creating topics on the fly. Hopefully it will be helpful for some of you. Of course, if you have a better solution, please, do not hesitate to share it with us.

Ok, let's start.

/** The method propagate topics **/
public List<String> propagateTopics(int partitions, short replication, int timeout) throws IOException {
    CreateTopicsRequest.TopicDetails topicDetails = new CreateTopicsRequest.TopicDetails(partitions, replication);
    Map<String, CreateTopicsRequest.TopicDetails> topicConfig = mTopics.stream()
            .collect(Collectors.toMap(k -> k, v -> topicDetails)); // 1

    CreateTopicsRequest request = new CreateTopicsRequest(topicConfig, timeout); // 2

    try {
        CreateTopicsResponse response = createTopic(request, BOOTSTRAP_SERVERS_CONFIG); // 3
        return response.errors().entrySet().stream()
                .filter(error -> error.getValue() == Errors.NONE)
                .map(Map.Entry::getKey)
                .collect(Collectors.toList()); // 4
    } catch (IOException e) {
        log.error(e);
    }

    return null;
}

1 we need an instance of TopicDetails, for simplicity, I'll share the same configs among all topics. Assume, that mTopics is your list of Strings of all topics you want to create.

2 Basically we want to send a request to our Kafka cluster, now we have the special class for that, - that accepts CreateTopicsRequest and timeout

3 Than we need to send the request and get the CreateTopicsResponse

    private static final short apiKey = ApiKeys.CREATE_TOPICS.id;
    private static final short version = 0;
    private static final short correlationId = -1;

private static CreateTopicsResponse createTopic(CreateTopicsRequest request, String client) throws IllegalArgumentException, IOException {
        String[] comp = client.split(":");
        if (comp.length != 2) {
            throw new IllegalArgumentException("Wrong client directive");
        }
        String address = comp[0];
        int port = Integer.parseInt(comp[1]);

        RequestHeader header = new RequestHeader(apiKey, version, client, correlationId);
        ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf());
        header.writeTo(buffer);
        request.writeTo(buffer);

        byte byteBuf[] = buffer.array();

        byte[] resp = requestAndReceive(byteBuf, address, port);
        ByteBuffer respBuffer = ByteBuffer.wrap(resp);
        ResponseHeader.parse(respBuffer);

        return CreateTopicsResponse.parse(respBuffer);
    }

    private static byte[] requestAndReceive(byte[] buffer, String address, int port) throws IOException {
        try(Socket socket = new Socket(address, port);
            DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
            DataInputStream dis = new DataInputStream(socket.getInputStream())
        ) {
            dos.writeInt(buffer.length);
            dos.write(buffer);
            dos.flush();

            byte resp[] = new byte[dis.readInt()];
            dis.readFully(resp);

            return resp;
        } catch (IOException e) {
            log.error(e);
        }

        return new byte[0];
    }

Here is no magic at all, just sending the request, and than parsing the byte stream to the response.

4 CreateTopicsResponse has property errors, which is just a Map<String, Errors> where key is the topic name you requested. The tricky thing, it contains all topics you requested, but those with no errors has value Errors.None, that's why I'm filtering the response and return only successfully created topics.

Andrei Nechaev
  • 225
  • 3
  • 12
2

Extending Andrei Nechaev answers

With 10.2.0, the way to get an instance of CreateTopicsRequest has changed a bit. We need to use the Builder inner class to build a CreateTopicsRequest instance. Here is a code sample.

CreateTopicsRequest.Builder builder = new CreateTopicsRequest.Builder(topicConfig, timeout, false);
CreateTopicsRequest request = builder.build();