求各位大佬们指导批量入队列的最佳姿势 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
jowan
V2EX    PHP

求各位大佬们指导批量入队列的最佳姿势

  •  
  •   jowan 2018-04-04 14:26:31 +08:00 3229 次点击
    这是一个创建于 2754 天前的主题,其中的信息可能已经有所发展或是发生改变。
    遇到个需求类似于余额宝的业务,每天按照用户的账户余额×年利率进行分红

    现在的做法是每天 cron 定时处理一次将所有会员主键存入队列,然后开启一个守护进程实时监听丢给 job 处理

    定时脚本里面的逻辑是从数据库取出所有会员 foreach 入队的,但是当会员数量达到百万千万越来越大的时候一次性取出的话因为内存问题不太现实,所以把定时任务从每天执行一次改成每分钟,然后每次入队一定数量的会员,但是这样没有达到理想的效果,因为每次入队一定量的数据,如果提前完成了那剩下的会员要等到下一分钟执行,而且每分钟入队最合适的数据量也需要自己估算,不能完全的实现排队执行任务

    请问各位大佬通常应该用什么方式解决这个问题?
    15 条回复    2018-04-14 22:43:26 +08:00
    Immortal
        1
    Immortal  
       2018-04-04 14:33:41 +08:00
    “所以把定时任务从每天执行一次改成每分钟,然后每次入队一定数量的会员”
    为什么不能把这一步改在脚本里面,定时启动这个脚本,在脚本里面批量处理呢。。钻牛角尖了吧
    jowan
        2
    jowan  
    OP
       2018-04-04 14:46:21 +08:00
    @Immortal 确实可以把处理的逻辑放在脚本里面 但是实际处理分红逻辑的时候,会频繁操作 DB (开启事务、更新会员表,写日志等),内存会有很大的消耗,从而不得不减少每次定时处理的数量,这样每次执行的数量更少 那全部执行完所有任务花费的时间更长。用队列的想法是可以开合适的多线程处理尽量减少执行时间而不用考虑内存问题

    理想方式是 :执行任务的总耗时 ≈ 每个任务的平均耗时 * 任务总量 这样会尽快处理掉业务
    Itoktsnhc
        3
    Itoktsnhc  
       2018-04-04 14:59:15 +08:00
    是否可以根据入队列的消息数量 动态的注入更多的任务?
    likuku
        4
    likuku  
       2018-04-04 15:05:03 +08:00
    分布式处理机,处理节点根据自身性能开出多进程(不要让系统超载)并行处理,
    假若待处理的任务太多,预计整体运行太久(超出任务最大可忍受等待时长),
    那么继续增加处理节点(能根据任务量自动增减处理节点更好)
    wjpdev
        5
    wjpdev  
       2018-04-04 15:07:34 +08:00   1
    走生产者-消费者的思路解决, 自已定好队列窗口合适大小(最小值:防止消费速度过慢,最大值:防止内存占完)。两个进程,一个进程 A 实时(或间隔 1s/3s/5s)监听队列是否为空,空了就从数据库去拉下一批数据的往队列里扔。另一个守护进程 B 实时监听丢给 job 处理。

    实时监控还是很占 CPU 资源的。更智能一点的,A 初始化的时候查一下当前用户总数多少,记下来。先丢 1000 个记录,第一次隔 10s 检查一下队列,如果队列空了,再丢 1000,但第二次是隔 9s 的时候检查,如果也空了,再丢 1000,隔 8s 检查。如果队列不为空,检查时间点+0.1 ..... 直到找到一个最优的检查时间点丢任务。

    1000 和 0.1 是可以调节的。 伪 AI .
    3IOhG7M0knRu5UlC
        6
    3IOhG7M0knRu5UlC  
       2018-04-04 16:19:22 +08:00 via Android
    千万压入队列也没多少占用,然后处理不就行了 不够就多来几个
    jowan
        7
    jowan  
    OP
       2018-04-04 16:38:34 +08:00
    @GooMS 入队占用的内存不是问题 问题是如何快速将 DB 中的千万条数据取出压入 你在脚本中一次获取千万条的数据 foreach 一下内存肯定崩了
    starmoon1994
        8
    starmoon1994  
       2018-04-04 16:54:20 +08:00
    前几天也遇到了这种问题 需要的数据可能是 1 个也可能是 1 万个
    我是做了个递归 在递归里每次去库里取 100 个 取够了就结束递归
    类似的 你可以每 N 分钟取一次数据库 假设队列容量是 M 那么每次就取 M-当前未消费量
    hisway
        9
    hisway  
       2018-04-04 17:07:21 +08:00
    队列任务里去查询下一批要处理的会员,生成下个执行队列
    slgz
        10
    slgz  
       2018-04-04 17:09:00 +08:00
    @wjpdev 我之前做导出功能的时候, 就是这么处理的, 每次丢 1000. 性能还可以
    jevonszmx
        11
    jevonszmx  
       2018-04-04 23:17:15 +08:00   1
    有几种办法,从多个角度解决:
    1、取数据太多:导致占用内存太大的问题,可以了解一下 yield,避免一次全读内存中处理;
    2、定时任务频率问题:可以使用 php pcntl 创建 daemon 守护进程,监控待处理数据。守护进程不像 crontab 那样执行频率受限( crontab 最低是每分钟一次,跑太多时间不够,太少有点浪费),所以只要设置一个合适的频率即可,不需要算每次数量那么痛苦;
    3IOhG7M0knRu5UlC
        12
    3IOhG7M0knRu5UlC  
       2018-04-04 23:44:40 +08:00 via Android   1
    @jowan 按照块取同志,一次取 100 条等等。然后我觉得可以让任务发布者只发布任务携带基础信息,其他业务数据交给 work 这样就拆分了压力
    3IOhG7M0knRu5UlC
        13
    3IOhG7M0knRu5UlC  
       2018-04-04 23:45:52 +08:00 via Android
    @jowan 如果优化好数据结构只需要几百 m 内存
    junan0708
        14
    juan0708  
       2018-04-14 22:38:18 +08:00 via Android
    mysql limit 不行啊?
    junan0708
        15
    junan0708  
       2018-04-14 22:43:26 +08:00 via Android
    主进程循环读取,丢到队列,然后 fork 子进程消费吧,
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     1621 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 24ms UTC 16:21 PVG 00:21 LAX 09:21 JFK 12:21
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86