kafka之跑不通
-- coding:utf-8 --
"""
Author:Yjx
Time:2019-8-17 16:26
"""
from kafka import KafkaProducer
from kafka import KafkaConsumer
from threading import Thread
import time
class kafkaProducer(object):
def __init__(self, kafkatopic, kafkahost, kafkaport): self.kafkatopic = kafkatopic self.kafkahost = kafkahost self.kafkaport = kafkaport bootstrap_server = [] for host in self.kafkahost: bootstrap_server.append('{kafka_host}:{kafka_port}'.format(kafka_host=host, kafka_port=self.kafkaport)) self.producer = KafkaProducer(bootstrap_servers=bootstrap_server) def sendMsg(self): try: for i in range(10): parmas_message = 'send_message_' + str(i+1) self.producer.send(self.kafkatopic, parmas_message.encode('utf-8')) self.producer.flush() print(parmas_message) self.producer.close() print('****over****') except KafkaError as e: print('KafkaError:', e)
class kafkaConsumer(object):
def __init__(self, kafkatopic, kafkahost, kafkaport): self.kafkatopic = kafkatopic self.kafkahost = kafkahost self.kafkaport = kafkaport bootstrap_server = [] for host in self.kafkahost: bootstrap_server.append('{kafka_host}:{kafka_port}'.format(kafka_host=host, kafka_port=self.kafkaport)) self.consumer = KafkaConsumer(self.kafkatopic, bootstrap_servers=bootstrap_server) def getMsg(self): n = 1 # print(time.time()) try: for msg in self.consumer: print('****get_message_%d****' % n, msg) n += 1 print('****done****') except KafkaError as e: print('KafkaError:', e)
def runGetMsg(para_host, para_port):
consumer = kafkaConsumer("topic-1", para_host, para_port) consumer.getMsg()
def runSendMsg(para_host, para_port):
producer = kafkaProducer("topic-1", para_host, para_port) producer.sendMsg()
if name == '__main__':
para_host = [] para_port = 9092 thread_1 = Thread(target=runSendMsg, args=(para_host, para_port)) thread_1.start() thread_2 = Thread(target=runGetMsg, args=(para_host, para_port)) thread_2.start()
相关推荐
Kafka 2020-09-18
yanghuashuiyue 2020-11-14
liuxingen 2020-11-13
wangying 2020-11-13
王谦 2020-11-03
huangwei00 2020-10-14
shenzhenzsw 2020-10-09
guicaizhou 2020-09-30
jiaomrswang 2020-09-23
jyj0 2020-09-21
guicaizhou 2020-09-15
hannuotayouxi 2020-08-20
amwayy 2020-08-03
yangyutong00 2020-08-01
weikaixxxxxx 2020-08-01
PoppyEvan 2020-08-01
guicaizhou 2020-08-01
PoppyEvan 2020-07-29