Go语言并发编程
与传统语言不同,Go是为并发而设计的语言,它在语言层次上提供了对并发编程的大量支持。
修改多个Goroutine同时访问的数据时,必须串行化访问,方法有两种:
- 通过通道操作
- 使用sync、sync/atomic包提供的同步原语
在不改变语义的前提下,编译期和处理器可能对读写操作进行重新排序。由于重新排序的存在,不同Goroutine看到的执行顺序可能不同。
在多Goroutine访问共享变量的情况下,必须使用同步机制,确保读操作能看到期望的(happens before的)写。
Go应用初始化时只有一个Goroutine,它能够创建更多的Goroutine,这些Goroutine会并发的运行。
如果包p导入了包q,则q的init函数的执行完毕,发生在任何p的init函数之前。
go语句的执行,发送在Goroutine主体逻辑执行之前。
Goroutine的销毁不保证发生在任何事件之前。
向一个通道发送数据,发生在接收数据之前:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
var c = make(chan int, 10) var a string func f() { a = "hello, world" c <- 0 } func main() { go f() <-c print(a) } |
上面的例子会确保一定能打印hello, world。
Goroutine是由Go运行时管理的轻量级线程,不依赖于操作系统级别的线程机制。
下面是Goroutine简单的例子:
1 2 3 4 5 6 7 8 9 10 11 12 |
func worker(name string) { for i := 0; i < 2; { time.Sleep(time.Second) fmt.Println("%v-%v", name, i) i++ } } func main() { // 使用go关键字,后面跟着一个函数调用,即可发动一个Goroutine go worker("W") worker("M") } |
Goroutine和程序主体部分在同一地址空间运行,因此它们共享变量时必须进行同步。sync包提供了相关的同步原语。
Go运行时为Goroutine实现线程的多路复用 —— 如果某个Goroutine阻塞(例如调用了某个阻塞性的系统调用),则运行时自动将阻塞Goroutine所在的OS线程上分配的其余Goroutine转移到其它Runnable的OS线程上。
每个Goroutine都需要栈资源,Go运行时支持动态栈大小,新创建的Goroutine的栈大小为几个KB,并根据需要进行扩展/收缩。这种动态栈大小让Goroutine平均资源消耗很小,在单台机器上创建10万级别的Goroutine很常见,但是创建相同数量的OS线程,负担要大得多。
Goroutine的调度模型由Go运行时实现。调度模型发生过一次较大的重构。
G-M-P调度模型从Go 1.2版本引入,并沿用至今。在此模型中,有三个重要的角色:
- G:对应Go语言中的Goroutine,运行时需要维护每个Goroutine的执行栈、执行状态等信息
- P:逻辑处理器,每个逻辑处理器对应一个局部运行队列,G可以在其上排队等候执行。P决定了Go程序中可以并行执行的Goroutine的数量(当然不能超过物理CPU的核心数)。对于G来说,P就是运行它的CPU
- M:对应操作系统线程。M不保存任何G的信息,这让G跨M调度成为可能
P必须绑定到M(逻辑处理器必须绑定到OS线程)才能让本地运行队列中的G有机会运行。当P绑定到M后会执行调度循环,调度循环会:
- 从P的本地运行队列中获取可运行的G
- 切换到G的执行栈
- 执行G的函数
如果G中执行了死循环,那么G将永远占据对应的P和M。这样会导致相同P中的其它G没有机会执行,更严重的是,如果只有一个P的情况下,程序中所有其它G都会被饿死。
为了解决上面的问题,Go 1.2引入抢占式调度。Go会在每个函数的入口都添加一段额外的代码,以保证Go运行时有机会检查,并执行抢占式调度。这种抢占式调度只能解决部分问题,对于没有任何函数调用的单纯死循环仍然无能为力。
Go程序启动时,运行时会创建一个名为sysmon的M(监控线程),此M不需要绑定P即可运行。sysmon每隔20us~10ms就会运行并执行对长时间运行的G发起抢占调度:
- 设置G的抢占标识位为true
- 当G下一次调用任何函数时,Go运行时即执行抢占
- G被放入P的本地运行队列,等待下一次调度
Go的理念是,基于阻塞性的接口编程,并使用Goroutine和Channel处理并发,而不是使用回调或者Future。对于Goroutine而言,所有IO操作都是阻塞性的。但是,大部分IO操作都不会导致M阻塞,这避免了大量创建操作系统线程。
Go运行时实现了netpoller,它将操作系统提供的异步网络IO转换为Go中的阻塞性IO,这一方面防止M被阻塞,另一方面保持了Go语言内部的阻塞性接口风格。
当G在通道操作或者网络IO上阻塞时,运行时调度器将其放入某个等待队列,M会尝试执行下一个可运行的G。如果没有可运行的G则M和P解绑并进入休眠状态。
当通道操作完成或者网络IO可用时,等待队列中的G被唤醒,标记为可运行并放入到某个P的本地运行队列中,绑定M并执行。
对于普通文件(Non-Pollable)的IO操作会导致M阻塞,等待IO操作完成后被操作系统唤醒。这种情况下,P会和M分离,寻求其它的M进行绑定,如果没有空闲的M则会创建新的M。这意味着大量的IO可能导致很多操作系统线程被创建。
发生任何其它阻塞性系统调用时,Go运行时的调度逻辑类似。
Go运行时会在逻辑处理器P上调度Goroutine,每个P会绑定一个操作系统线程M。每个P上可能有多个Goroutine等待被执行,这些Goroutine形成的队列被叫做本地运行队列。
新创建的Goroutine会被放置到一个全局运行队列中,Go运行时的调度器会进行调度,把Goroutine分配给某个P。
调用 runtime.GOMAXPROCS(1)可以设置逻辑处理器的数量。逻辑处理器的数量应该和物理核心数量一致,设置的过大并不能提升性能:
1 |
runtime.GOMAXPROCS(runtime.NumCPU()) |
GOMAXPROCS最大可以设置到256。
设置环境变量GODEBUG可以查看运行时调度器的状态,例如设置 GODEBUG=schedtrace=1000则Go运行时每秒钟都会在控制台上打印调度器状态:
1 2 3 4 5 6 7 8 9 10 11 |
SCHED 10ms: gomaxprocs=8 idleprocs=6 threads=4 spinningthreads=1 idlethreads=1 runqueue=0 [1 0 0 0 0 0 0 0] # SCHED,固定前缀,表示这一行调试信息由运行时调度器打印 # 10ms,从程序启动到打印信息经过的时间 # gomaxprocs 最大P数量 # idleprocs 空闲的P数量 # threads 操作系统线程数量 # spinningthreads 正在自旋的操作系统线程数量 # idlethread 空闲的操作系统线程数量 # runqueue 全局运行队列中G的数量 # [1 0 0 0 0 0 0 0] 各P的本地队列中G的数量 |
设置环境变量 GODEBUG=schedtrace=1000,scheddetail=1可以获得更加详细的信息。
应用程序退出时,不会等待Goroutine执行完毕,而是会立即退出。解决办法包括:
- 使用WaitGroup,让主Goroutine等待Worker Goroutine完成
- 如果Worker是长期运行的消息处理循环,应当从主Goroutine向其发送信号,导致其退出。最常见的用法是stop chan
基于通信来共享内存,而不是基于共享内存进行通信 —— Share memory by communicating; don't communicate by sharing memory。
Goroutine和Channel受到CSP并发模型(Communicating Sequential Processes)的启发。在CSP中,多个Process使用Channel进行通信,并且这种通信通常是同步式的。
Go语言中的通道是一种强类型的管道,你可以使用通道操作符 <-来从通道接收数据、发送数据到通道:
1 2 3 4 5 |
// 类似于切片,你需要先创建才能使用通道 ch := make(chan int) ch <- v // 将值v发送到通道ch v := <-ch // 从ch接收数据并赋值给v |
下面是两个Goroutine通过Channel进行通信的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
func receiver(ch chan int8) { for val := range ch { fmt.Printf("Received value: %v\n", val) } } func sender(ch chan int8) { for i := 0; i < 3; i++ { ch <- int8(i) time.Sleep(time.Second) } close(ch) } func main() { ch := make(chan int8) go sender(ch) go receiver(ch) time.Sleep(time.Second * 5) } |
Goroutine可能会永久的阻塞在通道上,对于发送者来说,只要接收方没有响应,就会发生这种情况。
考虑下面的例子:
1 2 3 4 5 6 7 8 |
func First(query string, replicas ...Search) Result { c := make(chan Result) searchReplica := func(i int) { c <- replicas[i](query) } for i := range replicas { go searchReplica(i) } return <-c } |
此函数从一系列replicas中返回第一个replica(query)调用的结果。除了第一个执行完毕的Goroutine之外,其它的都会阻塞,原因是主Goroutine仅仅从通道获取一个数据后就退出了。
解决此问题的方法有多种:
- 使用具有足够大缓冲的通道: c := make(chan Result,len(replicas))
- 使用带有default的select:
1234567searchReplica := func(i int) {select {case c <- replicas[i](query):// 如果通道c不可写,则执行下面的分支default:}} - 使用一个“取消”通道:
1234567891011done := make(chan struct{})// 外层函数退出时,通道被关闭defer close(done)searchReplica := func(i int) {select {case c <- replicas[i](query):// 那时任何协程都可以读取通道并退出case <- done:}}
默认情况下,通过通道发送、接收数据都会阻塞,直到通道的另外一端响应。 这样,Goroutine不需要明确的锁定或者条件变量即可同步。
你可以创建具有缓冲区的通道 —— 发送操作仅仅在缓冲区满的情况下阻塞,接收操作仅仅在缓冲区为空的情况下阻塞:
1 2 |
// 第二个参数为缓冲区大小 ch := make(chan int, 100) |
你可以调用 close(chan)函数提示通道不再有更多的值可以被接收,通道的接收者可以为接收表达式赋第二个值,以测试通道是否关闭:
1 2 |
// 如果通道没有更多的值可以接收,并且已经针对通道调用过close,则ok = false v, ok := <-ch |
注意:
- 仅仅应当由发送者关闭通道,而不是消费者。如果尝试向已经关闭的通道发送数据,会导致Panic
- 通道不同于文件,通常你不需要显式关闭通道。除非你需要显式的告知接收者(例如提示其结束for...range循环)
- 从已经关闭的通道接收是安全的,第二返回值将为false
for i := range c格式的循环用于遍历通道,直到通道被关闭。下面这个例子中,子Goroutine产生斐波纳契数列,主Goroutine使用for...range循环遍历其输出:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
func fibonacci(n int, c chan uint64) { x, y := uint64(0), uint64(1) for i := 0; i < n; i++ { c <- x x, y = y, x+y } close(c) } func main() { c := make(chan uint64, 50) go fibonacci(cap(c), c) for i := range c { fmt.Printf("%v ", i) } } |
多个Goroutine同时遍历单个通道是允许的,不会出现竞态条件。但是要注意:
- 尽量使用参数在Goroutine之间传递chan,而不是使用全局变量
- 对于单个Goroutine来说,不要又读又写,容易死锁
- 如果使用不带缓冲的Chan不会出现死锁,那么使用带缓冲的Chan也不会出现死锁。反之不成立。可以先使用不带缓冲的Chan,当需要性能提升时,增加缓冲
Go中的通道是一等公民,可以像其它值一样被传来传去。甚至,你可以通过通道来传递另外一个通道。下面是一个远程计算服务的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
// 远程计算请求 type Request struct { // 计算参数 args []int // 计算逻辑 f func([]int) int // 处理结果返回的通道 resultChan chan int } // 客户端 func Client(queue chan *Request) { request := &Request{[]int{1, 2, 3, 4, 5}, func(a []int) (s int) { for _, v := range a { s += v } return }, make(chan int)} queue <- request fmt.Printf("Result from server: %v", <-request.resultChan) } // 远程计算服务 func Server(queue chan *Request) { for req := range queue { req.resultChan <- req.f(req.args) } } func main() { queue := make(chan *Request) go Server(queue) go Client(queue) time.Sleep(time.Second) } |
可以看到,客户端通过通道发送请求,请求对象中包含了一个通道,服务器向此通道发送计算结果。
在零值通道上发送、接收都会永久阻塞。
1 2 3 4 5 6 |
var send chan<- int // 在chan关键字后侧加<-表示仅仅能发送 var receive <-chan int // 在chan关键字左侧加<-表示仅仅能接收 // 单向通道常常用于函数或方法的参数中 func counter(out chan<- int) { } |
select...case语句允许Goroutine在多个通信操作符上等待,当至少有一个操作符(可以在default分支中立即退出)准备好(可读或可写),Select才从阻塞中恢复。如果同时有多个操作符准备好,Select会随机执行其中一个分支:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
func fibonacci(c chan uint64, quit chan bool) { x, y := uint64(0), uint64(1) for { select { // 发送端 case c <- x: x, y = y, x+y // 这里的break用于终止case执行,如果要跳出for,可以使用标签或者goto break // 接收端,注意接收数据后可不赋值给任何变量 case <-quit: fmt.Println("Quit signaled") return // 如果没有任何case满足条件,则执行default default: // 可以在此休眠,执行阻塞性收发,等等 } } } func main() { c := make(chan uint64) q := make(chan bool) go fibonacci(c, q) // 从匿名函数启动Groutine go func() { for i := 0; i < 50; i++ { fmt.Println(<-c) } q <- true }() time.Sleep(time.Second) } |
下面是Istio代码中的一个示例:
1 2 3 4 5 6 7 8 |
select { case client.pushChannel <- &XdsEvent{}: // 意味着推送通道可用,事件推送成功 case <-client.doneChannel: // 意味着客户端已经关闭连接 case <-time.After(PushTimeout): // 意味着在超时之前,推送通道不可用,客户端也没关闭。可能是网络慢卡住了 } |
感受一下,用非常简洁的代码就把正常、异常情况处理好了。
1 2 3 4 5 6 7 8 9 10 11 |
// Ticker会定期向通道写入数据 tickChan := time.NewTicker(c.flaggerWindow).C for { select { // 定期触发这个case case <-tickChan: c.scheduleCanaries() case <-stopCh: return nil } } |
1 2 3 4 5 6 7 8 |
c := make(chan int) // 分配一个通道 // 在Go程中启动排序。当它完成后,在通道上发送信号 go func() { list.Sort() c <- 1 // 发送信号,什么值无所谓 }() doSomethingForAWhile() <-c // 等待排序结束,丢弃发来的值 |
还有缓冲的通道可以作为信号量使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
var sem = make(chan int, MaxOutstanding) func handle(r *Request) { sem <- 1 // 等待活动队列清空。 process(r) // 可能需要很长时间 <-sem // 完成;使下一个请求可以运行 } func Serve(queue chan *Request) { for { req := <-queue go handle(req) } } |
通道是跨Goroutine通信的利器,但是某些时候Goroutine之间不需要传递数据,我们只需要保证在任何时刻,仅仅它们其中的一个能够访问(同步)某项资源以避免冲突,这时你可以使用互斥量。
发生Goroutine之间的竞态条件和传统的基于线程的语言类似:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
var count uint32 = 0 var wg sync.WaitGroup func inc() { defer wg.Done() count++ // 非原子操作 } func main() { const N = 500 wg.Add(N) for i := 0; i < N; i++ { go inc() } wg.Wait() fmt.Printf("Expected: %v, Actual: %v", N, count) // Expected: 500, Actual: 488 } |
运行上段程序,可以看到计数器的结果不符合预期,每次都会不一样。这是由于对共享变量count的++操作不是原子操作,不受保护的访问会导致写覆盖。
现实中的例子要复杂的多,可能很难察觉到问题。你可以为go build 传入 --race参数,这样,在运行代码时,Go会自动检测竞态条件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
================== WARNING: DATA RACE Read at 0x0000005bddb0 by goroutine 7: main.inc() /home/alex/Go/projects/go-study/application.go:13 +0x69 Previous write at 0x0000005bddb0 by goroutine 6: main.inc() /home/alex/Go/projects/go-study/application.go:13 +0x83 Goroutine 7 (running) created at: main.main() /home/alex/Go/projects/go-study/application.go:19 +0x6e Goroutine 6 (finished) created at: main.main() /home/alex/Go/projects/go-study/application.go:19 +0x6e ================== Found 1 data race(s) |
Go标准库中定义的 sync.Mutex,实现了如下接口:
1 2 3 4 5 6 |
type Locker interface { // 调用此方法可以锁定共享资源 Lock() // 调用此方法可以解除对共享资源的锁定 Unlock() } |
使用互斥锁来改造上面的计数器代码:
1 2 3 4 5 6 7 8 |
var lock sync.Mutex func inc() { defer wg.Done() defer lock.Unlock() lock.Lock() count++ } |
即可同步化对共享变量count的访问。下面是另外一个使用互斥锁的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
import ( "fmt" "sync" "time" ) // 并发安全的计数器 type SafeCounter struct { v map[string]int // 锁 mux sync.Mutex } // 为某个键进行计数 func (c *SafeCounter) Inc(key string) { // 在进行不安全操作前后进行加锁、解锁 c.mux.Lock() c.v[key]++ c.mux.Unlock() } func (c *SafeCounter) Value(key string) int { c.mux.Lock() // 可以使用defer,确保此方法退出时会执行解锁 defer c.mux.Unlock() return c.v[key] } func main() { c := SafeCounter{v: make(map[string]int)} const key = "default-counter" for i := 0; i < 10000000; i++ { go c.Inc(key) } time.Sleep(time.Second) fmt.Println(c.Value(key)) } |
互斥锁要求所有访问共享资源的Goroutine串行化执行,因此性能较差。在读多写少的情况下,使用读写锁可以提升性能,读写锁的特点是:
- 读锁不排斥其它的读锁,但是排除任何写锁
- 写锁排除任何锁
1 2 3 4 5 6 7 8 9 10 11 |
var rw sync.RWMutex // 加读锁 rw.RLock() // 解读锁 rw.RUnlock() // 加写锁 rw.Lock() // 解写锁 rw.Unlock() |
在Go语言中,唯一被保证是原子性的,就是sync.atomic中定义的那些操作。也就是说,哪怕是简单的赋值,也需要保护:
1 2 3 |
clearCacheMutex.Lock() clearCacheTimerSet = false clearCacheMutex.Unlock() |
使用互斥量是最常见的保证原子性的方法,sync.atomic包相对不常用。
此包提供了很多低级的原子性内存访问原语,可以用于实现同步算法。 使用此包时要想当小心,推荐优先基于通道或者锁实现同步。
函数 | 说明 | ||
AddInt32 |
func AddInt32(addr *int32, delta int32) (new int32) 原子的给addr的值加delta,并返回返回新值 类似的函数包括AddInt64、AddUint32、AddUintptr等
|
||
CompareAndSwapInt32 |
CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) 针对int32针对CAS算法,示例:
类似的函数包括CompareAndSwapInt64、CompareAndSwapPointer等 |
||
LoadInt32 | 原子的加载值 | ||
StoreInt32 | 原子的保存值 |
等待组类似于Java的CountdownLatch:
1 2 3 4 5 6 7 8 9 10 11 |
var wg sync.WaitGroup const N = 100 wg.Add(N) // 需要N次Done调用才能解除Wait() for i := 0; i < N; i++ { go func() { wg.Done() wg.Wait() // 等待所有协作者完成 }() } wg.Done() wg.Wait() // 等待所有协作者完成 |
注意:传递WaitGroup时应当传递其指针。
使用Context可以方便的控制多个相互协作的Goroutine,下面是一个简单的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
import ( "context" "log" "math/rand" "time" ) func main() { // 使用空白上下文为根,产生一个子上下文 // 当返回的cancel函数被调用后,子上下文的Done通道自动关闭 ctx, cancel := context.WithCancel(context.Background()) for i := 0; i < 5; i++ { go genRandNum(ctx) } // 让Goroutine工作一会儿 time.Sleep(time.Second * 3) cancel() // 发出取消上下文的信号 time.Sleep(time.Second * 3) } func genRandNum(ctx context.Context) { for { select { // 检测上下文是否取消 case <-ctx.Done(): // 如果上下文已经取消,则退出Goroutine log.Println("Stop random number generation.") return default: // 如果上下文尚未取消,则产生随机数 log.Printf("Generated number: %v", rand.Int()) time.Sleep(time.Second) } } } |
context.WithCancel()、context.WithTimeout()调用返回的第二个值,即cancel()函数,其作用是触发ctx.Done()通道可读(关闭)。
Context接口定义了四个方法:
1 2 3 4 5 6 7 8 9 10 11 |
type Context interface { // 返回设置的最后期限,如果没有设置最后期限,第二个返回参数为false Deadline() (deadline time.Time, ok bool) // 返回一个只读的通道,如果此通道可读取,则意味着父Context已经发起了取消请求 // 从该通道读取到东西后,当前Goroutine应该进行资源清理并退出 Done() <-chan struct{} // 返回上下文被取消的原因 Err() error // 返回上下文上绑定的属性 Value(key interface{}) interface{} } |
内置的实现包括:
1 2 3 4 5 6 7 8 9 10 |
var ( background = new(emptyCtx) todo = new(emptyCtx) ) func Background() Context { return background } func TODO() Context { return todo } |
background、todo都是emptyCtx类型,这是一个不可取消、没有设置最后期限、没有绑定任何属性的Context。
context.Background()通常在main函数中用于根Context。
emptyCtx不能做任何事情,我们通常将其作为根Context,并利用下面的函数来构建上下文树:
1 2 3 4 5 6 7 8 |
// 产生一个子上下文和取消函数,调用函数可以取消上下文 func WithCancel(parent Context) (ctx Context, cancel CancelFunc) // 产生一个子上下文,在到达最后期限时,会自动取消,也可以手工取消 func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) // 类似上面 func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) // 通过上下文共享数据 func WithValue(parent Context, key, val interface{}) Context |
注意:调用取消函数会导致当前上下文以及任何后代上下文被取消,也就是Done()关闭
- 以参数的方式传递Context,并且不会将其包装在结构中
- Context作为第一个参数
- 不要传递nil Context
- 避免通过Context传递不必要的数据
- Context可以跨Goroutine安全访问
调度器会在GC、go语句、阻塞channel操作、阻塞系统调用和lock操作后运行。它也会在非内联函数调用后执行。
你也可以显式的唤起调度器:
1 2 3 |
for !done { runtime.Gosched() } |
但是一个空循环,就可以阻止调度器运作。
使用Go语言实现一个线程池是很简单的。下面分析Istio源码中包含的线程池的例子。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
package pool import ( "sync" ) // 代表工作协程需要执行的函数 type WorkFunc func(param interface{}) // 协程池结构,持有一组可重用的协程,工作函数可以调度到协程上 type GoroutinePool struct { queue chan work // 工作队列,缓冲通道 wg sync.WaitGroup // 用于在所有工作完成之前阻止shutdown操作 singleThreaded bool // 提示是否运行在单线程模式 } // 放入队列的工作,即工作函数+参数 type work struct { fn WorkFunc param interface{} } // 创建新的协程池 func NewGoroutinePool(queueDepth int, singleThreaded bool) *GoroutinePool { gp := &GoroutinePool{ queue: make(chan work, queueDepth), // 队列深度即通道缓冲大小 singleThreaded: singleThreaded, } // 至少添加一个工作协程 gp.AddWorkers(1) return gp } // 协程池实现io.Closer接口。在关闭时,必须在等待组上等待 —— 等待所有工作协程退出 func (gp *GoroutinePool) Close() error { if !gp.singleThreaded { // 关闭通道 close(gp.queue) // 等待 gp.wg.Wait() } return nil } // 调度一份工作,在未来执行 // 调用者必须确保,工作函数仅仅依赖于通过params传递的“外部数据” —— 要保证这一点,你可以 // 传递一个普通命名函数给此方法,而不是一个内联的匿名函数(容易形成闭包) func (gp *GoroutinePool) ScheduleWork(fn WorkFunc, param interface{}) { if gp.singleThreaded { // 单线程模式下,执行运行 fn(param) } else { // 否则,写入到通道。如果通道满了,可能会导致调用者阻塞 gp.queue <- work{fn: fn, param: param} } } // 添加工作协程到池中,增加并行度 func (gp *GoroutinePool) AddWorkers(numWorkers int) { if !gp.singleThreaded { // 增加需要等待的数量 gp.wg.Add(numWorkers) for i := 0; i < numWorkers; i++ { // 工作协程的工作循环 go func() { // 取出一个工作并执行 for work := range gp.queue { work.fn(work.param) } // 如果通道关闭,则减少等待组计数。所有协程都执行到这一步后,Close()调用才能从阻塞中返回 gp.wg.Done() }() } } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
package pool import ( "sync" "testing" ) func TestWorkerPool(t *testing.T) { const numWorkers = 123 const numWorkItems = 456 parameterMismatch := false for i := 0; i < 2; i++ { gp := NewGoroutinePool(128, i == 0) gp.AddWorkers(numWorkers) wg := &sync.WaitGroup{} wg.Add(numWorkItems) for i := 0; i < numWorkItems; i++ { passedParam := i // 在栈上捕获变量,避免创建闭包 gp.ScheduleWork(func(param interface{}) { // 调度 paramI := param.(int) if paramI != passedParam { parameterMismatch = true } wg.Done() }, passedParam) } // 等待所有工作执行完毕 wg.Wait() if parameterMismatch { t.Fatal("Passed parameter was not as expected") } // 关闭池 _ = gp.Close() } } |
Leave a Reply