Kafka(八)Python生产者和消费者API使用,kafkapython,单线程生产者#!/u


单线程生产者

#!/usr/bin/envpython#-*-coding:utf-8-*-importrandomimportsysfromkafkaimportKafkaProducerfromkafka.clientimportlogimporttimeimportjson__metaclass__=typeclassProducer:def__init__(self,KafkaServer='127.0.0.1',KafkaPort='9092',ClientId="Procucer01",Topic='Test'):"""用于设置生产者配置信息,这些配置项可以从源码中找到,下面为必要参数。:paramKafkaServer:kafka服务器IP:paramKafkaPort:kafka工作端口:paramClientId:生产者名称:paramTopic:主题"""self._bootstrap_server='{host}:{port}'.format(host=KafkaServer,port=KafkaPort)self._topic=Topicself._clientId=ClientId"""初始化一个生产者实例,生产者是线程安全的,多个线程共享一个生产者实例效率比每个线程都使用一个生产者实例要高acks:消费者只能消费被提交的,而只有消息在所有副本中都有了才算提交,生产者发送了消息是否要等待所有副本都同步了该消息呢?这个值就是控制这个的。默认是1,表示只要该分区的Leader副本成功写入日志就返回。0表示生产者无需等待,发送完就返回;all是所有副本都写入该消息才返回。all可靠性最高但是效率最低,0效率最高但是可靠性最低,所以一般用1。retries:表示请求重试次数,默认是0,上面的acks配置请求完成的标准,如果请求失败,生产者将会自动重试,如果配置为0则不重试。但是如果重试则有可能发生重复发送消息。key_serializer:键的序列化器,默认不设置,采用字节码value_serializer:值得序列化器,默认不设置,采用字节码,因为可以发送单一字符,也可以发送键值型消息"""try:self._producer=KafkaProducer(bootstrap_servers=self._bootstrap_server,client_id=self._clientId,acks=1,value_serializer=lambdam:json.dumps(m).encode('utf-8'))exceptExceptionaserr:printerr.messagedef_TIMESTAMP(self):t=time.time()returnint((round(t*1000)))#时间戳转换为普通时间defgetNormalTime(self,temp_timeStamp,timeSize=10):timeStamp=temp_timeStampiftimeSize==13:timeStamp=int(temp_timeStamp/1000)timeArray=time.localtime(timeStamp)otherStyleTime=time.strftime("%Y-%m-%d%H:%M:%S",timeArray)returnotherStyleTime#发送成功的回调函数def_on_send_success(self,record_metadata):print"Topic:%sPartition:%dOffset:%s"%(record_metadata.topic,record_metadata.partition,record_metadata.offset)#发送失败的回调函数def_on_send_error(self,excp):log.error('Iamanerrback',exc_info=excp)defsendMsg(self,msg,partition=None):"""发送消息:parammsg:消息:parampartition:分区也可以不指定:return:"""ifnotmsg:print"消息不能为空。"returnNone#发送的消息必须是序列化后的,或者是字节message=json.dumps(msg,encoding='utf-8',ensure_ascii=False)try:TIMESTAMP=self._TIMESTAMP()#发送数据,异步方式,调用之后立即返回,因为这里其实是发送到缓冲区,所以你可以多次调用,然后一起flush出去。self._producer.send(self._topic,partition=partition,key=self._clientId,value=message,timestamp_ms=TIMESTAMP).add_callback(self._on_send_success).add_errback(self._on_send_error)#下面的flush是阻塞的,只有flush才会真正通过网络把缓冲区的数据发送到对端,如果不调用flush,则等到时间或者缓冲区满了就会发送。self._producer.flush()printself.getNormalTime(TIMESTAMP,timeSize=13)+"sendmsg:"+messageexceptExceptionaserr:printerrdefmain():p=Producer(KafkaServer="172.16.48.171",KafkaPort="9092",Topic='AAA')foriinrange(10):time.sleep(1)closePrice=random.randint(1,500)msg={"股票代码":60000+i,"昨日收盘价":closePrice,"今日开盘价":0,"今日收盘价":0,}p.sendMsg(msg)if__name__=="__main__":try:main()finally:sys.exit()


消费者

#!/usr/bin/envpython#-*-coding:utf-8-*-importsysfromkafkaimportKafkaConsumerimportjson__metaclass__=typeclassConsumer:def__init__(self,KafkaServer='127.0.0.1',KafkaPort='9092',GroupID='TestGroup',ClientId="Test",Topic='Test'):"""用于设置消费者配置信息,这些配置项可以从源码中找到,下面为必要参数。:paramKafkaServer:kafka服务器IP:paramKafkaPort:kafka工作端口:paramGroupID:消费者组ID:paramClientId:消费者名称:paramTopic:主题"""self._bootstrap_server='{host}:{port}'.format(host=KafkaServer,port=KafkaPort)self._groupId=GroupIDself._topic=Topicself._clientId=ClientIddefconsumeMsg(self):try:"""初始化一个消费者实例,消费者不是线程安全的,所以建议一个线程实现一个消费者,而不是一个消费者让多个线程共享下面这些是可选参数,可以在初始化KafkaConsumer实例的时候传递进去enable_auto_commit是否自动提交,默认是trueauto_commit_interval_ms自动提交间隔毫秒数"""consumer=KafkaConsumer(self._topic,bootstrap_servers=self._bootstrap_server,group_id=self._groupId,client_id=self._clientId,enable_auto_commit=True,auto_commit_interval_ms=5000,value_deserializer=lambdam:json.loads(m.decode('utf-8')))"""这里不需要显示的调用订阅函数,在初始化KafkaConsumer对象的时候已经指定了主题,如果主题字段不为空则会自动调用订阅函数,至于这个线程消费哪个分区则是自动分配的。如果你希望手动指定分区则就需要使用assign()函数,并且在初始的时候不输入主题。"""#consumer.subscribe(self._topicList)#返回一个集合print"当前消费的分区为:",consumer.partitions_for_topic(self._topic)print"当前订阅的主题为:",consumer.subscription()whileTrue:formsginconsumer:ifmsg:print"Topic:%sPartition:%dOffset:%sKey:%sMessage:%s"%(msg.topic,msg.partition,msg.offset,msg.key,msg.value)exceptExceptionaserr:printerrdefmain():try:c=Consumer(KafkaServer='172.16.48.171',Topic='AAA')c.consumeMsg()exceptExceptionaserr:printerr.messageif__name__=="__main__":try:main()finally:sys.exit()

执行效果

技术分享图片

技术分享图片

Kafka(八)Python生产者和消费者API使用

评论关闭