跳到主要内容
  1. Skills/
  2. 后端编程/

go 语言中的 channel 与协程间通信

··字数 2255·5 分钟
go

go 中有一种数据类型 chan,它本身的用途是消息通道,用来在 goroutines 间实现接收、发送消息。同时有缓存功能,因此可视它为 协程队列。 注意,是语言级别上的支持,不同函数库支持。这一点可以让它的表达语法设计得更简单。 这个跨协程消息通信功能,可以非常简单地实现其它语言中较麻烦的并发任务系统、工作队列系统。

如何创建、使用 channel #

~/Projects/go/examples
➜  mkdir channel && cd channel

~/Projects/go/examples/zerovalue
➜  go mod init youwu.today/go/channel

~/Projects/go/examples/zerovalue
➜  touch channel.go

channel.go 中添加如下示例代码:

// channel.go
package main

import "fmt"

func main() {
    fmt.Println("😀😀 有悟的 go channel 示例 😀😀")
}

不创建基础工程也无所谓,如果没有依赖第三方库,放在一个 main.go 代码文件中,使用 go run main.go 来测试也是可行的。

在本文开头有提到 channel 是一个通道、队列,那么我们关心的应该就是如何创建这个通道、将数据装到通道中、从通道中提取数据。 golang 为它设计了一个操作符,left <- right,当 left 为 channel 时,表示向通道中写入数据(发送),当 right 为通道时,表示从通道提取数据(接收)。

package main

import  "fmt"

func main() {
    // fmt.Println("😀😀 有悟的 go channel 示例 😀😀")
    simpleChan()
}

func simpleChan() {
    // 声明一 chan 类型的变量
    var ch chan string
    ch = make(chan string)
    // 向 channel 中发送 string 类型数据
    go func() {
        ch <- "ping"
    }()
    // 创建一个 string 类型的变量,用来保存从 channel 队列提取的数据
    var v string
    v = <-ch
    fmt.Println(v)
}
~/Projects/go/examples/channel 
➜  go build channel.go && ./channel
ping

根据 go 变量声明的语法,上面可以简写为:

// channel.go
...
    ch := make(chan string)
    go func() {
     ch <- "ping"
    }()
    v := <-ch
    fmt.Println(v)
...

两个操作语句就可以完成了数据入队列(ch <- "ping"),数据出队列(v = <-ch)的动作。这里有个问题需要注意,channel 的接收与发送需要分别在两个 goroutine 中,如果你是直接看英文的文档、或者其他介绍的文章,可能没有指出这个要求。它是 协程的。

g o r o u t i n e 1 r < q e u c e e u i e v e o u t c h a n n e l < s q e u n e d u e i n g o r o u t i n e 2

若发送与接收没有分布在两个 goroutine 中,会出现这样的报错信息:

// channel.go
    ch := make(chan string)
    ch <- "ping"
    v := <-ch
~/Projects/go/examples/channel 
➜  go build channel.go && ./channel
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
        /Users/youwu.today/Projects/go/examples/channel/channel.go:9 +0x37

用一句话总结:channel 是连接 concurrent goroutines 的管道(pipe)。

场景示例:简单的 worker pool 编程模型 #

用 channel 的跨协程消息通讯(数据传输)功能,我们可以实现一个 worker pool。

  • 一个 jobs 队列,缓存需要处理的 jobs
  • 几个 worker 工作协程,从 jobs 队列取任务来处理
  • 一个 results 队列,保存任务处理结果
// channel.go
package main

import (
    "fmt"
    "time"
)

func main() {
    workpools()
}

func workpools() {
    const number_of_jobs = 5
    const number_of_workers = 2
    jobs := make(chan int, number_of_jobs)
    results := make(chan string, number_of_jobs)

    // 控制并行度,每个 worker 函数都运行在单独的 goroutine 中
    for w := 1; w <= number_of_workers; w++ {
        go worker(w, jobs, results)
    }

    // 向 任务队列写入任务
    for i := 1; i <= number_of_jobs; i++ {
        jobs <- i
    }
    fmt.Println("布置 job 后,关闭 jobs channel")
    close(jobs)

    // 监听 results channel,只要有内容就会被取走
    for i := 1; i <= number_of_jobs; i++ {
        fmt.Printf("结果: %s\n", <-results)
    }
}

// worker 逻辑:一个不断从 jobs chan 中取任务的循环
// 并将结果放在 out channel 中待取
func worker(id int, jobs <-chan int, out chan<- string) {
    fmt.Printf("worker #%d 启动\n", id)
    for job := range jobs {
        fmt.Printf("worker #%d 开始 工作%d\n", id, job)
        // sleep 模拟 『正在处理任务』
        time.Sleep(time.Millisecond * 500)
        fmt.Printf("worker #%d 结束 工作%d\n", id, job)

        out <- fmt.Sprintf("worker #%d 工作%d", id, job)
    }
    fmt.Printf("worker #%d 退出\n", id)
}

本例仅用于讲解演示,请勿在生产系统的程序中照搬。

上面过程,使用了 go func(){}()(生成 go协程)、chan(传递数据的管道) 简单地实现了一个具有并发功能的 工作者资源池模型。在例子中,设定了一定并发度的 worker routine,这些函数会从 jobs 任务 队列中获取任务来处理,并把结果发送到 results channel 中。

go 运行时按照自己的调度规则,来协调多个 worker 和主协程之间的运行。 worker 协程启动后,等待 jobs channel 被写入数据。对于熟悉 java、c++ 等其它编程语言的人来说,for job := range jobs 非常有迷惑性,它并不需要 jobs 的长度是已知的。for range 非常神奇,在 jobs channel 有值时才进行循环迭代。

m a i n r o u t i n e s e n d j o b r j p e o p o s p b o l u o s l l l l l t l j j o o b b w o r k e r # 1 w o r k e r # 2 r s e a s v u e l t r s e a s r v u e e l s t u l t s

提醒 :

注意 worker 函数的两个参数类型的区别,jobsout,一个是 <-chan int,另一个是 chan<- string

编译并运行:

~/Projects/go/examples/channel 
➜  go build channel.go && ./channel
布置 job 后,关闭 jobs channel
worker #2 启动
worker #2 开始 工作1
worker #1 启动
worker #1 开始 工作2
worker #1 结束 工作2
worker #1 开始 工作3
结果: worker #1 工作2
worker #2 结束 工作1
worker #2 开始 工作4
结果: worker #2 工作1
worker #2 结束 工作4
worker #2 开始 工作5
结果: worker #2 工作4
worker #1 结束 工作3
worker #1 退出
结果: worker #1 工作3
worker #2 结束 工作5
worker #2 退出
结果: worker #2 工作5

自己动手:

  1. jobs := make(chan int, number_of_jobs) 改为 make(chan int, 0) 会出现 “fatal error: all goroutines are asleep - deadlock!”
  2. jobs := make(chan int, number_of_jobs) 改为 make(chan int, 1)make(chan int, 2),观察 『布置 job 后,关闭 jobs channel』 这一句打印的位置

以上两个课后作业,有助于理解 channel 的功能与特点。

再次声明,不要在程序的生产环境中使用上述代码。它仅作演示所用,没有考虑很多需要处理的情况,并不健壮。