如何优雅的关闭 Go Channel「译」 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
请不要在回答技术问题时复制粘贴 AI 生成的内容
codehole
V2EX    程序员

如何优雅的关闭 Go Channel「译」

  •  
  •   codehole
    pyloque 2018-04-08 17:57:55 +08:00 4128 次点击
    这是一个创建于 2813 天前的主题,其中的信息可能已经有所发展或是发生改变。

    Channel 关闭原则

    不要在消费端关闭 channel,不要在有多个并行的生产者时对 channel 执行关闭操作。

    也就是说应该只在[唯一的或者最后唯一剩下]的生产者协程中关闭 channel,来通知消费者已经没有值可以继续读了。只要坚持这个原则,就可以确保向一个已经关闭的 channel 发送数据的情况不可能发生。

    暴力关闭 channel 的正确方法

    如果想要在消费端关闭 channel,或者在多个生产者端关闭 channel,可以使用 recover 机制来上个保险,避免程序因为 panic 而崩溃。

    func SafeClose(ch chan T) (justClosed bool) { defer func() { if recover() != nil { justClosed = false } }() // assume ch != nil here. close(ch) // panic if ch is closed return true // <=> justClosed = true; return } 

    使用这种方法明显违背了上面的 channel 关闭原则,然后性能还可以,毕竟在每个协程只会调用一次 SafeClose,性能损失很小。

    同样也可以在生产消息的时候使用 recover 方法。

    func SafeSend(ch chan T, value T) (closed bool) { defer func() { if recover() != nil { // The return result can be altered // in a defer function call. closed = true } }() ch <- value // panic if ch is closed return false // <=> closed = false; return } 

    礼貌地关闭 channel 方法

    还有不少人经常使用用 sync.Once 来关闭 channel,这样可以确保只会关闭一次

    type MyChannel struct { C chan T once sync.Once } func NewMyChannel() *MyChannel { return &MyChannel{C: make(chan T)} } func (mc *MyChannel) SafeClose() { mc.once.Do(func() { close(mc.C) }) } 

    同样我们也可以使用 sync.Mutex 达到同样的目的。

    type MyChannel struct { C chan T closed bool mutex sync.Mutex } func NewMyChannel() *MyChannel { return &MyChannel{C: make(chan T)} } func (mc *MyChannel) SafeClose() { mc.mutex.Lock() if !mc.closed { close(mc.C) mc.closed = true } mc.mutex.Unlock() } func (mc *MyChannel) IsClosed() bool { mc.mutex.Lock() defer mc.mutex.Unlock() return mc.closed } 

    要知道 golang 的设计者不提供 SafeClose 或者 SafeSend 方法是有原因的,他们本来就不推荐在消费端或者在并发的多个生产端关闭 channel,比如关闭只读 channel 在语法上就彻底被禁止使用了。

    优雅地关闭 channel 的方法

    上文的 SafeSend 方法一个很大的劣势在于它不能用在 select 块的 case 语句中。而另一个很重要的劣势在于像我这样对代码有洁癖的人来说,使用 panic/recover 和 sync/mutex 来搞定不是那么的优雅。下面我们引入在不同的场景下可以使用的纯粹的优雅的解决方法。

    多个消费者,单个生产者。这种情况最简单,直接让生产者关闭 channel 好了。

    package main import ( "time" "math/rand" "sync" "log" ) func main() { rand.Seed(time.Now().UnixNano()) log.SetFlags(0) // ... const MaxRandomNumber = 100000 const NumReceivers = 100 wgReceivers := sync.WaitGroup{} wgReceivers.Add(NumReceivers) // ... dataCh := make(chan int, 100) // the sender go func() { for { if value := rand.Intn(MaxRandomNumber); value == 0 { // The only sender can close the channel safely. close(dataCh) return } else { dataCh <- value } } }() // receivers for i := 0; i < NumReceivers; i++ { go func() { defer wgReceivers.Done() // Receive values until dataCh is closed and // the value buffer queue of dataCh is empty. for value := range dataCh { log.Println(value) } }() } wgReceivers.Wait() } 

    多个生产者,单个消费者。这种情况要比上面的复杂一点。我们不能在消费端关闭 channel,因为这违背了 channel 关闭原则。但是我们可以让消费端关闭一个附加的信号来通知发送端停止生产数据。

    package main import ( "time" "math/rand" "sync" "log" ) func main() { rand.Seed(time.Now().UnixNano()) log.SetFlags(0) // ... const MaxRandomNumber = 100000 const NumSenders = 1000 wgReceivers := sync.WaitGroup{} wgReceivers.Add(1) // ... dataCh := make(chan int, 100) stopCh := make(chan struct{}) // stopCh is an additional signal channel. // Its sender is the receiver of channel dataCh. // Its reveivers are the senders of channel dataCh. // senders for i := 0; i < NumSenders; i++ { go func() { for { // The first select here is to try to exit the goroutine // as early as possible. In fact, it is not essential // for this example, so it can be omitted. select { case <- stopCh: return default: } // Even if stopCh is closed, the first branch in the // second select may be still not selected for some // loops if the send to dataCh is also unblocked. // But this is acceptable, so the first select // can be omitted. select { case <- stopCh: return case dataCh <- rand.Intn(MaxRandomNumber): } } }() } // the receiver go func() { defer wgReceivers.Done() for value := range dataCh { if value == MaxRandomNumber-1 { // The receiver of the dataCh channel is // also the sender of the stopCh cahnnel. // It is safe to close the stop channel here. close(stopCh) return } log.Println(value) } }() // ... wgReceivers.Wait() } 

    就上面这个例子,生产者同时也是退出信号 channel 的接受者,退出信号 channel 仍然是由它的生产端关闭的,所以这仍然没有违背 channel 关闭原则。值得注意的是,这个例子中生产端和接受端都没有关闭消息数据的 channel,channel 在没有任何 goroutine 引用的时候会自行关闭,而不需要显示进行关闭。

    多个生产者,多个消费者

    这是最复杂的一种情况,我们既不能让接受端也不能让发送端关闭 channel。我们甚至都不能让接受者关闭一个退出信号来通知生产者停止生产。因为我们不能违反 channel 关闭原则。但是我们可以引入一个额外的协调者来关闭附加的退出信号 channel。

    package main import ( "time" "math/rand" "sync" "log" "strconv" ) func main() { rand.Seed(time.Now().UnixNano()) log.SetFlags(0) // ... const MaxRandomNumber = 100000 const NumReceivers = 10 const NumSenders = 1000 wgReceivers := sync.WaitGroup{} wgReceivers.Add(NumReceivers) // ... dataCh := make(chan int, 100) stopCh := make(chan struct{}) // stopCh is an additional signal channel. // Its sender is the moderator goroutine shown below. // Its reveivers are all senders and receivers of dataCh. toStop := make(chan string, 1) // The channel toStop is used to notify the moderator // to close the additional signal channel (stopCh). // Its senders are any senders and receivers of dataCh. // Its reveiver is the moderator goroutine shown below. var stoppedBy string // moderator go func() { stoppedBy = <- toStop close(stopCh) }() // senders for i := 0; i < NumSenders; i++ { go func(id string) { for { value := rand.Intn(MaxRandomNumber) if value == 0 { // Here, a trick is used to notify the moderator // to close the additional signal channel. select { case toStop <- "sender#" + id: default: } return } // The first select here is to try to exit the goroutine // as early as possible. This select blocks with one // receive operation case and one default branches will // be optimized as a try-receive operation by the // official Go compiler. select { case <- stopCh: return default: } // Even if stopCh is closed, the first branch in the // second select may be still not selected for some // loops (and for ever in theory) if the send to // dataCh is also unblocked. // This is why the first select block is needed. select { case <- stopCh: return case dataCh <- value: } } }(strconv.Itoa(i)) } // receivers for i := 0; i < NumReceivers; i++ { go func(id string) { defer wgReceivers.Done() for { // Same as the sender goroutine, the first select here // is to try to exit the goroutine as early as possible. select { case <- stopCh: return default: } // Even if stopCh is closed, the first branch in the // second select may be still not selected for some // loops (and for ever in theory) if the receive from // dataCh is also unblocked. // This is why the first select block is needed. select { case <- stopCh: return case value := <-dataCh: if value == MaxRandomNumber-1 { // The same trick is used to notify // the moderator to close the // additional signal channel. select { case toStop <- "receiver#" + id: default: } return } log.Println(value) } } }(strconv.Itoa(i)) } // ... wgReceivers.Wait() log.Println("stopped by", stoppedBy) } 

    以上三种场景不能涵盖全部,但是它们是最常见最通用的三种场景,基本上所有的场景都可以划分为以上三类。

    结论

    没有任何场景值得你去打破 channel 关闭原则,如果你遇到这样的一种特殊场景,还是建议你好好思考一下自己设计,是不是该重构一下了。

    阅读相关文章,关注公众号「码洞」

    pagxir
        1
    pagxir  
       2018-04-08 18:42:26 +08:00 via Android
    tempdban
        2
    tempdban  
       2018-04-08 19:19:23 +08:00 via Android
    我一直有个疑问,为啥都要关闭 channel
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     2511 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 27ms UTC 07:03 PVG 15:03 LAX 23:03 JFK 02:03
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86