上下文context是对计时器(timer)+通道(channel)+同步锁(sync.Mutex)的封装,主要用于多个协程间的统一控制,如取消和定时。理论上,能用上下文的地方都可以用计时器+通道+同步锁的方式来改写,那为什么还要用上下文呢。本文就从一个例子开始推导一下这个问题。仍然还是用《Go语言:context使用的示例》里的例子,采用context的实现方式请见前文,这里不再累述。
假设有这样一个应用场景,一个公司(main)有一名经理(manager)和两名工人(worker),公司下班(main exit)有两种可能:一:工人(worker)的工作时间已经达到合同约定的最大时长;二:经理(manager)提前叫停收工。两种可能满足其中一个即可下班。
1)首先我们先简化问题,仅以timer实现上述场景的第二种下班条件:
package main import ( "fmt" "time" ) //worker工作的最大时长,超过这个时长worker自行收工无需等待manager叫停 const MAX_WORKING_DURATION = 5 * time.Second //达到实际工作时长后,manager可以提前叫停 const ACTUAL_WORKING_DURATION = 2 * time.Second func main() { ch := make(chan struct{}) go worker(ch, "[1]") go worker(ch, "[2]") go manager(ch) <-ch //暂停1秒便于协程的打印输出 time.Sleep(1 * time.Second) fmt.Println("company closed") } func manager(ch chan struct{}) { time.Sleep(ACTUAL_WORKING_DURATION) fmt.Println("manager called cancel()") close(ch) } func worker(ch chan struct{}, name string) { for { select { case <-ch: fmt.Println(name, "return for ctxWithCancel.Done()") return default: fmt.Println(name, "working") } time.Sleep(1 * time.Second) } }输出:
[2] working [1] working [1] working [2] working manager called cancel() [1] return for ctxWithCancel.Done() [2] return for ctxWithCancel.Done() company closed从输出来看,manger协程在延时2秒后关闭了通道,worker检测到通道关闭后退出,main退出。整个过程符合预期。
2)我们再加入下班的第一种条件:
在main函数里加入一个timer,当达到worker的最大工作时长时,关闭通道ch。其余代码不变。
func main() { ch := make(chan struct{}) go worker(ch, "[1]") go worker(ch, "[2]") go manager(ch) timer := time.NewTimer(MAX_WORKING_DURATION) select{ case <- timer.C: close(ch) } <-ch //暂停1秒便于协程的打印输出 time.Sleep(1 * time.Second) fmt.Println("company closed") }输出:
[1] working [2] working [2] working [1] working manager called cancel() [1] return for ctxWithCancel.Done() [2] return for ctxWithCancel.Done() panic: close of closed channel goroutine 1 [running]: main.main() /home/go-test/src/tutorial/trycontext_timer.go:24 +0x10e exit status 2最后一行报错了,试图关闭一个已经关闭的通道。因为manager已经在2秒的时候把通道ch关闭了,所以定时器5秒的时候再关通道ch就报错了。这是一个协程同步的问题,要修正这个问题就需要加入同步锁和标识通道是否关闭的标志变量,在关闭通道时使用同步锁锁定再检测标志变量的状态,完成后释放同步锁。
修正之后:
package main import ( "fmt" "sync" "time" ) //worker工作的最大时长,超过这个时长worker自行收工无需等待manager叫停 const MAX_WORKING_DURATION = 5 * time.Second //达到实际工作时长后,manager可以提前叫停 const ACTUAL_WORKING_DURATION = 2 * time.Second type ctx struct { mu sync.Mutex closed bool } func main() { ch := make(chan struct{}) go worker(ch, "[1]") go worker(ch, "[2]") var c ctx go manager(ch, &c) timer := time.NewTimer(MAX_WORKING_DURATION) select { case <-timer.C: c.mu.Lock() if c.closed == true { c.mu.Unlock() return } close(ch) c.closed = true c.mu.Unlock() } <-ch //暂停1秒便于协程的打印输出 time.Sleep(1 * time.Second) fmt.Println("company closed") } func manager(ch chan struct{}, c *ctx) { time.Sleep(ACTUAL_WORKING_DURATION) fmt.Println("manager called cancel()") c.mu.Lock() if c.closed == true { c.mu.Unlock() return } close(ch) c.closed = true c.mu.Unlock() } func worker(ch chan struct{}, name string) { for { select { case <-ch: fmt.Println(name, "return for ctxWithCancel.Done()") return default: fmt.Println(name, "working") } time.Sleep(1 * time.Second) } }这里加入了一个ctx结构体以指针的方式在协程之间共享,同时把timer对通道的关闭以及manager对通道的关闭都加上了同步锁和标志变量检测。这样一来,整个运行就正常了:
[1] working [2] working [2] working [1] working manager called cancel() [2] return for ctxWithCancel.Done() [1] return for ctxWithCancel.Done()修改ACTUAL_WORKING_DURATION = 10 * time.Second,让超时先发生,输出也符合预期:
[1] working [2] working [2] working [1] working [2] working [1] working [2] working [1] working [2] working [1] working [2] return for ctxWithCancel.Done() [1] return for ctxWithCancel.Done() company closed就功能而言,这段代码已经完全可用了,既实现了超时控制又实现了取消控制。但是,这段代码存在冗余,又不方便重复使用,因此还需要重构:
首先把通道以及对通道的取消操作封装到ctx结构体:
package main import ( "fmt" "sync" "time" ) //worker工作的最大时长,超过这个时长worker自行收工无需等待manager叫停 const MAX_WORKING_DURATION = 5 * time.Second //达到实际工作时长后,manager可以提前叫停 const ACTUAL_WORKING_DURATION = 10 * time.Second type ctx struct{ mu sync.Mutex closed bool done chan struct{} } func New() (*ctx, func()){ c := ctx{} c.done = make(chan struct{}) return &c, func(){c.cancel()} } func (c *ctx)Done() chan struct{}{ return c.done } func (c *ctx)cancel(){ c.mu.Lock() if c.closed == true{ c.mu.Unlock() return } close(c.done) c.closed = true c.mu.Unlock() } func main() { c, cancelFunc := New() go worker(c, "[1]") go worker(c, "[2]") go manager(c, cancelFunc) timer := time.NewTimer(MAX_WORKING_DURATION) select { case <-timer.C: cancelFunc() } <-c.Done() //暂停1秒便于协程的打印输出 time.Sleep(1 * time.Second) fmt.Println("company closed") } func manager(c *ctx, cancelFunc func()) { time.Sleep(ACTUAL_WORKING_DURATION) fmt.Println("manager called cancel()") cancelFunc() } func worker(c *ctx, name string) { for { select { case <-c.Done(): fmt.Println(name, "return for ctxWithCancel.Done()") return default: fmt.Println(name, "working") } time.Sleep(1 * time.Second) } }这样一来,主体程序就简洁多了,对通道的操作都被封装在ctx结构体里。到这一步,其实已经可以看出ctx结构体就是带有cancel功能的上下文的雏形。
我们再把上面定时器超时部分的代码也封装到ctx结构体
最后的代码:
package main import ( "fmt" "sync" "time" ) //worker工作的最大时长,超过这个时长worker自行收工无需等待manager叫停 const MAX_WORKING_DURATION = 5 * time.Second //达到实际工作时长后,manager可以提前叫停 const ACTUAL_WORKING_DURATION = 10 * time.Second type ctx struct{ mu sync.Mutex closed bool done chan struct{} } func New() (*ctx, func()){ c := ctx{} c.done = make(chan struct{}) return &c, func(){c.cancel()} } func NewWithTimeout(dur time.Duration) (*ctx, func()){ c := ctx{} c.done = make(chan struct{}) timer := time.NewTimer(dur) go func() { select { case <-timer.C: c.cancel() } }() return &c, func(){c.cancel()} } func (c *ctx)Done() chan struct{}{ return c.done } func (c *ctx)cancel(){ c.mu.Lock() if c.closed == true{ c.mu.Unlock() return } close(c.done) c.closed = true c.mu.Unlock() } func main() { c, cancelFunc := NewWithTimeout(MAX_WORKING_DURATION) go worker(c, "[1]") go worker(c, "[2]") go manager(c, cancelFunc) <-c.Done() //暂停1秒便于协程的打印输出 time.Sleep(1 * time.Second) fmt.Println("company closed") } func manager(c *ctx, cancelFunc func()) { time.Sleep(ACTUAL_WORKING_DURATION) fmt.Println("manager called cancel()") cancelFunc() } func worker(c *ctx, name string) { for { select { case <-c.Done(): fmt.Println(name, "return for ctxWithCancel.Done()") return default: fmt.Println(name, "working") } time.Sleep(1 * time.Second) } }总结:
本文为了实现多协程同步的问题,从定时器+通道的方式逐步推导,最终得到了一个可重复使用的结构体ctx,而这个结构体实际就是context的雏形,整个过程是一个“造轮子”的过程,golang的context已经把这个轮子造好了。所以在多协程同步的场景下,都应该使用context。