准备写个基于 kafka 的延迟队列, 有感兴趣的吗 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
petelin
V2EX    Kafka

准备写个基于 kafka 的延迟队列, 有感兴趣的吗

  •  
  •   petelin 2019-05-29 17:53:27 +08:00 10923 次点击
    这是一个创建于 2331 天前的主题,其中的信息可能已经有所发展或是发生改变。

    解决的问题: kafka 不支持延迟队列

    如何解决: 如果是延迟小时, push 之前先放到 redis 里, 然后 work 通过 lua 轮训拿到需要真的 push 到队列里的请求, 然后 push 到 kafka 里.

    整个功能其实和 Python 的 celery 或者 Go 的 machinery 很像.但是前者需要单独部署项目复杂, 后者不支持 kafka.

    有搞头吗?

    21 条回复    2021-03-29 20:35:34 +08:00
    jaylee77
        1
    jaylee77  
       2019-05-29 18:08:16 +08:00
    没有
    Takamine
        2
    Takamine  
       2019-05-29 20:13:15 +08:00 via Android
    这样的感觉……还不如就只用 redis,从 list 里面取出来直接拿到 redis 订阅发布_(:з」∠)_。
    Varobjs
        3
    Varobjs  
       2019-05-30 08:31:39 +08:00 via Android
    beanstalk 了解下
    petelin
        4
    petelin  
    OP
       2019-05-30 09:12:28 +08:00 via iPhone
    @Takamine 性能会有问题 分布式的东西还是应该让队列做
    petelin
        5
    petelin  
    OP
       2019-05-30 09:18:25 +08:00 via iPhone
    @Varobjs 多谢 跟 sqs 很像 不过看到基于内存的 可靠性可能要打折了 还没具体看 上班研究一下
    ebingtel
        6
    ebingtel  
       2019-05-30 09:21:26 +08:00
    有个疑问,真的用得上 kafka 的场景,为了延迟 N 分钟、塞进 redis,单机内存会爆的吧?
    petelin
        7
    petelin  
    OP
       2019-05-30 09:38:35 +08:00 via iPhone
    @ebingtel 还好 Redis 也可以不是单机 分库分表嘛 具体的消息不要存进 Redis 存个 ID 就行 主要是必须解耦 容灾 可扩展 消息还最好只消费一次 后面这些要求任何一个消息队列都比现在的 Redis 实现要好

    当然做玩具或者小公司不用考虑这些
    airfling
        8
    airfling  
       2019-05-30 09:42:54 +08:00
    延迟注定会有个问题是,如果并发足够大,你存储延迟时这些数据的地方会内存爆掉
    petelin
        9
    petelin  
    OP
       2019-05-30 11:41:43 +08:00
    @airfling 不一定存内存里啊, Redis 只是为了方便, 你存 MySQL 里, 加上索引, 不也可以吗
    nicoljiang
        10
    nicoljiang  
    PRO
       2019-05-30 15:14:59 +08:00
    感觉是为了用新式的方案,出了一个坑,为了埋这个坑,又要用老的方案来弥补。
    mooncakejs
        11
    mooncakejs  
       2019-05-30 16:22:32 +08:00
    kafka 的特点是高性能,搞了这玩意就没有高性能了,那为什么不用直接支持定时的队列系统呢。
    其实高性能的队列中间件往往都不支持定时,或者有限支持。
    kafka 也可以参考有限支持的,只要支持有限的定时时长,还是很简单的。
    Feedline
        12
    Feedline  
       2019-05-30 17:15:40 +08:00
    为啥不用 rabbitmq ?
    petelin
        13
    petelin  
    OP
       2019-05-30 17:34:47 +08:00 via iPhone
    @Feedline 想用 公司不支持这个
    guagusi
        14
    guagusi  
       2019-05-31 09:10:24 +08:00
    时间轮了解一下
    petelin
        15
    petelin  
    OP
       2019-05-31 13:23:05 +08:00
    @guagusi 这个只是 kafka 内部需要延迟然后搞的一个东西, 没有暴露给生产者吧
    Damnever
        16
    Damnever  
       2019-06-02 11:48:39 +08:00
    用 Redis/MySQL 加个中间层就已经能做了一个延时队列了,仅仅是为了使用 kafka 的接口?

    据说 kafka 已经支持基于 timestamp 的消费了;如果非要用困难模式也是可以的,在 topic 上做文章,对过期时间点根据需求进行分段,每个分段对应一个 topic,然后对 producer 和 consumer 搞层封装,并不觉得数据量大的情况下这么玩 kafka 能不出问题
    petelin
        17
    petelin  
    OP
       2019-06-02 12:22:17 +08:00 via iPhone
    @Damnever 中间层能支持的 QPS 肯定没有算好了直接甩给 kafk 高对吧
    Damnever
        18
    Damnever  
       2019-06-02 12:40:36 +08:00
    @petelin 丢给 kafka 性能就高是什么逻辑,性能是由短板决定的
    petelin
        19
    petelin  
    OP
       2019-06-02 14:49:40 +08:00 via iPhone
    @Damnever 比方说只用 Redis 的话 有多少个 worker 就得 poll 多少次 这对 Redis 是一个挑战 不如只开很少的 worker 只进行分发消息 消息处理丢给 kafka
    另外 kafka 等消息队列可以支持 只处理一次 或者最少处理一次
    Damnever
        20
    Damnever  
       2019-06-02 17:36:22 +08:00
    @petelin 其实我不太明白你的具体场景和限制条件,但从你的描述来看我觉得这些其实都不是问题,再者又是基于什么理由判断对 Redis 是挑战的东西但是对 Kafka 就不是挑战了呢?当然不想重新造轮子觉得成本太高将各种系统的部分功能组合变成自己想要的功能也是可以的,但不管怎样都得造点东西.. (就我个人看来最简单干净的方法是使用支持延时队列的消息系统)
    bthulu
        21
    bthulu  
       2021-03-29 20:35:34 +08:00
    可以做固定时间点的延时重发.
    比如说, 1 秒, 10 秒, 30 秒, 1 分钟, 5 分钟, 1 小时, 8 小时, 24 小时延时等.
    针对每个延时时间创建一个队列, 生产者按延时需求将数据(数据里包一层最终要去的队列名)发送到对应队列.
    然后每个队列起一个消费者, 轮询数据, 到点发送到目标队列即可.
    ```
    headers.put('finalTopic', topic);
    producer.send(new ProducerRecord(delayedTopic, key, value, headers));
    ```

    ```
    // 60 秒延时队列
    int delay = 60_000;
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
    long timeLeft = record.timestamp() + delay - System.currentTimeMillis();
    if (timeLeft > 0) {
    Thread.sleep(timeLeft);
    }
    var topic = record.headers.lastHeader('finalTopic')
    record.headers.remove('finalTopic');
    producer.send(new ProducerRecord(topic, key, value, headers));
    }
    }
    ```
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     5204 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 34ms UTC 09:14 PVG 17:14 LAX 02:14 JFK 05:14
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86