go 语言中的 channel 与协程间通信
目录
go 中有一种数据类型 chan
,它本身的用途是消息通道,用来在 goroutines 间实现接收、发送消息。同时有缓存功能,因此可视它为 跨 协程队列。
注意,是语言级别上的支持,不同函数库支持。这一点可以让它的表达语法设计得更简单。
这个跨协程消息通信功能,可以非常简单地实现其它语言中较麻烦的并发任务系统、工作队列系统。
如何创建、使用 channel #
~/Projects/go/examples
➜ mkdir channel && cd channel
~/Projects/go/examples/zerovalue
➜ go mod init youwu.today/go/channel
~/Projects/go/examples/zerovalue
➜ touch channel.go
在 channel.go 中添加如下示例代码:
// channel.go
package main
import "fmt"
func main() {
fmt.Println("😀😀 有悟的 go channel 示例 😀😀")
}
不创建基础工程也无所谓,如果没有依赖第三方库,放在一个 main.go
代码文件中,使用 go run main.go
来测试也是可行的。
在本文开头有提到 channel 是一个通道、队列,那么我们关心的应该就是如何创建这个通道、将数据装到通道中、从通道中提取数据。
golang 为它设计了一个操作符,left <- right
,当 left
为 channel 时,表示向通道中写入数据(发送),当 right
为通道时,表示从通道提取数据(接收)。
package main
import "fmt"
func main() {
// fmt.Println("😀😀 有悟的 go channel 示例 😀😀")
simpleChan()
}
func simpleChan() {
// 声明一 chan 类型的变量
var ch chan string
ch = make(chan string)
// 向 channel 中发送 string 类型数据
go func() {
ch <- "ping"
}()
// 创建一个 string 类型的变量,用来保存从 channel 队列提取的数据
var v string
v = <-ch
fmt.Println(v)
}
~/Projects/go/examples/channel
➜ go build channel.go && ./channel
ping
根据 go 变量声明的语法,上面可以简写为:
// channel.go
...
ch := make(chan string)
go func() {
ch <- "ping"
}()
v := <-ch
fmt.Println(v)
...
两个操作语句就可以完成了数据入队列(ch <- "ping"
),数据出队列(v = <-ch
)的动作。这里有个问题需要注意,channel 的接收与发送需要分别在两个 goroutine 中,如果你是直接看英文的文档、或者其他介绍的文章,可能没有指出这个要求。它是 跨 协程的。
若发送与接收没有分布在两个 goroutine 中,会出现这样的报错信息:
// channel.go
ch := make(chan string)
ch <- "ping"
v := <-ch
~/Projects/go/examples/channel
➜ go build channel.go && ./channel
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
/Users/youwu.today/Projects/go/examples/channel/channel.go:9 +0x37
用一句话总结:channel 是连接 concurrent goroutines 的管道(pipe)。
场景示例:简单的 worker pool 编程模型 #
用 channel 的跨协程消息通讯(数据传输)功能,我们可以实现一个 worker pool。
- 一个 jobs 队列,缓存需要处理的 jobs
- 几个 worker 工作协程,从 jobs 队列取任务来处理
- 一个 results 队列,保存任务处理结果
// channel.go
package main
import (
"fmt"
"time"
)
func main() {
workpools()
}
func workpools() {
const number_of_jobs = 5
const number_of_workers = 2
jobs := make(chan int, number_of_jobs)
results := make(chan string, number_of_jobs)
// 控制并行度,每个 worker 函数都运行在单独的 goroutine 中
for w := 1; w <= number_of_workers; w++ {
go worker(w, jobs, results)
}
// 向 任务队列写入任务
for i := 1; i <= number_of_jobs; i++ {
jobs <- i
}
fmt.Println("布置 job 后,关闭 jobs channel")
close(jobs)
// 监听 results channel,只要有内容就会被取走
for i := 1; i <= number_of_jobs; i++ {
fmt.Printf("结果: %s\n", <-results)
}
}
// worker 逻辑:一个不断从 jobs chan 中取任务的循环
// 并将结果放在 out channel 中待取
func worker(id int, jobs <-chan int, out chan<- string) {
fmt.Printf("worker #%d 启动\n", id)
for job := range jobs {
fmt.Printf("worker #%d 开始 工作%d\n", id, job)
// sleep 模拟 『正在处理任务』
time.Sleep(time.Millisecond * 500)
fmt.Printf("worker #%d 结束 工作%d\n", id, job)
out <- fmt.Sprintf("worker #%d 工作%d", id, job)
}
fmt.Printf("worker #%d 退出\n", id)
}
本例仅用于讲解演示,请勿在生产系统的程序中照搬。
上面过程,使用了 go func(){}()
(生成 go协程)、chan
(传递数据的管道) 简单地实现了一个具有并发功能的 工作者资源池模型。在例子中,设定了一定并发度的 worker routine,这些函数会从 jobs 任务 队列中获取任务来处理,并把结果发送到 results channel 中。
go 运行时按照自己的调度规则,来协调多个 worker 和主协程之间的运行。
worker 协程启动后,等待 jobs channel 被写入数据。对于熟悉 java、c++ 等其它编程语言的人来说,for job := range jobs
非常有迷惑性,它并不需要 jobs 的长度是已知的。for range
非常神奇,在 jobs channel 有值时才进行循环迭代。
提醒 :
注意
worker
函数的两个参数类型的区别,jobs
、out
,一个是<-chan int
,另一个是chan<- string
。
编译并运行:
~/Projects/go/examples/channel
➜ go build channel.go && ./channel
布置 job 后,关闭 jobs channel
worker #2 启动
worker #2 开始 工作1
worker #1 启动
worker #1 开始 工作2
worker #1 结束 工作2
worker #1 开始 工作3
结果: worker #1 工作2
worker #2 结束 工作1
worker #2 开始 工作4
结果: worker #2 工作1
worker #2 结束 工作4
worker #2 开始 工作5
结果: worker #2 工作4
worker #1 结束 工作3
worker #1 退出
结果: worker #1 工作3
worker #2 结束 工作5
worker #2 退出
结果: worker #2 工作5
自己动手:
- 把
jobs := make(chan int, number_of_jobs)
改为make(chan int, 0)
会出现 “fatal error: all goroutines are asleep - deadlock!”- 把
jobs := make(chan int, number_of_jobs)
改为make(chan int, 1)
、make(chan int, 2)
,观察 『布置 job 后,关闭 jobs channel』 这一句打印的位置以上两个课后作业,有助于理解 channel 的功能与特点。
再次声明,不要在程序的生产环境中使用上述代码。它仅作演示所用,没有考虑很多需要处理的情况,并不健壮。