channel总结

csp

CSP 全称是 “Communicating Sequential Processes”,其主要含义就是说不要通过共享内存来通信,而是通过通信来共享内存

底层数据结构

channel的源码实现在src/runtime/chan.go中,底层数据结构解析:

type hchan struct {
    qcount   uint           //   chan 里元素数量
    dataqsiz uint           // chan 底层循环数组的长度
    buf      unsafe.Pointer // 指向底层循环数组的指针  只针对有缓冲的 channel
    elemsize uint16 //chan 中元素的大小
    closed   uint32 //chan是否被关闭
    elemtype *_type // chan 中的元素类型
    sendx    uint   // 已发送元素在循环数组中的索引
    recvx    uint   // 已接收元素在循环数组中的索引   recvq    waitq  // 等待接收的 goroutine 队列
    sendq    waitq  // 等待发送的 goroutine 队列

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex  //保护hchan中的所有字段
}

其中几个重点字段:

  • buf:指向底层循环数组,只有缓冲型的channel才会有
  • sendx、recvx均指向底层循环数组,表示当前可以发送或者可以接受的元素位置索引。
  • recxq、sendq 分别表示被阻塞的goroutine,这些goroutine由于尝试从channel中读取数据而被阻塞了。
  • waitq是sudog的一个双向链表,而sudog其实是goroutine的一层封装。
    type waitq struct {
        first *sudog
        last  *sudog
    }
    
    type sudog struct {
        // The following fields are protected by the hchan.lock of the
        // channel this sudog is blocking on. shrinkstack depends on
        // this for sudogs involved in channel ops.
    
        g *g
    
        next *sudog
        prev *sudog
        elem unsafe.Pointer // data element (may point to stack)
    
        // The following fields are never accessed concurrently.
        // For channels, waitlink is only accessed by g.
        // For semaphores, all fields (including the ones above)
        // are only accessed when holding a semaRoot lock.
    
        acquiretime int64
        releasetime int64
        ticket      uint32
    
        // isSelect indicates g is participating in a select, so
        // g.selectDone must be CAS'd to win the wake-up race.
        isSelect bool
    
        // success indicates whether communication over channel c
        // succeeded. It is true if the goroutine was awoken because a
        // value was delivered over channel c, and false if awoken
        // because c was closed.
        success bool
    
        parent   *sudog // semaRoot binary tree
        waitlink *sudog // g.waiting list or semaRoot
        waittail *sudog // semaRoot
        c        *hchan // channel
    }
    
  • lock 来保证每个读channel或者写channel都是原子性的

例如创建一个容量为6,元素为int类型的channel数据结构如下:

16317732637675

channel发送和接收元素的本质是什么

channel发送和接受元素的本质都是值拷贝。无论是从 sender goroutine 的栈到 chan buf,还是从 chan buf 到 receiver goroutine,或者是直接从 sender goroutine 到 receiver goroutine。很好的诠释了csp理论,只通过通信来共享内存。

channel 发送数据的过程

带有缓冲的channel,buf是一个循环连表。

当channel 没有满的时候发送数据

  • 第一:加锁
  • 第二:把数据从goroutine中copy到buf中
  • 第三:释放锁。

当channel 中数据已经满了或者channel是一个没有缓存的channel

go的goroutine是一个用户态的线程,用户态的线程是需要用户手动去调度的,go运行时scheduler会帮我们完成调度的事情。

当channel中数据已经满了或者是一个没有缓冲区的channel,那么就会触发goroutine的调度。

比如现在有一个已经满了的channel (ch),现在一个运行中的goroutine G1向channel中send(ch <- 1),这时会主动调用go的调度器,让G1等待,并让出其M,让给其他G使用。同时G1也会被抽象成一个含有G1指针和send元素的sudog的结构体保存到hchan中的sendq队列中,等待被唤醒。

那么G1什么时候会被唤醒呢,那么就得等待其他goroutine来消费channel中buf的数据的时候,比如一个goroutine G2这个时候来消费channel (<- ch),G2将buf中的数据取出的时候,channel会将sendq中的G1推出,将G1的数据拷贝到buf中,并且调用go的scheduler唤醒G1,并将G1放到可运行的goroutine队列中,也就是GMP中的P中。

channel接收数据的过程

当channel中有数据的时候

  • 第一:加锁
  • 第二:把数据从buf中copy到goroutine中
  • 第三:释放锁

当channel中没有数据

当一个运行中的goroutine G1从一个没有数据的channel中读取数据时,G1会主动调用go的scheduler,让G1等待,让出M,让其他G使用,G1还会被抽象为一个含有G1指针和recv空元素的sudog结构体保存到recvq中等待被唤醒。

当另一个goroutine G2向channel中发送数据时,正常情况下:G2应该会将channel锁住,将G2的数据copy到buf中,但是,这边go做了一个优化,直接把数据从G2直接copy到了G1的栈中,在唤醒过程中,G1无需再获取channel的锁,然后再从channel中获取数据,减少了内存copy。

channel资源泄露

channel可能会引发goroutine泄露。

泄露的原因是goroutine操作channel后,处于发送或者接收的阻塞状态,而channel处于满或者空的状态,一直得不到改变。同时,垃圾回收机制也不会回收此类资源,进而导致goroutine会一直在等待队列中。

另外,程序运行过程中,对于一个 channel,如果没有任何 goroutine 引用了,gc 会对其进行回收操作,不会引起内存泄漏。

channel应用场景

停止信号

比如说服务的优雅停止。

quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit

任务定时

与timer结合,一般是实现超时控制定时执行

  • 超时控制
    select {
    case <-time.After(100 * time.Millisecond):
    case <-s.stopc:
        return false
    }
    
  • 定时执行
    func worker() {
    ticker := time.Tick(1 * time.Second)
    for {
        select {
        case <- ticker:
            // 执行定时任务
            fmt.Println("执行 1s 定时任务")
            }
        }
    }
    

解耦生产方和消费方

服务启动时,启动 n 个 worker,作为工作协程池,这些协程工作在一个 for {} 无限循环里,从某个 channel 消费工作任务并执行:

控制并发数

有时需要定时执行几百个任务,例如每天定时按城市来执行一些离线计算的任务。但是并发数又不能太高,因为任务执行过程依赖第三方的一些资源,对请求的速率有限制。这时就可以通过 channel 来控制并发数。

//最多并发三个
var limit = make(chan int, 3)
func main() {
    // …………
    for _, w := range work {
        go func() {
            limit <- 1
            w()
            <-limit
        }()
    }
    // …………
}

构建一个缓冲型的 channel,容量为 3。接着遍历任务列表,每个任务启动一个 goroutine 去完成。真正执行任务,访问第三方的动作在 w() 中完成,在执行 w() 之前,先要从 limit 中拿“许可证”,拿到许可证之后,才能执行 w(),并且在执行完任务,要将“许可证”归还。这样就可以控制同时运行的 goroutine 数。

这里,limit <- 1 放在 func 内部而不是外部,原因是:

如果在外层,就是控制系统 goroutine 的数量,可能会阻塞 for 循环,影响业务逻辑。limit 其实和逻辑无关,只是性能调优,放在内层和外层的语义不太一样。

channel关闭是否还可以读取数据

从一个有缓冲的 channel 里读数据,当 channel 被关闭,依然能读出有效值。只有当返回的 ok 为 false 时,读出的数据才是无效的。

func main() {
    ch := make(chan int, 5)
    ch <- 18
    close(ch)
    x, ok := <-ch
    if ok {
        fmt.Println("received: ", x)
    }
    x, ok = <-ch
    if !ok {
        fmt.Println("channel closed, data invalid.")
    }
}

先创建了一个有缓冲的 channel,向其发送一个元素,然后关闭此 channel。之后两次尝试从 channel 中读取数据,第一次仍然能正常读出值。第二次返回的 ok 为 false,说明 channel 已关闭,且通道里没有数据。

channel happend-before事件

如果事件A与事件B存在 happend-before关系,即a -> b,b完成的结果一定要体现这种关系。

golang的channel,因为是在gorountine中传递,所以happend-before就显得非常必要。channel的happend-before关系如下:

  • 第n个send一定happend-before第n个receive finished,无论缓冲型还是非缓冲型。
  • 对于容量为m的缓冲型channel,第n个receive一定happend-before第n+m个send finished。
  • 对于非缓冲型的channel,第n个receive一定happend-before第n个send finished。(其实这就是第二种情况的特例,m=0的情况)。
  • channel close一定happend-before receive得到通知。

关闭channel流程

func closechan(c *hchan) {
    // 关闭一个 nil channel,panic
    if c == nil {
        panic(plainError("close of nil channel"))
    }
    // 上锁
    lock(&c.lock)
    // 如果 channel 已经关闭
    if c.closed != 0 {
        unlock(&c.lock)
        // panic
        panic(plainError("close of closed channel"))
    }
    // …………
    // 修改关闭状态
    c.closed = 1
    var glist *g
    // 将 channel 所有等待接收队列的里 sudog 释放
    for {
        // 从接收队列里出队一个 sudog
        sg := c.recvq.dequeue()
        // 出队完毕,跳出循环
        if sg == nil {
            break
        }
        // 如果 elem 不为空,说明此 receiver 未忽略接收数据
        // 给它赋一个相应类型的零值
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        // 取出 goroutine
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, unsafe.Pointer(c))
        }
        // 相连,形成链表
        gp.schedlink.set(glist)
        glist = gp
    }
    // 将 channel 等待发送队列里的 sudog 释放
    // 如果存在,这些 goroutine 将会 panic
    for {
        // 从发送队列里出队一个 sudog
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        // 发送者会 panic
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, unsafe.Pointer(c))
        }
        // 形成链表
        gp.schedlink.set(glist)
        glist = gp
    }
    // 解锁
    unlock(&c.lock)
    // Ready all Gs now that we've dropped the channel lock.
    // 遍历链表
    for glist != nil {
        // 取最后一个
        gp := glist
        // 向前走一步,下一个唤醒的 g
        glist = glist.schedlink.ptr()
        gp.schedlink = 0
        // 唤醒相应 goroutine
        goready(gp, 3)
    }
}

整体流程:

一个channel中,recvq和sendq中保存着阻塞的接受者和发送者。关闭channel后,对于接受者,会收到一个对应类型的零值,对于发送者,会直接panic。

close首先会上一把锁,将该channel锁住,接着将recvq和sendq中goroutine全部连成一个sudog链表,在解锁,然后将链表上的goroutine全部唤醒。

优雅的关闭channel

关于channel,有几个地方很不方便:

  • 无法在不改变channel自身状态的情况下,判断一个channel是否已经关闭。
  • 关闭一个已经closed 的channel,会导致panic。
  • 向一个closed的channel发送数据会导致panic。

关闭channel原则:
不要再receiver侧关闭channel,也不要在多个sender侧关闭channel。

根据receiver和sender的个数,可以分为以下几种情况:

  1. 一个sender,一个receiver
  2. 一个sender,N个receiver
  3. N个sender,一个receiver
  4. N个sender,M个receiver

针对1、2情况,都可以在sender侧关闭channel。

针对3情况,可以增加一个传递关闭信号的channel,receiver通过信号channel下大关闭指令,sender监听到关闭指令之后,停止发送数据。

func main() {
    rand.Seed(time.Now().UnixNano())
    const Max = 100000
    const NumSenders = 1000
    dataCh := make(chan int, 100)
    stopCh := make(chan struct{})
    // senders
    for i := 0; i < NumSenders; i++ {
        go func() {
            for {
                select {
                case <- stopCh:
                    return
                case dataCh <- rand.Intn(Max):
                }
            }
        }()
    }
    // the receiver
    go func() {
        for value := range dataCh {
            if value == Max-1 {
                fmt.Println("send stop signal to senders.")
                close(stopCh)
                return
            }
            fmt.Println(value)
        }
    }()
    select {
    case <- time.After(time.Hour):
    }
}

针对4情况:

和第 3 种情况不同,这里有 M 个 receiver,如果直接还是采取第 3 种解决方案,由 receiver 直接关闭 stopCh 的话,就会重复关闭一个 channel,导致 panic。因此需要增加一个中间人,M 个 receiver 都向它发送关闭 dataCh 的“请求”,中间人收到第一个请求后,就会直接下达关闭 dataCh 的指令(通过关闭 stopCh,这时就不会发生重复关闭的情况,因为 stopCh 的发送方只有中间人一个)。另外,这里的 N 个 sender 也可以向中间人发送关闭 dataCh 的请求。

func main() {
    rand.Seed(time.Now().UnixNano())
    const Max = 100000
    const NumReceivers = 10
    const NumSenders = 1000
    dataCh := make(chan int, 100)
    stopCh := make(chan struct{})
    // It must be a buffered channel.
    toStop := make(chan string, 1)
    var stoppedBy string
    // moderator
    go func() {
        stoppedBy = <-toStop
        close(stopCh)
    }()
    // senders
    for i := 0; i < NumSenders; i++ {
        go func(id string) {
            for {
                value := rand.Intn(Max)
                if value == 0 {
                    select {
                    case toStop <- "sender#" + id:
                    default:
                    }
                    return
                }
                select {
                case <- stopCh:
                    return
                case dataCh <- value:
                }
            }
        }(strconv.Itoa(i))
    }
    // receivers
    for i := 0; i < NumReceivers; i++ {
        go func(id string) {
            for {
                select {
                case <- stopCh:
                    return
                case value := <-dataCh:
                    if value == Max-1 {
                        select {
                        case toStop <- "receiver#" + id:
                        default:
                        }
                        return
                    }
                    fmt.Println(value)
                }
            }
        }(strconv.Itoa(i))
    }
    select {
    case <- time.After(time.Hour):
    }
}

操作channel情况总结

操作 nil channel closed channel not nil, not closed channel
close panic panic  正常关闭
读 <- ch 阻塞 读到对应类型的零值 阻塞或者正常读
写 ch <- 阻塞 panic 阻塞或者正常写
暂无评论

发送评论 编辑评论


|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇