深入详解Go语言channel底层实现原理

深入详解Go语言channel底层实现原理

一、前言

channel一个类型管道,通过它可以在Golang的多个goroutine之间发送和接受消息,实现信息的通信,并且是Golang在语言层面提供的goroutine间的通信方式。
众所周知,Go依赖于成为CSP(***municating Sequential Processes)的并发模型,通过Channel实现这种同步模式。Go并发的核心哲学是不要通过共享内存的方式进行通信;相反,通过沟通来分享信息;

下面以简单的示例来演示Go如何通过Channel来实现通信的:

package main
import (
    "fmt"
    "time"
)
func goRoutineA(a <-chan int) {
    val := <-a
    fmt.Println("goRoutineA received the data", val)
}
func goRoutineB(b chan int) {
    val := <-b
    fmt.Println("goRoutineB  received the data", val)
}
func main() {
    ch := make(chan int, 3)
    go goRoutineA(ch)
    go goRoutineB(ch)
    ch <- 3
    time.Sleep(time.Second * 1)
}

结果为:goRoutineA received the data 3
上面只是个简单的例子,只输出goRoutineA,没有执行goRoutineB,说明Channel仅允许被一个goroutine读写。

二、channel应用场景

  • 数据交流:当做并发的buffer或者queue,解决生产者 - 消费者问题。多个goroutine可以并发当做生产者(producer)和消费者(consumer);
  • 数据传递:一个goroutine将数据交给另一个goroutine,相当于把数据的拥有权托付出去;
  • 信号通知:一个goroutine可以将信号(closing、closed、data ready)等信号传递给另一个goroutine或者另一组goroutine;
  • 任务编排:可以让一组goroutine按照一定的顺序并发或者串行的执行,这就是编排能力;
  • 锁机制:利用channel实现互斥机制;

三、通道channel基本信息

3.1、声明和初始化

var name chan T
其中,name代表chan的名字,为自定义内容;chan T代表通道的类型,T代表通道中的元素类型。在声明时,channel必须与一个实际的类型T绑定在一起,代表通道中能够读取和传递的元素类型。

上述代码仅完成了通道的声明,还未进行初始化,一个没有初始化的通道会被置为nil,这一点可以通过简单的输出得出:

func main() {
	var c chan string 
	fmt.Println(c)
}

最后输出结果为:<nil>
因此在通道声明完成以后,还需要对其进行初始化,初始化需要使用make操作符,make会初始化通道,并在内存中分配通道的空间;

var c = make(chan int, 10)

通道的表示形式有三种:

3.1.1、chan T 读写双向通道

// 处理双向通道的示例
func processData(ch chan int) {
    // 发送数据
    ch <- 42
    
    // 接收数据
    data := <-ch
    fmt.Println("Processed:", data)
}

func main() {
    ch := make(chan int, 1)
    go processData(ch)
    
    // 主协程也可以双向操作
    time.Sleep(100 * time.Millisecond)
    ch <- 100
    
    time.Sleep(100 * time.Millisecond)
    fmt.Println("Final value:", <-ch)
}

该类型的通道灵活性更强,同时对于使用者的要求也更高,需要100%明确每次读取、写入的数据是否符合预期;

3.1.2、chan<-T 只能写入通道

当函数只需要向通道发送数据而不需要读取时使用,确保函数不会意外读取通道数据。

func main() {
	var c = make(chan<- int)
	c <- 1
	select {
	case val := <-c:
		fmt.Println("goRoutineA received the data", val)
	default:
		return
	}
}

尝试从一个只能写入的通道中读取数据的时候,此时会报编译错误:

invalid operation: cannot receive from send-only channel a (variable of type chan<- int)

3.1.3、<-chan T 只能读取通道

// 数据消费者,只从通道读取数据
func dataConsumer(ch <-chan int) {
    for value := range ch {
        fmt.Println("Consumed:", value)
        time.Sleep(200 * time.Millisecond)
    }
    fmt.Println("Channel closed")
}

// 数据生成器,返回一个只接收通道
func dataGenerator(count int) <-chan int {
    ch := make(chan int)
    go func() {
        defer close(ch)
        for i := 0; i < count; i++ {
            ch <- i * i
        }
    }()
    return ch
}

func main() {
    // 获取只接收通道
    dataCh := dataGenerator(5)
    
    // 只能从通道读取,不能发送
    dataConsumer(dataCh)
    
    // 下面的代码会编译错误
    // dataCh <- 100 // 不能向只接收通道发送数据
}

尝试往一个只能读取数据的通道中写入数据的时候,此时会报编译错误:

invalid operation: cannot send to receive-only channel dataCh (variable of type <-chan int)

3.1.4、答疑解惑

可能会有部分初学者对于上述三种通道有疑惑,只读通道,仅允许从通道中读取数据,不允许写入,那我读取的数据从哪儿来的呢?仅允许写入通道,写进去的数据肯定是希望第三方(其他goroutine)感知到,进而取走进行消费处理的,那不允许读取,我写进去的意义是什么呢?

A:只发送通道并不是数据的终点,而是管道的一头。数据写入后,是通过其他关联的双向通道或只接收通道被消费的。关键点在于“通道的引用”和“通道的方向约束”是两回事。在 Go 中,通道是引用类型。当您创建一个通道并将其传递给多个函数时,所有函数操作的是同一个底层通道,只是通过类型系统限制了不同函数对通道的操作权限。
例:

// 只发送通道 - 只能写入数据
func producer(ch chan<- int) {
    for i := 0; i < 5; i++ {
        fmt.Printf("生产者发送: %d\n", i)
        ch <- i // 写入数据
        time.Sleep(100 * time.Millisecond)
    }
    close(ch) // 发送方关闭通道
}

// 只接收通道 - 只能读取数据
func consumer(ch <-chan int) {
    for value := range ch {
        fmt.Printf("消费者收到: %d\n", value)
        time.Sleep(200 * time.Millisecond)
    }
    fmt.Println("消费者: 通道已关闭")
}

func main() {
    // 创建一个双向通道
    dataChannel := make(chan int, 3)
    
    // 启动消费者 - 传递只接收通道
    go consumer(dataChannel)
    
    // 生产者使用只发送通道
    producer(dataChannel)
    
    fmt.Println("程序结束")
}
3.1.4.1、为什么使用方向约束?
  • 类型安全:防止函数意外执行不应该的操作
  • 代码清晰:明确函数对通道的使用意图
  • 编译时检查:错误在编译时就被发现,而不是运行时
3.1.4.2、关键理解点
  • 通道是共享的:chan<- T 和 <-chan T 都是对同一个底层通道的视图
  • 数据流向:通过只发送通道写入的数据,会出现在对应的接收通道中
  • 内存模型:通道提供了 goroutine 之间的安全通信机制,确保数据同步
    这种设计模式使得 Go 的并发编程更加安全和清晰,每个函数只做它应该做的事情,不会意外干扰其他部分的数据流。

3.1.5、通道关闭

close(c) //关闭通道

在正常读取的情况下,通道返回的ok为true。通道在关闭时仍然会返回,但是data为其类型的零值,ok也变为了false。和通道读取不同的是,不能向已经关闭的通道中写入数据。

var c = make(chan int)
close(c)
c<-5 //panic: send on closed channel

通道关闭会通知所有正在读取通道的协程,相当于向所有读取协程中都写入了数据。

四、原理剖析

4.1、channel对应的底层实现函数

在探究channel源码之前,我们至少需要先找到channel在Golang的具体实现在哪。因为我们在使用channel时,用的是<-符号,并不能直接在go源码中找到其实现。但是Golang的编译器必然会将<-符号翻译成底层对应的实现。

我们可以使用Go自带的命令: go tool ***pile -N -l -S hello.go, 将代码翻译成对应的汇编指令。

或者,直接可以使用***piler Explorer这个在线工具。对于上述示例代码可以直接在这个链接看其汇编结果: go.godbolt.org/z/3xw5Cj

通过仔细查看以上示例代码对应的汇编指令,可以发现以下的对应关系:

  1. channel的构造语句 make(chan int), 对应的是 runtime.makechan函数
  2. 发送语句 c <- 1, 对应的是runtime.chansend1函数
  3. 接收语句 x := <- c, 对应的是runtime.chanrecv1函数

以上几个函数的实现都位于go源码中的runtime/chan.go代码文件中。我们接下来针对这几个函数,探究下channel的实现。

4.2、channel的构造

channel的构造语句 make(chan int),将会被golang编译器翻译为runtime.makechan函数, 其函数签名如下:

func makechan(t *chantype, size int) *hchan

其中,t *chantype即构造channel时传入的元素类型。size int即用户指定的channel缓冲区大小,不指定则为0。该函数的返回值是*hchan。hchan则是channel在golang中的内部实现。其定义如下:

// hchan 通道结构
type hchan struct {
	qcount   uint           // 队列中数据元素个数(len)
	dataqsiz uint           // 队列可容纳数据元素的容量(cap)
	buf      unsafe.Pointer // 指向队列的指针
	elemsize uint16			// 单个数据元素的大小
	closed   uint32			// 通道是否关闭的标志
	elemtype *_type 	    // 数据元素类型
	sendx    uint   // 队列下标:表示元素写入时存放至队列中的位置
	recvx    uint   // 队列下标:表示元素从队列的该位置读取
	recvq    waitq  // 等待接收数据的goroutine队列
	sendq    waitq  // 等待发送数据的goroutine队列
	
	lock mutex 	 	// 锁,保护hchan中的所有字段
}

// waitq 等待队列(实质上是一个双向链表)
type waitq struct {
	first *sudog  // 指向双向链表头节点
	last  *sudog  // 指向双向链表尾节点
	// sudog表示一个在等待队列中的goroutine
}

type sudog struct {
	g *g
	next *sudog
	prev *sudog
	elem unsafe.Pointer // 指向数据
	c    *hchan     // channel
	...
}

lock 锁保护 hchan 中的所有字段,以及此通道上被阻塞的 sudogs 中的字段。

hchan中的所有属性大致可以分为三类:

  • buffer相关的属性。例如buf、dataqsiz、qcount等。 当channel的缓冲区大小不为0时,buffer中存放了待接收的数据。使用ring buffer实现。
  • waitq相关的属性,可以理解为是一个FIFO的标准队列。其中recvq中是正在等待接收数据的goroutine,sendq中是等待发送数据的goroutine。waitq使用双向链表实现。
  • 其他属性,例如lock、elemtype、closed等。

4.2.1、hchan类型

一个channel只能传递一种类型的值,类型信息存储在hchan数据结构体中,_type结构体中包含elemtype及elemsize等。

elemetype代表类型,用于数据传递过程中的赋值
elemesize代码类型大小,用于在buf中定位元素位置

4.2.2、hchan环形队列

hchan内部实现了一个环形队列(ring buffer)作为缓冲区,队列的长度是创建channel时指定的。下图展示了一个可缓存4个元素的channel的示意图:

  • dataqsiz指示队列长度为4,即可缓存4个元素
  • buf指向队列的内存区
  • qcount表示队列中还有3个元素
  • sendx指示后续写入的数据存储的位置,取值[0,3)
  • recvx指示从该位置读取数据,取值[0,3)

4.2.3、hchan等待队列


从channel读数据,如果channel缓冲区为空或者没有缓存区,当前goroutine会被封装成sudog加入至等待队列recvq,然后进入阻塞状态。上图展示了一个缓冲队列数据为空的channel,有3个goroutine阻塞等待读取数据。

向channel写数据,如果channel缓冲区已满或者没有缓冲区,当前goroutine会被封装成sudog加入至等待队列sendq,然后进入阻塞状态。上图展示了一个缓冲队列已满的channel,有4个goroutine阻塞等待写入数据。

处于等待队列中的goroutine会在其他goroutine操作channel时被唤醒:(一次操作,只会唤醒一个阻塞goroutine)

因读数据阻塞的goroutine会被向channel写入数据的goroutine唤醒
因写数据阻塞的goroutine会被从channel读出数据的goroutine唤醒

4.2.4、ring buffer实现

channel中使用了ring buffer(环形缓冲区)来缓存写入的数据。ring buffer有很多好处,而且非常适合用来实现FIFO式的固定长度队列。

在channel中,ring buffer的实现如下:

hchan中有两个与buffer相关的变量: recvx和sendx。其中sendx表示buffer中可写的index, recvx表示buffer中可读的index。 从recvx到sendx之间的元素,表示已正常存放入buffer中的数据。

我们可以直接使用buf[recvx]来读取到队列的第一个元素,使用buf[sendx] = x来将元素放到队尾。

4.2.4.1、buffer的写入

当buffer未满时,将数据放入到buffer中的操作如下:

qp := chanbuf(c, c.sendx)
// 将数据拷贝到buffer中
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
	c.sendx = 0
}
c.qcount++

其中chanbuf(c, c.sendx)相当于c.buf[c.sendx]。以上过程非常简单,就是将数据拷贝到buffer的sendx的位置上。

接着,将sendx移到下一个位置上。如果sendx已到达最后一位,则将其置为0,这是一个典型的头尾相连的做法。

4.2.4.2、buffer的读取

当buffer未满时,此时sendq里面也一定是空的(因为如果buffer未满,用于发送数据的goroutine肯定不会排队,而是直接放数据到buffer中,具体逻辑参考上文向channel发送数据一节),这时候对于channel的读取过程chanrecv就比较简单了,直接从buffer中读取即可,也是一个移动recvx的过程。与上文buffer的写入基本一致。

而sendq里面有已等待的goroutine的时候,此时buffer一定是满的。这个时候channel的读取逻辑如下:

/ 相当于c.buf[c.recvx]
qp := chanbuf(c, c.recvx)
// copy data from queue to receiver
if ep != nil {
	typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
	c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz		

以上代码中,ep接收数据的变量对应的地址。例如,在x := <- c中,表示变量x的地址。
而sg代表从sendq中取出的第一个sudog。并且:

typedmemmove(c.elemtype, ep, qp)表示buffer中的当前可读元素拷贝到接收变量的地址处。
typedmemmove(c.elemtype, qp, sg.elem)表示将sendq中goroutine等待发送的数据拷贝到buffer中。因为此后进行了recv++, 因此相当于把sendq中的数据放到了队尾。

简单来说,这里channel将buffer中队首的数据拷贝给了对应的接收变量,同时将sendq中的元素拷贝到了队尾,这样可以才可以做到数据的FIFO(先入先出)。

接下来可能有点绕,c.sendx = c.recvx, 这句话实际的作用相当于c.sendx = (c.sendx+1) % c.dataqsiz,因为此时buffer依然是满的,所以sendx == recvx是成立的。

转载请说明出处内容投诉
CSS教程网 » 深入详解Go语言channel底层实现原理

发表评论

欢迎 访客 发表评论

一个令你着迷的主题!

查看演示 官网购买