4

I'm trying to setup a producer to send messages, using node-rdkafka, to an Event Stream Service in IBM Cloud, however I've not been able to receive the 'ready' event back from the server.

I've had a day trying to figure this out and I'm sure it's going to come down to my configuration of the Producer, but as far as I can tell I've got it right.

I'm running this from a Ubuntu 19.04, using node-rdkafka to create the producer. I'm running this with Node 10.15.2

Thanks for any suggestions you can give.



var Kafka = require('node-rdkafka');

var producer = new Kafka.Producer({
    'debug' : 'all',
    'metadata.broker.list': 'kafka03-prod02.messagehub.services.eu-gb.bluemix.net:9093',
    'dr_cb': true, //delivery report callback
    'security.protocol': 'sasl_ssl',
    'ssl.ca.location': '/etc/ssl/certs/',
    'sasl.mechanisms': 'PLAIN',
    'sasl.username': '<credential username>',
    'sasl.password': '<credential password>',
    'retries': 10,
    'retry.backoff.ms': 10000
});

var topicName = '<my-topic>';

//logging debug messages, if debug is enabled
producer.on('event.log', function(log) {
    console.log(log);
});

//logging all errors
producer.on('event.error', function(err) {
    console.error('Error from producer');
    console.error(err);
});

//counter to stop this sample after maxMessages are sent
var counter = 0;
var maxMessages = 10;

producer.on('delivery-report', function(err, report) {
    console.log('delivery-report: ' + JSON.stringify(report));
    counter++;
});

//Wait for the ready event before producing
producer.on('ready', function(arg) {
    console.log('producer ready.' + JSON.stringify(arg));

    for (var i = 0; i < maxMessages; i++) {
        var value = Buffer.from('value-' +i);
        var key = "key-"+i;
        // if partition is set to -1, librdkafka will use the default partitioner
        var partition = -1;
        var headers = [
            { header: "header value" }
        ]
        producer.produce(topicName, partition, value, key, new Date(), "". headers);
    }

    //need to keep polling for a while to ensure the delivery reports are received
    var pollLoop = setInterval(function() {
        producer.poll();
        if (counter === maxMessages) {
            clearInterval(pollLoop);
            producer.disconnect();
        }
    }, 1000);

});

producer.on('disconnected', function(arg) {
    console.log('producer disconnected. ' + JSON.stringify(arg));
});

//starting the producer
producer.connect();

I've included some of the log extracts below from where it looks like it's failing.

{ severity: 7,
  fac: 'BROKERFAIL',
  message:
   '[thrd:sasl_ssl://kafka03-prod02.messagehub.services.eu-gb.bluemix.net]: sasl_ssl://kafka03-prod02.messagehub.services.eu-gb.bluemix.net:9093/bootstrap: failed: err: Local: Broker handle destroyed: (errno: Operation now in progress)' }
{ severity: 7,
  fac: 'STATE',
  message:
   '[thrd:sasl_ssl://kafka03-prod02.messagehub.services.eu-gb.bluemix.net]: sasl_ssl://kafka03-prod02.messagehub.services.eu-gb.bluemix.net:9093/bootstrap: Broker changed state CONNECT -> DOWN' }
{ severity: 7,
  fac: 'BROADCAST',
  message:
   '[thrd:sasl_ssl://kafka03-prod02.messagehub.services.eu-gb.bluemix.net]: Broadcasting state change' }
{ severity: 7,
  fac: 'BUFQ',
  message:
   '[thrd:sasl_ssl://kafka03-prod02.messagehub.services.eu-gb.bluemix.net]: sasl_ssl://kafka03-prod02.messagehub.services.eu-gb.bluemix.net:9093/bootstrap: Purging bufq with 0 buffers' }
{ severity: 7,
  fac: 'BUFQ',
  message:
   '[thrd:sasl_ssl://kafka03-prod02.messagehub.services.eu-gb.bluemix.net]: sasl_ssl://kafka03-prod02.messagehub.services.eu-gb.bluemix.net:9093/bootstrap: Updating 0 buffers on connection reset' }
{ severity: 7,
  fac: 'TERM',
  message:
   '[thrd:sasl_ssl://kafka03-prod02.messagehub.services.eu-gb.bluemix.net]: sasl_ssl://kafka03-prod02.messagehub.services.eu-gb.bluemix.net:9093/bootstrap: Received TERMINATE op in state DOWN: 1 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs' }
{ severity: 7,
  fac: 'BROKERFAIL',
  message:
   '[thrd:sasl_ssl://kafka03-prod02.messagehub.services.eu-gb.bluemix.net]: sasl_ssl://kafka03-prod02.messagehub.services.eu-gb.bluemix.net:9093/bootstrap: failed: err: Local: Broker handle destroyed: (errno: Operation now in progress)' }
{ severity: 7,
  fac: 'FAIL',
  message:
   '[thrd:sasl_ssl://kafka03-prod02.messagehub.services.eu-gb.bluemix.net]: sasl_ssl://kafka03-prod02.messagehub.services.eu-gb.bluemix.net:9093/bootstrap: Client is terminating' }
{ severity: 7,
  fac: 'BUFQ',
  message:
   '[thrd:sasl_ssl://kafka03-prod02.messagehub.services.eu-gb.bluemix.net]: sasl_ssl://kafka03-prod02.messagehub.services.eu-gb.bluemix.net:9093/bootstrap: Purging bufq with 0 buffers' }
{ severity: 7,
  fac: 'BUFQ',
  message:
   '[thrd:sasl_ssl://kafka03-prod02.messagehub.services.eu-gb.bluemix.net]: sasl_ssl://kafka03-prod02.messagehub.services.eu-gb.bluemix.net:9093/bootstrap: Updating 0 buffers on connection reset' }
{ severity: 7,
  fac: 'TERMINATE',
  message:
   '[thrd::0/internal]: :0/internal: Handle is terminating in state DOWN: 1 refcnts (0x7f6d1402e4b0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf' }
{ severity: 7,
  fac: 'BROKERFAIL',
  message:
   '[thrd::0/internal]: :0/internal: failed: err: Local: Broker handle destroyed: (errno: Success)' }
{ severity: 7,
  fac: 'BUFQ',
  message:
   '[thrd::0/internal]: :0/internal: Purging bufq with 0 buffers' }
{ severity: 7,
  fac: 'BUFQ',
  message:
   '[thrd::0/internal]: :0/internal: Updating 0 buffers on connection reset' }

Process finished with exit code 0

Neil
  • 89
  • 2
  • 8

1 Answers1

0

It looks like you are missing the ssl.ca.location property in the client configuration.

This needs to be set to the location where CAs are stored on your system.

For example:

  • On macOS: /etc/ssl/cert.pem
  • Ubuntu: /etc/ssl/certs/
  • Red Hat: /etc/pki/tls/cert.pem

In case you've not seen it already, there is a sample application for Event Streams available on Github.com that demonstrates how to use node-rdkafka.

You can see all the required configurations in this snippet:

var driver_options = {
    //'debug': 'all',
    'metadata.broker.list': opts.brokers,
    'security.protocol': 'sasl_ssl',
    'ssl.ca.location': opts.calocation,
    'sasl.mechanisms': 'PLAIN',
    'sasl.username': 'token',
    'sasl.password': opts.api_key,
    'broker.version.fallback': '0.10.0',  // still needed with librdkafka 0.11.6 to avoid fallback to 0.9.0
    'log.connection.close' : false
};
Mickael Maison
  • 25,067
  • 7
  • 71
  • 68
  • Thanks for the Mikael, I've added the ssl.ca.location but I'm still getting the same issue, the only difference in the logging is i've got another group of "TERMINATE, BROKERFAIL, BUFQ, BUFQ" messages in the logging. – Neil Jun 13 '19 at 16:57
  • Are you able to run the sample application I mentioned following these steps: https://github.com/ibm-messaging/event-streams-samples/blob/master/kafka-nodejs-console-sample/docs/Local.md. Also note that if this issue is impacting you, please open a ticket following https://cloud.ibm.com/docs/services/EventStreams?topic=eventstreams-report_problem – Mickael Maison Jun 13 '19 at 17:11
  • Thanks for the information. I've had a play around with the sample application and got it to work, so it looks like it's my configuration was the problem. – Neil Jul 22 '19 at 10:33