
# coding:utf=8 from pykafka.client import KafkaClient import logging import json import time logging.basicConfig(level= logging.WARNING) produce_logger = loggin.getLogger('prodrcer') def kafka(use_rdkafka=False): client = KafkaClient('192.168.109.58:9092,192.168.109.70:9092,192.168.109.91:9092') produce_start = time.time() topic = client.topics['meteor_spider_article_dev'] # producer = topic.get_producer(sync=True, use_rdkafka=use_rdkafka) msg_body = { 'article_id': 1, "title": "标题", "subtitle": "副标题", } msg = json.dumps(msg_body) with topic.get_sync_producer() as producer: for i in range(0, 1000): producer.produce(msg) producer.stop() return time.time() - produce_start def calculate_thoughput(timing, n_messages=1000, msg_size=5956): print("Processed {0} messsages in {1:.2f} seconds".format(n_messages, timing)) print("{0:.2f} MB/s".format((msg_size * n_messages) / timing / (1024*1024))) print("{0:.2f} Msgs/s".format(n_messages / timing)) if __name__ == '__main__': calculate_thoughput(kafka()) Processed 1000 messsages in 76.68 seconds 0.07 MB/s 13.04 Msgs/s
这速度 怎么回事?
1 sylecn 2016-09-07 14:41:58 +08:00 topic.get_sync_producer() 虽然还没有用过 kafka ,但是这种压力测试应该都用 async 模式来发消息吧。如果用同步,起码开多线程一起发。 要不然一个一个等反馈多慢啊。 |
2 reAsOn 2016-09-07 14:47:30 +08:00 用过 kafka-python 的库,性能可以接受,需要用异步发送 + batch |
3 996635 OP |
4 est 2016-09-07 15:13:35 +08:00 kafka 至少有 3 个 py 库,各自实现都不同。需要仔细判别。 |
5 tongle 2016-09-07 15:50:51 +08:00 Using the librdkafka extension 试试这个 |