
当下遇到的问题:
服务提供商: 1. 集群每个节点的吞吐量在 1.5 MB/s 左右,远小于服务的吞吐量 2. 3 个节点每个 topic 设置 90 个分区, 3 副本,这个使用方式不太合理,服务需要对每个 topic 维护 90x3 个 replica 进程,io process 也要维护 90x3 个,原来顺序的读写也会退化为随机读写,网络 process 需要维护 90 个 3. 看历史监控记录,副本延迟在过去是会频繁发生的 4. 之前有建议您修改分区到 6 ~ 9 个 您这边反馈分区数调低之后消费者有延迟,实际您这边的吞吐量远没有达到服务应该有的吞吐量,怀疑是客户端方面有问题,需要您在消费端打印每次 poll 的时间和 poll 下来的消息条数,确定消费者行为,这样我们可以进一步分析 现在我们这边的解决方案还是和之前的建议一样,topic 分区数调整到 6 ~ 9 个,消费延迟的问题需要从客户端出发解决 开发者: 调整为 6 个分区之后,不是消费延迟问题,是单个消费者的能力不足,跟不上生产的速度。之前已经试过了,10 来分钟就堆积了 100 万消息。 服务提供商: 6 个分区的话,可以使用 6 个消费者,6 个消费者的能力远不止这么差. max.poll.records,可以用于指定批量消费条数的 配合配置 max.partition.fetch.byte 和 fetch.max.wait.ms 两个参数 可以实现批量消费 kafka 的消息。您看看 php 的客户端是否有设置这些参数的地方,或者有其他地方可以设置消费者的批量消费的,因为一条条的消费,效率是极低的 开发者: rdkafka 扩展里,好像没这个相关的参数 当前是 1 个 topic,90 个分区,分区数太多引起 kafka 集群副本同步时的性能下降问题。服务商建议减少分区数,但是减少分区数会有大量的消息堆积,rdkafka 如何提升单消费者的性能呢?
消费者大致代码如下:
$this->RdKafkaCOnf= new RdKafka\Conf(); $this->RdKafkaConf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitiOns= null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: $kafka->assign(null); break; default: throw new \Exception($err); } }); $this->RdKafkaConf->set('group.id', $groupid); // Initial list of Kafka brokers $this->RdKafkaConf->set('metadata.broker.list', $configs); $this->RdKafkaConf->set('socket.keepalive.enable', 'true'); $this->RdKafkaConf->set('enable.auto.commit', 'true'); $this->RdKafkaConf->set('auto.commit.interval.ms', '100'); $this->RdKafkaConf->set('auto.offset.reset', 'smallest'); $topic = is_array($topic) ? $topic : [$topic]; $cOnsumer= new RdKafka\KafkaConsumer($this->RdKafkaConf); $consumer->subscribe($topic); while (true) { $message = $consumer->consume($timeout * 1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: call_user_func_array($callback, [$message]); // $consumer->commitAsync($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: // Log::get('consumer')->info("No more messages; will wait for more"); break; case RD_KAFKA_RESP_ERR__TIMED_OUT: // Log::get('consumer')->error("Timed out"); break; default: throw new \Exception($message->errstr(), $message->err); } } //callback function if (count(self::$queue) >= 10 || (time() - $this->lastWriteTimestamp) >= 1) { self::$queue[] = $msg; $queue = self::$queue; self::$queue = []; $this->lastWriteTimestamp = time(); $reportData = []; foreach ($queue as $message) { $data = json_decode($message->payload, true); // 入库 } } else { self::$queue[] = $msg; } 问题已解决:
使用连接池前,API服务器tcp time_wait个数保持在3w左右,大多是kafka集群的连接。服务器cpu在50%左右。 使用连接池后,同样的请求量,tcp time_wait个数一直在1000内,cpu使用率降到了17%左右。。。
又踩了tcp的坑。说到底还是自己功底不扎实。继续努力吧
踩坑记录:
server { ... location /http/ { proxy_pass http://http_backend; proxy_http_version 1.1; proxy_set_header Connection ""; ... } } 5 JKeita 2021-05-11 14:16:14 +08:00 一个 topic 每个分区只会被消费者组里的一个消费者消费。 |