Go 语言并发模型:像 Unix Pipe 那样使用 channel - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
请不要在回答技术问题时复制粘贴 AI 生成的内容
oscarzhao
V2EX    程序员

Go 语言并发模型:像 Unix Pipe 那样使用 channel

  •  
  •   oscarzhao
    oscarzhao 2016-08-13 23:36:10 +08:00 4081 次点击
    这是一个创建于 3356 天前的主题,其中的信息可能已经有所发展或是发生改变。

    简介

    Go 语言的并发原语允许开发者以类似于 Unix Pipe 的方式构建数据流水线 (data pipelines),数据流水线能够高效地利用 I/O 和多核 CPU 的优势。

    本文要讲的就是一些使用流水线的一些例子,流水线的错误处理也是本文的重点。

    阅读建议

    数据流水线充分利用了多核特性,代码层面是基于 channel 类型 和 go 关键字。

    channel 和 go 贯穿本文的始终。如果你对这两个概念不太了解,建议先阅读之前发布的两篇文章:Go 语言内存模型(上 /下)

    如果你对操作系统中"生产者"和"消费者"模型比较了解的话,也将有助于对本文中流水线的理解。

    本文中绝大多数讲解都是基于代码进行的。换句话说,如果你看不太懂某些代码片段,建议补全以后,在机器或 play.golang.org 上运行一下。对于某些不明白的细节,可以手动添加一些语句以助于理解。

    由于 Go 语言并发模型 的英文原文 Go Concurrency Patterns: Pipelines and cancellation 篇幅比较长,本文只包含 理论推导和简单的例子。 下一篇文章我们会对 "并行 MD5" 这个现实生活的例子进行详细地讲解。

    什么是 "流水线" (pipeline)?

    对于"流水线"这个概念, Go 语言中并没有正式的定义,它只是很多种并发方式的一种。这里我给出一个非官方的定义:一条流水线是 是由多个阶段组成的,相邻的两个阶段由 channel 进行连接; 每个阶段是由一组在同一个函数中启动的 goroutine 组成。在每个阶段,这些 goroutine 会执行下面三个操作:

    1. 通过 inbound channels 从上游接收数据
    2. 对接收到的数据执行一些操作,通常会生成新的数据
    3. 将新生成的数据通过 outbound channels 发送给下游

    除了第一个和最后一个阶段,每个阶段都可以有任意个 inbound 和 outbound channel 。 显然,第一个阶段只有 outbound channel ,而最后一个阶段只有 inbound channel 。 我们通常称第一个阶段为"生产者""源头",称最后一个阶段为"消费者""接收者"

    首先,我们通过一个简单的例子来演示这个概念和其中的技巧。后面我们会更出一个真实世界的例子。

    流水线入门:求平方数

    假设我们有一个流水线,它由三个阶段组成。

    第一阶段是 gen 函数,它能够将一组整数转换为 channel , channel 可以将数字发送出去。 gen 函数首先启动一个 goroutine ,该 goroutine 发送数字到 channel ,当数字发送完时关闭 channel 。 代码如下:

    func gen(nums ...int) <-chan int { out := make(chan int) go func() { for _, n := range nums { out <- n } close(out) }() return out } 

    第二阶段是 sq 函数,它从 channel 接收一个整数,然后返回 一个 channel ,返回的 channel 可以发送 接收到整数的平方。 当它的 inbound channel 关闭,并且把所有数字均发送到下游时,会关闭 outbound channel 。代码如下:

    func sq(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { out <- n * n } close(out) }() return out } 

    main 函数 用于设置流水线并运行最后一个阶段。最后一个阶段会从第二阶段接收数字,并逐个打印出来,直到来自于上游的 inbound channel 关闭。代码如下:

    func main() { // 设置流水线 c := gen(2, 3) out := sq(c) // 消费输出结果 fmt.Println(<-out) // 4 fmt.Println(<-out) // 9 } 

    由于 sq 函数的 inbound channel 和 outbound channel 类型一样,所以组合任意个 sq 函数。比如像下面这样使用:

    func main() { // 设置流水线并消费输出结果 for n := range sq(sq(gen(2, 3))) { fmt.Println(n) // 16 then 81 } } 

    如果我们稍微修改一下 gen 函数,便可以模拟 haskell 的惰性求值。有兴趣的读者可以自己折腾一下。

    流水线进阶:扇入和扇出

    扇出:同一个 channel 可以被多个函数读取数据,直到 channel 关闭。 这种机制允许将工作负载分发到一组 worker ,以便更好地并行使用 CPU 和 I/O 。

    扇入:多个 channel 的数据可以被同一个函数读取和处理,然后合并到一个 channel ,直到所有 channel 都关闭。

    下面这张图对 扇入 有一个直观的描述:

    扇入

    我们修改一下上个例子中的流水线,这里我们运行两个 sq 实例,它们从同一个 channel 读取数据。 这里我们引入一个新函数 merge 对结果进行"扇入"操作:

    func main() { in := gen(2, 3) // 启动两个 sq 实例,即两个 goroutines 处理 channel "in" 的数据 c1 := sq(in) c2 := sq(in) // merge 函数将 channel c1 和 c2 合并到一起,这段代码会消费 merge 的结果 for n := range merge(c1, c2) { fmt.Println(n) // 打印 4 9, 或 9 4 } } 

    merge 函数 将多个 channel 转换为一个 channel ,它为每一个 inbound channel 启动一个 goroutine ,用于将数据 拷贝到 outbound channel 。 merge 函数的实现见下面代码 (注意 wg 变量):

    func merge(cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // 为每一个输入 channel cs 创建一个 goroutine output // output 将数据从 c 拷贝到 out ,直到 c 关闭,然后 调用 wg.Done output := func(c <-chan int) { for n := range c { out <- n } wg.Done() } wg.Add(len(cs)) for _, c := range cs { go output(c) } // 启动一个 goroutine ,用于所有 output goroutine 结束时,关闭 out // 该 goroutine 必须在 wg.Add 之后启动 go func() { wg.Wait() close(out) }() return out } 

    在上面的代码中,每个 inbound channel 对应一个 output 函数。所有 output goroutine 被创建以后, merge 启动一个额外的 goroutine , 这个 goroutine 会等待所有 inbound channel 上的发送操作结束以后,关闭 outbound channel 。

    对已经关闭的 channel 执行发送操作(ch<-)会导致异常,所以我们必须保证所有的发送操作都在关闭 channel 之前结束。 sync.WaitGroup 提供了一种组织同步的方式。 它保证 merge 中所有 inbound channel (cs ...<-chan int) 均被正常关闭, output goroutine 正常结束后,关闭 out channel 。

    停下来思考一下

    在使用流水线函数时,有一个固定的模式:

    1. 在一个阶段,当所有发送操作 (ch<-) 结束以后,关闭 outbound channel
    2. 在一个阶段, goroutine 会持续从 inbount channel 接收数据,直到所有 inbound channel 全部关闭

    在这种模式下,每一个接收阶段都可以写成 range 循环的方式, 从而保证所有数据都被成功发送到下游后, goroutine 能够立即退出。

    在现实中,阶段并不总是接收所有的 inbound 数据。有时候是设计如此:接收者可能只需要数据的一个子集就可以继续执行。 更常见的情况是:由于前一个阶段返回一个错误,导致该阶段提前退出。 这两种情况下,接收者都不应该继续等待后面的值被传送过来。

    我们期望的结果是:当后一个阶段不需要数据时,上游阶段能够停止生产。

    在我们的例子中,如果一个阶段不能消费所有的 inbound 数据,试图发送这些数据的 goroutine 会永久阻塞。看下面这段代码片段:

     // 只消费 out 的第一个数据 out := merge(c1, c2) fmt.Println(<-out) // 4 or 9 return // 由于我们不再接收 out 的第二个数据 // 其中一个 goroutine output 将会在发送时被阻塞 } 

    显然这里存在资源泄漏。一方面 goroutine 消耗内存和运行时资源,另一方面 goroutine 栈中的堆引用会阻止 gc 执行回收操作。 既然 goroutine 不能被回收,那么他们必须自己退出。

    我们重新整理一下流水线中的不同阶段,保证在下游阶段接收数据失败时,上游阶段也能够正常退出。 一个方式是使用带有缓冲的管道作为 outbound channel 。缓存可以存储固定个数的数据。 如果缓存没有用完,那么发送操作会立即返回。看下面这段代码示例:

    c := make(chan int, 2) // 缓冲大小为 2 c <- 1 // 立即返回 c <- 2 // 立即返回 c <- 3 // 该操作会被阻塞,直到有一个 goroutine 执行 <-c ,并接收到数字 1 

    如果在创建 channel 时就知道要发送的值的个数,使用 buffer 就能够简化代码。 仍然使用求平方数的例子,我们对 gen 函数进行重写。我们将这组整型数拷贝到一个 缓冲 channel 中,从而避免创建一个新的 goroutine :

    func gen(nums ...int) <-chan int { out := make(chan int, len(nums)) for _, n := range nums { out <- n } close(out) return out } 

    回到 流水线中被阻塞的 goroutine ,我们考虑让 merge 函数返回一个缓冲管道:

    func merge(cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int, 1) // 在本例中存储未读的数据足够了 // ... 其他部分代码不变 ... 

    尽管这种方法解决了这个程序中阻塞 goroutine 的问题,但是从长远来看,它并不是好办法。 缓存大小选择为 1 是建立在两个前提之上:

    1. 我们已经知道 merge 函数有两个 inbound channel
    2. 我们已经知道下游阶段会消耗多少个值

    这段代码很脆弱。如果我们在传入一个值给 gen 函数,或者下游阶段读取的值变少, goroutine 会再次被阻塞。

    为了从根本上解决这个问题,我们需要提供一种机制,让下游阶段能够告知上游发送者停止接收的消息。 下面我们看下这种机制。

    显式取消 (Explicit cancellation)

    当 main 函数决定退出,并停止接收 out 发送的任何数据时,它必须告诉上游阶段的 goroutine 让它们放弃 正在发送的数据。 main 函数通过发送数据到一个名为 done 的 channel 实现这样的机制。 由于有两个潜在的 发送者被阻塞,它发送两个值。如下代码所示:

    func main() { in := gen(2, 3) // 启动两个运行 sq 的 goroutine // 两个 goroutine 的数据均来自于 in c1 := sq(in) c2 := sq(in) // 消耗 output 生产的第一个值 done := make(chan struct{}, 2) out := merge(done, c1, c2) fmt.Println(<-out) // 4 or 9 // 告诉其他发送者,我们将要离开 // 不再接收它们的数据 done <- struct{}{} done <- struct{}{} } 

    发送数据的 goroutine 使用一个 select 表达式代替原来的操作, select 表达式只有在接收到 out 或 done 发送的数据后,才会继续进行下去。 done 的值类型为 struct{} ,因为它发送什么值不重要,重要的是它发送没发送: 接收事件发生意味着 channel out 的发送操作被丢弃。 goroutine output 基于 inbound channel c 继续执行 循环,所以上游阶段不会被阻塞。(后面我们会讨论如何让循环提前退出)。 使用 done channel 方式实现的 merge 函数如下:

    func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // 为 cs 的的每一个 输入 channel // 创建一个 goroutine 。 output 函数将 // 数据从 c 拷贝到 out ,直到 c 关闭, // 或者接收到 done 信号; // 然后调用 wg.Done() output := func(c <-chan int) { for n := range c { select { case out <- n: case <-done: } } wg.Done() } // ... the rest is unchanged ... 

    这种方法有一个问题:每一个下游的接收者需要知道潜在被阻上游发送者的个数,然后向这些发送者发送信号让它们提前退出。 时刻追踪这些数目是一项繁琐且易出错的工作。

    我们需要一种方式能够让未知数目、且个数不受限制的 goroutine 停止向下游发送数据。在 Go 语言中,我们可以通过关闭一个 channel 实现,因为在一个已关闭 channel 上执行接收操作(<-ch)总是能够立即返回,返回值是对应类型的零值。关于这点的细节,点击这里查看。

    换句话说,我们只要关闭 done channel ,就能够让解开对所有发送者的阻塞。对一个管道的关闭操作事实上是对所有接收者的广播信号。

    我们把 done channel 作为一个参数传递给每一个 流水线上的函数,通过 defer 表达式声明对 done channel 的关闭操作。 因此,所有从 main 函数作为源头被调用的函数均能够收到 done 的信号,每个阶段都能够正常退出。 使用 done 对 main 函数重构以后,代码如下:

    func main() { // 设置一个 全局共享的 done channel , // 当流水线退出时,关闭 done channel // 所有 goroutine 接收到 done 的信号后, // 都会正常退出。 done := make(chan struct{}) defer close(done) in := gen(done, 2, 3) // 将 sq 的工作分发给两个 goroutine // 这两个 goroutine 均从 in 读取数据 c1 := sq(done, in) c2 := sq(done, in) // 消费 outtput 生产的第一个值 out := merge(done, c1, c2) fmt.Println(<-out) // 4 or 9 // defer 调用时, done channel 会被关闭。 } 

    现在,流水线中的每个阶段都能够在 done channel 被关闭时返回。 merge 函数中的 output 代码也能够顺利返回,因为它 知道 done channel 关闭时,上游发送者 sq 会停止发送数据。 在 defer 表达式执行结束时,所有调用链上的 output 都能保证 wg.Done() 被调用:

    func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // 为 cs 的每一个 channel 创建一个 goroutine // 这个 goroutine 运行 output ,它将数据从 c // 拷贝到 out ,直到 c 关闭,或者 接收到 done // 的关闭信号。人啊后调用 wg.Done() output := func(c <-chan int) { defer wg.Done() for n := range c { select { case out <- n: case <-done: return } } } // ... the rest is unchanged ... 

    同样的原理, done channel 被关闭时, sq 也能够立即返回。在 defer 表达式执行结束时,所有调用链上的 sq 都能保证 out channel 被关闭。代码如下:

    func sq(done <-chan struct{}, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { select { case out <- n * n: case <-done: return } } }() return out } 

    这里,我们给出几条构建流水线的指导:

    1. 当所有发送操作结束时,每个阶段都关闭自己的 outbound channels
    2. 每个阶段都会一直从 inbound channels 接收数据,直到这些 channels 被关闭,或发送者解除阻塞状态。

    流水线通过两种方式解除发送者的阻塞:

    1. 提供足够大的缓冲保存发送者发送的数据
    2. 接收者放弃 channel 时,显式地通知发送者。

    结论

    本文介绍了 Go 语言中构建数据流水线的一些技巧。流水线的错误处理比较复杂,流水线的每个阶段都可能阻塞向下游发送数据, 下游阶段也可能不再关注上游发送的数据。上面我们介绍了通过关闭一个 channel ,向流水线中的所有 goroutine 发送一个 "done" 信号;也定义了 构建流水线的正确方法。

    下一篇文章,我们将通过一个 并行 md5 的例子来说明本文所讲的一些理念和技巧。

    原作者 Sameer Ajmani ,翻译 Oscar

    下期预告: Go 语言并发模型:以并行 md5 计算为例。原文链接

    相关链接:

    1. 原文链接: https://blog.golang.org/pipelines
    2. Go 并发模型: http://talks.golang.org/2012/concurrency.slide#1
    3. Go 高级并发模型: http://blog.golang.org/advanced-go-concurrency-patterns

    扫码关注微信公众号“深入 Go 语言”

    在这里

    5 条回复    2016-08-16 09:40:18 +08:00
    pubby
        1
    pubby  
       2016-08-14 00:25:23 +08:00
    翻译得不错

    感觉 merge 应该再改造一下

    output := func(c <-chan int) {
    defer wg.Done()
    for { // 不依赖上游 chan
    select {
    case <-done: // 放第一个,尽早退出
    return
    case n,ok:=<- c:
    if !ok { // inbound closed
    return
    }
    out <- n
    }
    }
    }
    pubby
        2
    pubby  
       2016-08-14 00:28:19 +08:00
    写错了,跟 switch 搞混了 -_-
    struCoder
        3
    struCoder  
       2016-08-15 13:56:19 +08:00
    我之前翻译的一篇老外的关于 golang 中的特性文章,与 lz 表达的很有相似度。
    http://www.strucoder.com/2016/03/15/gozhong-de-bing-fa-ke-shi-hua/
    oscarzhao
        4
    oscarzhao  
    OP
       2016-08-15 16:49:59 +08:00
    @struCoder 这篇文章好像在哪里看过
    struCoder
        5
    struCoder  
       2016-08-16 09:40:18 +08:00
    @oscarzhao 不会吧,国内好像就是我一个人翻译了老外的这篇文章的样子。。。。
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     923 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 26ms UTC 21:45 PVG 05:45 LAX 14:45 JFK 17:45
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86