两个线程一起从表中取数据,表和 sql 如何设计呢? - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
qviqvi
V2EX    数据库

两个线程一起从表中取数据,表和 sql 如何设计呢?

  •  
  •   qviqvi 2024-06-20 12:10:54 +08:00 2542 次点击
    这是一个创建于 478 天前的主题,其中的信息可能已经有所发展或是发生改变。

    一个任务表,两个线程同时每次取一条数据执行业务逻辑,然后把这条数据标记为完成或暂时放弃,暂时放弃的后面会被再次取出

    要求同一条数据不能被多个线程同时取出

    求优秀的设计方案

    32 条回复    2024-06-21 13:39:17 +08:00
    rekulas
        1
    rekulas  
       2024-06-20 12:31:20 +08:00
    取数据的时候加行锁就行了
    tclm
        2
    tclm  
       2024-06-20 13:37:42 +08:00
    主键是数值吗?确定只有两个线程吗?
    如果都是的话,一个线程只处理单数,一个线程只处理双数。
    Vegetable
        4
    Vegetable  
       2024-06-20 13:40:43 +08:00
    1. 行内有字段能识别当前正在处理中
    2. 取的时候加锁
    gitdoit
        5
    gitdoit  
       2024-06-20 13:45:28 +08:00
    只有一个线程负责交互任务表, 其他线程再和这个线程交互
    coder001
        6
    coder001  
       2024-06-20 13:46:42 +08:00
    搞个并发队列,不管是进程内还是进程外,工作线程去队列取工作项
    主线程负责从数据库读出东西放进队列,工作线程拿到任务就先标记一下立即更新再开始执行,遇到放弃(需要再次执行)的状况主线程里再次拿出投进队列
    test0x01
        7
    test0x01  
       2024-06-20 14:03:54 +08:00 via Android
    一个单数 ID 一个双数 ID
    MoYi123
        8
    MoYi123  
       2024-06-20 14:09:39 +08:00   2
    update table set status = 'work' where id = 1 and stats = 'ready'

    然后看 update 的返回值是 0 还是 1 即可. 和 redis 的 setnx 差不多的做法.
    rainxt
        9
    rainxt  
       2024-06-20 14:11:22 +08:00   1
    正在做类似的需求,额外用了 redis 做消息队列,1 个任务往消息队列里写,几十个业务进程从消息队列里拉取任务,处理完回写数据库标记完成,消息队列里空的时候再去查数据库里还有没有要处理的往里加
    Karte
        10
    Karte  
       2024-06-20 14:14:42 +08:00
    1. 创建一个带有并发控制所的队列.
    2. 每个线程从该队列中获取数据.
    3. 在获取数据时, 如果队列为空, 则由当前线程请求数据库, 批量读取数据丢入队列.
    4. 数据暂时放弃时, 将数据丢入队列底部.

    这样就好了. 在读取数据时通过锁做到数据获取无冲突, 而且不会有重复的数据从数据库中读取; 另一方面也降低了 IO 请求.

    这个方案的缺点在获取数据时会有锁的存在, 如果数据处理比较快, 可能会遇到其他线程在等待其中一个线程读取数据库的返回.
    8355
        11
    8355  
       2024-06-20 14:16:11 +08:00
    1.最简单的单进程扫表推消息队列
    2.update task set status=1 (运行中) where id = xx 根据影响行数判断是否哪个线程可以抢占执行中状态。
    8355
        12
    8355  
       2024-06-20 14:17:01 +08:00
    @8355 同 8l 少了待执行状态 一个意思
    mifly
        13
    mifly  
       2024-06-20 14:17:14 +08:00 via Android
    可以利用数据库的 update 语句的锁来实现
    生成唯一的 Lockkey
    Update sometable ser lockkey=lockkey where ... Limit 10
    Select ... From sometable where lockkey=lockkey
    Karte
        14
    Karte  
       2024-06-20 14:21:51 +08:00
    方案二:

    1. 创建一个对象, 对象记录一个查询位置, 查询数量.
    2. 每个线程通过加锁读取该对象获取位置和数量, 并在获取后更新当前查询位置. 释放锁
    3. 通过之前获取到的查询位置, 查询数量从数据库获取一段范围的数据.
    4. 将数据丢入线程本地队列, 依次处理.
    5. 对暂时放弃的数据 id 存入 步骤一 对象中. 采用 CopyOnWrite 方法更新对象值.

    这个方案将查询 IO 放入到了每个线程, 线程只需要在获取当前处理位置时加锁就好了. 这样锁的时间十分短暂, 几乎可以做到 "无锁" 的情况. 但是需要注意线程数量, 查询数量参数; 尽可能的将线程数拉小, 查询数量拉大, 这样线程可以更专注处理数据, 而不是一直等待 IO 返回.
    Karte
        15
    Karte  
       2024-06-20 14:23:06 +08:00
    方案三:
    直接用现成的消息队列, 把要处理的消息丢到队列里. 消费者直接多线程从消息队列中获取数据处理. 这个开发方案是可以做到代码量最小.
    CEBBCAT
        16
    CEBBCAT  
       2024-06-20 14:26:06 +08:00
    加锁哪有串行得劲,模仿 Redis 一致性哈希,加一列 int 给任务打标签,你干一三五,我干二四六,加一个 worker 捡漏。或者用 MQ 传递 ID ,让已经成熟的 MQ 来干这事


    *然后发现让楼上抢答了,气!
    nothingistrue
        17
    nothingistrue  
       2024-06-20 14:36:55 +08:00   1
    楼主你要得,应该不只是「同一条数据不能被多个线程同时取出」,而且还要是,「一条数据被一个线程处理时,其他线程顺序取下一条数据处理」

    @rekulas #1
    @Vegetable #4
    取数逻辑是未处理数据中的第一个,行锁管不了(意味着非序列级别的事务也管不了),表锁和序列级别事务又管得太多了写回前都会阻塞另一个线程取数,这样的话无论多少线程都跟单线程没啥区别。

    @MoYi123 #8 你这个其实就是变相的表锁

    我这没有方案,不过可以确定单靠数据库是解决不了了。上面两个用队列的方案,看起来就星了。
    yjhatfdu2
        18
    yjhatfdu2  
       2024-06-20 14:46:29 +08:00   2
    begin;
    select * from tbl for update skip locked limit 1;
    # 处理逻辑
    update from tbl where xxx=xxx set processed=1;
    commit;
    yjhatfdu2
        19
    yjhatfdu2  
       2024-06-20 14:47:00 +08:00
    现在的程序员都不会开事务了吗
    Vegetable
        20
    Vegetable  
       2024-06-20 18:06:32 +08:00
    @nothingistrue 锁是为了修改状态标记占坑,不是处理任务的时候锁着。
    qviqvi
        21
    qviqvi  
    OP
       2024-06-20 18:39:17 +08:00
    @yjhatfdu2 感觉这个是正解
    rqxiao
        22
    rqxiao  
       2024-06-21 08:56:54 +08:00
    @yjhatfdu2 第一次知道
    csrocks
        23
    csrocks  
       2024-06-21 09:00:17 +08:00
    number of task = N

    task_0: select * from t where mod(id_hash, N)=0
    ...
    task_N: select * from t where mod(id_hash, N)=N-1
    nothingistrue
        24
    nothingistrue  
       2024-06-21 10:01:22 +08:00
    @Vegetable #20 看好取数的查询条件,不是 select * from t by id ,而是 select * from t where stauts= 未处理(以及未锁定) 中的第一条。两个线程之间的共享对象,不是一行数据,而是所有未处理数据,你要加锁或者占坑,也只能这么占。加锁之后的结果就是,多个线程只能轮流处理,跟单线程一个效果,无法并行。
    Vegetable
        25
    Vegetable  
       2024-06-21 10:15:42 +08:00
    @nothingistrue 我没看明白你说的是什么,锁只是确保状态只会被一个线程修改,和 MoYi123 说的判断影响行数的方案本质是一样的,锁的持续时间也就几毫秒而已,哪里和单线程一个效果
    nothingistrue
        26
    nothingistrue  
       2024-06-21 10:25:19 +08:00
    @yjhatfdu2 #18
    https://stackoverflow.com/questions/53288584/select-for-update-skip-locked-in-repetable-read-transactions
    看下 skip locked 的效果,可以重复加锁,但是事务提交的时候要判定数据有没有被更改过,如果已经更改,那么本事务要失败。这就是个乐观锁,先提交的成功,后提交的失败,使用场景是并发修改的机率不高的场景。如果你在多线程并发场景中用乐观锁,那没跑几步就会只剩一个线程活着,其他线程全部出错终止(加了出错之后重启机制,效果会更差,绝大部分性能将被浪费在失败重启上)。

    事务不是一句「开事务」就完事大吉的。什么时候开事务,开什么样的事务,都是有考究的。最常见的错误,就是认为开了事务就不怕并发数据冲突了。事务的隔离性是分级别的,序列化级别才能保证完全不出现并发数据冲突但同时也没并发了。常用的隔离级别是可重复读,只在并发数据是确定的单行/多行数据的时候才能保证无并发冲突,当并发数据是表,或者表中不确定的数据时,还是要加锁处理并发冲突的。

    而怎么加锁,也是有考究的。楼主的场景是没法加锁的,因为它的并发数据是「 stauts = 未处理」的表,不是「主键 = xxx 」的行。加常规悲观锁就让线程排队,等同于失去并发。加乐观锁,就是开玩笑。

    @qviqvi #21 不要只找简单答案,容易错。Karte 那三个方案才是正确的。
    nothingistrue
        27
    nothingistrue  
       2024-06-21 10:41:02 +08:00
    @Vegetable #25
    update table set status = 'work' where id = 1 and stats = 'ready'
    id = 1 怎么来的:是 select id from table where stauts= 未处理 limit 1 得到的
    那要是 select 语句跟 update 语句之间,其他线程已经提前做了 update 呢:这个 update 会返回 0 ,本线程处理失败
    接着呢:如果你当失败处理,那么这个线程没了;如果你重新 select ,那么大部分时间要消耗在 select -> update return 0 -> select 的死循环中了。

    我对 @MoYi123 第一个回复确实是错了,不是表锁,而是乐观锁。但乐观锁不解决问题,详见我上面对 yjhatfdu2 的回复。

    楼主这个场景要加锁,你只能对 「 where stauts= 未处理 」,或者「全表」加锁,加了之后多线程从并发变排队,形同单线程。
    glacer
        28
    glacer  
       2024-06-21 11:09:28 +08:00
    id mod 线程数
    MoYi123
        29
    MoYi123  
       2024-06-21 11:18:26 +08:00
    @nothingistrue
    id=1 怎么来的? 当然是 select * from t where stats = 'ready' limit 10 查出来的啊,
    每个 worker 先拉一个 todo list, 然后一个个去领任务, 成功领到了就开始干, 这有啥问题啊,

    难道你用消息队列就能凭空变出来这个 id 了? 不也是这样查的
    linyinma
        30
    linyinma  
       2024-06-21 11:59:45 +08:00
    明明在应用层可以处理的,偏偏要扯到数据库,: 一个线程批量取,多个线程消费, 中间加个锁消费完了再去取.....
    nothingistrue
        31
    nothingistrue  
       2024-06-21 12:37:21 +08:00
    @MoYi123 #29 消息队列读写分离了,可以单独只对读那一瞬间加锁。10 楼说得明明白白。
    MoYi123
        32
    MoYi123  
       2024-06-21 13:39:17 +08:00
    @nothingistrue 回答一个问题, 往队列里塞数据的进程是不是单点? 或者你要怎么加锁?
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     5549 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 29ms UTC 08:56 PVG 16:56 LAX 01:56 JFK 04:56
    Do have faith in what you're doing.
    ubao snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86