10

When I run the following command with kafka 0.9.0.1, I get these warnings[1]. Can you please tell me what is wrong with my topics? (I'm talking to the kafka broker which runs in ec2)

./kafka-console-consumer.sh --new-consumer --bootstrap-server kafka.xx.com:9092 --topic MY_TOPIC?

[1]

[2016-04-06 10:57:45,839] WARN Error while fetching metadata with correlation id 1 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION} (org.apache.kafka.clients.NetworkClient)
[2016-04-06 10:57:46,066] WARN Error while fetching metadata with correlation id 3 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION} (org.apache.kafka.clients.NetworkClient)
[2016-04-06 10:57:46,188] WARN Error while fetching metadata with correlation id 5 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION} (org.apache.kafka.clients.NetworkClient)
[2016-04-06 10:57:46,311] WARN Error while fetching metadata with correlation id 7 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION} (org.apache.kafka.clients.NetworkClient)
moosehead42
  • 411
  • 4
  • 18
Ratha
  • 9,434
  • 17
  • 85
  • 163

4 Answers4

7

You topic name is not valid because it has character '?' which is not legalCharacter for topic names.

avr
  • 4,835
  • 1
  • 19
  • 30
  • Now i tried without "?" and it is not returning any messages. That means it stuck. But when i go with older consumer tool, it consumes messages. Can you tell me the reason? – Ratha Apr 06 '16 at 06:36
  • Older consumer i tried like ./kafka-console-consumer.sh --zookeeper zookeper.xx.com:2181 --topic MY_TOPIC --from-beginning – Ratha Apr 06 '16 at 06:37
  • Add **--from-beginning** flag at the end of your new-consumer command. It should work. – avr Apr 06 '16 at 06:40
  • No its not working..It stuck..Please check following threads at apache kafka user list //Is there any behavioural change to connect local server and remote server?// and /Consumer thread is waiting forever, not returning any objects// – Ratha Apr 06 '16 at 06:43
  • It would be great if you tell me the reason – Ratha Apr 06 '16 at 06:43
  • When you run consumer command with **--from-beginning** flag then it reset consumer offset to smallest(earliest) and consumes all messages available in topic whereas the command with out that flag will consume from largest(latest) offset. – avr Apr 06 '16 at 06:54
  • 1
    ok..so back to my original issue. Now the poll() stuck forever without providing any exception to my new consumer. But my ec2 instance settings look fine – Ratha Apr 06 '16 at 06:57
  • Ratha, did you ever figure out why poll() was stuck? I'm seeing similar behavior on my local instance. – cacois Jun 13 '16 at 13:26
  • I figured posting a comment here since this is **still happening in KafkaConsumer 3.x** and is one of the top results for `poll()` blocking issue. After a few hours of head-scratching, it appears that `poll(Duration timeout)` doesn't actually timeout and awaits `max.poll.interval.ms` before calling back to its rebalancer logic. **Once the timeout has exceeded**, the consumer logic is notified about topic partition changes, e.g. `onPartitionsRevoked(Collection partitions)`. – mohsenrezaeithe Dec 13 '22 at 00:59
1

I got same error. in my case problem was space between comma separated topics in my code:

@source(type='kafka',
    topic.list="p1, p2, p3",
    partition.no.list='0',
    threading.option='single.thread',
    group.id="group",
    bootstrap.servers='kafka:9092',
    @map(type='json')
)

finally find solution:

@source(type='kafka',
    topic.list="p1,p2,p3",
    partition.no.list='0',
    threading.option='single.thread',
    group.id="group",
    bootstrap.servers='kafka:9092',
    @map(type='json')
)
mortezahosseini
  • 167
  • 1
  • 13
0

it happens when our producer is not able to produce to the respective address, Kindly check in /kafka/config/server.properties the value of advertised listeners, if its commented out , there are other issues. But if its not please put your ip address in place of localhost and then restart both zookeeper and kafka Try starting the console producer hopefully it will work.

A.s
  • 21
  • 1
  • 1
  • 3
0

Just in case anyone is having this issue related with a comma " , " and logstash output to kafka or a calculated topic name:

In the topic_id of logstash output to kafka we tried to create the topic_id appending a variable we calculated in the filter.

The problem is that this field was already present in the source document and we later add it "again" in the logstash filter, converting the string field into a hash (array/list).

So as we used in the logstash output

topic_id => ["topicName_%{field}"]

we end up with:

topic_id : "topicName_fieldItem1,FieldItem2"

Which caused the exception in logstash logs

[WARN ][org.apache.kafka.clients.NetworkClient] [Producer clientId=logstash] Error while fetching metadata with correlation id 3605264 : {topicName_fieldItem1,FieldItem2=INVALID_TOPIC_EXCEPTION}