python操作kafka,,mac启动zooke


mac启动zookeeper和kafka:zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties
#py2.7版本使用kafka,py3.7版本使用kafka-python
import datetime
import json
import time
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=‘localhost:9092‘)
# send函数传递topic类型为str,value的类型是bytes
future = producer.send(‘hello‘, json.dumps(
{"method": "get", "step": "1", "type": "test", "testName": "kafka",
"cid": "{0}".format(datetime.datetime.now().strftime(‘%Y%m%d%H%M%S‘)),
"info": "demo{}".format(1)}).encode())
record_metadata = future.get(timeout=10)
print( record_metadata, datetime.datetime.now().strftime(‘%Y%m%d%H%M%S‘))


from kafka import KafkaConsumer
consumer = KafkaConsumer(‘hello‘, bootstrap_servers=[‘localhost:9092‘])
for msg in consumer:
recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
print( recv)

python操作kafka

评论关闭