Skip to main content
  1. 经验技巧分享/
  2. 后端编程/

go 语言中的 channel 与协程间通信

··658 字
go

go 中有一种数据类型 chan,它本身的用途是消息通道,用来在 goroutines 间实现接收、发送消息。同时有缓存功能,因此可视它为 协程队列。 注意,是语言级别上的支持,不同函数库支持。这一点可以让它的表达语法设计得更简单。 这个跨协程消息通信功能,可以非常简单地实现其它语言中较麻烦的并发任务系统、工作队列系统。

如何创建、使用 channel #

为了下面例子演示的需要,先创建一个工程:
~/Projects/go/examples
➜  mkdir channel && cd channel

~/Projects/go/examples/zerovalue
➜  go mod init youwu.today/go/channel

~/Projects/go/examples/zerovalue
➜  touch channel.go

channel.go 中添加如下示例代码:

// channel.go
package main

import "fmt"

func main() {
    fmt.Println("😀😀 有悟的 go channel 示例 😀😀")
}

不创建基础工程也无所谓,如果没有依赖第三方库,放在一个 main.go 代码文件中,使用 go run main.go 来测试也是可行的。

在本文开头有提到 channel 是一个通道、队列,那么我们关心的应该就是如何创建这个通道、将数据装到通道中、从通道中提取数据。 golang 为它设计了一个操作符,left <- right,当 left 为 channel 时,表示向通道中写入数据(发送),当 right 为通道时,表示从通道提取数据(接收)。

package main

import  "fmt"

func main() {
    // fmt.Println("😀😀 有悟的 go channel 示例 😀😀")
    simpleChan()
}

func simpleChan() {
    // 声明一 chan 类型的变量
    var ch chan string
    ch = make(chan string)
    // 向 channel 中发送 string 类型数据
    go func() {
        ch <- "ping"
    }()
    // 创建一个 string 类型的变量,用来保存从 channel 队列提取的数据
    var v string
    v = <-ch
    fmt.Println(v)
}
~/Projects/go/examples/channel 
➜  go build channel.go && ./channel
ping

根据 go 变量声明的语法,上面可以简写为:

// channel.go
...
    ch := make(chan string)
    go func() {
     ch <- "ping"
    }()
    v := <-ch
    fmt.Println(v)
...

两个操作语句就可以完成了数据入队列(ch <- "ping"),数据出队列(v = <-ch)的动作。这里有个问题需要注意,channel 的接收与发送需要分别在两个 goroutine 中,如果你是直接看英文的文档、或者其他介绍的文章,可能没有指出这个要求。它是 协程的。

┌─┐                                ┌─┐
│g│                                │g│
│o│                                │o│
│r│                                │r│
│o│                                │o│
│u│                                │u│
│t│                                │t│
│i│ receive/             send/     │i│
│n│ queue out┌─────────┐ queue in  │n│
│e│◀─────────│ channel │◀──────────│e│
│1│          └─────────┘           │2│
└─┘                                └─┘

若发送与接收没有分布在两个 goroutine 中,会出现这样的报错信息:

// channel.go
    ch := make(chan string)
    ch <- "ping"
    v := <-ch
~/Projects/go/examples/channel 
➜  go build channel.go && ./channel
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
        /Users/youwu.today/Projects/go/examples/channel/channel.go:9 +0x37

用一句话总结:channel 是连接 concurrent goroutines 的管道(pipe)。

场景示例:简单的 worker pool 编程模型 #

用 channel 的跨协程消息通讯(数据传输)功能,我们可以实现一个 worker pool。

  • 一个 jobs 队列,缓存需要处理的 jobs
  • 几个 worker 工作协程,从 jobs 队列取任务来处理
  • 一个 results 队列,保存任务处理结果
// channel.go
package main

import (
    "fmt"
    "time"
)

func main() {
    workpools()
}

func workpools() {
    const number_of_jobs = 5
    const number_of_workers = 2
    jobs := make(chan int, number_of_jobs)
    results := make(chan string, number_of_jobs)

    // 控制并行度,每个 worker 函数都运行在单独的 goroutine 中
    for w := 1; w <= number_of_workers; w++ {
        go worker(w, jobs, results)
    }

    // 向 任务队列写入任务
    for i := 1; i <= number_of_jobs; i++ {
        jobs <- i
    }
    fmt.Println("布置 job 后,关闭 jobs channel")
    close(jobs)

    // 监听 results channel,只要有内容就会被取走
    for i := 1; i <= number_of_jobs; i++ {
        fmt.Printf("结果: %s\n", <-results)
    }
}

// worker 逻辑:一个不断从 jobs chan 中取任务的循环
// 并将结果放在 out channel 中待取
func worker(id int, jobs <-chan int, out chan<- string) {
    fmt.Printf("worker #%d 启动\n", id)
    for job := range jobs {
        fmt.Printf("worker #%d 开始 工作%d\n", id, job)
        // sleep 模拟 『正在处理任务』
        time.Sleep(time.Millisecond * 500)
        fmt.Printf("worker #%d 结束 工作%d\n", id, job)

        out <- fmt.Sprintf("worker #%d 工作%d", id, job)
    }
    fmt.Printf("worker #%d 退出\n", id)
}

本例仅用于讲解演示,请勿在生产系统的程序中照搬。

上面过程,使用了 go func(){}()(生成 go协程)、chan(传递数据的管道) 简单地实现了一个具有并发功能的 工作者资源池模型。在例子中,设定了一定并发度的 worker routine,这些函数会从 jobs 任务 队列中获取任务来处理,并把结果发送到 results channel 中。

go 运行时按照自己的调度规则,来协调多个 worker 和主协程之间的运行。 worker 协程启动后,等待 jobs channel 被写入数据。对于熟悉 java、c++ 等其它编程语言的人来说,for job := range jobs 非常有迷惑性,它并不需要 jobs 的长度是已知的。for range 非常神奇,在 jobs channel 有值时才进行循环迭代。

                            ┌─┐                      
                            │w│                      
┌─┐                         │o│                      
│m│                         │r│                      
│a│                         │k│   save               
│i│               poll job──│e│──result─────┐        
│n│               ▼         │r│             ▼        
│r│send job  ╔═════════╗    │#│┌─┐     ╔═════════╗   
│o│─────────▶║  jobs   ║    │1││w│     ║ results ║◀─┐
│u│          ╚═════════╝    └─┘│o│     ╚═════════╝  │
│t│               ▲            │r│          ▲       │
│i│               │            │k│     save │       │
│n│              poll job──────│e│────result┘       │
│e│                            │r│                  │
└─┘                            │#│                  │
 │                             │2│                  │
 │              poll           └─┘                  │
 └─────────────result───────────────────────────────┘

提醒 :

注意 worker 函数的两个参数类型的区别,jobsout,一个是 <-chan int,另一个是 chan<- string

编译并运行:

~/Projects/go/examples/channel 
➜  go build channel.go && ./channel
布置 job 后,关闭 jobs channel
worker #2 启动
worker #2 开始 工作1
worker #1 启动
worker #1 开始 工作2
worker #1 结束 工作2
worker #1 开始 工作3
结果: worker #1 工作2
worker #2 结束 工作1
worker #2 开始 工作4
结果: worker #2 工作1
worker #2 结束 工作4
worker #2 开始 工作5
结果: worker #2 工作4
worker #1 结束 工作3
worker #1 退出
结果: worker #1 工作3
worker #2 结束 工作5
worker #2 退出
结果: worker #2 工作5

自己动手:

  1. jobs := make(chan int, number_of_jobs) 改为 make(chan int, 0) 会出现 “fatal error: all goroutines are asleep - deadlock!”
  2. jobs := make(chan int, number_of_jobs) 改为 make(chan int, 1)make(chan int, 2),观察 『布置 job 后,关闭 jobs channel』 这一句打印的位置

以上两个课后作业,有助于理解 channel 的功能与特点。

再次声明,不要在程序的生产环境中使用上述代码。它仅作演示所用,没有考虑很多需要处理的情况,并不健壮。