通俗易懂剖析Go Channel:理解并发通信的核心机制

通俗易懂剖析Go Channel:理解并发通信的核心机制

本文来自 Go就业训练营 小韬同学的投稿。

也强烈安利大家多写博客,不仅能倒逼自己学习总结,也能作为简历的加分项,提高求职面试的竞争力。

你想想看:面试官看到你简历中的博客主页有几十篇文章,几千粉丝是什么感觉。要比你空洞洞的写一句“热爱技术”强太多啦!

正文

我们在学习与使用Go语言的过程中,对channel并不陌生,channel是Go语言与众不同的特性之一,也是非常重要的一环,深入理解Channel,相信能够在使用的时候更加的得心应手。

一、Channel基本用法

1、channel类别

channel在类型上,可以分为两种:

  • 双向channel:既能接收又能发送的channel
  • 单向channel:只能发送或只能接收的channel,即单向channel可以为分为:
    • 只写channel
    • 只读channel

声明并初始化如下如下:

func main() {  
    // 声明并初始化  
    var ch chan string = make(chan string) // 双向channel  
    var readCh <-chan string = make(<-chan string) // 只读channel  
    var writeCh chan<- string = make(chan<- string) // 只写channel  
}  

上述定义中,<-表示单向的channel。如果箭头指向chan,就表示只写channel,可以往chan里边写入数据;如果箭头远离chan,则表示为只读channel,可以从chan读数据。

在定义channel时,可以定义任意类型的channel,因此也同样可以定义chan类型的channel。例如:

a := make(chan<- chan int)   // 定义类型为 chan int 的写channel  
b := make(chan<- <-chan int) // 定义类型为 <-chan int 的写channel  
c := make(<-chan <-chan int) // 定义类型为 <-chan int 的读channel  
d := make(chan (<-chan int)) // 定义类型为 (<-chan int) 的读channel  

channel未初始化时,其零值为nilnil 是 chan 的零值,是一种特殊的 chan,对值是 nil 的 chan 的发送接收调用者总是会阻塞。

func main() {  
    var ch chan string  
    fmt.Println(ch) // <nil>  
}  

通过make我们可以初始化一个channel,并且可以设置其容量的大小,如下初始化了一个类型为string,其容量大小为512channel

var ch chan string = make(chan string, 512)  

当初始化定义了channel的容量,则这样的channel叫做buffered chan,即有缓冲channel。如果没有设置容量,channel的容量为0,这样的channel叫做unbuffered chan,即无缓冲channel

有缓冲channel中,如果channel中还有数据,则从这个channel接收数据时不会被阻塞。如果channel的容量还未满,那么向这个channel发送数据也不会被阻塞,反之则会被阻塞。

无缓冲channel则只有当读写操作都准备好后,才不会阻塞,这也是unbuffered chan在使用过程中非常需要注意的一点,否则可能会出现常见的bug。

channel的常见操作:

  1. 发送数据

往channel发送一个数据使用ch <-

func main() {  
    var ch chan int = make(chan int, 512)  
    ch <- 2000  
}  

上述的ch可以是chan int类型,也可以是单向chan <-int

  1. 接收数据

从channel接收一条数据可以使用<-ch

func main() {  
    var ch chan int = make(chan int, 512)  
    ch <- 2000 // 发送数据  
  
    data := <-ch // 接收数据  
    fmt.Println(data) // 2000  
}  

ch 类型是 chan T,也可以是单向<-chan T

在接收数据时,可以返回两个返回值。第一个返回值返回channel中的元素,第二个返回值为bool类型,表示是否成功地从channel中读取到一个值。

如果第二个参数是false,则表示channel已经被close而且channel中没有缓存的数据,这个时候第一个值返回的是零值。

func main() {  
    var ch chan int = make(chan int, 512)  
    ch <- 2000 // 发送数据  
  
    data1, ok1 := <-ch // 接收数据  
    fmt.Printf("data1 = %d, ok1 = %t\n", data1, ok1) // data1 = 2000, ok1 = true  
    close(ch)  // 关闭channel  
    data2, ok2 := <-ch  // 接收数据  
    fmt.Printf("data2 = %d, ok2 = %t", data2, ok2) // data2 = 0, ok2 = false  
}  

所以,如果从channel读取到一个零值,可能是发送操作真正发送的零值,也可能是closed关闭channel并且channel没有缓存元素产生的零值,这是需要注意判别的一个点。

  1. 其他操作

Go内建的函数closecaplen都可以对chan类型进行操作。

  • close:关闭channel。
  • cap:返回channel的容量。
  • len:返回channel缓存中还未被取走的元素数量。
func main() {  
    var ch chan int = make(chan int, 512)  
    ch <- 100  
    ch <- 200  
    fmt.Println("ch len:", len(ch)) // ch len: 2  
    fmt.Println("ch cap:", cap(ch)) // ch cap: 512  
}  

发送操作接收操作可以作为select语句中的case clause,例如:

func main() {  
    var ch = make(chan int, 512)  
    for i := 0; i < 10; i++ {  
       select {  
       case ch <- i:  
       case v := <-ch:  
          fmt.Println(v)  
       }  
    }  
}  

for-range语句同样可以在chan中使用,例如:

func main() {  
    var ch = make(chan int, 512)  
    ch <- 100  
    ch <- 200  
    ch <- 300  
    for v := range ch {  
       fmt.Println(v)  
    }  
}  
  
// 执行结果  
100  
200  
300  

2、select介绍

在Go语言中,select语句用于监控一组case语句,根据特定的条件执行相对应的case语句或default语句,与switch类似,但不同之处在于select语句中所有case中的表达式都必须是channel的发送或接收操作。select使用示例代码如下:

select {  
case <-ch1:  
    fmt.Println("ch1")  
case ch2 <- 1:  
    fmt.Println("ch2")  
}  

上述代码中,select关键字让当前goroutine同时等待ch1 的可读和ch2的可写,在满足任意一个case分支之前,select 会一直阻塞下去,直到其中的一个 channel 转为就绪状态时执行对应case分支的代码。如果多个channel同时就绪的话则随机选择一个case执行。

当使用空select时,空的 select 语句会直接阻塞当前的goroutine,使得该goroutine进入无法被唤醒的永久休眠状态。空select,即select内不包含任何case

select{  
    
}  

另外当select语句内只有一个case分支时,如果该case分支不满足,那么当前select就变成了一个阻塞的channel读/写操作。

select {  
case <-ch1:  
    fmt.Println("ch1")  
}  

上述select中,当ch1可读时,会执行打印操作,反之则阻塞当前goroutine

select语句内包含default分支时,如果select内的所有case都不满足,则会执行default分支的逻辑,用于当其他case都不满足时执行一些默认操作。

select {  
case <-ch1:  
    fmt.Println("ch1")  
case ch2 <- 1:  
    fmt.Println("ch2")  
default:  
    fmt.Println("default")  
}  

上述代码中,当ch1可读或ch2可写时,会执行相应的打印操作,否则就执行default语句中的代码,相当于一个非阻塞的channel读取操作。

select的使用可以总结为:

  • select不存在任何的case且没有default分支:永久阻塞当前 goroutine;
  • select只存在一个case且没有default分支:阻塞的发送/接收;
  • select存在多个case:随机选择一个满足条件的case执行;
  • select存在default,其他case都不满足时:执行default语句中的代码;

二、Channel实现原理

从代码的角度剖析channel的实现,能够让我们更好的去使用channel

我们可以从chan类型的数据结构、初始化以及三个操作发送、接收和关闭这几个方面来了解channel

1、chan数据结构

chan类型的数据结构定义位于runtime.hchan,其结构体定义如下:

type hchan struct {  
    qcount   uint           // total data in the queue  
    dataqsiz uint           // size of the circular queue  
    buf      unsafe.Pointer // points to an array of dataqsiz elements  
    elemsize uint16  
    closed   uint32  
    elemtype *_type // element type  
    sendx    uint   // send index  
    recvx    uint   // receive index  
    recvq    waitq  // list of recv waiters  
    sendq    waitq  // list of send waiters  
  
    // lock protects all fields in hchan, as well as several  
    // fields in sudogs blocked on this channel.  
    //  
    // Do not change another G's status while holding this lock  
    // (in particular, do not ready a G), as this can deadlock  
    // with stack shrinking.  
    lock mutex  
}  

解释一下上述各个字段的意义:

  • qcount:表示chan中已经接收到的数据且还未被取走的元素个数。内建函数len可以返回这个字段的值。
  • datasiz:循环队列的大小。chan在实现上使用一个循环队列来存放元素的个数,循环队列适用于生产者-消费者的场景。
  • buf:存放元素的循环队列bufferbuf 字段是一个指向队列缓冲区的指针,即指向一个dataqsiz元素的数组。buf 字段是使用 unsafe.Pointer 类型来表示队列缓冲区的起始地址。unsafe.Pointer是一种特殊的指针类型,它可以用于指向任何类型的数据。由于队列缓冲区的类型是动态分配的,所以不能直接使用某个具体类型的指针来表示。
  • elemtypeelemsizeelemtype表示chan中元素的数据类型,elemsize表示其大小。当chan定义后,它的元素类型是固定的,即普通类型或者指针类型,因此元素大小也是固定的。
  • sendx:处理发送数据操作的指针在buf队列中的位置。当channel接收到了新的数据时,该指针就会加上elemsize,移动到下一个位置。buf 的总大小是elemsize的整数倍且buf是一个循环列表。
  • recvx:处理接收数据操作的指针在buf队列中的位置。当从buf中取出数据,此指针会移动到下一个位置。
  • recvq:当接收操作发现channel中没有数据可读时,会被则色,此时会被加入到recvq队列中。
  • sendq:当发送操作发现buf队列已满时,会被进行阻塞,此时会被加入到sendq队列中。

2、chan初始化

channel在进行初始化时,Go编译器会根据是否传入容量的大小,来选择调用makechan64,还是makechanmakechan64在实现上底层还是调用makechan来进行初始化,makechan64只是对size做了检查。

makechan函数根据chan的容量的大小和元素的类型不同,初始化不同的存储空间。省略一些检查代码,makechan函数的主要逻辑如下:

func makechan(t *chantype, size int) *hchan {  
    elem := t.elem  
      
    ...  
  
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))  
      
    ...  
      
    var c *hchan  
    switch {  
    case mem == 0:  
       // 队列或元素大小为零,不必创建buf  
       c = (*hchan)(mallocgc(hchanSize, nil, true))  
       c.buf = c.raceaddr()  
    case elem.ptrdata == 0:  
       // 元素不包含指针,分配一块连续的内存给hchan数据结构和buf  
       // hchan数据结构后面紧接着就是buf,在一次调用中分配hchan和buf  
       c = (*hchan)(mallocgc(hchanSize+mem, nil, true))  
       c.buf = add(unsafe.Pointer(c), hchanSize)  
    default:  
       // 元素包含指针,单独分配buf  
       c = new(hchan)  
       c.buf = mallocgc(mem, elem, true)  
    }  
  
    // 记录元素大小、类型、容量  
    c.elemsize = uint16(elem.size)  
    c.elemtype = elem  
    c.dataqsiz = uint(size)  
    lockInit(&c.lock, lockRankHchan)  
      
    ...  
      
    return c  
}  

3、send发送操作

Go在编译发送数据给channel时,会把发送操作send转换成chansend1函数,而chansend1函数会调用chansend函数。

func chansend1(c *hchan, elem unsafe.Pointer) {  
    chansend(c, elem, true, getcallerpc())  
}  

我们可以来分段分析chansend函数的实现逻辑。

第一部分:

主要是对chan进行判断,判断chan是否为nil,若为nil,则判断是否需要将当前goroutine进行阻塞,阻塞通过gopark来对调用者goroutine park(阻塞休眠)。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {  
    // 第一部分  
    if c == nil { // 判断chan是否为nil  
       if !block { // 判断是否需要阻塞当前goroutine  
          return false  
       }  
       // 调用这goroutine park,进行阻塞休眠  
       gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)  
       throw("unreachable")  
    }  
      
    ...  
}  

第二部分

第二部分的逻辑判断是当你往一个容量已满的chan实例发送数据,且不想当前调用的goroutine被阻塞时(chan未被关闭),那么处理的逻辑是直接返回。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {  
    ...  
    // 第二部分  
    if !block && c.closed == 0 && full(c) {  
        return false  
    }  
    ...  
}  

第三部分

第三部分的逻辑判断是首先进行互斥锁加锁,然后判断当前chan是否关闭,如果chan已经被close了,则释放互斥锁并panic,即对已关闭的chan发送数据会panic

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {  
    ...  
    // 第三部分  
    lock(&c.lock) // 开始加锁  
  
    if c.closed != 0 { // 判断channel是否关闭  
        unlock(&c.lock)  
        panic(plainError("send on closed channel"))  
    }  
    ...  
}  

第四部分

第四部分的逻辑主要是判断接收队列中是否有正在等待的接收方receiver。如果存在正在等待的receiver(说明此时buf中没有缓存的数据),则将他从接收队列中弹出,直接将需要发送到channel的数据交给这个receiver,而无需放入到buf中,让发送操作速度更快一些。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {  
    ...  
      
    // 第四部分  
    if sg := c.recvq.dequeue(); sg != nil {  
       // 找到了一个正在等待的接收者。我们传递我们想要发送的值  
       // 直接传递给receiver接收者,绕过channel buf缓存区(如果receiver有的话)  
       send(c, sg, ep, func() { unlock(&c.lock) }, 3)  
       return true  
    }  
  
    ...  
}  

第五部分

当等待队列中并没有正在等待的receiver,则说明当前buf还没有满,此时将发送的数据放入到buf中。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {  
    ...  
      
    // 第五部分  
    if c.qcount < c.dataqsiz { // 判断buf是否满了  
       // channel buf还有可用的空间. 将发送数据入buf循环队列.  
       qp := chanbuf(c, c.sendx)  
       if raceenabled {  
          racenotify(c, c.sendx, nil)  
       }  
       typedmemmove(c.elemtype, qp, ep)  
       c.sendx++  
       if c.sendx == c.dataqsiz {  
          c.sendx = 0  
       }  
       c.qcount++  
       unlock(&c.lock)  
       return true  
    }  
      
    ...  
}  

第六部分

当逻辑走到第六部分,说明正在处理buf已满的情况。如果buf已满,则发送操作的goroutine就会加入到发送者的等待队列,直到被唤醒。当goroutine被唤醒时,数据或者被取走了,或者chan已经被关闭了。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {  
    ...  
    // 第六部分  
      
    // chansend1函数调用不会进入if块里,因为chansend1的block=true  
    if !block {  
       unlock(&c.lock)  
       return false  
    }  
      
    ...  
      
    c.sendq.enqueue(mysg) // 加入发送队列  
      
    ...  
      
    gopark(chanpark***mit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // 阻塞  
      
    ...  
}  

4、recv接收操作

channel中接收数据时,Go会将代码转换成chanrecv1函数。如果需要返回两个返回值,则会转换成chanrecv2chanrecv1函数和chanrecv2都会调用chanrecv函数。chanrecv1chanrecv2传入的 block参数的值是true,两种调用都是阻塞方式,因此在分析chanrecv函数的实现时,可以不考虑 block=false的情况。

// 从已编译代码中进入 <-c 的入口点  
func chanrecv1(c *hchan, elem unsafe.Pointer) {  
    chanrecv(c, elem, true)  
}  
  
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {  
    _, received = chanrecv(c, elem, true)  
    return  
}  

同样,省略一些检查类的代码,我们也可以分段分析chanrecv函数的逻辑。

第一部分

第一部分主要判断当前进行接收操作的chan实例是否为nil,若为nil,则从nil chan中接收数据的调用这goroutine会被阻塞。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {  
    ...  
    // 第一部分  
    if c == nil { // 判断chan是否为nil  
       if !block { // 是否阻塞,默认为block=true  
          return  
       }  
       // 进行阻塞  
       gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)  
       throw("unreachable")  
    }  
    ...  
}  

第二部分
这一部分只要是考虑block=falsec为空的情况,block=false的情况我们可以不做考虑。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {  
    ...  
    // 检查未获得锁的失败非阻塞操作。  
    if !block && empty(c) {  
        ...  
    }  
    ...  
}  

第三部分

第三部分的逻辑为判断当前chan是否被关闭,若当前chan已经被close了,并且缓存队列中没有缓冲的元素时,返回truefalse

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {  
  
    ...  
     
    lock(&c.lock) // 加锁,返回时释放锁  
      
    // 第三部分  
    if c.closed != 0 { // 当chan已被关闭时  
        if c.qcount == 0 { // 且 buf区 没有缓存的数据了  
              
            ...  
              
            unlock(&c.lock) // 解锁  
            if ep != nil {  
               typedmemclr(c.elemtype, ep)  
            }  
            return true, false  
        }  
    }   
    ...  
}  

第四部分

第四部分是处理通道未关闭且buf缓存队列已满的情况。只有当缓存队列已满时,才能够从发送等待队列获取到sender。若当前的chanunbufferchan,即无缓冲区channel时,则直接将sender的发送数据传递给receiver。否则就从缓存队列的头部读取一个元素值,并将获取的sender携带的值加入到buf循环队列的尾部。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {  
    ...  
    if c.closed != 0 { // 当chan已被关闭时  
      
    } else { // 第四部分,通道未关闭  
       // 如果sendq队列中有等待发送的sender  
       if sg := c.sendq.dequeue(); sg != nil {  
          // 存在正在等待的sender,如果缓存区的容量为0则直接将发送方的值传递给接收方  
          // 反之,则从缓存队列的头部获取数据,并将获取的sender的发送值加入到缓存队列尾部  
          recv(c, sg, ep, func() { unlock(&c.lock) }, 3)  
          return true, true  
       }  
    }  
      
    ...  
}  

第五部分

第五部分的主要逻辑是处理发送队列中没有等待的senderbuf中有缓存的数据。该段逻辑与外出的互斥锁共用一把锁,因此不存在并发问题。当buf缓存区有缓存元素时,则取出该元素传递给receiver,同时移动接收指针。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {  
    ...  
      
    // 第五部分  
    if c.qcount > 0 { // 发送队列中没有等待的sender,且buf中有缓存数据  
        // 直接从缓存队列中获取数据  
        qp := chanbuf(c, c.recvx)  
        if raceenabled {  
           racenotify(c, c.recvx, nil)  
        }  
        if ep != nil {  
           typedmemmove(c.elemtype, ep, qp)  
        }  
        typedmemclr(c.elemtype, qp)  
        c.recvx++ // 移动接收指针  
        if c.recvx == c.dataqsiz { // 指针若已到末尾则进行重置(循环队列)  
           c.recvx = 0  
        }  
        c.qcount-- // 获取数据后,buf缓存区元素个数减一  
        unlock(&c.lock) // 解锁  
        return true, true  
    }  
  
    if !block { // block=true  
        unlock(&c.lock)  
        return false, false  
    }  
    ...  
}  

第六部分

第六部分的逻辑主要是处理buf缓存区中没有缓存数据的情况。当buf缓存区没有缓存数据时,那么当前的receiver就会被阻塞,直到它从sender中接收了数据,或者是chanclose,才会返回。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {  
    ...  
    c.recvq.enqueue(mysg) // 将当前接收操作入接收队列  
      
    ...  
      
    // 进行阻塞,等待唤醒  
    gopark(chanpark***mit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)  
    ...  
}  

5、close关闭

close函数主要用于channel的关闭,Go编译器会替换成closechan函数的调用。省略一些检查下的代码后,closechan函数的主要逻辑如下:

  • 如果当前channil,则直接panic
  • 如果当前chan已关闭,再次close则直接panic
  • 如果chan不为nilchan也没有closed,就把等待队列中的 sender(writer)receiver(reader)从队列中全部移除并唤醒。
func closechan(c *hchan) {  
    if c == nil { // 若当前chan未nil,则直接panic  
       panic(plainError("close of nil channel"))  
    }  
  
    lock(&c.lock) // 加锁  
      
    if c.closed != 0 { // 若当前chan已经关闭,则直接panic  
       unlock(&c.lock)  
       panic(plainError("close of closed channel"))  
    }  
      
    ...  
  
    c.closed = 1 // 设置当前channel的状态为已关闭  
  
    var glist gList  
  
    // 释放接收队列中所有的reader  
    for {  
       sg := c.recvq.dequeue()  
       if sg == nil {  
          break  
       }  
       if sg.elem != nil {  
          typedmemclr(c.elemtype, sg.elem)  
          sg.elem = nil  
       }  
       if sg.releasetime != 0 {  
          sg.releasetime = cputicks()  
       }  
       gp := sg.g  
       gp.param = unsafe.Pointer(sg)  
       sg.su***ess = false  
       if raceenabled {  
          raceacquireg(gp, c.raceaddr())  
       }  
       glist.push(gp)  
    }  
  
    // 释放发送队列中所有的writer (它们会panic)  
    for {  
       sg := c.sendq.dequeue()  
       if sg == nil {  
          break  
       }  
       sg.elem = nil  
       if sg.releasetime != 0 {  
          sg.releasetime = cputicks()  
       }  
       gp := sg.g  
       gp.param = unsafe.Pointer(sg)  
       sg.su***ess = false  
       if raceenabled {  
          raceacquireg(gp, c.raceaddr())  
       }  
       glist.push(gp)  
    }  
    unlock(&c.lock)  
  
    for !glist.empty() {  
       gp := glist.pop()  
       gp.schedlink = 0  
       goready(gp, 3)  
    }  
}  

三、总结

通过学习channel的基本使用,了解其操作背后的实现原理,可以帮助我们更好的使用channel,避免一些操作不当而导致的panic或者说是bug,让我们在使用channel时能够更加的得心应手。

channel的值和状态有多种情况,而不同的操作(send、recv、close)又可能得到不同的结果,这是使用 channel 类型时需要经常注意的点,我们可以将不同channel值下的不同操作进行一个总结,特别注意操作channel时会产生panic的情况,已经可能会导致线程阻塞的情况,都是有可能导致死锁与goroutine泄漏的罪魁祸首。

channel执行操作\channel状态 channel为nil channel buf为空 channel buf已满 channel buf未满且不为空 channel已关闭
receive接收操作 阻塞 阻塞 读取数据 读取数据 返回buf中缓存的数据
send发送操作 阻塞 写入数据 阻塞 写入数据 panic
close关闭 panic 关闭channel,buf中没有缓存数据 关闭channel,保留已缓存的数据 关闭channel,保留已缓存的数据 panic

又出成绩啦

我们又出成绩啦!大厂Offer集锦!遥遥领先!

这些朋友赢麻了!

这是一个专注程序员升职加薪の知识星球

答疑解惑

需要「简历优化」、「就业辅导」、「职业规划」的朋友可以联系我。

加我微信:wangzhongyang1993

关注我的同名公众号:王中阳Go

转载请说明出处内容投诉
CSS教程_站长资源网 » 通俗易懂剖析Go Channel:理解并发通信的核心机制

发表评论

欢迎 访客 发表评论

一个令你着迷的主题!

查看演示 官网购买