当前位置: 代码迷 >> 综合 >> Go语言学习笔记(十二)------协程(goroutine)与通道(channel)
  详细解决方案

Go语言学习笔记(十二)------协程(goroutine)与通道(channel)

热度:4   发布时间:2023-12-10 18:12:48.0

一、并发、并行和协程

1.Go 语言为构建并发程序的基本代码块是 协程(goroutine) 与通道 (channel)。他们需要语言,编译器,和runtime的支持。Go 语言提供的垃圾回收器对并发编程至关重要。不要通过共享内存来通信,而通过通信来共享内存。通信强制协作。

2.一个应用程序是运行在机器上的一个进程;进程是一个运行在自己内存地址空间里的独立执行体。一个进程由一个或多个操作系统线程组成,这些线程其实是共享同一个内存地址空间的一起工作的执行体。几乎所有'正式'的程序都是多线程的,以便让用户或计算机不必等待,或者能够同时服务多个请求(如 Web 服务器),或增加性能和吞吐量(例如,通过对不同的数据集并行执行代码)。

3.一个并发程序可以在一个处理器或者内核上使用多个线程来执行任务,但是只有同一个程序在某个时间点同时运行在多核或者多处理器上才是真正的并行。并行是一种通过使用多处理器以提高速度的能力。所以并发程序可以是并行的,也可以不是。公认的,使用多线程的应用难以做到准确,最主要的问题是内存中的数据共享,它们会被多线程以无法预知的方式进行操作,导致一些无法重现或者随机的结果(称作 竞态 )。不要使用全局变量或者共享内存,它们会给你的代码在并发运算的时候带来危险。解决之道在于同步不同的线程,对数据加锁,这样同时就只有一个线程可以变更数据。在 Go 的标准库 sync 中有一些工具用来在低级别的代码中实现加锁;不过过去的软件开发经验告诉我们这会带来更高的复杂度,更容易使代码出错以及更低的性能,所以这个经典的方法明显不再适合现代多核/多处理器编程: thread-per-connection 模型不够有效。

4.Go 更倾向于其他的方式,在诸多比较合适的范式中,有个被称作 Communicating Sequential Processes(顺序通信处理) (CSP, C. Hoare 发明的)还有一个叫做 message passing-model(消息传递) (已经运用在了其他语言中,比如Erlang)。在 Go 中,应用程序并发处理的部分被称作 goroutines(协程) ,它可以进行更有效的并发运算。在协程和操作系统线程之间并无一对一的关系:协程是根据一个或多个线程的可用性,映射(多路复用,执行于)在他们之上的;协程调度器在 Go 运行时很好的完成了这个工作。

5.协程工作在相同的地址空间中,所以共享内存的方式一定是同步的;这个可以使用 sync 包来实现,不过我们很不鼓励这样做:Go 使用 channels 来同步协程当系统调用(比如等待 I/O)阻塞协程时,其他协程会继续在其他线程上工作。协程的设计隐藏了许多线程创建和管理方面的复杂工作。协程是轻量的,比线程更轻。它们痕迹非常不明显(使用少量的内存和资源):使用 4K 的栈内存就可以在堆中创建它们。因为创建非常廉价,必要的时候可以轻松创建并运行大量的协程(在同一个地址空间中 100,000 个连续的协程)。并且它们对栈进行了分割,从而动态的增加(或缩减)内存的使用;栈的管理是自动的,但不是由垃圾回收器管理的,而是在协程退出后自动释放。
6.存在两种并发方式:确定性的(明确定义排序)和非确定性的(加锁/互斥从而未定义排序)。Go 的协程和通道理所当然的支持确定性的并发方式(例如通道具有一个 sender 和一个 receiver)。我们会使用一个常见的算法问题(工人问题)来对比两种处理方式。
7.协程是通过使用关键字 go 调用(执行)一个函数或者方法来实现的(也可以是匿名或者 lambda 函数)。这样会在当前的计算过程中开始一个同时进行的函数,在相同的地址空间中并且分配了独立的栈,比如: go sum(bigArray) ,在后台计算总和。协程的栈会根据需要进行伸缩,不出现栈溢出;开发者不需要关心栈的大小。当协程结束的时候,它会静默退出:用来启动这个协程的函数不会得到任何的返回值。
8.任何 Go 程序都必须有的 main() 函数也可以看做是一个协程,尽管它并没有通过 go 来启动。协程可以在程序初始化的过程中运行(在 init() 函数中)。
9.在一个协程中,比如它需要进行非常密集的运算,你可以在运算循环中周期的使用 runtime.Gosched() :这会让出处理器,允许运行其他协程;它并不会使当前协程挂起,所以它会自动恢复执行。使用 Gosched() 可以使计算均匀分布,使通信不至于迟迟得不到响应。

10.并发和并行的差异:Go的重点不在于并行的首要位置:并发程序可能是并行的,也可能不是。并行是一种通过使用多处理器以提高速度的能力。但往往是,一个设计良好的并发程序在并行方面的表现也非常出色。在当前的运行时(2012 年一月)实现中,Go 默认没有并行指令,只有一个独立的核心或处理器被专门用于 Go 程序,不论它启动了多少个协程;所以这些协程是并发运行的,但他们不是并行运行的:同一时间只有一个协程会处在运行状态。这个情况在以后可能会发生改变,不过届时,为了使你的程序可以使用多个核心运行,这时协程就真正的是并行运行了,你必须使用 GOMAXPROCS 变量。这会告诉运行时有多少个协程同时执行。

11.使用 GOMAXPROCS:在 gc 编译器下(6g 或者 8g)你必须设置 GOMAXPROCS 为一个大于默认值 1 的数值来允许运行时支持使用多于 1 个的操作系统线程,所有的协程都会共享同一个线程除非将 GOMAXPROCS 设置为一个大于 1 的数。当 GOMAXPROCS大于 1 时,会有一个线程池管理许多的线程。通过 gccgo 编译器 GOMAXPROCS 有效的与运行中的协程数量相等。假设 n 是机器上处理器或者核心的数量。如果你设置环境变量 GOMAXPROCS>=n,或者执行 runtime.GOMAXPROCS(n) ,接下来协程会被分割(分散)到 n 个处理器上。更多的处理器并不意味着性能的线性提升。有这样一个经验法则,对于n 个核心的情况设置 GOMAXPROCS 为 n-1 以获得最佳性能,也同样需要遵守这条规则:协程的数量 > 1 +,GOMAXPROCS > 1。所以如果在某一时间只有一个协程在执行,不要设置 GOMAXPROCS!
还有一些通过实验观察到的现象:在一台 1 颗 CPU 的笔记本电脑上,增加 GOMAXPROCS 到 9 会带来性能提升。在一台 32 核的机器上,设置 GOMAXPROCS=8 会达到最好的性能,在测试环境中,更高的数值无法提升性能。如果设置一个很大的 GOMAXPROCS 只会带来轻微的性能下降;设置 GOMAXPROCS=100,使用 top 命令和 H 选项查看到只有 7 个活动的线程。增加 GOMAXPROCS 的数值对程序进行并发计算是有好处的;

总结:GOMAXPROCS 等同于(并发的)线程数量,在一台核心数多于1个的机器上,会尽可能有等同于核心数的线程在并行运行。

12. 命令行指定使用的核心数量:使用 flags 包,如下:

var numCores = flag.Int("n", 2, "number of CPU cores to use")
in main()
flag.Parse()
runtime.GOMAXPROCS(*numCores)

协程可以通过调用 runtime.Goexit() 来停止,尽管这样做几乎没有必要。

13.main() , longWait() 和 shortWait() 三个函数作为独立的处理单元按顺序启动,然后开始并行运行。每一个函数都在运行的开始和结束阶段输出了消息。为了模拟他们运算的时间消耗,我们使用了 time 包中的 Sleep 函数。 Sleep() 可以按照指定的时间来暂停函数或协程的执行,这里使用了纳秒(ns,符号 1e9 表示 1 乘 10 的 9 次方,e=指数)。他们按照我们期望的顺序打印出了消息,几乎都一样,可是我们明白这是模拟出来的,以并行的方式。我们让 main()函数暂停 10 秒从而确定它会在另外两个协程之后结束。如果不这样(如果我们让 main() 函数停止 4 秒), main()会提前结束, longWait() 则无法完成。如果我们不在 main() 中等待,协程会随着程序的结束而消亡。当 main() 函数返回的时候,程序退出:它不会等待任何其他非 main 协程的结束。这就是为什么在服务器程序中,每一个请求都会启动一个协程来处理, server() 函数必须保持运行状态。通常使用一个无限循环来达到这样的目的。另外,协程是独立的处理单元,一旦陆续启动一些协程,你无法确定他们是什么时候真正开始执行的。你的代码逻辑必须独立于协程调用的顺序。

package main
import ("fmt""time"
)
func main() {fmt.Println("In main()")go longWait()go shortWait()fmt.Println("About to sleep in main()")// sleep works with a Duration in nanoseconds (ns)纳秒 !time.Sleep(10 * 1e9)fmt.Println("At the end of main()")
}
func longWait() {fmt.Println("Beginning longWait()")time.Sleep(5 * 1e9) // sleep for 5 secondsfmt.Println("End of longWait()")
}
func shortWait() {fmt.Println("Beginning shortWait()")time.Sleep(2 * 1e9) // sleep for 2 secondsfmt.Println("End of shortWait()")
}

输出:

In main()
About to sleep in main()
Beginning longWait()
Beginning shortWait()
End of shortWait()
End of longWait()
At the end of main() // after 10s

为了对比使用一个线程,连续调用的情况,移除 go 关键字,重新运行程序。现在输出:

In main()
Beginning longWait()
End of longWait()
Beginning shortWait()
End of shortWait()
About to sleep in main()
At the end of main() // after 17 s

协程更有用的一个例子应该是在一个非常长的数组中查找一个元素。将数组分割为若干个不重复的切片,然后给每一个切片启动一个协程进行查找计算。这样许多并行的协程可以用来进行查找任务,整体的查找时间会缩短(除以协程的数量)。

14.Go 协程(goroutines)和协程(coroutines):“Go协程(goroutines)” 即是 是 Go 语言中的协程,而“协程(coroutines)”指的是其他语言中的协程概念,仅在本节出现。在其他语言中,比如 C#,Lua 或者 Python 都有协程的概念。这个名字表明它和 Go协程有些相似,不过有两点不同:
Go 协程意味着并行(或者可以以并行的方式部署),协程一般来说不是这样的
Go 协程通过通道来通信;协程通过让出和恢复操作来通信
Go 协程比协程更强大,也很容易从协程的逻辑复用到 Go 协程

二、协程间的信道

1.Go 有一种特殊的类型,通道(channel),就像一个可以用于发送类型化数据的管道,由其负责协程之间的通信,从而避开所有由共享内存导致的陷阱;这种通过通道进行通信的方式保证了同步性。数据在通道中进行传递:在任何给定时间,一个数据被设计为只有一个协程可以对其访问,所以不会发生数据竞争。 数据的所有权(可以读写数据的能力)也因此被传递。工厂的传送带是个很有用的例子。一个机器(生产者协程)在传送带上放置物品,另外一个机器(消费者协程)拿到物品并打包。通道服务于通信的两个目的:值的交换,同步的,保证了两个计算(协程)任何时候都是可知状态。

2.声明通道: var identifier chan datatype,未初始化的通道的值是nil。所以通道只能传输一种类型的数据,比如 chan int 或者 chan string ,所有的类型都可以用于通道,空接口interface{} 也可以。甚至可以(有时非常有用)创建通道的通道。通道实际上是类型化消息的队列:使数据得以传输。它是先进先出(FIFO)的结构所以可以保证发送给他们的元素的顺序。通道也是引用类型,所以我们使用make() 函数来给它分配内存。这里先声明了一个字符串通道 ch1,然后创建了它(实例化):
var ch1 chan string
ch1 = make(chan string)
当然可以更短: ch1 := make(chan string) 。这里我们构建一个int通道的通道: chanOfChans := make(chan int) 。或者函数通道: funcChan := make(chan func()) 。所以通道是对象的第一类型:可以存储在变量中,作为函数的参数传递,从函数返回以及通过通道发送它们自身。另外它们是类型化的,允许类型检查,比如尝试使用整数通道发送一个指针。

3.通信操作符 <-,这个操作符直观的标示了数据的传输:信息按照箭头的方向流动。
流向通道(发送):ch <- int1 表示:用通道 ch 发送变量 int1(双目运算符,中缀 = 发送)
从通道流出(接收)三种方式:int2 = <- ch 表示:变量 int2 从通道 ch(一元运算的前缀操作符,前缀 = 接收)接收数据(获取新值);假设 int2 已经声明过了,如果没有的话可以写成: int2 := <- ch 。<- ch 可以单独调用获取通道的(下一个)值,当前值会被丢弃,但是可以用来验证,所以以下代码是合法的:

if <- ch != 1000{
...
}

操作符 <- 也被用来发送和接收,Go 尽管不必要,为了可读性,通道的命名通常以 ch 开头或者包含 chan 。示例:

package main
import ("fmt""time"
)
func main() {ch := make(chan string) //同一个通道作为参数才可以通信go sendData(ch)go getData(ch)time.Sleep(3*1e9)
}
func sendData(ch chan string) {ch <- "Washington"ch <- "Tripoli"ch <- "London"ch <- "Beijing"ch <- "Tokio"
}
func getData(ch chan string) {var input stringtime.Sleep(2e9)for {input = <-chfmt.Printf("%s ", input)}
}

4.通道阻塞。通信是同步且无缓冲的:在有接受者接收数据之前,发送不会结束。想象一个无缓冲的通道在没有空间来保存数据的时候:必须要一个接收者准备好接收通道的数据然后发送者直接把数据发送给接收者。所以通道的发送/接收操作在对方准备好之前是阻塞的:
1)对于同一个通道,发送操作(协程或者函数中的),在接收者准备好之前是阻塞的:如果ch中的数据无人接收,就无法再给通道传入其他数据:新的输入无法在通道非空的情况下传入。所以发送操作会等待 ch 再次变为可用状态:就是通道值被接收时(可以传入变量)。
2)对于同一个通道,接收操作是阻塞的(协程或函数中的),直到发送者可用:如果通道中没有数据,接收者就阻塞了。
尽管这看上去是非常严格的约束,实际在大部分情况下工作的很不错。程序 channel_block.go 验证了以上理论,一个协程在无限循环中给通道发送整数数据。不过因为没有接收者,只输出了一个数字 0。

package main
import ("fmt"
)
func main() {ch1 := make(chan int)go pump(ch1) // pump hangs//go suck(ch1)//time.Sleep(1e9)fmt.Println(<-ch1) // prints only 0
}
func pump(ch chan int) {for i := 0; ; i++ {ch <- i}
}
/*func suck(ch chan int) {for {fmt.Println(<-ch)}
}*/

为通道解除阻塞定义 suck 函数来在无限循环中读取通道,在 main() 中使用协程开始它,给程序 1 秒的时间来运行:输出了上万个整数。

5.通过一个(或多个)通道交换数据进行协程同步。可以在通道两端互相阻塞对方,形成了叫做死锁的状态。Go 运行时会检查并 panic,停止程序。死锁几乎完全是由糟糕的设计导致的。无缓冲通道会被阻塞。设计无阻塞的程序可以避免这种情况,或者使用带缓冲的通道。

package main
import (
"fmt"
)
func f1(in chan int) {
fmt.Println(<-in) //如果通道中没有数据,接收者就阻塞了。
}
func main() {
out := make(chan int)
out <- 2
go f1(out)
}

6.同步通道-使用带缓冲的通道,一个无缓冲通道只能包含 1 个元素,有时显得很局限。我们给通道提供了一个缓存,可以在扩展的 make 命令中设置它的容量,如下:

buf := 100
ch1 := make(chan string, buf)

buf 是通道可以同时容纳的元素(这里是 string)个数,在缓冲满载(缓冲被全部使用)之前,给一个带缓冲的通道发送数据是不会阻塞的,而从通道读取数据也不会阻塞,直到缓冲空了。如果容量大于 0,通道就是异步的了:缓冲满载(发送)或变空(接收)之前通信不会阻塞,元素会按照发送的顺序被接收。如果容量是0或者未设置,通信仅在收发双方准备好的情况下才可以成功。
 ch :=make(chan type, value)
value == 0 -> synchronous, unbuffered (阻塞)
value > 0 -> asynchronous, buffered(非阻塞)取决于value元素
若使用通道的缓冲,你的程序会在“请求”激增的时候表现更好:更具弹性,专业术语叫:更具有伸缩性(scalable)。要在首要位置使用无缓冲通道来设计算法,只在不确定的情况下使用缓冲。

7.协程中用通道输出结果,为了知道计算何时完成,可以通过信道回报。在例子 go sum(bigArray) 中,要这样写:
ch := make(chan int)
go sum(bigArray, ch) // bigArray puts the calculated sum on ch
// .. do something else for a while
sum := <- ch // wait for, and retrieve the sum
也可以使用通道来达到同步的目的,这个很有效的用法在传统计算机中称为信号量(semaphore)。或者换个方式:通过通道发送信号告知处理已经完成(在协程中)。在其他协程运行时让 main 程序无限阻塞的通常做法是在 main 函数的最后放置一个{}。也可以使用通道让 main 程序等待协程完成,就是所谓的信号量模式。

8.信号量模式:协程通过在通道 ch 中放置一个值来处理结束的信号。 main 协程等待 <-ch 直到从中获取到值。我们期望从这个通道中获取返回的结果 用完整的信号量模式对长度为N的 float64 切片进行了 N 个 doSomething() 计算并同时完成,通道 sem 分配了相同的长度(切包含空接口类型的元素),待所有的计算都完成后,发送信号(通过放入值)。在循环中从通道sem 不停的接收数据来等待所有的协程完成。

type Empty interface {}
var empty Empty
...
data := make([]float64, N)
res := make([]float64, N)
sem := make(chan Empty, N)
...
for i, xi := range data {
go func (i int, xi float64) {
res[i] = doSomething(i, xi)
sem <- empty
} (i, xi)
}
// wait for goroutines to finish
for i := 0; i < N; i++ { <-sem }

注意闭合: i 、 xi 都是作为参数传入闭合函数的,从外层循环中隐藏了变量 i 和 xi 。让每个协程有一份 i 和xi 的拷贝;另外,for 循环的下一次迭代会更新所有协程中 i 和 xi 的值。切片 res 没有传入闭合函数,因为协程不需要单独拷贝一份。切片 res 也在闭合函数中但并不是参数。

9. 实现并行的 for 循环,for 循环的每一个迭代是并行完成的,在 for 循环中并行计算迭代可能带来很好的性能提升。不过所有的迭代都必须是独立完成的:

for i, v := range data {
go func (i int, v float64) {
doSomething(i, v)
...
} (i, v)
}

10.用带缓冲通道实现一个信号量:信号量是实现互斥锁(排外锁)常见的同步机制,限制对资源的访问,解决读写问题,比如没有实现信号量的 sync 的Go 包,使用带缓冲的通道可以轻松实现:
带缓冲通道的容量和要同步的资源容量相同
通道的长度(当前存放的元素个数)与当前资源被使用的数量相同
容量减去通道的长度就是未处理的资源个数(标准信号量的整数值)
不用管通道中存放的是什么,只关注长度;因此我们创建了一个长度可变但容量为0(字节)的通道:
type Empty interface {}
type semaphore chan Empty
将可用资源的数量N来初始化信号量 semaphore : sem = make(semaphore, N)
然后直接对信号量进行操作:

// acquire n resources
func (s semaphore) P(n int) {
e := new(Empty)
for i := 0; i < n; i++ {
s <- e
}
}
// release n resouces
func (s semaphore) V(n int) {
for i:= 0; i < n; i++{
<- s
}
}

可以用来实现一个互斥的例子:

/* mutexes */
func (s semaphore) Lock() {
s.P(1)
}
func (s semaphore) Unlock(){
s.V(1)
}
/* signal-wait */
func (s semaphore) Wait(n int) {
s.P(n)
}
func (s semaphore) Signal() {
s.V(1)
}

习惯用法:通道工厂模式
编程中常见的另外一种模式如下:不将通道作为参数传递给协程,而用函数来生成一个通道并返回(工厂角色);函数内有个匿名函数被协程调用。在 channel_block2.go 加入这种模式便有了示例:

package main
import (
"fmt"
"time"
)
func main() {
stream := pump()
go suck(stream)
time.Sleep(1e9)
}
func pump() chan int {
ch := make(chan int)
go func() {
for i := 0; ; i++ {
ch <- i
}
}()
return ch
}
func suck(ch chan int) {
for {
fmt.Println(<-ch)
}
}

11.给通道使用 for 循环:for 循环的 range 语句可以用在通道 ch 上,便可以从通道中获取值,像这样:
for v := range ch {
fmt.Printf("The value is %v\n", v)
}
它从指定通道中读取数据直到通道关闭,才继续执行下边的代码。很明显,另外一个协程必须写入 ch (不然代码就阻塞在 for 循环了),而且必须在写入完成后才关闭。 suck 函数可以这样写,且在协程中调用这个动作,程序变成了这样:

package main
import (
"fmt"
"time"
)
func main() {
suck(pump())
time.Sleep(1e9)
}
func pump() chan int {
ch := make(chan int)
go func() {
for i := 0; ; i++ {
ch <- i
}
}()
return ch
}
func suck(ch chan int) {
go func() {
for v := range ch {
fmt.Println(v)
}
}()
}

习惯用法:通道迭代模式
这个模式用到了后边示例 producer_consumer.go 的生产者-消费者模式,通常,需要从包含了地址索引字段 items的容器给通道填入元素。为容器的类型定义一个方法 Iter() ,返回一个只读的通道items,如下:

func (c *container) Iter () <- chan item {
ch := make(chan item)
go func () {
for i:= 0; i < c.Len(); i++{ // or use a for-range loop
ch <- c.items[i]
}
} ()
return ch
}

在协程里,一个 for 循环迭代容器 c 中的元素(对于树或图的算法,这种简单的 for 循环可以替换为深度优先搜索)。调用这个方法的代码可以这样迭代容器:
for x := range container.Iter() { ... }
可以运行在自己的协程中,所以上边的迭代用到了一个通道和两个协程(可能运行在两个线程上)。就有了一个特殊的生产者-消费者模式。如果程序在协程给通道写完值之前结束,协程不会被回收;设计如此。这种行为看起来是错误的,但是通道是一种线程安全的通信。在这种情况下,协程尝试写入一个通道,而这个通道永远不会被读取,这可能是个 bug 而并非期望它被静默的回收。
习惯用法:生产者消费者模式
假设你有 Produce() 函数来产生 Consume 函数需要的值。它们都可以运行在独立的协程中,生产者在通道中放入给消费者读取的值。整个处理过程可以替换为无限循环:
for {
Consume(Produce())
}

12.通道的方向:通道类型可以用注解来表示它只发送或者只接收:
var send_only chan<- int // channel can only receive data
var recv_only <-chan int // channel can only send data
只接收的通道(<-chan T)无法关闭,因为关闭通道是发送者用来表示不再给通道发送值了,所以对只接收通道是没有意义的。通道创建的时候都是双向的,但也可以分配有方向的通道变量,就像以下代码:

var c = make(chan int) // bidirectional
go source(c)
go sink(c)
func source(ch chan<- int){
for { ch <- 1 }
}
func sink(ch <-chan int) {
for { <-ch }
}

习惯用法:管道和选择器模式
更具体的例子还有协程处理它从通道接收的数据并发送给输出通道:

sendChan := make(chan int)
reciveChan := make(chan string)
go processChannel(sendChan, receiveChan)
func processChannel(in <-chan int, out chan<- string) {
for inValue := range in {
result := ... /// processing inValue
out <- result
}
}

通过使用方向注解来限制协程对通道的操作。这里有一个来自 Go 指导的很赞的例子,打印了输出的素数,使用选择器(‘筛’)作为它的算法。每个 prime 都有一个选择器:

// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.package main
package main
import "fmt"
// Send the sequence 2, 3, 4, ... to channel 'ch'.
func generate(ch chan int) {
for i := 2; ; i++ {
ch <- i // Send 'i' to channel 'ch'.
}
}
// Copy the values from channel 'in' to channel 'out',
// removing those divisible by 'prime'.
func filter(in, out chan int, prime int) {
for {
i := <-in // Receive value of new variable 'i' from 'in'.
if i%prime != 0 {
out <- i // Send 'i' to channel 'out'.
}
}
}
// The prime sieve: Daisy-chain filter processes together.
func main() {
ch := make(chan int) // Create a new channel.
go generate(ch) // Start generate() as a goroutine.
for {
prime := <-ch
fmt.Print(prime, " ")
ch1 := make(chan int)
go filter(ch, ch1, prime)
ch = ch1
}
}

协程 filter(in, out chan int, prime int) 拷贝整数到输出通道,丢弃掉可以被 prime 整除的数字。然后每个 prime 又开启了一个新的协程,生成器和选择器并发请求。输出:
2 3 5 7 11 13 17 19 23 29 31 37 41 43 47 53 59 61 67 71 73 79 83 89 97 101
103 107 109 113 127 131 137 139 149 151 157 163 167 173 179 181 191 193 197 199 211 223
227 229 233 239 241 251 257 263 269 271 277 281 283 293 307 311 313 317 331 337 347 349
353 359 367 373 379 383 389 397 401 409 419 421 431 433 439 443 449 457 461 463 467 479
487 491 499 503 509 521 523 541 547 557 563 569 571 577 587 593 599 601 607 613 617 619
631 641 643 647 653 659 661 673 677 683 691 701 709 719 727 733 739 743 751 757 761 769
773 787 797 809 811 821 823 827 829 839 853 857 859 863 877 881 883 887 907 911 919 929
937 941 947 953 967 971 977 983 991 997 1009 1013...
第二个版本引入了上边的习惯用法:函数 sieve 、 generate 和 filter 都是工厂;它们创建通道并返回,而且使用了协程的 lambda 函数。 main 函数现在短小清晰:它调用 sieve() 返回了包含素数的通道,然后通过 fmt.Println(<-primes) 打印出来。

// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import (
"fmt"
)
// Send the sequence 2, 3, 4, ... to returned channel
func generate() chan int {
ch := make(chan int)
go func() {
for i := 2; ; i++ {
ch <- i
}
}()
return ch
}
// Filter out input values divisible by 'prime', send rest to returned channel
func filter(in chan int, prime int) chan int {
out := make(chan int)
go func() {
for {
if i := <-in; i%prime != 0 {
out <- i
}
}
}()
return out
}
func sieve() chan int {
out := make(chan int)
go func() {
ch := generate()
for {
prime := <-ch
ch = filter(ch, prime)
out <- prime
}
}()
return out
}
func main() {
primes := sieve()
for {
fmt.Println(<-primes)
}
}

三、协程的同步:关闭通道-测试阻塞的通道

通道可以被显式的关闭;尽管它们和文件不同:不必每次都关闭。只有在当需要告诉接收者不会再提供新的值的时候,才需要关闭通道。只有发送者需要关闭通道,接收者永远不会需要。第一个可以通过函数 close(ch) 来完成:这个将通道标记为无法通过发送操作 <- 接受更多的值;给已经关闭的通道发送或者再次关闭都会导致运行时的 panic。在创建一个通道后使用 defer 语句是个不错的办法(类似这种情况):

ch := make(chan float64)
defer close(ch)

第二个问题可以使用逗号,ok 操作符:用来检测通道是否被关闭。

v, ok := <-ch // ok is true if v received value

通常和 if 语句一起使用:

if v, ok := <-ch; ok {
process(v)
}

或者在 for 循环中接收的时候,当关闭或者阻塞的时候使用 break:

v, ok := <-ch
if !ok {
break
}
process(v)

示例程序中使用这些可以改进为版本 goroutine3.go,输出相同。实现非阻塞通道的读取,需要使用 select。

package main
import "fmt"
func main() {
ch := make(chan string)
go sendData(ch)
getData(ch)
}
func sendData(ch chan string) {
ch <- "Washington"
ch <- "Tripoli"
ch <- "London"
ch <- "Beijing"
ch <- "Tokio"
close(ch)
}
func getData(ch chan string) {
/*for input := range ch {
process(input)
}*/
for {
input, open := <-ch
if !open {
break
}
fmt.Printf("%s ", input)
}
}

使用 for-range 语句来读取通道是更好的办法,因为这会自动检测通道是否关闭。通道迭代器中,两个协程经常是一个阻塞另外一个。如果程序工作在多核心的机器上,大部分时间只用到了一个处理器。可以通过使用带缓冲(缓冲空间大于 0)的通道来改善。比如,缓冲大小为 100,迭代器在阻塞之前,至少可以从容器获得 100 个元素。如果消费者协程在独立的内核运行,就有可能让协程不会出现阻塞。由于容器中元素的数量通常是已知的,需要让通道有足够的容量放置所有的元素。这样,迭代器就不会阻塞(尽管消费者协程仍然可能阻塞)。然后,这样有效的加倍了迭代容器所需要的内存使用量,所以通道的容量需要限制一下最大值。记录运行时间和性能测试可以帮助你找到最小的缓存容量带来最好的性能。

四、使用 select 切换协程

从不同的并发执行的协程中获取值可以通过关键字 select 来完成,它和 switch 控制语句非常相似,也被称作通信开关; select 监听进入通道的数据,也可以是用通道发送值的时候。default 语句是可选的;fallthrough 行为,和普通的 switch 相似,是不允许的。在任何一个 case 中执行 break 或者return ,select 就结束了。select 做的就是:选择处理列出的多个通信情况中的一个。如果都阻塞了,会等待直到其中一个可以处理;如果多个可以处理,随机选择一个;如果没有通道操作可以处理并且写了 default 语句,它就会执行: default 永远是可运行的(这就是准备好了,可以执行)。在 select 中使用发送操作并且有 default 可以确保发送不被阻塞!如果没有 default ,select 就会一直阻塞。select 语句实现了一种监听模式,通常用在(无限)循环中;在某种情况下,通过 break 语句使循环退出。

select {
case u:= <- ch1:
...
case v:= <- ch2:
...
...
default: // no value ready to be received
...
}

程序 中有 2 个通道 ch1 和 ch2 ,三个协程 pump1() 、 pump2() 和 suck() 。这是一个典型的生产者消费者模式。在无限循环中, ch1 和 ch2 通过 pump1() 和 pump2() 填充整数; suck() 也是在无限循环中轮询输入的,通过 select 语句获取 ch1 和 ch2 的整数并输出。选择哪一个 case 取决于哪一个通道收到了信息。程序在main 执行 1 秒后结束。

package main
import ("fmt""time"
)
func main() {ch1 := make(chan int)ch2 := make(chan int)go pump1(ch1)go pump2(ch2)go suck(ch1, ch2)time.Sleep(1e9)
}
func pump1(ch chan int) {for i := 0; ; i++ {ch <- i * 2}
}
func pump2(ch chan int) {for i := 0; ; i++ {ch <- i + 5}
}
func suck(ch1, ch2 chan int) { //同时读取两个通道的值for { //随机选择select { case v := <-ch1: // 如果通道一fmt.Printf("Received on channel 1: %d\n", v)case v := <-ch2:fmt.Printf("Received on channel 2: %d\n", v)}}
}

计算执行时间
再次声明这只是为了一边练习协程的概念一边找点乐子。如果你需要的话可使用 math.pi 中的 Pi;而且不使用协程会运算的更快。一个急速版本:使用 GOMAXPROCS ,开启和GOMAXPROCS 同样多个协程。
习惯用法:后台服务模式
服务通常是是用后台协程中的无限循环实现的,在循环中使用 select 获取并处理通道中的数据:

// Backend goroutine.
func backend() {
for {
select {
case cmd := <-ch1:
// Handle ...
case cmd := <-ch2:
...
case cmd := <-chStop:
// stop server
}
}
}

在程序的其他地方给通道 ch1 , ch2 发送数据,比如:通道 stop 用来清理结束服务程序。另一种方式(但是不太灵活)就是(客户端)在 chRequest 上提交请求,后台协程循环这个通道,使用 switch 根据请求的行为来分别处理:

func backend() {
for req := range chRequest {
switch req.Subjext() {
case A1: // Handle case ...
case A2: // Handle case ...
default:
// Handle illegal request ..
// ...
}
}
}

五、通道、超时和计时器(Ticker)

time 包中有一些有趣的功能可以和通道组合使用。其中就包含了 time.Ticker 结构体,这个对象以指定的时间间隔重复的向通道 C 发送时间值:

type Ticker struct {
C <-chan Time // the channel on which the ticks are delivered.
// contains filtered or unexported fields
...
}

时间间隔的单位是 ns(纳秒,int64),在工厂函数 time.NewTicker 中以 Duration 类型的参数传入: func Newticker(dur) *Ticker 。在协程周期性的执行一些事情(打印状态日志,输出,计算等等)的时候非常有用。调用 Stop() 使计时器停止,在 defer 语句中使用。这些都很好的适应 select 语句:

ticker := time.NewTicker(updateInterval)
defer ticker.Stop()
...
select {
case u:= <-ch1:
...
case v:= <-ch2:
...
case <-ticker.C:
logState(status) // call some logging function logState
default: // no value ready to be received
...
}

time.Tick() 函数声明为 Tick(d Duration) <-chan Time ,当你想返回一个通道而不必关闭它的时候这个函数非常有用:它以 d 为周期给返回的通道发送时间,d是纳秒数。如果需要像下边的代码一样,限制处理频率函数client.Call() 是一个 RPC 调用,这里暂不赘述:

import "time"
rate_per_sec := 10
var dur Duration = 1e9 / rate_per_sec
chRate := time.Tick(dur) // a tick every 1/10th of a second
for req := range requests {
<- chRate // rate limit our Service.Method RPC calls
go client.Call("Service.Method", req, ...)
}

这样只会按照指定频率处理请求: chRate 阻塞了更高的频率。每秒处理的频率可以根据机器负载(和/或)资源的情况而增加或减少。定时器(Timer)结构体看上去和计时器(Ticker)结构体的确很像(构造为 NewTimer(d Duration) ),但是它只发送一次时间,在 Dration d 之后。还有 time.After(d) 函数,声明如下:func After(d Duration) <-chan Time.在 Duration d 之后,当前时间被发到返回的通道;所以它和 NewTimer(d).C 是等价的;它类似 Tick() ,但是After() 只发送一次时间。下边有个很具体的示例,很好的阐明了 select 中 default 的作用:

package main
import (
"fmt"
"time"
)
func main() {
tick := time.Tick(1e8)
boom := time.After(5e8)
for {
select {
case <-tick:
fmt.Println("tick.")
case <-boom:
fmt.Println("BOOM!")
return
default:
fmt.Println(" .")
time.Sleep(5e7)
}
}
}


输出:
.
.
tick.
.
.
tick.
.
.
tick.
.
.
tick.
.
.
tick.
BOOM!
习惯用法:简单超时模式
要从通道 ch 中接收数据,但是最多等待1秒。先创建一个信号通道,然后启动一个 lambda 协程,协程在给通道发送数据之前是休眠的:

timeout := make(chan bool, 1)
go func() {
time.Sleep(1e9) // one second
timeout <- true
}()

然后使用 select 语句接收 ch 或者 timeout 的数据:如果 ch 在 1 秒内没有收到数据,就选择到了 time 分支并放弃了 ch 的读取。

select {
case <-ch:
// a read from ch has occured
case <-timeout:
// the read from ch has timed out
break
}

第二种形式:取消耗时很长的同步调用
也可以使用 time.After() 函数替换 timeout-channel 。可以在 select 中通过 time.After() 发送的超时信号来停止协程的执行。以下代码,在 timeoutNs 纳秒后执行 select 的 timeout 分支后,执行 client.Call 的协程也随之结束,不会给通道 ch 返回值:

ch := make(chan error, 1)
go func() { ch <- client.Call("Service.Method", args, &reply) } ()
select {
case resp := <-ch
// use resp and reply
case <-time.After(timeoutNs):
// call timed out
break
}

注意缓冲大小设置为 1 是必要的,可以避免协程死锁以及确保超时的通道可以被垃圾回收。此外,需要注意在有多个case 符合条件时, select 对 case 的选择是伪随机的,如果上面的代码稍作修改如下,则 select 语句可能不会在定时器超时信号到来时立刻选中 time.After(timeoutNs) 对应的 case ,因此协程可能不会严格按照定时器设置的时间结束。

ch := make(chan int, 1)
go func() { for { ch <- 1 } } ()
L:
for {
select {
case <-ch:
// do something
case <-time.After(timeoutNs):
// call timed out
break L
}
}

第三种形式:假设程序从多个复制的数据库同时读取。只需要一个答案,需要接收首先到达的答案, Query 函数获取数据库的连接切片并请求。并行请求每一个数据库并返回收到的第一个响应:

func Query(conns []conn, query string) Result {
ch := make(chan Result, 1)
for _, conn := range conns {
go func(c Conn) {
select {
case ch <- c.DoQuery(query):
default:
}
}(conn)
}
return <- ch
}

再次声明,结果通道 ch 必须是带缓冲的:以保证第一个发送进来的数据有地方可以存放,确保放入的首个数据总会成功,所以第一个到达的值会被获取而与执行的顺序无关。正在执行的协程可以总是可以使用 runtime.Goexit() 来停止。
在应用中缓存数据:
应用程序中用到了来自数据库(或者常见的数据存储)的数据时,经常会把数据缓存到内存中,因为从数据库中获取数据的操作代价很高;如果数据库中的值不发生变化就没有问题。但是如果值有变化,我们需要一个机制来周期性的从数据库重新读取这些值:缓存的值就不可用(过期)了,而且我们也不希望用户看到陈旧的数据。

六、协程和恢复(recover)

一个用到 recover 的程序停掉了服务器内部一个失败的协程而不影响其他协程的工作。

func server(workChan <-chan *Work) {
for work := range workChan {
go safelyDo(work) // start the goroutine for that work
}
}
func safelyDo(work *Work) {
defer func() {
if err := recover(); err != nil {
log.Printf("Work failed with %s in %v", err, work)
}
}()
do(work)
}

上边的代码,如果 do(work) 发生 panic,错误会被记录且协程会退出并释放,而其他协程不受影响。因为 recover 总是返回 nil ,除非直接在 defer 修饰的函数中调用, defer 修饰的代码可以调用那些自身可以使用panic 和 recover 避免失败的库例程(库函数)。举例, safelyDo() 中 defer 修饰的函数可能在调用 recover 之前就调用了一个 logging 函数, panicking 状态不会影响 logging 代码的运行。因为加入了恢复模式,函数do (以及它调用的任何东西)可以通过调用 panic 来摆脱不好的情况。但是恢复是在 panicking 的协程内部的:不
能被另外一个协程恢复。

七、新旧模型对比:任务和worker

假设我们需要处理很多任务;一个worker处理一项任务。任务可以被定义为一个结构体(具体的细节在这里并不重要):

type Task struct {
// some state
}

使用通道进行同步:使用一个通道接受需要处理的任务,一个通道接受处理完成的任务(及其结果)。worker在协程中启动,其数量N应该根据任务数量进行调整。主线程扮演着Master节点角色,可能写成如下形式:

func main() {
pending, done := make(chan *Task), make(chan *Task)
go sendWork(pending) // put tasks with work on the channel
for i := 0; i < N; i++ { // start N goroutines to do work
go Worker(pending, done)
}
consumeWork(done) // continue with the processed tasks
}

worker的逻辑比较简单:从pending通道拿任务,处理后将其放到done通道中:

func Worker(in, out chan *Task) {
for {
t := <-in
process(t)
out <- t
}
}

这里并不使用锁:从通道得到新任务的过程没有任何竞争。随着任务数量增加,worker数量也应该相应增加,同时性能并不会像第一种方式那样下降明显。在pending通道中存在一份任务的拷贝,第一个worker从pending通道中获得第一个任务并进行处理,这里并不存在竞争(对一个通道读数据和写数据的整个过程是原子性的)。某一个任务会在哪一个worker中被执行是不可知的,反过来也是。worker数量的增多也会增加通信的开销,这会对性能有轻微的影响。从这个简单的例子中可能很难看出第二种模式的优势,但含有复杂锁运用的程序不仅在编写上显得困难,也不容易编写正确,使用第二种模式的话,就无需考虑这么复杂的东西了。因此,第二种模式对比第一种模式而言,不仅性能是一个主要优势,而且还有个更大的优势:代码显得更清晰、更优雅。一个更符合go语言习惯的worker写法:

IDIOM: Use an in- and out-channel instead of locking
func Worker(in, out chan *Task) {
for {
t := <-in
process(t)
out <- t
}
}

对于任何可以建模为Master-Worker范例的问题,一个类似于worker使用通道进行通信和交互、Master进行整体协调的方案都能完美解决。如果系统部署在多台机器上,各个机器上执行Worker协程,Master和Worker之间使用netchan或者RPC进行通信。怎么选择是该使用锁还是通道?
通道是一个较新的概念,本节我们着重强调了在go协程里通道的使用,但这并不意味着经典的锁方法就不能使用。go语言让你可以根据实际问题进行选择:创建一个优雅、简单、可读性强、在大多数场景性能表现都能很好的方案。如果你的问题适合使用锁,也不要忌讳使用它。go语言注重实用,什么方式最能解决你的问题就用什么方式,而不是强迫你使用一种编码风格。下面列出一个普遍的经验法则:
使用锁的情景:
访问共享数据结构中的缓存信息
保存应用程序上下文和状态信息数据
使用通道的情景:
与异步操作的结果进行交互
分发任务
传递数据所有权
当你发现你的锁使用规则变得很复杂时,可以反省使用通道会不会使问题变得简单些。

八、惰性生成器的实现:生成器是指当被调用时返回一个序列中下一个值的函数,例如:

generateInteger() => 0
generateInteger() => 1
generateInteger() => 2
....

生成器每次返回的是序列中下一个值而非整个序列;这种特性也称之为惰性求值:只在你需要时进行求值,同时保留相关变量资源(内存和cpu):这是一项在需要时对表达式进行求值的技术。例如,生成一个无限数量的偶数序列:要产生这样一个序列并且在一个一个的使用可能会很困难,而且内存会溢出!但是一个含有通道和go协程的函数能轻易实现这个需求。在例子中,我们实现了一个使用 int 型通道来实现的生成器。通道被命名为 yield 和 resume ,这些词经常在协程代码中使用。

package main
import (
"fmt"
)
var resume chan int
func integers() chan int {
yield := make(chan int)
count := 0
go func() {
for {
yield <- count
count++
}
}()
return yield
}
func generateInteger() int {
return <-resume
}
func main() {
resume = integers()
fmt.Println(generateInteger()) //=> 0
fmt.Println(generateInteger()) //=> 1
fmt.Println(generateInteger()) //=> 2
}

有一个细微的区别是从通道读取的值可能会是稍早前产生的,并不是在程序被调用时生成的。如果确实需要这样的行为,就得实现一个请求响应机制。当生成器生成数据的过程是计算密集型且各个结果的顺序并不重要时,那么就可以将生成器放入到go协程实现并行化。但是得小心,使用大量的go协程的开销可能会超过带来的性能增益。
这些原则可以概括为:通过巧妙地使用空接口、闭包和高阶函数,我们能实现一个通用的惰性生产器的工厂函数 BuildLazyEvaluator (这个应该放在一个工具包中实现)。工厂函数需要一个函数和一个初始状态作为输入参数,返回一个无参、返回值是生成序列的函数。传入的函数需要计算出下一个返回值以及下一个状态参数。在工厂函数中,创建一个通道和无限循环的go协程。返回值被放到了该通道中,返回函数稍后被调用时从该通道中取得该返回值。每当取得一个值时,下一个值即被计算。在下面的例子中,定义了一个 evenFunc 函数,其是一个惰性生成函数:在main函数中,我们创建了前10个偶数,每个都是通过调用 even() 函数取得下一个值的。为此,我们需要在 BuildLazyIntEvaluator 函数中具体化我们的生成函数,然后我们能够基于此做出定义。

package main
import ("fmt"
)
type Any interface{}
type EvalFunc func(Any) (Any, Any)
func main() {evenFunc := func(state Any) (Any, Any) {os := state.(int)ns := os + 2return os, ns}even := BuildLazyIntEvaluator(evenFunc, 0)for i := 0; i < 10; i++ {fmt.Printf("%vth even: %v\n", i, even())}
}
func BuildLazyEvaluator(evalFunc EvalFunc, initState Any) func() Any {retValChan := make(chan Any)loopFunc := func() {var actState Any = initStatevar retVal Anyfor {retVal, actState = evalFunc(actState)retValChan <- retVal}}retFunc := func() Any {return <- retValChan}go loopFunc()return retFunc
}
func BuildLazyIntEvaluator(evalFunc EvalFunc, initState Any) func() int {ef := BuildLazyEvaluator(evalFunc, initState)return func() int {return ef().(int)}
}

输出:

0th even: 0
1th even: 2
2th even: 4
3th even: 6
4th even: 8
5th even: 10
6th even: 12
7th even: 14
8th even: 16
9th even: 18

九、实现 Futures 模式

所谓Futures就是指:有时候在你使用某一个值之前需要先对其进行计算。这种情况下,你就可以在另一个处理器上进行该值的计算,到使用时,该值就已经计算完毕了。Futures模式通过闭包和通道可以很容易实现,类似于生成器,不同地方在于Futures需要返回一个值。参考条目文献给出了一个很精彩的例子:假设我们有一个矩阵类型,我们需要计算两个矩阵A和B乘积的逆,首先我们通过函数 Inverse(M) 分别对其进行求逆运算,在将结果相乘。如下函数 InverseProduct() 实现了如上过程:

func InverseProduct(a Matrix, b Matrix) {
a_inv := Inverse(a)
b_inv := Inverse(b)
return Product(a_inv, b_inv)
}

在这个例子中,a和b的求逆矩阵需要先被计算。那么为什么在计算b的逆矩阵时,需要等待a的逆计算完成呢?显然不必要,这两个求逆运算其实可以并行执行的。换句话说,调用 Product 函数只需要等到 a_inv 和 b_inv 的计算完成。如下代码实现了并行计算方式:

func InverseProduct(a Matrix, b Matrix) {
a_inv_future := InverseFuture(a) // start as a goroutine
b_inv_future := InverseFuture(b) // start as a goroutine
a_inv := <-a_inv_future
b_inv := <-b_inv_future
return Product(a_inv, b_inv)
}

InverseFuture 函数起了一个 goroutine 协程,在其执行闭包运算,该闭包会将矩阵求逆结果放入到future通道中:

func InverseFuture(a Matrix) chan Matrix {
future := make(chan Matrix)
go func() {
future <- Inverse(a)
}()
return future
}

当开发一个计算密集型库时,使用Futures模式设计API接口是很有意义的。在你的包使用Futures模式,且能保持友好的API接口。此外,Futures可以通过一个异步的API暴露出来。这样你可以以最小的成本将包中的并行计算移到用户代码中。

注:开始不懂了,比较深奥。

  相关解决方案