Is it possible to achieve this in ballerina
- To create a new kafka topic in ballerina
- To list available topics in Ballerina
- Subscribe to a created topic in ballerina
Is it possible to achieve this in ballerina
You can subscribe to a topic using the following code:
import ballerina/log;
import wso2/kafka;
import ballerina/internal;
// Kafka consumer endpoint
endpoint kafka:SimpleConsumer consumer {
bootstrapServers: "localhost:9092, localhost:9093",
// Consumer group ID
groupId: "test-group",
// Listen from topic 'test'
topics: ["test"],
// Poll every 1 second
pollingInterval:1000
};
// Kafka service that listens from the topic 'product-price'
// 'inventoryControlService' subscribed to new product price updates from
// the product admin and updates the Database.
service<kafka:Consumer> kafkaService bind consumer {
// Triggered whenever a message added to the subscribed topic
onMessage(kafka:ConsumerAction consumerAction, kafka:ConsumerRecord[] records) {
// Dispatched set of Kafka records to service, We process each one by one.
foreach entry in records {
byte[] serializedMsg = entry.value;
// Convert the serialized message to string message
string msg = internal:byteArrayToString(serializedMsg, "UTF-8");
log:printInfo("New message received from the product admin");
// log the retrieved Kafka record
log:printInfo("Topic: " + entry.topic + "; Received Message: " + msg);
// Mock logic
// Update the database with the new price for the specified product
log:printInfo("Database updated with the new price of the product");
}
}
}
This Github repo might be quite useful for you. It contains various examples for both consumers and producers.
Regarding your questions for creating and listing topics, if you don't need to perform these actions from Ballerina, you can do so from your command line:
bin/kafka-topics.sh --zookeeper localhost:2181 --topic test --from-beginning
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor <number_of_replicas> --partitions <number_of_partitions> --topic test
EDIT: Update the sample codes to comply with the latest ballerina version (from V1.2.0 upwards).
You can
If you send data using a
Kafka producer
, it will publish data to that particular topic, and if the topic is not available, it will create the topic, and publish. (To support this, you have to setauto.create.topics.enable=true
in the broker properties).Consider you want to publish to the topic
test
from a producer. You can create a producer endpoint calledkafka:Producer
and send data to a particular topic usingsend()
function.
kafka:Producer sampleProducer = new ({
bootstrapServers: "localhost:9092",
acks: "all",
valueSerializerType: kafka:SER_STRING
});
string topic = "test";
string msg = "Your Message";
sampleProducer->send(messageToPublish, topic);`
Now if there is a topic called
test
is available for the Kafka broker hosted atlocalhost:9092
, it will publish the message to the topic, or it will create the topic, if it doesn't exist.
You can use
subscribe()
function ofKafka:Consumer
to subscribe to a topic.
listener kafka:Consumer sampleConsumer = new ({
bootstrapServers: "localhost:9090",
groupId: "test-consumers",
valueDeserializerType: kafka:DES_STRING
});
string topic = "test";
string[] topics = [topic];
sampleConsumer->subscribe(topics);
Please note that the
subscribe()
takesstring[]
as the input parameter, hence you should pass astring[]
to it.There are other functions such as
subscribeToPattern()
,subscribeWithPartitionRebalance()
which are also can be used to subscribe a consumer to a topic, you can find more about them in the API Documentation.
But to list the available topics, you need to get the list of topics from the Kafka broker itself. But you can get a list of topics, which is currently subscribed by a particular consumer, using ballerina.
string[] subscribedTopics;
var result = sampleConsumer->getSubscription();
if (result is error) {
// Your logic for handling the error
} else {
subscribedTopics = result;
}
Make sure to handle the error here, as the
getSubscription()
can return either astring[]
or anerror
. Ballerina type guard can do the trick for you.