Channel基础

核心哲学

Do not communicate by sharing memory; instead, share memory by communicating.

“不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存”,channel便是这一理念的支持和体现

基本操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
var ch chan int          // 零值通道,此外,通道的零值是为nil

ch1 := make(chan int)    // 创建非缓冲通道
close(ch1)               // 关闭通道

ch2 := make(chan struct{})  // struct{}类型的通道


ch3 := make(chan int, 1)  // 创建容量为1的缓冲通道
ch3 <- 1                  // 往通道中写1
data, ok := <- ch2        // 从通道中读取, 根据ok可知是否成功获取值,ok可省

len(ch2)                  // 通道当前长度
cap(ch2)                  // 通道容量

for {                     // 使用select进行多通道操作
    select {
    case <-ch2:           // 接收操作
        fmt.Println("ch1")
    case ch3 <- 2:        // 发送操作
        fmt.Println("ch2")
    default:
        fmt.Println("default")
    }
}

非缓冲通道和缓冲通道的区别

一个容量为0的channel称为一个非缓冲通道(unbuffered channel),一个容量不为0的channel称为一个缓冲通道(buffered channel)

我初学的时候以为容量为1的channel是非缓冲通道,但实际它是缓冲通道。

只有容量是0的通道是非缓冲通道,非缓冲通道初始化后是阻塞的。

非缓冲通道

非缓冲通道非常特殊,因为对于无缓冲通道,发送方会阻塞通道,直到接收方接收到来自通道的数据,而接收方也会阻塞通道,直到发送方将数据发送到通道中。只有接收方和发送方同时处于就绪状态,发送方才能发送成功,或者接收方才能接收成功。

这令人十分困惑,会让人觉得前后矛盾。看下面两个输出等价的例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
package main

import (
	"fmt"
)

func main() {
	done := make(chan struct{})

	go func() {

		fmt.Printf("hello")
		done <- struct{}{}
	}()

	<- done
	fmt.Println(" world")
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
package main

import (
	"fmt"
)

func main() {
	done := make(chan struct{})

	go func() {

		fmt.Printf("hello")
		<- done
	}()

	done <- struct{}{}
	fmt.Println(" world")
}

这两份代码都会输出hello world,但是第二种非常丧心病狂,违反认知。

可以理解为两个goroutine,一个执行到channel的接收,另一个执行到channel的发送,这样才能够继续执行后面的程序,否则将一直阻塞。

单向通道仅起限制作用

单向通道分两种,只发送通道(send only channel) 和 只接收通道(read only channel)

单向通道同样令初学者困惑,只发送怎么接收?只接收怎么发送?

实际上单向通道的用途仅仅是为了限制。单向channel必须配合普通的双向channel使用,且必须先有双向通道,并将这个双向通道转换为单向通道进行使用。

函数将普通通道转换为只读通道,并返回了这个只读通道,那么用户拿到这个channel只能读取而不能发送消息,从而达到限制作用。如Go中的time.Tick,如果用户非法往channel里写数据,那么将会乱套。

使用channel可能出现的边界情况

设channel内的元素类型为T,读取操作定义为data, ok := <-ch

操作 nil通道 非nil但已关闭的通道 非nil且未关闭的通道
关闭 panic: close of nil channel panic: close of closed channel 成功关闭
发送数据 永久阻塞。所有线程休眠后提示deadlock panic: send on closed channel 通道容量不足时阻塞或者成功发送
接收数据 永久阻塞。所有线程休眠后提示deadlock 永不阻塞。若通道中数据已经被全部读取,则返回值是T的零值,ok为false,可以一直读取;若通道中还残留数据,可以继续读取,ok为true 通道无数据时阻塞或者成功接收

下面举两个例子说明比较容易理解错误的对nil通道进行发送会发生什么(接收同理):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	var ch1 chan struct{}

	wg := sync.WaitGroup{}
    wg.Add(1)
	go func() {
		for {
			fmt.Println("sleep")
			time.Sleep(time.Second * 10)
			fmt.Println("wake up")
			break
		}
		wg.Done()
	}()

	fmt.Println("block at nil channel send")
	ch1 <- struct {}{}
	fmt.Println("pass nil channel send")

	wg.Wait()
	fmt.Println("pass wait")
}

这个例子中,nil通道在主线程中,执行到ch1 <- struct {}{}会阻塞,等10秒子线程执行完毕后,会检测到主线程阻塞了,且没有其他子线程在工作,报错:fatal error: all goroutines are asleep - deadlock!

此外,如果去掉goroutine中的break,就可以观察到对nil通道的发送处于一直阻塞的现象。

下面举一个nil通道在子线程中的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	wg := sync.WaitGroup{}
	wg.Add(2)
	go func() {
		var ch1 chan struct{}
		for {
			ch1 <- struct{}{}
			fmt.Println("nil channel send, done")
			break
		}
		wg.Done()
	}()
	go func() {
		for {
			time.Sleep(time.Second * 10)
		}
		wg.Done()
	}()

	wg.Wait()
}

这个例子将永久阻塞,没有任何输出。
此外,如果去掉第二个gorouine,那么会同样报错:fatal error: all goroutines are asleep - deadlock!

关闭channel

关闭channel在实际生产中是个比较复杂的事。

n个发送者,m个接收者,发送者关闭会发生什么?哪个发送者关闭?接收者关闭会发生什么?哪个接收者可以关闭?

其实不必陷入琐碎的细节里,关闭channel核心都围绕着,让发送者或者接收者知道channel要关闭了,执行对应的关闭逻辑即可。

此外,接收者通过接收的布尔返回值是能感知到channel的关闭的,所以关键在于处理发送者。

使用一个额外的channel通知发送者停止发送,这使得发送者变成接收者,这个问题便解决了。

Channel底层实现

数据结构

channel底层主要由一个存放channel数据的循环队列buf,以及存放被阻塞的接收者和发送者gortouine的两个双向链表recvqsendq组成。 runtime.hchan

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
type hchan struct {
	qcount   uint           // channel中已存在的元素个数
	dataqsiz uint           // channel中循环队列的容量
	buf      unsafe.Pointer // 指向一个长度为dataqsiz的数组。
	elemsize uint16         // 每个channel内元素的大小
	closed   uint32         // 是否关闭,1关闭,0未关闭
	elemtype *_type // channel中每个元素类型
	sendx    uint   // 发送的下标(在buf中的下标)
	recvx    uint   // 接收的下标(在buf中的下标)
	recvq    waitq  // 等待接收的goroutine队列
	sendq    waitq  // 等待发送的goroutine队列
	
	// lock保护了所有hchan中的字段,以及部分sudog的字段。
	lock mutex
}

recvqsendq的类型是waitq,它是一个双向链表,有指向头尾的两个指针,每个链表节点的类型是runtime.sudogsudog代表着被channel阻塞的g

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type waitq struct {
	first *sudog
	last  *sudog
}

type sudog struct {
	g *g

	next *sudog
	prev *sudog
	elem unsafe.Pointer 

	acquiretime int64
	releasetime int64
	ticket      uint32

	isSelect bool

	success bool

	parent   *sudog // semaRoot binary tree
	waitlink *sudog // g.waiting list or semaRoot
	waittail *sudog // semaRoot
	c        *hchan // channel
}

创建channel

runtime/chan.go中关于channel创建的方法:

  • func makechan(t *chantype, size int) *hchan {}
  • func makechan64(t *chantype, size int64) *hchan {}

大致原理:分配内存,设置相关字段。

待续

发送数据

runtime/chan.go中channel发送数据相关的方法:

  • func chansend1(c *hchan, elem unsafe.Pointer) {}
  • func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {}
  • func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {}
  • func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {}

原理:

  • 直接发送机制:直接将数据发送给等待者
  • 如果channel未满则直接加入
  • 如果channel满则阻塞(使用select时不会)

goroutine调度

待续

接收数据

runtime/chan.go中channel接收数据相关方法:

  • func chanrecv1(c *hchan, elem unsafe.Pointer){}
  • func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool){}
  • func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {}

原理:

  • 直接接收:从sendq中拿到goroutine,并接收数
  • 从buf中读取
  • 等待buf不为空

待续

Channel花式用法(各种应用场景)

推荐细阅读并动手实践:通道用例大全