Menu

  • Home
  • Work
    • Cloud
      • Virtualization
      • IaaS
      • PaaS
    • Java
    • Go
    • C
    • C++
    • JavaScript
    • PHP
    • Python
    • Architecture
    • Others
      • Assembly
      • Ruby
      • Perl
      • Lua
      • Rust
      • XML
      • Network
      • IoT
      • GIS
      • Algorithm
      • AI
      • Math
      • RE
      • Graphic
    • OS
      • Linux
      • Windows
      • Mac OS X
    • BigData
    • Database
      • MySQL
      • Oracle
    • Mobile
      • Android
      • IOS
    • Web
      • HTML
      • CSS
  • Life
    • Cooking
    • Travel
    • Gardening
  • Gallery
  • Video
  • Music
  • Essay
  • Home
  • Work
    • Cloud
      • Virtualization
      • IaaS
      • PaaS
    • Java
    • Go
    • C
    • C++
    • JavaScript
    • PHP
    • Python
    • Architecture
    • Others
      • Assembly
      • Ruby
      • Perl
      • Lua
      • Rust
      • XML
      • Network
      • IoT
      • GIS
      • Algorithm
      • AI
      • Math
      • RE
      • Graphic
    • OS
      • Linux
      • Windows
      • Mac OS X
    • BigData
    • Database
      • MySQL
      • Oracle
    • Mobile
      • Android
      • IOS
    • Web
      • HTML
      • CSS
  • Life
    • Cooking
    • Travel
    • Gardening
  • Gallery
  • Video
  • Music
  • Essay

Go语言并发编程

4
Feb
2016

Go语言并发编程

By Alex
/ in Go
/ tags 并发编程
0 Comments

与传统语言不同,Go是为并发而设计的语言,它在语言层次上提供了对并发编程的大量支持。

Goroutine
注意点

修改多个Goroutine同时访问的数据时,必须串行化访问,方法有两种:

  1. 通过通道操作
  2. 使用sync、sync/atomic包提供的同步原语

在不改变语义的前提下,编译期和处理器可能对读写操作进行重新排序。由于重新排序的存在,不同Goroutine看到的执行顺序可能不同。

在多Goroutine访问共享变量的情况下,必须使用同步机制,确保读操作能看到期望的(happens before的)写。

Happens Before
初始化

Go应用初始化时只有一个Goroutine,它能够创建更多的Goroutine,这些Goroutine会并发的运行。

如果包p导入了包q,则q的init函数的执行完毕,发生在任何p的init函数之前。

协程创建

go语句的执行,发送在Goroutine主体逻辑执行之前。

协程销毁

Goroutine的销毁不保证发生在任何事件之前。

通道通信

向一个通道发送数据,发生在接收数据之前:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
var c = make(chan int, 10)
var a string
 
func f() {
    a = "hello, world"
    c <- 0
}
 
func main() {
    go f()
    <-c
    print(a)
}

上面的例子会确保一定能打印hello, world。 

简介

Goroutine是由Go运行时管理的轻量级线程,不依赖于操作系统级别的线程机制。

下面是Goroutine简单的例子:

Go
1
2
3
4
5
6
7
8
9
10
11
12
func worker(name string) {
    for i := 0; i < 2; {
        time.Sleep(time.Second)
        fmt.Println("%v-%v", name, i)
        i++
    }
}
func main() {
    // 使用go关键字,后面跟着一个函数调用,即可发动一个Goroutine
    go worker("W")
    worker("M")
}

Goroutine和程序主体部分在同一地址空间运行,因此它们共享变量时必须进行同步。sync包提供了相关的同步原语。

Go运行时为Goroutine实现线程的多路复用 —— 如果某个Goroutine阻塞(例如调用了某个阻塞性的系统调用),则运行时自动将阻塞Goroutine所在的OS线程上分配的其余Goroutine转移到其它Runnable的OS线程上。

每个Goroutine都需要栈资源,Go运行时支持动态栈大小,新创建的Goroutine的栈大小为几个KB,并根据需要进行扩展/收缩。这种动态栈大小让Goroutine平均资源消耗很小,在单台机器上创建10万级别的Goroutine很常见,但是创建相同数量的OS线程,负担要大得多。

G-M-P模型

Goroutine的调度模型由Go运行时实现。调度模型发生过一次较大的重构。

G-M-P调度模型从Go 1.2版本引入,并沿用至今。在此模型中,有三个重要的角色:

  1. G:对应Go语言中的Goroutine,运行时需要维护每个Goroutine的执行栈、执行状态等信息
  2. P:逻辑处理器,每个逻辑处理器对应一个局部运行队列,G可以在其上排队等候执行。P决定了Go程序中可以并行执行的Goroutine的数量(当然不能超过物理CPU的核心数)。对于G来说,P就是运行它的CPU
  3. M:对应操作系统线程。M不保存任何G的信息,这让G跨M调度成为可能

P必须绑定到M(逻辑处理器必须绑定到OS线程)才能让本地运行队列中的G有机会运行。当P绑定到M后会执行调度循环,调度循环会:

  1. 从P的本地运行队列中获取可运行的G
  2. 切换到G的执行栈
  3. 执行G的函数

gmp

抢占式调度

如果G中执行了死循环,那么G将永远占据对应的P和M。这样会导致相同P中的其它G没有机会执行,更严重的是,如果只有一个P的情况下,程序中所有其它G都会被饿死。

为了解决上面的问题,Go 1.2引入抢占式调度。Go会在每个函数的入口都添加一段额外的代码,以保证Go运行时有机会检查,并执行抢占式调度。这种抢占式调度只能解决部分问题,对于没有任何函数调用的单纯死循环仍然无能为力。

Go程序启动时,运行时会创建一个名为sysmon的M(监控线程),此M不需要绑定P即可运行。sysmon每隔20us~10ms就会运行并执行对长时间运行的G发起抢占调度:

  1. 设置G的抢占标识位为true
  2. 当G下一次调用任何函数时,Go运行时即执行抢占
  3. G被放入P的本地运行队列,等待下一次调度
阻塞和调度

Go的理念是,基于阻塞性的接口编程,并使用Goroutine和Channel处理并发,而不是使用回调或者Future。对于Goroutine而言,所有IO操作都是阻塞性的。但是,大部分IO操作都不会导致M阻塞,这避免了大量创建操作系统线程。

通道/网络阻塞

Go运行时实现了netpoller,它将操作系统提供的异步网络IO转换为Go中的阻塞性IO,这一方面防止M被阻塞,另一方面保持了Go语言内部的阻塞性接口风格。

当G在通道操作或者网络IO上阻塞时,运行时调度器将其放入某个等待队列,M会尝试执行下一个可运行的G。如果没有可运行的G则M和P解绑并进入休眠状态。

当通道操作完成或者网络IO可用时,等待队列中的G被唤醒,标记为可运行并放入到某个P的本地运行队列中,绑定M并执行。

系统调用阻塞

对于普通文件(Non-Pollable)的IO操作会导致M阻塞,等待IO操作完成后被操作系统唤醒。这种情况下,P会和M分离,寻求其它的M进行绑定,如果没有空闲的M则会创建新的M。这意味着大量的IO可能导致很多操作系统线程被创建。

发生任何其它阻塞性系统调用时,Go运行时的调度逻辑类似。

逻辑处理器

Go运行时会在逻辑处理器P上调度Goroutine,每个P会绑定一个操作系统线程M。每个P上可能有多个Goroutine等待被执行,这些Goroutine形成的队列被叫做本地运行队列。

新创建的Goroutine会被放置到一个全局运行队列中,Go运行时的调度器会进行调度,把Goroutine分配给某个P。

调用 runtime.GOMAXPROCS(1)可以设置逻辑处理器的数量。逻辑处理器的数量应该和物理核心数量一致,设置的过大并不能提升性能:

Go
1
runtime.GOMAXPROCS(runtime.NumCPU())

GOMAXPROCS最大可以设置到256。

调度器状态

设置环境变量GODEBUG可以查看运行时调度器的状态,例如设置 GODEBUG=schedtrace=1000则Go运行时每秒钟都会在控制台上打印调度器状态:

Shell
1
2
3
4
5
6
7
8
9
10
11
SCHED 10ms: gomaxprocs=8 idleprocs=6 threads=4 spinningthreads=1 idlethreads=1 runqueue=0 [1 0 0 0 0 0 0 0]
 
# SCHED,固定前缀,表示这一行调试信息由运行时调度器打印
# 10ms,从程序启动到打印信息经过的时间
# gomaxprocs 最大P数量
# idleprocs 空闲的P数量
# threads 操作系统线程数量
# spinningthreads 正在自旋的操作系统线程数量
# idlethread 空闲的操作系统线程数量
# runqueue 全局运行队列中G的数量
# [1 0 0 0 0 0 0 0] 各P的本地队列中G的数量

设置环境变量 GODEBUG=schedtrace=1000,scheddetail=1可以获得更加详细的信息。 

应用的退出

应用程序退出时,不会等待Goroutine执行完毕,而是会立即退出。解决办法包括:

  1. 使用WaitGroup,让主Goroutine等待Worker Goroutine完成
  2. 如果Worker是长期运行的消息处理循环,应当从主Goroutine向其发送信号,导致其退出。最常见的用法是stop chan
通道

基于通信来共享内存,而不是基于共享内存进行通信 —— Share memory by communicating; don't communicate by sharing memory。

通道简介

Goroutine和Channel受到CSP并发模型(Communicating Sequential Processes)的启发。在CSP中,多个Process使用Channel进行通信,并且这种通信通常是同步式的。

Go语言中的通道是一种强类型的管道,你可以使用通道操作符 <-来从通道接收数据、发送数据到通道:

Go
1
2
3
4
5
// 类似于切片,你需要先创建才能使用通道
ch := make(chan int)
 
ch <- v      // 将值v发送到通道ch
v := <-ch    // 从ch接收数据并赋值给v

下面是两个Goroutine通过Channel进行通信的例子:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func receiver(ch chan int8) {
    for val := range ch {
        fmt.Printf("Received value: %v\n", val)
    }
}
func sender(ch chan int8) {
    for i := 0; i < 3; i++ {
        ch <- int8(i)
        time.Sleep(time.Second)
    }
    close(ch)
}
func main() {
    ch := make(chan int8)
    go sender(ch)
    go receiver(ch)
    time.Sleep(time.Second * 5)
}
通道阻塞

Goroutine可能会永久的阻塞在通道上,对于发送者来说,只要接收方没有响应,就会发生这种情况。

考虑下面的例子:

Go
1
2
3
4
5
6
7
8
func First(query string, replicas ...Search) Result {  
    c := make(chan Result)
    searchReplica := func(i int) { c <- replicas[i](query) }
    for i := range replicas {
        go searchReplica(i)
    }
    return <-c
}

此函数从一系列replicas中返回第一个replica(query)调用的结果。除了第一个执行完毕的Goroutine之外,其它的都会阻塞,原因是主Goroutine仅仅从通道获取一个数据后就退出了。

解决此问题的方法有多种:

  1. 使用具有足够大缓冲的通道: c := make(chan Result,len(replicas))
  2. 使用带有default的select:
    Go
    1
    2
    3
    4
    5
    6
    7
    searchReplica := func(i int) {
        select {
        case c <- replicas[i](query):
        // 如果通道c不可写,则执行下面的分支
        default:
        }
    } 
  3. 使用一个“取消”通道:
    Go
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    done := make(chan struct{})
    // 外层函数退出时,通道被关闭
    defer close(done)
     
    searchReplica := func(i int) {
        select {
        case c <- replicas[i](query):
        // 那时任何协程都可以读取通道并退出
        case <- done:
        }
    } 
缓冲通道

默认情况下,通过通道发送、接收数据都会阻塞,直到通道的另外一端响应。 这样,Goroutine不需要明确的锁定或者条件变量即可同步。

你可以创建具有缓冲区的通道 —— 发送操作仅仅在缓冲区满的情况下阻塞,接收操作仅仅在缓冲区为空的情况下阻塞:

Go
1
2
// 第二个参数为缓冲区大小
ch := make(chan int, 100)
关闭通道

你可以调用 close(chan)函数提示通道不再有更多的值可以被接收,通道的接收者可以为接收表达式赋第二个值,以测试通道是否关闭:

Go
1
2
// 如果通道没有更多的值可以接收,并且已经针对通道调用过close,则ok = false
v, ok := <-ch

注意:

  1. 仅仅应当由发送者关闭通道,而不是消费者。如果尝试向已经关闭的通道发送数据,会导致Panic
  2. 通道不同于文件,通常你不需要显式关闭通道。除非你需要显式的告知接收者(例如提示其结束for...range循环)
  3. 从已经关闭的通道接收是安全的,第二返回值将为false
for...range

for i := range c格式的循环用于遍历通道,直到通道被关闭。下面这个例子中,子Goroutine产生斐波纳契数列,主Goroutine使用for...range循环遍历其输出:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func fibonacci(n int, c chan uint64) {
    x, y := uint64(0), uint64(1)
    for i := 0; i < n; i++ {
        c <- x
        x, y = y, x+y
    }
    close(c)
}
 
func main() {
    c := make(chan uint64, 50)
    go fibonacci(cap(c), c)
    for i := range c {
        fmt.Printf("%v ", i)
    }
}

多个Goroutine同时遍历单个通道是允许的,不会出现竞态条件。但是要注意:

  1. 尽量使用参数在Goroutine之间传递chan,而不是使用全局变量
  2. 对于单个Goroutine来说,不要又读又写,容易死锁
  3. 如果使用不带缓冲的Chan不会出现死锁,那么使用带缓冲的Chan也不会出现死锁。反之不成立。可以先使用不带缓冲的Chan,当需要性能提升时,增加缓冲
通道的通道

Go中的通道是一等公民,可以像其它值一样被传来传去。甚至,你可以通过通道来传递另外一个通道。下面是一个远程计算服务的例子:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 远程计算请求
type Request struct {
    // 计算参数
    args []int
    // 计算逻辑
    f func([]int) int
    // 处理结果返回的通道
    resultChan chan int
}
 
// 客户端
func Client(queue chan *Request) {
 
    request := &Request{[]int{1, 2, 3, 4, 5}, func(a []int) (s int) {
        for _, v := range a {
            s += v
        }
        return
    }, make(chan int)}
    queue <- request
    fmt.Printf("Result from server: %v", <-request.resultChan)
}
 
// 远程计算服务
func Server(queue chan *Request) {
    for req := range queue {
        req.resultChan <- req.f(req.args)
    }
}
func main() {
    queue := make(chan *Request)
    go Server(queue)
    go Client(queue)
    time.Sleep(time.Second)
}

 可以看到,客户端通过通道发送请求,请求对象中包含了一个通道,服务器向此通道发送计算结果。

nil通道

在零值通道上发送、接收都会永久阻塞。

单向通道
Go
1
2
3
4
5
6
var send chan<- int      // 在chan关键字后侧加<-表示仅仅能发送
var receive <-chan int   // 在chan关键字左侧加<-表示仅仅能接收
 
// 单向通道常常用于函数或方法的参数中
func counter(out chan<- int) {
}
select

select...case语句允许Goroutine在多个通信操作符上等待,当至少有一个操作符(可以在default分支中立即退出)准备好(可读或可写),Select才从阻塞中恢复。如果同时有多个操作符准备好,Select会随机执行其中一个分支: 

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func fibonacci(c chan uint64, quit chan bool) {
    x, y := uint64(0), uint64(1)
    for {
        select {
        // 发送端
        case c <- x:
            x, y = y, x+y
            // 这里的break用于终止case执行,如果要跳出for,可以使用标签或者goto
            break
        // 接收端,注意接收数据后可不赋值给任何变量
        case <-quit:
            fmt.Println("Quit signaled")
            return
        // 如果没有任何case满足条件,则执行default
        default:
            // 可以在此休眠,执行阻塞性收发,等等
        }
    }
}
 
func main() {
    c := make(chan uint64)
    q := make(chan bool)
    go fibonacci(c, q)
    // 从匿名函数启动Groutine
    go func() {
        for i := 0; i < 50; i++ {
            fmt.Println(<-c)
        }
        q <- true
    }()
    time.Sleep(time.Second)
}

下面是Istio代码中的一个示例:

Go
1
2
3
4
5
6
7
8
select {
case client.pushChannel <- &XdsEvent{}:
    // 意味着推送通道可用,事件推送成功
case <-client.doneChannel:
    // 意味着客户端已经关闭连接
case <-time.After(PushTimeout):
    // 意味着在超时之前,推送通道不可用,客户端也没关闭。可能是网络慢卡住了
}

感受一下,用非常简洁的代码就把正常、异常情况处理好了。 

通道用法示例
定时器
Go
1
2
3
4
5
6
7
8
9
10
11
// Ticker会定期向通道写入数据
tickChan := time.NewTicker(c.flaggerWindow).C
for {
    select {
        // 定期触发这个case
    case <-tickChan:
        c.scheduleCanaries()
    case <-stopCh:
        return nil
    }
}
等待子例程
Go
1
2
3
4
5
6
7
8
c := make(chan int)  // 分配一个通道
// 在Go程中启动排序。当它完成后,在通道上发送信号
go func() {
    list.Sort()
    c <- 1  // 发送信号,什么值无所谓
}()
doSomethingForAWhile()
<-c   // 等待排序结束,丢弃发来的值
信号量

还有缓冲的通道可以作为信号量使用:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
var sem = make(chan int, MaxOutstanding)
 
func handle(r *Request) {
    sem <- 1 // 等待活动队列清空。
    process(r)  // 可能需要很长时间
    <-sem    // 完成;使下一个请求可以运行
}
 
func Serve(queue chan *Request) {
    for {
        req := <-queue
        go handle(req)
    }
} 
锁
Mutex

通道是跨Goroutine通信的利器,但是某些时候Goroutine之间不需要传递数据,我们只需要保证在任何时刻,仅仅它们其中的一个能够访问(同步)某项资源以避免冲突,这时你可以使用互斥量。

竞态条件

发生Goroutine之间的竞态条件和传统的基于线程的语言类似:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var count uint32 = 0
var wg sync.WaitGroup
 
func inc() {
    defer wg.Done()
    count++     // 非原子操作
}
func main() {
    const N = 500
    wg.Add(N)
    for i := 0; i < N; i++ {
        go inc()
    }
    wg.Wait()
    fmt.Printf("Expected: %v, Actual: %v", N, count) // Expected: 500, Actual: 488
}

运行上段程序,可以看到计数器的结果不符合预期,每次都会不一样。这是由于对共享变量count的++操作不是原子操作,不受保护的访问会导致写覆盖。

现实中的例子要复杂的多,可能很难察觉到问题。你可以为go build 传入 --race参数,这样,在运行代码时,Go会自动检测竞态条件:

Shell
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
==================
WARNING: DATA RACE
Read at 0x0000005bddb0 by goroutine 7:
  main.inc()
      /home/alex/Go/projects/go-study/application.go:13 +0x69
 
Previous write at 0x0000005bddb0 by goroutine 6:
  main.inc()
      /home/alex/Go/projects/go-study/application.go:13 +0x83
 
Goroutine 7 (running) created at:
  main.main()
      /home/alex/Go/projects/go-study/application.go:19 +0x6e
 
Goroutine 6 (finished) created at:
  main.main()
      /home/alex/Go/projects/go-study/application.go:19 +0x6e
==================
Found 1 data race(s)
使用互斥锁

 Go标准库中定义的 sync.Mutex,实现了如下接口:

Go
1
2
3
4
5
6
type Locker interface {
    // 调用此方法可以锁定共享资源
    Lock()
    // 调用此方法可以解除对共享资源的锁定
    Unlock()
}

使用互斥锁来改造上面的计数器代码:

Go
1
2
3
4
5
6
7
8
var lock sync.Mutex
 
func inc() {
    defer wg.Done()
    defer lock.Unlock()
    lock.Lock()
    count++
}

即可同步化对共享变量count的访问。下面是另外一个使用互斥锁的例子:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import (
    "fmt"
    "sync"
    "time"
)
 
// 并发安全的计数器
type SafeCounter struct {
    v map[string]int
    // 锁
    mux sync.Mutex
}
 
//  为某个键进行计数
func (c *SafeCounter) Inc(key string) {
    // 在进行不安全操作前后进行加锁、解锁
    c.mux.Lock()
    c.v[key]++
    c.mux.Unlock()
}
 
func (c *SafeCounter) Value(key string) int {
    c.mux.Lock()
    // 可以使用defer,确保此方法退出时会执行解锁
    defer c.mux.Unlock()
    return c.v[key]
}
 
func main() {
    c := SafeCounter{v: make(map[string]int)}
    const key = "default-counter"
    for i := 0; i < 10000000; i++ {
        go c.Inc(key)
    }
 
    time.Sleep(time.Second)
    fmt.Println(c.Value(key))
} 
RWMutex

互斥锁要求所有访问共享资源的Goroutine串行化执行,因此性能较差。在读多写少的情况下,使用读写锁可以提升性能,读写锁的特点是:

  1. 读锁不排斥其它的读锁,但是排除任何写锁
  2. 写锁排除任何锁
Go
1
2
3
4
5
6
7
8
9
10
11
var rw sync.RWMutex
 
// 加读锁
rw.RLock()
// 解读锁
rw.RUnlock()
 
// 加写锁
rw.Lock()
// 解写锁
rw.Unlock()
原子操作

在Go语言中,唯一被保证是原子性的,就是sync.atomic中定义的那些操作。也就是说,哪怕是简单的赋值,也需要保护:

Go
1
2
3
clearCacheMutex.Lock()
clearCacheTimerSet = false
clearCacheMutex.Unlock()

使用互斥量是最常见的保证原子性的方法,sync.atomic包相对不常用。

sync.atomic

此包提供了很多低级的原子性内存访问原语,可以用于实现同步算法。 使用此包时要想当小心,推荐优先基于通道或者锁实现同步。

函数 说明
AddInt32

func AddInt32(addr *int32, delta int32) (new int32)

原子的给addr的值加delta,并返回返回新值

类似的函数包括AddInt64、AddUint32、AddUintptr等

 

CompareAndSwapInt32

CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

 针对int32针对CAS算法,示例:

Go
1
2
3
4
5
lock := int32(0)
// 如果旧值和新值不同,则原子的设置为新值
if atomic.CompareAndSwapInt32(&lock, 0, 1) {
    fmt.Printf("Locked: %v", lock) // Locked: 1
}

 类似的函数包括CompareAndSwapInt64、CompareAndSwapPointer等

LoadInt32 原子的加载值
StoreInt32 原子的保存值
并发控制
WaitGroup

等待组类似于Java的CountdownLatch:

Go
1
2
3
4
5
6
7
8
9
10
11
var wg sync.WaitGroup
const N = 100
wg.Add(N) // 需要N次Done调用才能解除Wait()
for i := 0; i < N; i++ {
    go func() {
        wg.Done()
        wg.Wait()  // 等待所有协作者完成
    }()
}
wg.Done()
wg.Wait()  // 等待所有协作者完成

注意:传递WaitGroup时应当传递其指针。

Context

使用Context可以方便的控制多个相互协作的Goroutine,下面是一个简单的例子:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import (
    "context"
    "log"
    "math/rand"
    "time"
)
 
func main() {
    // 使用空白上下文为根,产生一个子上下文
    // 当返回的cancel函数被调用后,子上下文的Done通道自动关闭
    ctx, cancel := context.WithCancel(context.Background())
 
    for i := 0; i < 5; i++ {
        go genRandNum(ctx)
    }
    // 让Goroutine工作一会儿
    time.Sleep(time.Second * 3)
    cancel() // 发出取消上下文的信号
    time.Sleep(time.Second * 3)
}
func genRandNum(ctx context.Context) {
    for {
        select {
        // 检测上下文是否取消
        case <-ctx.Done():
            // 如果上下文已经取消,则退出Goroutine
            log.Println("Stop random number generation.")
            return
        default:
            // 如果上下文尚未取消,则产生随机数
            log.Printf("Generated number: %v", rand.Int())
            time.Sleep(time.Second)
        }
    }
}

context.WithCancel()、context.WithTimeout()调用返回的第二个值,即cancel()函数,其作用是触发ctx.Done()通道可读(关闭)。

Context接口 

Context接口定义了四个方法:

Go
1
2
3
4
5
6
7
8
9
10
11
type Context interface {
    // 返回设置的最后期限,如果没有设置最后期限,第二个返回参数为false
    Deadline() (deadline time.Time, ok bool)
    // 返回一个只读的通道,如果此通道可读取,则意味着父Context已经发起了取消请求
    // 从该通道读取到东西后,当前Goroutine应该进行资源清理并退出
    Done() <-chan struct{}
    // 返回上下文被取消的原因
    Err() error
    // 返回上下文上绑定的属性
    Value(key interface{}) interface{}
}

内置的实现包括:

Go
1
2
3
4
5
6
7
8
9
10
var (
    background = new(emptyCtx)
    todo       = new(emptyCtx)
)
func Background() Context {
    return background
}
func TODO() Context {
    return todo
}

background、todo都是emptyCtx类型,这是一个不可取消、没有设置最后期限、没有绑定任何属性的Context。

context.Background()通常在main函数中用于根Context。 

上下文树

emptyCtx不能做任何事情,我们通常将其作为根Context,并利用下面的函数来构建上下文树:

Go
1
2
3
4
5
6
7
8
// 产生一个子上下文和取消函数,调用函数可以取消上下文
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
// 产生一个子上下文,在到达最后期限时,会自动取消,也可以手工取消
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
// 类似上面
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
// 通过上下文共享数据
func WithValue(parent Context, key, val interface{}) Context

注意:调用取消函数会导致当前上下文以及任何后代上下文被取消,也就是Done()关闭

注意点
  1. 以参数的方式传递Context,并且不会将其包装在结构中
  2. Context作为第一个参数
  3. 不要传递nil Context
  4. 避免通过Context传递不必要的数据
  5. Context可以跨Goroutine安全访问
协程调度

调度器会在GC、go语句、阻塞channel操作、阻塞系统调用和lock操作后运行。它也会在非内联函数调用后执行。

你也可以显式的唤起调度器:

Go
1
2
3
for !done {
    runtime.Gosched()
}

但是一个空循环,就可以阻止调度器运作。 

协程池

使用Go语言实现一个线程池是很简单的。下面分析Istio源码中包含的线程池的例子。

Istio协程池
代码
Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package pool
 
import (
    "sync"
)
 
// 代表工作协程需要执行的函数
type WorkFunc func(param interface{})
 
// 协程池结构,持有一组可重用的协程,工作函数可以调度到协程上
type GoroutinePool struct {
    queue          chan work      // 工作队列,缓冲通道
    wg             sync.WaitGroup // 用于在所有工作完成之前阻止shutdown操作
    singleThreaded bool           // 提示是否运行在单线程模式
}
 
// 放入队列的工作,即工作函数+参数
type work struct {
    fn    WorkFunc
    param interface{}
}
 
// 创建新的协程池
func NewGoroutinePool(queueDepth int, singleThreaded bool) *GoroutinePool {
    gp := &GoroutinePool{
        queue:          make(chan work, queueDepth), // 队列深度即通道缓冲大小
        singleThreaded: singleThreaded,
    }
// 至少添加一个工作协程
    gp.AddWorkers(1)
    return gp
}
 
// 协程池实现io.Closer接口。在关闭时,必须在等待组上等待 —— 等待所有工作协程退出
func (gp *GoroutinePool) Close() error {
    if !gp.singleThreaded {
// 关闭通道
        close(gp.queue)
// 等待
        gp.wg.Wait()
    }
    return nil
}
 
// 调度一份工作,在未来执行
// 调用者必须确保,工作函数仅仅依赖于通过params传递的“外部数据” —— 要保证这一点,你可以
// 传递一个普通命名函数给此方法,而不是一个内联的匿名函数(容易形成闭包)
func (gp *GoroutinePool) ScheduleWork(fn WorkFunc, param interface{}) {
    if gp.singleThreaded {
// 单线程模式下,执行运行
        fn(param)
    } else {
// 否则,写入到通道。如果通道满了,可能会导致调用者阻塞
        gp.queue <- work{fn: fn, param: param}
    }
}
 
// 添加工作协程到池中,增加并行度
func (gp *GoroutinePool) AddWorkers(numWorkers int) {
    if !gp.singleThreaded {
// 增加需要等待的数量
        gp.wg.Add(numWorkers)
        for i := 0; i < numWorkers; i++ {
// 工作协程的工作循环
            go func() {
// 取出一个工作并执行
                for work := range gp.queue {
                    work.fn(work.param)
                }
// 如果通道关闭,则减少等待组计数。所有协程都执行到这一步后,Close()调用才能从阻塞中返回
                gp.wg.Done()
            }()
        }
    }
}
测试
Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package pool
 
import (
    "sync"
    "testing"
)
 
func TestWorkerPool(t *testing.T) {
    const numWorkers = 123
    const numWorkItems = 456
 
    parameterMismatch := false
 
    for i := 0; i < 2; i++ {
        gp := NewGoroutinePool(128, i == 0)
        gp.AddWorkers(numWorkers)
 
        wg := &sync.WaitGroup{}
        wg.Add(numWorkItems)
 
        for i := 0; i < numWorkItems; i++ {
            passedParam := i // 在栈上捕获变量,避免创建闭包
            gp.ScheduleWork(func(param interface{}) { // 调度
                paramI := param.(int)
                if paramI != passedParam {
                    parameterMismatch = true
                }
                wg.Done()
            }, passedParam)
        }
 
        // 等待所有工作执行完毕
        wg.Wait()
 
        if parameterMismatch {
            t.Fatal("Passed parameter was not as expected")
        }
 
        // 关闭池
        _ = gp.Close()
    }
}

 

← IntelliJ IDEA知识集锦
Framework7学习笔记(三):高级 →

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code class="" title="" data-url=""> <del datetime=""> <em> <i> <q cite=""> <strike> <strong> <pre class="" title="" data-url=""> <span class="" title="" data-url="">

Related Posts

  • POSIX线程编程
  • Java5新特性
  • Python并发编程
  • Java线程与并发编程
  • Linux信号、进程和会话

Recent Posts

  • Investigating and Solving the Issue of Failed Certificate Request with ZeroSSL and Cert-Manager
  • A Comprehensive Study of Kotlin for Java Developers
  • 背诵营笔记
  • 利用LangChain和语言模型交互
  • 享学营笔记
ABOUT ME

汪震 | Alex Wong

江苏淮安人,现居北京。目前供职于腾讯云,专注容器方向。

GitHub:gmemcc

Git:git.gmem.cc

Email:gmemjunk@gmem.cc@me.com

ABOUT GMEM

绿色记忆是我的个人网站,域名gmem.cc中G是Green的简写,MEM是Memory的简写,CC则是我的小天使彩彩名字的简写。

我在这里记录自己的工作与生活,同时和大家分享一些编程方面的知识。

GMEM HISTORY
v2.00:微风
v1.03:单车旅行
v1.02:夏日版
v1.01:未完成
v0.10:彩虹天堂
v0.01:阳光海岸
MIRROR INFO
Meta
  • Log in
  • Entries RSS
  • Comments RSS
  • WordPress.org
Recent Posts
  • Investigating and Solving the Issue of Failed Certificate Request with ZeroSSL and Cert-Manager
    In this blog post, I will walk ...
  • A Comprehensive Study of Kotlin for Java Developers
    Introduction Purpose of the Study Understanding the Mo ...
  • 背诵营笔记
    Day 1 Find Your Greatness 原文 Greatness. It’s just ...
  • 利用LangChain和语言模型交互
    LangChain是什么 从名字上可以看出来,LangChain可以用来构建自然语言处理能力的链条。它是一个库 ...
  • 享学营笔记
    Unit 1 At home Lesson 1 In the ...
  • K8S集群跨云迁移
    要将K8S集群从一个云服务商迁移到另外一个,需要解决以下问题: 各种K8S资源的迁移 工作负载所挂载的数 ...
  • Terraform快速参考
    简介 Terraform用于实现基础设施即代码(infrastructure as code)—— 通过代码( ...
  • 草缸2021
    经过四个多月的努力,我的小小荷兰景到达极致了状态。

  • 编写Kubernetes风格的APIServer
    背景 前段时间接到一个需求做一个工具,工具将在K8S中运行。需求很适合用控制器模式实现,很自然的就基于kube ...
  • 记录一次KeyDB缓慢的定位过程
    环境说明 运行环境 这个问题出现在一套搭建在虚拟机上的Kubernetes 1.18集群上。集群有三个节点: ...
  • eBPF学习笔记
    简介 BPF,即Berkeley Packet Filter,是一个古老的网络封包过滤机制。它允许从用户空间注 ...
  • IPVS模式下ClusterIP泄露宿主机端口的问题
    问题 在一个启用了IPVS模式kube-proxy的K8S集群中,运行着一个Docker Registry服务 ...
  • 念爷爷
      今天是爷爷的头七,十二月七日、阴历十月廿三中午,老人家与世长辞。   九月初,回家看望刚动完手术的爸爸,发

  • 6 杨梅坑

  • liuhuashan
    深圳人才公园的网红景点 —— 流花山

  • 1 2020年10月拈花湾

  • 内核缺陷触发的NodePort服务63秒延迟问题
    现象 我们有一个新创建的TKE 1.3.0集群,使用基于Galaxy + Flannel(VXLAN模式)的容 ...
  • Galaxy学习笔记
    简介 Galaxy是TKEStack的一个网络组件,支持为TKE集群提供Overlay/Underlay容器网 ...
TOPLINKS
  • Zitahli's blue 91 people like this
  • 梦中的婚礼 64 people like this
  • 汪静好 61 people like this
  • 那年我一岁 36 people like this
  • 为了爱 28 people like this
  • 小绿彩 26 people like this
  • 彩虹姐姐的笑脸 24 people like this
  • 杨梅坑 6 people like this
  • 亚龙湾之旅 1 people like this
  • 汪昌博 people like this
  • 2013年11月香山 10 people like this
  • 2013年7月秦皇岛 6 people like this
  • 2013年6月蓟县盘山 5 people like this
  • 2013年2月梅花山 2 people like this
  • 2013年淮阴自贡迎春灯会 3 people like this
  • 2012年镇江金山游 1 people like this
  • 2012年徽杭古道 9 people like this
  • 2011年清明节后扬州行 1 people like this
  • 2008年十一云龙公园 5 people like this
  • 2008年之秋忆 7 people like this
  • 老照片 13 people like this
  • 火一样的六月 16 people like this
  • 发黄的相片 3 people like this
  • Cesium学习笔记 90 people like this
  • IntelliJ IDEA知识集锦 59 people like this
  • 基于Kurento搭建WebRTC服务器 38 people like this
  • Bazel学习笔记 37 people like this
  • PhoneGap学习笔记 32 people like this
  • NaCl学习笔记 32 people like this
  • 使用Oracle Java Mission Control监控JVM运行状态 29 people like this
  • Ceph学习笔记 27 people like this
  • 基于Calico的CNI 27 people like this
Tag Cloud
ActiveMQ AspectJ CDT Ceph Chrome CNI Command Cordova Coroutine CXF Cygwin DNS Docker eBPF Eclipse ExtJS F7 FAQ Groovy Hibernate HTTP IntelliJ IO编程 IPVS JacksonJSON JMS JSON JVM K8S kernel LB libvirt Linux知识 Linux编程 LOG Maven MinGW Mock Monitoring Multimedia MVC MySQL netfs Netty Nginx NIO Node.js NoSQL Oracle PDT PHP Redis RPC Scheduler ServiceMesh SNMP Spring SSL svn Tomcat TSDB Ubuntu WebGL WebRTC WebService WebSocket wxWidgets XDebug XML XPath XRM ZooKeeper 亚龙湾 单元测试 学习笔记 实时处理 并发编程 彩姐 性能剖析 性能调优 文本处理 新特性 架构模式 系统编程 网络编程 视频监控 设计模式 远程调试 配置文件 齐塔莉
Recent Comments
  • qg on Istio中的透明代理问题
  • heao on 基于本地gRPC的Go插件系统
  • 黄豆豆 on Ginkgo学习笔记
  • cloud on OpenStack学习笔记
  • 5dragoncon on Cilium学习笔记
  • Archeb on 重温iptables
  • C/C++编程:WebSocketpp(Linux + Clion + boostAsio) – 源码巴士 on 基于C/C++的WebSocket库
  • jerbin on eBPF学习笔记
  • point on Istio中的透明代理问题
  • G on Istio中的透明代理问题
  • 绿色记忆:Go语言单元测试和仿冒 on Ginkgo学习笔记
  • point on Istio中的透明代理问题
  • 【Maven】maven插件开发实战 – IT汇 on Maven插件开发
  • chenlx on eBPF学习笔记
  • Alex on eBPF学习笔记
  • CFC4N on eBPF学习笔记
  • 李运田 on 念爷爷
  • yongman on 记录一次KeyDB缓慢的定位过程
  • Alex on Istio中的透明代理问题
  • will on Istio中的透明代理问题
  • will on Istio中的透明代理问题
  • haolipeng on 基于本地gRPC的Go插件系统
  • 吴杰 on 基于C/C++的WebSocket库
©2005-2025 Gmem.cc | Powered by WordPress | 京ICP备18007345号-2