求 go 并发限制的最佳实现 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
monkeyWie
0.65D
V2EX    Go 编程语言

求 go 并发限制的最佳实现

  •  
  •   monkeyWie
    monkeyWie 2021-02-08 12:01:29 +08:00 7810 次点击
    这是一个创建于 1776 天前的主题,其中的信息可能已经有所发展或是发生改变。

    情况如下: 有 N 个任务,每个任务执行完都会返回结果或者 error,通过固定的(M)协程去执行,如果其中有一个任务返回 error 时立即结束,否则全部执行完成时返回结果列表。

    我自己写了一版,感觉有点复杂:https://play.golang.org/p/ono1S04XupK

    不知道各位有没有什么更简单的实现。

    第 1 条附言    2021-02-08 18:32:55 +08:00
    大家好好审下题啊,要在有一个任务发送错误时立即结束流程,errgroup 是不行的哦,它会等到所有任务执行完毕才返回。
    第 2 条附言    2021-02-10 12:33:58 +08:00
    感谢各位大佬的思路,目前基于最优实现封装了一个通用的方法: https://play.golang.org/p/Be7vNF4JH4-
    如果各位有更好的思路可以重写下面的 run 方法然后分享出来
    第 3 条附言    2021-02-10 13:27:32 +08:00
    再更新下,上面代码的没有实现发生错误立即返回: https://play.golang.org/p/B3fZZCWwuKT
    70 条回复    2021-02-19 10:21:51 +08:00
    ToPoGE
        1
    ToPoGE  
       2021-02-08 12:16:33 +08:00 via Android
    context 不就可以实现吗
    cloudfstrife
        2
    cloudfstrife  
       2021-02-08 12:26:53 +08:00 via Android
    errgroup 了解一下
    xylophone21
        3
    xylophone21  
       2021-02-08 12:30:21 +08:00
    一起探讨,如果 M 不是大到离谱,是否需要控制 M ?还是无脑 go 就好了,因为这样感觉就是 go 程池 /线程池了,有点像把 go 程当线程用了?
    sunorg
        4
    sunorg  
       2021-02-08 12:44:23 +08:00
    不是 channel 就可以实现了吗?
    monkeyWie
        5
    monkeyWie  
    OP
       2021-02-08 13:22:19 +08:00
    @ToPoGE context 实现不了等待任务全部执行完成吧
    monkeyWie
        6
    monkeyWie  
    OP
       2021-02-08 13:25:00 +08:00
    @xylophone21 现在就是 M 如果太大了服务器会顶不住,比如并发查询 sql
    monkeyWie
        7
    monkeyWie  
    OP
       2021-02-08 13:34:51 +08:00
    @cloudfstrife errgroup 我试了下,第一不能限制并发数,第二不能在发生错误时立即返回。
    baodaren8
        8
    baodaren8  
       2021-02-08 13:35:46 +08:00
    借楼问一下,我怎么不能发帖了。。
    coool
        9
    coool  
       2021-02-08 13:45:42 +08:00
    @baodaren8 我也不能发了
    ToPoGE
        10
    ToPoGE  
       2021-02-08 13:52:03 +08:00
    @monkeyWie context 配合 waitGRoup,就可以做到失败一个全干掉,全完成,返回
    MadbookPro
        11
    MadbookPro  
       2021-02-08 14:00:54 +08:00
    一个 err chan 用来通知 root goroutine 发生错误,然后 root context 退出; wait group 用来做全部完成检查。
    这样何如?
    fatedier
        12
    fatedier  
       2021-02-08 14:04:21 +08:00
    MadbookPro
        13
    MadbookPro  
       2021-02-08 14:04:47 +08:00
    哦还有并发控制。
    taskChannel + workerChannel,这样 wait group 也不需要了
    mogg
        14
    mogg  
       2021-02-08 14:23:54 +08:00
    线程池+channel,收到错误消息停止向队列里发送消息。
    想要立刻回收,主线程直接 stop 线程池里所有线程,否则等待运行中线程跑完
    monkeyWie
        15
    monkeyWie  
    OP
       2021-02-08 14:27:43 +08:00
    @ToPoGE @MadbookPro @mogg
    直接上代码吧,这样说不明白
    LoNeFong
        16
    LoNeFong  
       2021-02-08 14:29:07 +08:00
    errgroup + 1
    /table>
    Claar
        17
    Claar  
       2021-02-08 14:37:21 +08:00 via iPhone
    并发控制:协程池
    消息传递:channel
    还有等待协程结束再结束 main 函数:sync.
    甚至可以子任务报错直接 exit
    ToPoGE
        18
    ToPoGE  
       2021-02-08 14:38:59 +08:00
    @monkeyWie 那你用 errgroup 把,go 官方标准库中的,看文档直接用,就是 context 和 waitGroup 结合的
    ppphp
        19
    ppphp  
       2021-02-08 14:48:36 +08:00
    看不到代码。。。我的思路是,for 循环几个 goroutine,select 的时候,先从 errchannel 里看 err,然后 default 里从 taskchannel 里拿 task
    mogg
        20
    mogg  
       2021-02-08 15:48:18 +08:00
    ```go
    var wg sync.WaitGroup
    pool := makePool(pollSize)
    func() {
    for i := 0; i < runTimes; i++ {
    select {
    case <-errChan:
    fmt.Println("error")
    pool.Stop()
    return
    default:
    wg.Add(1)
    poll.Submit(task)
    }
    }
    }()
    if pool.IsRunning()
    wg.Wait()
    ```
    我觉得核心就是这样一个结构,不过 go 没用过线程池,可以看看有什么库
    KaynW
        21
    KaynW  
       2021-02-08 15:53:11 +08:00
    caiych
        22
    caiych  
       2021-02-08 16:05:58 +08:00
    一个比较简单的实现
    https://play.golang.org/p/MDU_x7C2npI

    如果有一个 routine 有 error,ctx 会 Done
    如果 ctx 已经 Done 了,semaphore 会 error
    wpf375516041
        23
    wpf375516041  
       2021-02-08 17:03:40 +08:00
    package main

    import (
    "fmt"
    "net/http"
    )

    /*有 N 个任务,每个任务都会返回结果或者 error,通过固定的并发数(M)去执行。
    如果其中有一个任务返回 error 时立即结束,否则全部执行完成时返回结果列表*/
    func main() {

    n := 10
    m := 5
    result := make([]string, n)
    limitCh := make(chan interface{}, m)
    errCh := make(chan error)
    doneCh := make(chan interface{},1)

    defer func() {
    close(limitCh)
    close(errCh)
    }()

    for state, i := true, 0; i < n; i++ {
    state = true
    for state {
    select {
    case limitCh <- nil:
    fmt.Printf("开始第%d 个任务\n", i)
    go func(i int) {
    var err error
    defer func() {
    if i == n-1 {
    close(doneCh)
    }
    if err != nil {
    errCh <- err
    }
    <-limitCh
    }()
    ret, err := doTask()
    if err != nil {
    return
    }
    result[i] = ret
    }(i)
    state = false
    case <-errCh:
    return
    default:
    }
    }
    }
    <- doneCh
    fmt.Println(result)

    }

    func doTask() (string, error) {
    // 模拟执行任务
    resp, err := http.Get("https://www.baidu.com")
    if err != nil {
    return "", err
    }
    defer resp.Body.Close()
    return resp.Status, nil
    }
    monkeyWie
        24
    monkeyWie  
    OP
       2021-02-08 18:17:31 +08:00
    @caiych #22 这个做不到有一个 task 发生 error 立即结束,例如: https://play.golang.org/p/sqlMbgW7z9Z
    monkeyWie
        25
    monkeyWie  
    OP
       2021-02-08 18:18:55 +08:00
    @caiych #22 比如第一个任务执行已经失败了,需要立即返回,而不是等到所有任务执行完
    monkeyWie
        26
    monkeyWie  
    OP
       2021-02-08 18:28:10 +08:00
    @wpf375516041 #23 这个好像也有点问题哦,就是判断任务全部执行完成的地方

    ```
    if i == n-1 {
    close(doneCh)
    }
    ```
    这里判断最后一个任务执行完成就结束,但是可能会存在还有正在执行的任务并且比最后一个任务执行还慢,就不对了。
    caiych
        27
    caiych  
       2021-02-08 18:54:42 +08:00
    @monkeyWie 你仔细看一下打出来的内容,第一批开始执行的任务里没有 1 ( 1 没有抢到信号量)。当 1 抢到信号量开始执行后整个程序都结束了(其他的任务都取消了,不然如果 i 那个循环大的话整个程序会运行很久)
    rrfeng
        28
    rrfeng  
       2021-02-08 19:36:10 +08:00
    1. 声明一个 buffered chan 当做 goroutine 池子 。
    2. 启动 goroutine 的时候传入 context 用做取消(前提是你的 goroutine 任务可以取消)。
    ginjedoad
        29
    ginjedoad  
       2021-02-08 20:29:53 +08:00
    兄弟,errgroup 就是为了你这个场景设计的。不要重复造轮子了
    ginjedoad
        30
    ginjedoad  
       2021-02-08 20:30:42 +08:00
    说只要一个错误不能马上中断返回的,自己好好审计一下 errgroup
    monkeyWie
        31
    monkeyWie  
    OP
       2021-02-08 22:29:21 +08:00
    @caiych #27 错误发生时不会立刻结束,而是会等正在执行的任务全部完成才返回,你可以跑这个试试: https://play.golang.org/p/66Me2TYbVoK

    错误发生了也要等 5 秒才结束。
    monkeyWie
        32
    monkeyWie  
    OP
       2021-02-08 22:32:04 +08:00
    @ginjedoad #30 老哥贴个代码我跑一下看看,我自己测的 errgroup 是不能发生错误立即中断的
    wpf375516041
        33
    wpf375516041  
       2021-02-08 22:55:03 +08:00
    @monkeyWie 是滴 用 waitgroup 大家说的都对 其实你把原来的封装下 搞个协程池 代码就清晰了
    wpf375516041
        34
    wpf375516041  
       2021-02-08 23:16:58 +08:00   1
    我觉得这是个很好的面试题,既有实际意义也考验基本功,大家可以试试不用三方库实现一下~
    talk is cheap, show me the code
    一起娱乐娱乐,新年快乐~!
    1. 控制并发
    2. 等待所有任务返回
    3. 一个任务错误,立刻结束

    如果不解耦,并发控制和结果处理的逻辑混杂确实屎

    go 实现的时候想当然了,只以最后提交的任务判断是否结束

    kotlin 协程实现的时候发现 io,计算任务无法退出,必须要手动捕捉中止信号

    java 须要手动捕捉中止信号 但是可以通过 thread.stop()强制停止,另外判断线程是否异常退出较难
    caiych
        35
    caiych  
       2021-02-09 00:06:49 +08:00   2
    @monkeyWie

    最开始的版本里注释有写 DoWork 里的操作需要支持 context cancellation (比如如果操作是 http.Get 的话,可以使用 http.NewRequestWithContext)

    这个里面实现了一个如果调用的操作不支持 context cancellation 的情况
    https://play.golang.org/p/Cot1FYgIKLd
    ooh
        36
    ooh  
       2021-02-09 00:11:53 +08:00
    monkeyWie
        37
    monkeyWie  
    OP
       2021-02-09 08:45:30 +08:00
    @wpf375516041 #34 哈哈,搞不好会加入大厂面试题库
    monkeyWie
        38
    monkeyWie  
    OP
       2021-02-09 08:48:38 +08:00
    @caiych #35 实际上很多 work 是不支持 cancel 的,而且也不一定要 cancel 掉,只要不阻塞主协程就行了,发送错误的时候主协程继续执行,其它正在执行的任务让它继续跑。
    monkeyWie
        39
    monkeyWie  
    OP
       2021-02-09 09:47:54 +08:00
    @caiych #35 不好意思前面没看仔细,这个确实可以,赞一个!
    guonaihong
        40
    guonaihong  
       2021-02-09 09:51:10 +08:00
    难道 slice+context.Context+errgroup 的组合不行?

    1.分配 M 容量的 slice 。放 M 个 go 程正常运行的结果。每个 go 程都有自己的 index,所以存结果这块都不要加锁,相当舒服的操作。

    2.context 当作异常终端点。每个 go 程都持有这个 context 变量。任意一个 go 程错误,cancel 。任意 go 程检查 ctx.Done()。所谓 “如果其中有一个任务返回 error 时立即结束” 完美实现。

    3.检查 errgroup 的返回,err != nil,就返回错误,else 部分没有错误,返回第 1 步声明的 slice
    crclz
        41
    crclz  
       2021-02-09 09:54:50 +08:00
    分解问题:
    1. 通过固定的 M 个携程执行。
    解决方案:信号量或者条件变量或者 channel,很多种方法实现。

    2. 只要出现一个任务返回 Error,就立即结束全部任务。
    解决方案:C#的 TAP 的 Task.WhenAny 和 CancellationToken 模式可以很好的解决这些问题。
    那么只需要用 go 来实现 Task.WhenAny 和 CancellationToken 模式即可。

    CancellationToken 很好实现,用 struct {IsCancellationRequired: bool}就可以实现。
    Task.WhenAny 可以用如下方式实现:有一个 chan ch,每个任务完成后,往 ch 里面写一个数字(或者写入任务信息),同时主线程阻塞读取 ch 。
    monkeyWie
        42
    monkeyWie  
    OP
       2021-02-09 09:57:02 +08:00
    @guonaihong #40 应该行的,但是对第一点有点疑问,用 slice 怎么实现 M 个协程的限制呢
    guonaihong
        43
    guonaihong  
       2021-02-09 10:01:11 +08:00
    //放结果伪代码
    // 每个 go 程的 id 已经固定下来,就是 for 循环启动的 index.大家操作自己的私有 index 。为啥会有竞争?
    for i :=0;i < M;i++ {
    i:=i
    go func(){
    slice[i] = result
    }
    }
    jackeliang
        44
    jackeliang  
       2021-02-09 10:09:47 +08:00
    @monkeyWie 发生错误不能立即返回的,需要等待其它协程结束才行,不然协程泄露了。
    aeli
        45
    aeli  
       2021-02-09 10:11:36 +08:00
    所有在并发里要求错误立即中断所有并发返回的,显然是脑抽抽
    dawniii
        46
    dawniii  
       2021-02-09 10:13:53 +08:00
    当其中有一个协程有错误,另一个协程在密集计算,应该是没法打断直接返回的吧
    SignLeung
        47
    SignLeung  
       2021-02-09 11:10:36 +08:00   1
    楼上说的没错,协程好像只能自己结束
    SethShi
        48
    SethShi  
       2021-02-09 11:11:06 +08:00
    N 个的容量的 success channel.
    再来一个 err channel

    然后主线程那里 select 这两个 channel 做事情就可以啦.

    ----------------------
    至于你纠结 err 之后协程能不能关闭, 那个不是你关心的事情了. 可以考虑传递一个 context 给 request, err 发生错误的时候进行 cancel context 即可.
    liyunlong41
        49
    liyunlong41  
       2021-02-09 11:24:29 +08:00
    https://play.golang.org/p/YYvynelzIHj
    感兴趣写了下,如果想出现错误后中断其他 goroutine 的处理,其他 goroutine 必须可以被 cancel 掉。
    asAnotherJack
        50
    asAnotherJack  
       2021-02-09 12:47:03 +08:00
    errgroup 可以实现出错时结束流程,前提是你的代码实现了 cancel 逻辑
    Aoang
        51
    Aoang  
       2021-02-09 14:47:03 +08:00 via Android
    并发限制用协程池,退出机制需要自己实现。

    如果懒得实现,就直接摘出来,用系统的实现。main 启动之后开一个协程监听退出信号,收到退出信号之后直接 os.Exit()

    其实还是应该自己在代码中实现出来,那个操作只能玩玩。看看源码,用 runtime.Goexit() 来终止协程。

    errgroup context 都是用来传递消息的,并不是来做终止的。你用这两个来做的话,只能每进行一步就检查一下是否有退出信号,不然就做不到及时退出。
    teawithlife
        52
    teawithlife  
       2021-02-09 14:48:32 +08:00
    @liyunlong41 #49 你好,请教一下,在 61 行的这一部分,为什么两个 channel 之间互相要写数据?
    ```
    select {
    case err := <-errCh:
    handleErr(err, result[1:])
    <-done
    case <-done:
    if len(errCh) > 0 {
    err := <-errCh
    handleErr(err, result[1:])
    return
    }
    fmt.Println("success handle all task:", result[1:])
    }
    ```

    改成这样是否可以?
    ```
    select {
    case err := <-errCh:
    handleErr(err, result[1:])
    case <-done:
    fmt.Println("success handle all task:", result[1:])
    }
    ```
    Aoang
        53
    Aoang  
       2021-02-09 15:01:42 +08:00 via Android
    @teawithlife #52

    因为他代码中 errCh 和 done 作用是一样的,一个 chan 就可以了,检查 chan 中的内容是出错退出的还是最终完成退出就行了。
    liyunlong41
        54
    liyunlong41  
       2021-02-09 15:28:38 +08:00
    @teawithlife 没有相互写数据,是为了一些健壮性考虑吧。第一个 case 如果从 errCh 中读到 err 了,<-done 表示等待其他 goroutine 也都结束,是怕有 goroutine 泄露。第二个 case 是考虑可能有极小概率 errCh 和 done 同时有数据,select 随机选择了 done channel,所以最终再判断下 errCh 是否有 err 。
    monkeyWie
        55
    monkeyWie  
    OP
       2021-02-09 19:50:17 +08:00
    @liyunlong41 #49 目前 35L 这种应该是最优雅的实现,我们用纯标准库实现的还是太复杂了哈哈
    teawithlife
        56
    teawithlife  
       2021-02-09 20:55:51 +08:00
    @liyunlong41 #54 “相互写数据”这个表述不太严谨。不过你说的有道理,从健壮性来说,确实需要考虑这些极端情况。


    @monkeyWie #55 个人觉得 35L 的写法确实比较优雅,但是从效率来说,还是 49L 的写法更好一些,因为 49L 的协程数量是 M 个,而 35L 的协程数量是 N 个,当 N>>M 时,虽然 golang 的协程足够轻量,但是也没必要这么浪费。
    monkeyWie
        57
    monkeyWie  
    OP
       2021-02-09 20:59:03 +08:00
    @teawithlife #56 35L 协程数量其实是 N 个,用信号量做了控制的
    monkeyWie
        58
    monkeyWie  
    OP
       2021-02-09 21:00:12 +08:00
    @monkeyWie #57 说出了,是 M 个
    teawithlife
        59
    teawithlife  
       2021-02-10 08:29:04 +08:00
    @monkeyWie #57 协程有 N 个,只不过通过信号量保证了其中只有 M 个在跑,其他的协程虽然在等待,也需要消耗少量资源
    u2r1Hqo6HExmNsrt
        60
    u2r1Hqo6HExmNsrt  
       2021-02-10 09:52:31 +08:00
    我之前研究过 go 的流控怎么写,感觉还是流控好,流控还可以控制几秒才做一个任务,单纯控制并发感觉不如流控。
    monkeyWie
        61
    monkeyWie  
    OP
       2021-02-10 10:55:47 +08:00   1
    @teawithlife #59 额,确实是 N 个协程,不过稍微改下就行了,把信号量控制放在循环里面
    https://play.golang.org/p/SP7a8MaDd8B
    MilletChili
        62
    MilletChili  
       2021-02-10 11:23:42 +08:00
    感觉不需要写太多代码,直接用 channel 控制就好了
    https://play.golang.org/p/JiD2EopqPkL
    monkeyWie
        63
    monkeyWie  
    OP
       2021-02-10 12:10:11 +08:00
    @MilletChili #62 这种思路好像也不错,不过如果要加上参数传递和结果、错误返回也还是挺复杂的
    monkeyWie
        64
    monkeyWie  
    OP
       2021-02-10 12:37:58 +08:00
    @MilletChili#62 可以尝试下用这种思路实现下这里的 run 方法,https://play.golang.org/p/Be7vNF4JH4-
    MilletChili
        65
    MilletChili  
       2021-02-10 13:57:42 +08:00
    @monkeyWie 嗯嗯,真要项目搞,还是用一些开源包好点
    troywinter
        66
    troywinter  
       2021-02-10 16:42:15 +08:00
    控制并发直接用 Semaphore 就行了,至于遇到错误怎么退出 goroutine 应该是你的业务代码自己实现,不同业务场景需要有不同的处理。
    YouLMAO
        67
    YouLMAO  
       2021-02-11 22:50:02 +08:00 via Android
    errgroup.withcontext 楼主是不是没学过 go 呀? 这样只要一个 err,context 就会取消,全部都返回了
    tolerance
        68
    tolerance  
       2021-02-13 00:37:32 +08:00
    把 channel 关了就可以
    kevinwan
        69
    kevinwan  
       2021-02-17 13:09:27 +08:00 via iPhone
    go-zero 下有个 mr 包解决这种场景,遇到 error 可以 cancel 所有任务
    abccccabc
        70
    abccccabc  
       2021-02-19 10:21:51 +08:00
    各位问一下,你们是怎样看 play.golang.org/p/xxxxxx 代码的?难道要 pa 强??
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     909 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 43ms UTC 23:23 PVG 07:23 LAX 15:23 JFK 18:23
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86