一、前言
channel一个类型管道,通过它可以在Golang的多个goroutine之间发送和接受消息,实现信息的通信,并且是Golang在语言层面提供的goroutine间的通信方式。
众所周知,Go依赖于成为CSP(***municating Sequential Processes)的并发模型,通过Channel实现这种同步模式。Go并发的核心哲学是不要通过共享内存的方式进行通信;相反,通过沟通来分享信息;
下面以简单的示例来演示Go如何通过Channel来实现通信的:
package main
import (
"fmt"
"time"
)
func goRoutineA(a <-chan int) {
val := <-a
fmt.Println("goRoutineA received the data", val)
}
func goRoutineB(b chan int) {
val := <-b
fmt.Println("goRoutineB received the data", val)
}
func main() {
ch := make(chan int, 3)
go goRoutineA(ch)
go goRoutineB(ch)
ch <- 3
time.Sleep(time.Second * 1)
}
结果为:goRoutineA received the data 3
上面只是个简单的例子,只输出goRoutineA,没有执行goRoutineB,说明Channel仅允许被一个goroutine读写。
二、channel应用场景
- 数据交流:当做并发的buffer或者queue,解决生产者 - 消费者问题。多个goroutine可以并发当做生产者(producer)和消费者(consumer);
- 数据传递:一个goroutine将数据交给另一个goroutine,相当于把数据的拥有权托付出去;
- 信号通知:一个goroutine可以将信号(closing、closed、data ready)等信号传递给另一个goroutine或者另一组goroutine;
- 任务编排:可以让一组goroutine按照一定的顺序并发或者串行的执行,这就是编排能力;
- 锁机制:利用channel实现互斥机制;
三、通道channel基本信息
3.1、声明和初始化
var name chan T
其中,name代表chan的名字,为自定义内容;chan T代表通道的类型,T代表通道中的元素类型。在声明时,channel必须与一个实际的类型T绑定在一起,代表通道中能够读取和传递的元素类型。
上述代码仅完成了通道的声明,还未进行初始化,一个没有初始化的通道会被置为nil,这一点可以通过简单的输出得出:
func main() {
var c chan string
fmt.Println(c)
}
最后输出结果为:<nil>
因此在通道声明完成以后,还需要对其进行初始化,初始化需要使用make操作符,make会初始化通道,并在内存中分配通道的空间;
var c = make(chan int, 10)
通道的表示形式有三种:
3.1.1、chan T 读写双向通道
// 处理双向通道的示例
func processData(ch chan int) {
// 发送数据
ch <- 42
// 接收数据
data := <-ch
fmt.Println("Processed:", data)
}
func main() {
ch := make(chan int, 1)
go processData(ch)
// 主协程也可以双向操作
time.Sleep(100 * time.Millisecond)
ch <- 100
time.Sleep(100 * time.Millisecond)
fmt.Println("Final value:", <-ch)
}
该类型的通道灵活性更强,同时对于使用者的要求也更高,需要100%明确每次读取、写入的数据是否符合预期;
3.1.2、chan<-T 只能写入通道
当函数只需要向通道发送数据而不需要读取时使用,确保函数不会意外读取通道数据。
func main() {
var c = make(chan<- int)
c <- 1
select {
case val := <-c:
fmt.Println("goRoutineA received the data", val)
default:
return
}
}
尝试从一个只能写入的通道中读取数据的时候,此时会报编译错误:
invalid operation: cannot receive from send-only channel a (variable of type chan<- int)
3.1.3、<-chan T 只能读取通道
// 数据消费者,只从通道读取数据
func dataConsumer(ch <-chan int) {
for value := range ch {
fmt.Println("Consumed:", value)
time.Sleep(200 * time.Millisecond)
}
fmt.Println("Channel closed")
}
// 数据生成器,返回一个只接收通道
func dataGenerator(count int) <-chan int {
ch := make(chan int)
go func() {
defer close(ch)
for i := 0; i < count; i++ {
ch <- i * i
}
}()
return ch
}
func main() {
// 获取只接收通道
dataCh := dataGenerator(5)
// 只能从通道读取,不能发送
dataConsumer(dataCh)
// 下面的代码会编译错误
// dataCh <- 100 // 不能向只接收通道发送数据
}
尝试往一个只能读取数据的通道中写入数据的时候,此时会报编译错误:
invalid operation: cannot send to receive-only channel dataCh (variable of type <-chan int)
3.1.4、答疑解惑
可能会有部分初学者对于上述三种通道有疑惑,只读通道,仅允许从通道中读取数据,不允许写入,那我读取的数据从哪儿来的呢?仅允许写入通道,写进去的数据肯定是希望第三方(其他goroutine)感知到,进而取走进行消费处理的,那不允许读取,我写进去的意义是什么呢?
A:只发送通道并不是数据的终点,而是管道的一头。数据写入后,是通过其他关联的双向通道或只接收通道被消费的。关键点在于“通道的引用”和“通道的方向约束”是两回事。在 Go 中,通道是引用类型。当您创建一个通道并将其传递给多个函数时,所有函数操作的是同一个底层通道,只是通过类型系统限制了不同函数对通道的操作权限。
例:
// 只发送通道 - 只能写入数据
func producer(ch chan<- int) {
for i := 0; i < 5; i++ {
fmt.Printf("生产者发送: %d\n", i)
ch <- i // 写入数据
time.Sleep(100 * time.Millisecond)
}
close(ch) // 发送方关闭通道
}
// 只接收通道 - 只能读取数据
func consumer(ch <-chan int) {
for value := range ch {
fmt.Printf("消费者收到: %d\n", value)
time.Sleep(200 * time.Millisecond)
}
fmt.Println("消费者: 通道已关闭")
}
func main() {
// 创建一个双向通道
dataChannel := make(chan int, 3)
// 启动消费者 - 传递只接收通道
go consumer(dataChannel)
// 生产者使用只发送通道
producer(dataChannel)
fmt.Println("程序结束")
}
3.1.4.1、为什么使用方向约束?
- 类型安全:防止函数意外执行不应该的操作
- 代码清晰:明确函数对通道的使用意图
- 编译时检查:错误在编译时就被发现,而不是运行时
3.1.4.2、关键理解点
- 通道是共享的:chan<- T 和 <-chan T 都是对同一个底层通道的视图
- 数据流向:通过只发送通道写入的数据,会出现在对应的接收通道中
- 内存模型:通道提供了 goroutine 之间的安全通信机制,确保数据同步
这种设计模式使得 Go 的并发编程更加安全和清晰,每个函数只做它应该做的事情,不会意外干扰其他部分的数据流。
3.1.5、通道关闭
close(c) //关闭通道
在正常读取的情况下,通道返回的ok为true。通道在关闭时仍然会返回,但是data为其类型的零值,ok也变为了false。和通道读取不同的是,不能向已经关闭的通道中写入数据。
var c = make(chan int)
close(c)
c<-5 //panic: send on closed channel
通道关闭会通知所有正在读取通道的协程,相当于向所有读取协程中都写入了数据。
四、原理剖析
4.1、channel对应的底层实现函数
在探究channel源码之前,我们至少需要先找到channel在Golang的具体实现在哪。因为我们在使用channel时,用的是<-符号,并不能直接在go源码中找到其实现。但是Golang的编译器必然会将<-符号翻译成底层对应的实现。
我们可以使用Go自带的命令: go tool ***pile -N -l -S hello.go, 将代码翻译成对应的汇编指令。
或者,直接可以使用***piler Explorer这个在线工具。对于上述示例代码可以直接在这个链接看其汇编结果: go.godbolt.org/z/3xw5Cj
通过仔细查看以上示例代码对应的汇编指令,可以发现以下的对应关系:
- channel的构造语句 make(chan int), 对应的是 runtime.makechan函数
- 发送语句 c <- 1, 对应的是runtime.chansend1函数
- 接收语句 x := <- c, 对应的是runtime.chanrecv1函数
以上几个函数的实现都位于go源码中的runtime/chan.go代码文件中。我们接下来针对这几个函数,探究下channel的实现。
4.2、channel的构造
channel的构造语句 make(chan int),将会被golang编译器翻译为runtime.makechan函数, 其函数签名如下:
func makechan(t *chantype, size int) *hchan
其中,t *chantype即构造channel时传入的元素类型。size int即用户指定的channel缓冲区大小,不指定则为0。该函数的返回值是*hchan。hchan则是channel在golang中的内部实现。其定义如下:
// hchan 通道结构
type hchan struct {
qcount uint // 队列中数据元素个数(len)
dataqsiz uint // 队列可容纳数据元素的容量(cap)
buf unsafe.Pointer // 指向队列的指针
elemsize uint16 // 单个数据元素的大小
closed uint32 // 通道是否关闭的标志
elemtype *_type // 数据元素类型
sendx uint // 队列下标:表示元素写入时存放至队列中的位置
recvx uint // 队列下标:表示元素从队列的该位置读取
recvq waitq // 等待接收数据的goroutine队列
sendq waitq // 等待发送数据的goroutine队列
lock mutex // 锁,保护hchan中的所有字段
}
// waitq 等待队列(实质上是一个双向链表)
type waitq struct {
first *sudog // 指向双向链表头节点
last *sudog // 指向双向链表尾节点
// sudog表示一个在等待队列中的goroutine
}
type sudog struct {
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // 指向数据
c *hchan // channel
...
}
lock 锁保护 hchan 中的所有字段,以及此通道上被阻塞的 sudogs 中的字段。
hchan中的所有属性大致可以分为三类:
- buffer相关的属性。例如buf、dataqsiz、qcount等。 当channel的缓冲区大小不为0时,buffer中存放了待接收的数据。使用ring buffer实现。
- waitq相关的属性,可以理解为是一个FIFO的标准队列。其中recvq中是正在等待接收数据的goroutine,sendq中是等待发送数据的goroutine。waitq使用双向链表实现。
- 其他属性,例如lock、elemtype、closed等。
4.2.1、hchan类型
一个channel只能传递一种类型的值,类型信息存储在hchan数据结构体中,_type结构体中包含elemtype及elemsize等。
elemetype代表类型,用于数据传递过程中的赋值
elemesize代码类型大小,用于在buf中定位元素位置
4.2.2、hchan环形队列
hchan内部实现了一个环形队列(ring buffer)作为缓冲区,队列的长度是创建channel时指定的。下图展示了一个可缓存4个元素的channel的示意图:
- dataqsiz指示队列长度为4,即可缓存4个元素
- buf指向队列的内存区
- qcount表示队列中还有3个元素
- sendx指示后续写入的数据存储的位置,取值[0,3)
- recvx指示从该位置读取数据,取值[0,3)
4.2.3、hchan等待队列
从channel读数据,如果channel缓冲区为空或者没有缓存区,当前goroutine会被封装成sudog加入至等待队列recvq,然后进入阻塞状态。上图展示了一个缓冲队列数据为空的channel,有3个goroutine阻塞等待读取数据。
向channel写数据,如果channel缓冲区已满或者没有缓冲区,当前goroutine会被封装成sudog加入至等待队列sendq,然后进入阻塞状态。上图展示了一个缓冲队列已满的channel,有4个goroutine阻塞等待写入数据。
处于等待队列中的goroutine会在其他goroutine操作channel时被唤醒:(一次操作,只会唤醒一个阻塞goroutine)
因读数据阻塞的goroutine会被向channel写入数据的goroutine唤醒
因写数据阻塞的goroutine会被从channel读出数据的goroutine唤醒
4.2.4、ring buffer实现
channel中使用了ring buffer(环形缓冲区)来缓存写入的数据。ring buffer有很多好处,而且非常适合用来实现FIFO式的固定长度队列。
在channel中,ring buffer的实现如下:
hchan中有两个与buffer相关的变量: recvx和sendx。其中sendx表示buffer中可写的index, recvx表示buffer中可读的index。 从recvx到sendx之间的元素,表示已正常存放入buffer中的数据。
我们可以直接使用buf[recvx]来读取到队列的第一个元素,使用buf[sendx] = x来将元素放到队尾。
4.2.4.1、buffer的写入
当buffer未满时,将数据放入到buffer中的操作如下:
qp := chanbuf(c, c.sendx)
// 将数据拷贝到buffer中
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
其中chanbuf(c, c.sendx)相当于c.buf[c.sendx]。以上过程非常简单,就是将数据拷贝到buffer的sendx的位置上。
接着,将sendx移到下一个位置上。如果sendx已到达最后一位,则将其置为0,这是一个典型的头尾相连的做法。
4.2.4.2、buffer的读取
当buffer未满时,此时sendq里面也一定是空的(因为如果buffer未满,用于发送数据的goroutine肯定不会排队,而是直接放数据到buffer中,具体逻辑参考上文向channel发送数据一节),这时候对于channel的读取过程chanrecv就比较简单了,直接从buffer中读取即可,也是一个移动recvx的过程。与上文buffer的写入基本一致。
而sendq里面有已等待的goroutine的时候,此时buffer一定是满的。这个时候channel的读取逻辑如下:
/ 相当于c.buf[c.recvx]
qp := chanbuf(c, c.recvx)
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
以上代码中,ep接收数据的变量对应的地址。例如,在x := <- c中,表示变量x的地址。
而sg代表从sendq中取出的第一个sudog。并且:
typedmemmove(c.elemtype, ep, qp)表示buffer中的当前可读元素拷贝到接收变量的地址处。
typedmemmove(c.elemtype, qp, sg.elem)表示将sendq中goroutine等待发送的数据拷贝到buffer中。因为此后进行了recv++, 因此相当于把sendq中的数据放到了队尾。
简单来说,这里channel将buffer中队首的数据拷贝给了对应的接收变量,同时将sendq中的元素拷贝到了队尾,这样可以才可以做到数据的FIFO(先入先出)。
接下来可能有点绕,c.sendx = c.recvx, 这句话实际的作用相当于c.sendx = (c.sendx+1) % c.dataqsiz,因为此时buffer依然是满的,所以sendx == recvx是成立的。