1

Kafka Connect source and sink connectors provide practically ideal feature set for configuring a data pipeline without writing any code. In my case I wanted to use it for integrating data from several DB servers (producers) located on the public Internet.

However some producers don't have direct access to Kafka brokers as their network/firewall configuration allows traffic to a specific host only (port 443). And unfortunately I cannot really change these settings.

My thought was to use Confluent REST Proxy but I learned that Kafka Connect uses KafkaProducer API so it needs direct access to brokers.

I found a couple possible workarounds but none is perfect:

  1. SSH Tunnel - as described in: Consume from a Kafka Cluster through SSH Tunnel
  2. Use REST Proxy but replace Kafka Connect with custom producers, mentioned in How can we configure kafka producer behind a firewall/proxy?
  3. Use SSHL demultiplexer to route the trafic to broker (but just one broker)

Has anyone faced similar challenge? How did you solve it?

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
kgr
  • 21
  • 4

2 Answers2

1

Sink Connectors (ones that write to external systems) do not use the Producer API.

That being said, you could use some HTTP Sink Connector that issues POST requests to the REST Proxy endpoint. It's not ideal, but it would address the problem. Note: This means you have two clusters - one that you are consuming from in order to issue HTTP requests via Connect, and the other behind the proxy.


Overall, I don't see how the question is unique to Connect, since you'd have similar issues with any other attempt to write the data to Kafka through the only open HTTPS port.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • Well, that's an interesting approach with two clusters. A bit overcomplicated but the fact is that it answers my constraint while still giving me the flexibility that SSH tunnels don't. Thanks a lot. – kgr Jun 29 '21 at 11:31
  • There's another project on github called `kafkaproxy` that is a pure TCP proxy, if that is something you might want to look into – OneCricketeer Jun 29 '21 at 14:48
  • A note: HTTP Sink Connector is published using commercial licence (https://www.confluent.io/hub/confluentinc/kafka-connect-http). This might be a limiting factor for some cases. – kgr Jul 01 '21 at 06:23
  • 1
    I found configuring Conflent HTTP Sink connector with REST Proxy a bit tricky. It seems that the connector sends JSON structure that is not compliant with what REST Proxy expects: https://docs.confluent.io/platform/current/kafka-rest/api.html#post--topics-(string-topic_name) Do you have an example of HTTP Sink connector configuration script made to work with REST Proxy? – kgr Jul 13 '21 at 21:36
  • I never said to use the Confluent one, I said use _some HTTP sink_. I.e. you could write one of your own that sends the format you need – OneCricketeer Jul 14 '21 at 14:55
  • @kgr You can post a new answer rather than a comment if you've found a solution – OneCricketeer Jul 14 '21 at 15:04
  • I managed to configure Confluent HTTP Sink connector as well as alternative one https://github.com/llofberg/kafka-connect-rest to work with REST Proxy. I will try to add connector configuration here. – kgr Jul 14 '21 at 15:06
1

As @OneCricketeer recommended, I tried a HTTP Sink Connector with REST Proxy approach. I managed to configure Confluent HTTP Sink connector as well as alternative one (github.com/llofberg/kafka-connect-rest) to work with Confluent REST Proxy.

I'm adding connector configuration in case it saves some time to anyone trying this approach.

Confluent HTTP Sink connector

    {
    "name": "connector-sink-rest",
    "config": {
        "topics": "test",
        "tasks.max": "1",
        "connector.class": "io.confluent.connect.http.HttpSinkConnector",
        "headers": "Content-Type:application/vnd.kafka.json.v2+json",
        "http.api.url": "http://rest:8082/topics/test",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter.schemas.enable": "false",
        "batch.prefix": "{\"records\":[",
        "batch.suffix": "]}",
        "batch.max.size": "1",
        "regex.patterns":"^~$",
        "regex.replacements":"{\"value\":~}",
        "regex.separator":"~",
        "confluent.topic.bootstrap.servers": "localhost:9092",
        "confluent.topic.replication.factor": "1"
    }
}

Kafka Connect REST connector

{
    "name": "connector-sink-rest-v2",
    "config": {
        "connector.class": "com.tm.kafka.connect.rest.RestSinkConnector",
        "tasks.max": "1",
        "topics": "test",
        "rest.sink.url": "http://rest:8082/topics/test",
        "rest.sink.method": "POST",
        "rest.sink.headers": "Content-Type:application/vnd.kafka.json.v2+json",        
        "transforms": "velocityEval",        
        "transforms.velocityEval.type": "org.apache.kafka.connect.transforms.VelocityEval$Value",
        "transforms.velocityEval.template": "{\"records\":[{\"value\":$value}]}",
        "transforms.velocityEval.context": "{}"        
    }
}
kgr
  • 21
  • 4