golang 推送系统怎么做消息合并优化? - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
请不要在回答技术问题时复制粘贴 AI 生成的内容
laravel
V2EX    程序员

golang 推送系统怎么做消息合并优化?

  •  1
     
  •   laravel 2020-04-08 21:15:55 +08:00 4598 次点击
    这是一个创建于 2078 天前的主题,其中的信息可能已经有所发展或是发生改变。

    比如服务端有 5000 人在线,每秒消息数 100 条,如果不优化,每秒的广播将是 50 万, 现在我想把 1s 之内的消息合并成 1 条,这样就只有 5000 次广播了,聊天这种 1s 延迟可以忍受。

    现在的问题是 这种消息怎么保存?方便消费

    语言 golang

    25 条回复    2020-04-22 18:43:49 +08:00
    takeoffyoung
        1
    takeoffyoung  
       2020-04-08 21:22:50 +08:00
    方便消费的话,消息都扔分布式队列,然后 consumer 控制消费速率,做 batch 等等。
    至于保存,可以写队列之前持久化消息或者在队列上做 dump 均可,看具体需求了。
    dbskcnc
        2
    dbskcnc  
       2020-04-08 22:06:18 +08:00
    消息队列是比较方便的解决方案,pulsar 做这个好像挺合适的
    matrix67
        3
    matrix67  
       2020-04-08 22:48:33 +08:00
    类似 telegram ? 他们有万人群。 或者限制群大小?和微信一样?
    laravel
        4
    laravel  
    OP
       2020-04-08 23:04:37 +08:00
    @dbskcnc 看了下,好像不错,准备学一下,我对消息系统非常感兴趣。
    laravel
        5
    laravel  
    OP
       2020-04-08 23:05:04 +08:00
    @matrix67 直播的聊天,不能限制啊,就是要这种刷屏的感觉
    laravel
        6
    laravel  
    OP
       2020-04-08 23:06:44 +08:00
    @takeoffyoung 我在 golang 里加了个 queue 类型是 map[int64][]byte 然后启动另一个 goroutine 去合并相同时间戳的消息,可以实现,但是推送一多就挂了,我只是个业余写 go 的,还找不到为什么报错,好难调试啊
    gy123
        7
    gy123  
       2020-04-08 23:22:16 +08:00 via iPhone
    思路是,每次来消息启动一个延时一秒执行异步任务,并且执行的任务和任务信息绑定到一个全局变量,后续来的消息相当于合并到这个变量,没有新建任务。因为你说需求没有什么需要区分不同种类,那么全局变量相当于一直绑定一套异步任务,这个变量可以是个 map 或固定对象
    gy123
        8
    gy123  
       2020-04-08 23:23:58 +08:00 via iPhone
    @gy123 接上一条,执行任务同时销毁任务数据,下次来新消息新建任务。同时保证并发安全
    ljpCN
        9
    ljpCN  
       2020-04-08 23:32:46 +08:00 via Android
    生产者挨个扔进队列,消费者每次消费 n 个
    gy123
        10
    gy123  
       2020-04-08 23:57:34 +08:00 via iPhone
    @ljpCN 对合并内容的时间窗口要求不严格,可以用这种方式,比较简单
    laravel
        11
    laravel  
    OP
       2020-04-09 00:03:34 +08:00
    @dbskcnc pulsar 的 function 能不能实现消息合并?
    gy123
        12
    gy123  
       2020-04-09 00:06:53 +08:00 via iPhone
    打个比方,当某一时刻第一次连续发送了 1 秒的信息到队列,当轮训时间到达消费取消息的时间是在发消息的 0.5 秒,那么消费这些消息需要处理两次
    NUT
        13
    NUT  
       2020-04-09 09:04:03 +08:00
    我认为这不是一个服务端的优化,而是客户端的优化。
    服务端提供策略,客户端自动切换策略。

    对于客户端网络好的发送效率 ok 的可以使用 批量拉的方式,
    对于客户端网络差的情况,得需要提供基于 http 的批量拉的情况。
    合并包的核心思维就是 减少 单包的应答,以便减少在链路上的损耗。
    可以采用批量确认的方案。
    比如 s-》 123 c--》确认连续收到最大 3 。
    不过这个要改你们消息存储。

    消息系统核心一个就是路由、一个是消息存储 最后一个是 策略。
    消息系统优化点很多。 不能独立的之咱在服务端的角度考虑问题而是要根据整个链路进行优化。
    希望对你有用。
    dbskcnc
        14
    dbskcnc  
       2020-04-09 10:12:28 +08:00
    @laravel 看你的处理似乎没有不丢失的需求,也不考虑内存,不用 Pulsar 也没什么问题

    fuction 是个 filter,个人觉得用来做消息合并似乎不太合适,毕竟总是要把消息从 broker 读出来的,比较好控制的是什么时候去读和一次读取多少
    hst001
        15
    hst001  
       2020-04-09 13:49:56 +08:00
    首先明确的一下问题,楼主想问的应该是消息存储和读取时产生的广播,不是服务端广播给客户端的问题。

    消息合并的思路走歪了。

    消息合并在某些情形下可能得不尝试,比如大量的聊天窗口,聊天消息间隔大于 1 秒,这个时候等待合并就是浪费 CPU 内存,因为这些消息不适合合并,老老实实往消息队列推就好。

    读扩散和写扩散其实是 IM 系统里面一定会遇到的存储问题,建议楼主通过搜索引擎深入了解一下。两种方法在不同情形(单聊、小群群聊和大群群聊)下各有利弊,通常的做法是两种方式结合。
    laravel
        16
    laravel  
    OP
       2020-04-09 15:39:55 +08:00
    @hst001 我主要想解决的是推送时候 cpu100%的问题,同时也要保证消息及时到达。怎么做才能让推送的压力不大啊?我能想到的就是排队了,但是排队我怎么知道系统负载情况?
    laravel
        17
    laravel  
    OP
       2020-04-09 15:46:28 +08:00
    就是怎么才能充分利用 cpu,而又不让他卡死呢?
    我觉得肯定是需要一个队列排队的,那我怎么知道每次取多少消息能充分利用 cpu ?
    只能通过经验反复测试了吧?
    hst001
        18
    hst001  
       2020-04-09 16:02:34 +08:00
    @laravel #16 那你到底是推送的问题还是存储的问题,我理解错了吗。。。

    推送端的话,消息队列本来就帮两端减轻了负载,如果你使用了队列,推送服务还是满负载运行,而队列又在不断堆积,我觉得你应该考虑扩容服务了。
    hst001
        19
    hst001  
       2020-04-09 16:06:01 +08:00
    @laravel #17 你不需要考虑每次取多少条消息,我感觉你对消息队列的理解有误区
    useben
        20
    useben  
       2020-04-09 18:40:35 +08:00
    @laravel cpu 100%? 比如使用消息队列用 kafka, 消费者 pull 模式, 消费速度是视消费者的消费速度来拉取消息的. 这样起一个定时器, 定时拉取就行了吧, 怎么会打满 cpu
    lewinlan
        21
    lewinlan  
       2020-04-10 09:44:04 +08:00 via Android
    在发送出口做个限流器呗。1 秒内的消息都 append 进一个 slice 里,用一个 Ticker 计时发送并清空 slice,很简单吧
    vvmint233
        22
    vvmint233  
       2020-04-10 10:24:20 +08:00
    或许在广播的前面加上一层缓冲区, 达到一定大小就清空缓冲区然后分发出去
    zz554952942
        23
    zz554952942  
       2020-04-10 10:39:17 +08:00
    聊天数据发送到 kafka,或者 RabbitMQ(确保数据不丢失)
    然后启一个携程, time.tick 定时 1s 从消息队列取即可
    zz554952942
        24
    zz554952942  
       2020-04-10 10:43:44 +08:00
    亦或者可以用 Redis, 以用户 ID 做哈希键 然后当接收到消息的时候则把需要推送的用户 ID 的信息写到 Redis 上(这样子就把当个用户所需要的消息进行了合并)
    然后携程定时从 Redis 里取所有数据 然后发送(取到的是一个哈希数组就是每个用户应该接收到的消息)
    laravel
        25
    laravel  
    OP
       2020-04-22 18:43:49 +08:00
    我已经打算 redis 或者用 go 实现 redis 的 list 也行,一次取 100-1000 条合并之后再广播
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     1041 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 29ms UTC 23:06 PVG 07:06 LAX 15:06 JFK 18:06
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86