Здравствуйте.
захотелось написать клиента на python для kafka
для этого использовал
https://github.com/wurstmeister/kafka-docker.git
docker-compose.yml
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.1.182
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "test-topic:5:2"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
запустил в такой конфигурации
docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
21cac556d57e wurstmeister/kafka "start-kafka.sh" 30 hours ago Up 7 hours 0.0.0.0:32780->9092/tcp kafka-docker_kafka_3
53ef85ab1535 wurstmeister/kafka "start-kafka.sh" 30 hours ago Up 6 hours 0.0.0.0:32781->9092/tcp kafka-docker_kafka_2
a4ba60a5e74a wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 30 hours ago Up 7 hours 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp kafka-docker_zookeeper_1
9c2538130486 wurstmeister/kafka "start-kafka.sh" 30 hours ago Up 6 hours 0.0.0.0:32782->9092/tcp kafka-docker_kafka_1
захожу в docker
$ docker exec -it kafka-docker_kafka_3 bash
bash-4.4# /opt/kafka/bin/kafka-topics.sh --zookeeper 172.19.0.3:2181 --list
test-topic
bash-4.4# exit
exit
в итоге
kafka_2.12-2.4.0
ставлю библиотеку kafka-python
$ /opt/python/python37/bin/pip3 list
Package Version
----------------- ---------
backcall 0.1.0
bcrypt 3.1.7
cffi 1.13.2
cryptography 2.8
cypari 2.3.0
decorator 4.4.1
future 0.18.2
FXrays 1.3.3
geographiclib 1.50
geopy 1.20.0
get 2019.4.13
ipython 7.11.1
ipython-genutils 0.2.0
jedi 0.15.2
kafka-python 1.4.7
пытаюсь написать клиента
import os
from kafka import KafkaConsumer
producer = KafkaConsumer(security_protocol="PLAINTEXT", bootstrap_servers=os.environ.get('KAFKA_HOST', 'localhost:2181'))
при попытке запуска возвращается ошибка
Traceback (most recent call last):
File "/home/drno/IdeaProjects/prometheus_dirs_and_files_node_exporter/zookeeper_kafka.py", line 9, in <module>
producer = KafkaConsumer(security_protocol="PLAINTEXT", bootstrap_servers=os.environ.get('KAFKA_HOST', 'localhost:2181'))
File "/opt/python/python37/lib/python3.7/site-packages/kafka/consumer/group.py", line 354, in __init__
self._client = KafkaClient(metrics=self._metrics, **self.config)
File "/opt/python/python37/lib/python3.7/site-packages/kafka/client_async.py", line 240, in __init__
self.config['api_version'] = self.check_version(timeout=check_timeout)
File "/opt/python/python37/lib/python3.7/site-packages/kafka/client_async.py", line 908, in check_version
version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))
File "/opt/python/python37/lib/python3.7/site-packages/kafka/conn.py", line 1228, in check_version
raise Errors.UnrecognizedBrokerVersion()
kafka.errors.UnrecognizedBrokerVersion: UnrecognizedBrokerVersion
Как решить эту проблему?