关于 PHP Rdkafka 消费者性能讨论 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
yuandj
V2EX    Kafka

关于 PHP Rdkafka 消费者性能讨论

  •  
  •   yuandj 2021-05-11 11:19:04 +08:00 2439 次点击
    这是一个创建于 1628 天前的主题,其中的信息可能已经有所发展或是发生改变。

    当下遇到的问题:

    服务提供商: 1. 集群每个节点的吞吐量在 1.5 MB/s 左右,远小于服务的吞吐量 2. 3 个节点每个 topic 设置 90 个分区, 3 副本,这个使用方式不太合理,服务需要对每个 topic 维护 90x3 个 replica 进程,io process 也要维护 90x3 个,原来顺序的读写也会退化为随机读写,网络 process 需要维护 90 个 3. 看历史监控记录,副本延迟在过去是会频繁发生的 4. 之前有建议您修改分区到 6 ~ 9 个 您这边反馈分区数调低之后消费者有延迟,实际您这边的吞吐量远没有达到服务应该有的吞吐量,怀疑是客户端方面有问题,需要您在消费端打印每次 poll 的时间和 poll 下来的消息条数,确定消费者行为,这样我们可以进一步分析 现在我们这边的解决方案还是和之前的建议一样,topic 分区数调整到 6 ~ 9 个,消费延迟的问题需要从客户端出发解决 开发者: 调整为 6 个分区之后,不是消费延迟问题,是单个消费者的能力不足,跟不上生产的速度。之前已经试过了,10 来分钟就堆积了 100 万消息。 服务提供商: 6 个分区的话,可以使用 6 个消费者,6 个消费者的能力远不止这么差. max.poll.records,可以用于指定批量消费条数的 配合配置 max.partition.fetch.byte 和 fetch.max.wait.ms 两个参数 可以实现批量消费 kafka 的消息。您看看 php 的客户端是否有设置这些参数的地方,或者有其他地方可以设置消费者的批量消费的,因为一条条的消费,效率是极低的 开发者: rdkafka 扩展里,好像没这个相关的参数 

    当前是 1 个 topic,90 个分区,分区数太多引起 kafka 集群副本同步时的性能下降问题。服务商建议减少分区数,但是减少分区数会有大量的消息堆积,rdkafka 如何提升单消费者的性能呢?

    消费者大致代码如下:

    $this->RdKafkaCOnf= new RdKafka\Conf(); $this->RdKafkaConf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitiOns= null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: $kafka->assign(null); break; default: throw new \Exception($err); } }); $this->RdKafkaConf->set('group.id', $groupid); // Initial list of Kafka brokers $this->RdKafkaConf->set('metadata.broker.list', $configs); $this->RdKafkaConf->set('socket.keepalive.enable', 'true'); $this->RdKafkaConf->set('enable.auto.commit', 'true'); $this->RdKafkaConf->set('auto.commit.interval.ms', '100'); $this->RdKafkaConf->set('auto.offset.reset', 'smallest'); $topic = is_array($topic) ? $topic : [$topic]; $cOnsumer= new RdKafka\KafkaConsumer($this->RdKafkaConf); $consumer->subscribe($topic); while (true) { $message = $consumer->consume($timeout * 1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: call_user_func_array($callback, [$message]); // $consumer->commitAsync($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: // Log::get('consumer')->info("No more messages; will wait for more"); break; case RD_KAFKA_RESP_ERR__TIMED_OUT: // Log::get('consumer')->error("Timed out"); break; default: throw new \Exception($message->errstr(), $message->err); } } //callback function if (count(self::$queue) >= 10 || (time() - $this->lastWriteTimestamp) >= 1) { self::$queue[] = $msg; $queue = self::$queue; self::$queue = []; $this->lastWriteTimestamp = time(); $reportData = []; foreach ($queue as $message) { $data = json_decode($message->payload, true); // 入库 } } else { self::$queue[] = $msg; } 
    第 1 条附言    2021-05-15 19:37:09 +08:00

    问题已解决:

    1. 从代码可以看到,callback已经采用了批处理,想要消费的更快,调大批量条数就可以了。
    2. kafka集群性能的问题,是由于kafka生产者写入数据时,没有使用连接池,每次请求都会新建一个tcp连接,当请求量上来时就导致tcp连接数过多,kafka集群CPU性能受到影响导致的。

    使用连接池前,API服务器tcp time_wait个数保持在3w左右,大多是kafka集群的连接。服务器cpu在50%左右。 使用连接池后,同样的请求量,tcp time_wait个数一直在1000内,cpu使用率降到了17%左右。。。

    又踩了tcp的坑。说到底还是自己功底不扎实。继续努力吧

    踩坑记录:

    1. hyperf的连接池中max_connections参数,是针对单woker设置的,如果想要在单服务器配置500个连接的话,需要配置500/woker_num
    2. nginx做代理时,指定http版本为1.1,开启长链接(减少后端服务器对于nginx服务器的time_wait状态个数)
    server { ... location /http/ { proxy_pass http://http_backend; proxy_http_version 1.1; proxy_set_header Connection ""; ... } } 
    6 条回复    2021-05-11 14:51:59 +08:00
    iyaozhen
        1
    iyaozhen  
       2021-05-11 13:07:22 +08:00
    callback 慢呗,可以多进程( 1-2 倍分区数)同一个 group.id 并行消费。
    yuandj
        2
    yuandj  
    OP
       2021-05-11 13:17:05 +08:00
    @iyaozhen 用 swoole 的协程试过,多个协程之间会重复消费数据
    iyaozhen
        3
    iyaozhen  
       2021-05-11 13:40:02 +08:00
    @yuandj 不用携程,多进程最合适。确定是相同 group id ?
    yuandj
        4
    yuandj  
    OP
       2021-05-11 13:50:50 +08:00
    @iyaozhen 一个 topic 下的一个分区,在同一时间,不是只能被一个消费者消费吗?
    JKeita
        5
    JKeita  
       2021-05-11 14:16:14 +08:00
    一个 topic 每个分区只会被消费者组里的一个消费者消费。
    iyaozhen
        6
    iyaozhen  
       2021-05-11 14:51:59 +08:00 via Android
    @yuandj 是啊,但你可以 n 个主进程消费,然后扔给 task 进程入库
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     828 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 27ms UTC 21:07 PVG 05:07 LAX 14:07 JFK 17:07
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86