go的channel源码层理解


基于源码1.9.3

原理简述

Go中的channel分为了带buffer和无buffer的。

带buffer的,底层是用了一个循环数组,消息加到数组的最后就会折返到数组的开始。数组满了的时候,就会阻塞发送操作。

无buffer的,底层就不依赖数组了。因为在无buffer的channel上收发消息行为是会直接阻塞的,这种情况处理不妥善会出现死锁问题。一般可以先把读channel读的goroutine先启动起来,再启动写channel。

1
2
3
4
5
6
7
8
9
// 这段代码就会出现死锁,解决方案就是把A和B对调
func main() {
	c := make(chan int)
	c <- 1  // A. 先写channel
	go func() {  // B. 再读channel
		fmt.Println(<-c)
	}()
	time.Sleep(3 * time.Second)
}

当不能继续发送或读取时,就会出现阻塞。具体动作是把当前的goroutine挂起,挂到channel上。而channel是一个结构体(下文配源码)。里面有两个指针(recvq和sendq),分别指向阻塞读和阻塞写的goroutine队列。

在channel上发送和读取消息的逻辑有点不一样。

发送消息时:

  1. channel已经关闭,那就不能发。panic掉。
  2. 看一下有没有阻塞在读操作上的goroutine,有的话,赶紧把数据复制给它。把它安排在下一次调度切换上。(p.runnext)
  3. 没有被阻塞的goroutine。如果带buffer,buffer还有空位,就放在buffer里。否则就阻塞挂起当前发送消息的goroutine。

所以要注意,不要依赖发送到channel的顺序。发送消息时遇到被阻塞读操作的goroutine时,会先满足它们,即使buffer里还有数据。

读取消息时:

  1. channel已经关闭,也可以读,只是读出来的数据为空。
  2. 看一下有没有阻塞的写操作的goroutine,有的话唤醒它。读取它发送的数据(A)。
  3. 读取后的数据(A)放哪,视乎是带buffer还是无buffer。无buffer的话,就直接把写数据(A)给读取者。带buffer的话,就先看buffer里是否有数据(B),有就把数据(B)给读取者,再把数据(A)放到原来数据(B)空出来的位置上。

整个读写过程,都是需要加锁的。毕竟channel是在多个goroutine中被使用。

此外,无论是带buffer或无buffer,数据在 发送者-buffer-读取者 三者流转时。都是采用拷贝的方式(typedmemmove)。这也呼应了go里不存引用传递,都是值传递。所以使用channel时,避免使用过大的数据。而且要改变从channel里读取出来的值时,channel的类型要使用指针类型。

Go提倡通过通信来共享内存,而非通过共享内存来通信,正是使用了channel来达到这个效果。这个就是Hoare提出的CSP(Communicating Sequential Processes)。这种方式也在Occam和Erlang上进行了验证,是没问题的。所以Go也使用上。

了解原理后,会发现channel底层还是用了锁。但channel提供给开发者是一种更高层的并发编程思维和用法,它更易用和直接,减少bug。而非pthread那种基于信号量,锁等较底层的并发编程方式。

源码介绍

channel的数据结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// src/runtime/chan.go
type hchan struct {
	qcount   uint           // 通道的len
	dataqsiz uint           // 通道的cap
	buf      unsafe.Pointer // 指向buffer channel底层的数组
	elemsize uint16  // channel里元素的大小
	closed   uint32  // 标记channel是否close,1 == closed
	elemtype *_type // 通道类型
	sendx    uint   // 指向下一个写位置
	recvx    uint   // 同上,指向下一个读位置
	recvq    waitq  // 阻塞在读上的g,双向链表
	sendq    waitq  // 阻塞在写上的g,双向链表

	// 用于并发控制
	lock mutex
}

通过channel写数据

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
// src/runtime/chan.go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	if c == nil {
		// 不能往一个nil的channel里写数据
		if !block {
			// 不阻塞就直接返回吧
			return false
		}
		// 阻塞当前g,将当前g与m脱离
		gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
		throw("unreachable")
	}

	// ...

	if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
		(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
		return false
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	// 加锁
	lock(&c.lock)

	// 不能给已关闭的channel发送数据
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}

	// 有别的g被阻塞着在等待,唤醒它,赶紧喂它,然后返回成功
	// 所以使用无buffer channel时,必须要先启动进行读channel操作,否则就会出现死锁问题
	if sg := c.recvq.dequeue(); sg != nil {
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

	// buffer channel 还有位置,那就把数据放到buffer里
	if c.qcount < c.dataqsiz {
		// 通过指针运算,找到应该插入buf的位置
		qp := chanbuf(c, c.sendx)
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
		}
		// 然后将发送的数据,拷贝到buf对应位置上。所以对于不论对于指针或非指针,都是值拷贝
		typedmemmove(c.elemtype, qp, ep)
		// 计算好下次插入的位置。底层是循环数组,要注意掉头
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		// 记录循环数组元素总数
		c.qcount++
		// 解锁
		unlock(&c.lock)
		// 返回成功
		return true
	}

	// 发送不了,也不阻塞,就返回失败吧
	if !block {
		unlock(&c.lock)
		return false
	}

	// buf里没有空间了,阻塞自己吧。把g自己放进channel的链表里等待投喂。然后gopark挂起自己这个g
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.selectdone = nil
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	c.sendq.enqueue(mysg)
	goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)

	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	if gp.param == nil {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	releaseSudog(mysg)
	return true
}

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if raceenabled {
		if c.dataqsiz == 0 {
			racesync(c, sg)
		} else {
			qp := chanbuf(c, c.recvx)
			raceacquire(qp)
			racerelease(qp)
			raceacquireg(sg.g, qp)
			racereleaseg(sg.g, qp)
			c.recvx++
			if c.recvx == c.dataqsiz {
				c.recvx = 0
			}
			c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
		}
	}
	if sg.elem != nil {
		sendDirect(c.elemtype, sg, ep)
		sg.elem = nil
	}
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	// 把gp这个g放在p的runnext下,等待调度
	goready(gp, skip+1)
}

通过channel读数据

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	// ...

	if c == nil {
		if !block {
			return
		}
		gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
		throw("unreachable")
	}

	if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
		c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
		atomic.Load(&c.closed) == 0 {
		return
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	// 加锁
	lock(&c.lock)

	// close 了的channel还能读的
	if c.closed != 0 && c.qcount == 0 {
		if raceenabled {
			raceacquire(unsafe.Pointer(c))
		}
		unlock(&c.lock)
		if ep != nil {
			typedmemclr(c.elemtype, ep)
		}
		return true, false
	}

	// 有别的g被阻塞着,唤醒它,让它发送数据
	if sg := c.sendq.dequeue(); sg != nil {
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}

	// qcount>0,说明这是个buffer channel,并且buffer里有数据
	// 那就读出来返回
	if c.qcount > 0 {
		// Receive directly from queue
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
		}
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		unlock(&c.lock)
		return true, true
	}

	if !block {
		unlock(&c.lock)
		return false, false
	}

	// 无论是buffer还是unbuffer,都没数据读了,只好阻塞自己,加到chan的链表里了
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.selectdone = nil
	mysg.c = c
	gp.param = nil
	c.recvq.enqueue(mysg)
	goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)

	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	closed := gp.param == nil
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, !closed
}

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	// unbuffer channel
	if c.dataqsiz == 0 {
		if raceenabled {
			racesync(c, sg)
		}
		if ep != nil {
			// 直接从g上把数据拷贝到ep的位置上
			recvDirect(c.elemtype, sg, ep)
		}
	} else {
		// buffer里有数据,把buffer里的读出来
		// buffer channel
		// buf里有数据,把它读出来
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
			raceacquireg(sg.g, qp)
			racereleaseg(sg.g, qp)
		}
		if ep != nil {
			// 读出来的数据要写到ep对应的地址上
			typedmemmove(c.elemtype, ep, qp)
		}
		// copy data from sender to queue
		// 然后再将被阻塞写的g的数据到buffer里
		// 然后再把阻塞着的g它对应的数据,放到上面buf空出来的位置上
		typedmemmove(c.elemtype, qp, sg.elem)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
	}
	sg.elem = nil
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	// 好了,被阻塞写的g写完后,唤醒它吧
	goready(gp, skip+1)
}

recvDirect和sendDirect

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// 直接从被阻塞写的g上,把数据拷贝到dst的地址上
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
	// sg被阻塞写的g,elem是对应栈或堆上变量的地址,上面存有要发送的数据
	src := sg.elem
	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
	memmove(dst, src, t.size)
}

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
	// sg被阻塞读的g,elem是对应栈或堆上变量的地址
	dst := sg.elem
	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
	memmove(dst, src, t.size)
}