RabbitMQ 增加多消费者导致生产者推送消息阻塞(10s~30s) 设备上传数据到系统 A(netty),系统 A 根据数据类型推送消息到不同的队列,因为设备量增多的原因,之前单消费者开始处理的不及时,就想着多增加个消费者(和之前的消费者代码一样),然后系统 A 推送消息开始出现卡顿,数据帧应答的很慢,感觉不像是流控的事,管理端看着也没问题 相关代码: 系统 A:
channelRead(ChannelHandlerContext ctx, Object msg){ .... sendAck(ctx,ack); switch (data.getClass().getName()) { case "realTimeData": RabbitUtil.getInstance().publish(realTimeData); } }
publish(RealTimeData realTimeData){ ....... Map<String, Object> header = new HashMap<String, Object>(); header.put("DataType", "RealTimeData"); BasicProperties props = new BasicProperties().builder().headers(header).build(); channel.basicPublish(exchangeName, routeKey_CollectedData, props, CollectedRealTimeDataPackageTransform.toBytes(data)); }
channel init: private Channel channel; private ConnectionFactory factory = new ConnectionFactory(); @PostConstruct public void init() { instance = this; factory.setUsername(mqUserName); factory.setPassword(mqPassword); factory.setHost(mqHost); factory.setVirtualHost(mqVirtualHost); factory.setPort(mqPort); } channel = factory.newConnection().createChannel(); }
消费者代码:
@Autowired DataProcessor processor; @Autowired @Qualifier("threadpool") ThreadPoolExecutor threadPool; @RabbitListener(queues = "${mq.queue.Original.CollectedData}", ackMode = "MANUAL") public void process(Message msg, Channel channel) { MessageProperties mp = msg.getMessageProperties(); Map<String, Object> headers = mp.getHeaders(); String dataType = (String) headers.get("DataType"); switch (dataType) { case "RealTimeData": CompletableFuture.runAsync(() -> { try { channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false); CollectedRealTimeData crtd = CollectedRealTimeDataPackageTransform.fromBytes(msg.getBody()); processor.process(crtd); } catch (Exception e) { try { channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, false); } catch (IOException ioException) { ioException.printStackTrace(); } e.printStackTrace(); } }, threadPool); break; } }
![]() | 1 qW7bo2FbzbC0 2022-06-21 09:51:09 +08:00 用 rabbit 经常会遇到 server no response 的情况,只能手动 kill 进程重新启动,改用 kafka 后未出现同样问题。我们的场景比较简单,切换起来很快。我们这边观测 rabbit client 很多或消息大的话,对 server 有压力。和 rabbit 的性能宣传语不同,可能使我们的使用方式有问题吧 |
![]() | 2 wupher 2022-06-21 10:27:15 +08:00 没碰到类似的情况,可能是量级未到? 之前写的一个系统,使用 RabbitMQ 进行多端通讯。日常大约在 5000 ~ 8000 个客户端进行数据交换,同步消息和异步消息都有。 消费者一直都是多节点通过 RabbitListener 连接 RabbitMQ 。刚才又看了一下 application.yml 批量获取一次 10 条,concurrency 5 max 10 之前未碰到过 publish 超时的情况。 你用的版本是? |
3 withBruce OP concurrency 这个属性开多线程不是消费的同一批数据把 concurrency 5 max 10 配置好这个问题解决了 还是自己对于 mq 没弄明白 谢谢了! |
5 withBruce OP @qW7bo2FbzbC0 谢谢 |