I'm trying to unit test the following piece of code using pytest:
import json
from typing import Any, Dict
from confluent_kafka import Consumer
def get_message(config: Dict[str, Any]):
consumer = Consumer(
{
"group.id": config["KAFKA_GROUP_ID"],
"bootstrap.servers": config["KAFKA_BROKERS"],
"default.topic.config": {"auto.offset.reset": "smallest"},
}
)
consumer.subscribe([config["KAFKA_TOPIC"]])
while True:
collect = consumer.poll()
if collect is None:
continue
try:
message = json.loads(collect.value().decode("utf-8"))
except json.JSONDecodeError:
continue
return message
But I can't mock subscribe function. I tried:
mock_subscribe = MagicMock(return_value='test')
monkeypatch.setattr('confluent_kafka.cimpl.Consumer.subscribe', mock_subscribe)
In result I got following error:
TypeError: can't set attributes of built-in/extension type
How to mock this function properly?