Celery+Django 如何动态修改已被接收或已在队列中的任务? - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
Raul7
V2EX    Python

Celery+Django 如何动态修改已被接收或已在队列中的任务?

  •  
  •   Raul7 2022-11-21 16:45:02 +08:00 2984 次点击
    这是一个创建于 1060 天前的主题,其中的信息可能已经有所发展或是发生改变。

    项目遇到两种场景:

      1. 任务已被 Celery 接收;
      1. 任务已在队列中排队;

    队列配置了默认任务优先级,我想在任务被接收或者在队列中,动态修改任务的优先级,有什么比较好的方法吗?

    希望各位大佬不吝赐教。

    8 条回复    2022-12-07 15:34:44 +08:00
    crysislinux
        1
    crysislinux  
       2022-11-21 16:49:23 +08:00 via Android
    把任务处理做成幂等的,然后要改优先级就同样的参数但是更高的优先级再发一次。
    Raul7
        2
    Raul7  
    OP
       2022-11-21 16:51:05 +08:00
    @crysislinux 再发一次的话,之前的那个任务要清掉吗?应该不会覆盖掉吧?队列就需要执行两个任务了
    westoy
        3
    westoy  
       2022-11-21 17:01:10 +08:00
    不能修改
    revoke 之前那个 task id, 再重新创建一个,celery 运行到的时候会判断状态
    如果你本身要处理的数据会保存状态, 那也不用取消了, 执行的时候判断一下就行了
    celery 的优先级比较玄学, 也只有基于 rabbitmq 才支持
    Raul7
        4
    Raul7  
    OP
       2022-11-21 17:20:29 +08:00
    @westoy 现在新版本的 celery ,Redis 也支持的
    crysislinux
        5
    crysislinux  
       2022-11-21 17:24:27 +08:00 via Android
    @Raul7 不会覆盖,但是后面允许的那个处理的时候你的代码能判断出来处理过了就可以跳过
    akaHenry
        6
    akaHenry  
       2022-11-22 12:28:31 +08:00
    应该没有什么队列产品, 满足你这奇怪的需求. 不只是 celery.

    队列, 只是管道, 是无状态的.

    你想改 task 状态, 是业务需求. 需要自己单独设计方案.

    给个思路:

    1. task 数据包内, 定义 task_type 字段.
    2. redis, 根据 task_type, 动态调整优先级 level 值.
    3. 定义第一级 MQ 管道 + task worker, 此 worker 主要做调度器. 收到 task 时, 根据 task_type, 查询 redis, 判断实时优先级, 进而决定:
    - 立即执行
    - 转发到另一个队列中, 等待其他 task worker 处理.
    - 忽略丢弃
    4. 次一级的 MQ + task worker, 无脑执行.


    celery, 可以是个管道链. 其他消息队列, 都可按照此思路设计业务.
    akaHenry
        7
    akaHenry  
       2022-11-22 12:35:28 +08:00
    另外不要滥用 celery, 这东西, 本身是个调度器, 做定时任务用还行.

    常规的 MQ 需求, 踢开 celery, 直接写业务代码, 操作 rabbitmq/kafka 就好.

    当然, 如果是维护的不值钱代码, 当我没说.
    kelvin_fly
        8
    kelvin_fly  
       2022-12-07 15:34:44 +08:00
    队列是无状态的管道。已经进入后,不能再做修改。这应该是个业务上的问题,需要记录队里的 ID ,然后在任务分配那,和 worker 里做业务联动处理 i
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     3133 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 24ms UTC 12:05 PVG 20:05 LAX 05:05 JFK 08:05
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86