2

I'm trying to send a json payload using librdkafka c api. What I'm trying to do now is

#include <jansson.h>
#include <librdkafka/rdkafka.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>

typedef struct my_data {
    char *id;
    char *value;
    unsigned long timestamp;
} my_data;

char * my_data_to_json(const my_data *ev);
void rk_dr_callback(rd_kafka_t *rk, const rd_kafka_message_t *msg, void *opaque);

int main(int argc, char * argv[])
{
    // configure producer
    rd_kafka_conf_t *conf = rd_kafka_conf_new();
    char errstr[512];

    rd_kafka_conf_set(conf, "bootstrap.servers", "k1.example.com:9093", errstr, sizeof(errstr));    
    
    rd_kafka_conf_set_dr_msg_cb(conf, rk_dr_callback);

    rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));    

    if (!rk) {
        fprintf(stderr, "failed to create kafka producer: %s\n", errstr);
        return -1;
    }

    // create json
    my_data ev = {
        .id = "test-id",
        .value = "test-value",
        .timestamp = (unsigned long) time(NULL)
    };

    char *json = my_data_to_json(&ev);
    printf("json dump: %s\n", json);

    // publish data
    rd_kafka_resp_err_t err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("topic"),
        RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(json, strlen(json)),
        RD_KAFKA_V_KEY("key-1", strlen("key-1")), RD_KAFKA_V_OPAQUE(NULL),
        RD_KAFKA_V_END);

    // cleanup
    rd_kafka_flush(rk, 5 * 1000);
    
    if (rd_kafka_outq_len(rk) > 0) {
        fprintf(stderr, "%d message(s) were not delivered\n", rd_kafka_outq_len(rk));
    }

    rd_kafka_destroy(rk);
    free(json);

    return 0;
}

void rk_dr_callback(rd_kafka_t *rk, const rd_kafka_message_t *msg, void *opaque)
{
    if (msg->err) {
        printf("failed to send message: %s\n", rd_kafka_err2str(msg->err));
    }
    else {
        printf("delivered %zd bytes to partition %d\n", msg->len, msg->partition);
    }
}

char * my_data_to_json(const my_data *ev)
{
    /* build the JSON object {"id": "id", "value": "value", "timestamp": 12345678} */
    json_t *json = json_pack("{sssssi}", "id", ev->id, "value", ev->value, "timestamp", ev->timestamp);

    if (!json) {
        fprintf(stderr, "failed to construct json from data\n");
    }

    char *str = json_dumps(json, JSON_COMPACT);

    if (!str) {
        fprintf(stderr, "failed to encode json object\n");
    }

    return str;
}

Using the above code I manage to get the bytes to the broker. But the json payload seems to be malformed. The consumer (consumer is a C# client using Newtonsoft json lib to deserialise) throws the following error:

Newtonsoft.Json.JsonReaderException: Unexpected character encountered while parsing value: . Path '', line 0, position 0.\n at Newtonsoft.Json.JsonTextReader.ParseValue()\n at ...

I can't quite figure out whether my mistake is in the way I construct the json object, encode it to a string or the way I publish the json string using librdkafka.

kovac
  • 4,945
  • 9
  • 47
  • 90
  • *"I can't quite figure out whether my mistake is in the way I construct the json object, encode it to a string or the way I publish the json string using librdkafka"* - Then you should isolate each of the three and see which one goes wrong. – StoryTeller - Unslander Monica May 30 '21 at 08:42
  • If you get an error in creating or encoding the JSON, you are just logging the error. I think you should return a `NULL` string if any of those steps fail and check in the calling function `main()` whether `my_data_to_json` returns `NULL`. – kiner_shah May 30 '21 at 09:54
  • Also, are you able to see any error messages being logged at the publisher end (C side)? – kiner_shah May 30 '21 at 09:55
  • @kiner_shah I can see a non-zero number of bytes being sent to the broker and there is no error on publisher side. I'm wondering if there's an issue with encoding of the json since it's expecting utf-8 and I'm not doing anything to make sure it's utf-8. I'm reading up on it right now. – kovac May 30 '21 at 09:58

0 Answers0