Kafka Producer(Python threading),,import thr
Kafka Producer(Python threading),,import thr
import threading
import time
import random
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=‘192.168.1.10:9092‘)
threads = []
class MyThread(threading.Thread):
def __init__(self, threadName, delay):
threading.Thread.__init__(self)
self.threadName=threadName
self.delay=delay
def run(self):
sendinfo(self.threadName, self.delay)
def sendinfo( threadName, delay):
count = 0
while count < 5:
time.sleep(delay)
count += 1
data = "".join(random.sample(
[‘a‘, ‘b‘, ‘c‘, ‘d‘, ‘e‘, ‘f‘, ‘g‘, ‘h‘, ‘i‘, ‘j‘, ‘k‘, ‘l‘, ‘m‘, ‘n‘, ‘o‘, ‘p‘, ‘q‘, ‘r‘, ‘s‘, ‘t‘, ‘u‘, ‘v‘,
‘w‘, ‘x‘, ‘y‘, ‘z‘], 10)).replace(" ", "")
word=("%s, %s, %s, %s" % (threadName, count, data, time.ctime(time.time())))
producer.send(‘test‘, key=threadName, value=word)
print (word)
try:
t1=MyThread("Thread-1",0)
threads.append(t1)
t2=MyThread("Thread-2",0)
threads.append(t2)
t3=MyThread("Thread-3",0)
threads.append(t3)
for t in threads:
t.start()
for t in threads:
t.join()
producer.send(‘test‘, key="Thread-1", value="exit")
producer.send(‘test‘, key="Thread-2", value="exit")
producer.send(‘test‘, key="Thread-3", value="exit")
print ("exit program with 0")
except:
print ("Error: failed to run producer program")
Kafka Producer(Python threading)
相关内容
- python循环导入的问题,,1、问题循环导入,代
- python 给定日期 返回属于当年第几周,,1、返回当前时间
- python对影评进行评论分析,形成词云图,,1 # -*- co
- Python3中// 和/区别,," / "表示浮点数
- python 抽象类、抽象方法、接口、依赖注入、SOLIP,,1、
- python笔记1-用python解决小学生数学题,,前几天有人在群
- Python调用OpenCV实现摄像头的运动检测,,[硬件环境]Win
- Python工作日类库Busines Holiday介绍,,引言: 在日常工作
- python模拟博客园登录-基础版,,mport tim
- python多进程共享内存,,from multi
评论关闭