探索goroutine的创建


go 1.9.3

GOARCH=“amd64” GOOS=“darwin”

本文使用delve进行调试

创建一个goroutine

一个go语法就很容易地创建出一个goroutine了。(本文也基于这个程序进行分析)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// $GOPATH/test/main.go
package main

import (
	"fmt"
)

var ch = make(chan bool)

func hello() {
	fmt.Println("hello world")
	close(ch)
}

func main() {
	go hello()
	<- ch
}

定位goroutine的创建函数

由于go除了初始化引导部分是使用手写汇编实现之外,其他的像调度器、内存管理、GC都由runtime实现。所以在这儿就只保留与runtime相关的语句。

对照我们的工程代码,不难发现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
$ dlv debug test
Type 'help' for list of commands.
(dlv) disassemble -l main.main
TEXT main.main(SB) /Users/cbsheng/goproject/src/test/main.go
    # 省略部分代码
	main.go:15	0x10b2740	e88b0af8ff		call $runtime.newproc
    # 省略部分代码
	main.go:16	0x10b2759	e8321ff5ff		call $runtime.chanrecv1
    # 省略部分代码
	main.go:14	0x10b2768	e8c30bfaff		call $runtime.morestack_noctxt
(dlv)

继续看看runtime.newproc源码在哪儿。

1
2
3
(dlv) b runtime.newproc
Breakpoint 1 set at 0x10331d0 for runtime.newproc() /usr/local/go/src/runtime/proc.go:2919
(dlv)

找到文件,就来看看庐山真面目吧。

newproc()活比较简单,只是获取参数的起始地址与go hello()这条语句的PC寄存器。真正干活的是newproc1()。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// Create a new g running fn with siz bytes of arguments.
// Put it on the queue of g's waiting to run.
// The compiler turns a go statement into a call to this.
// Cannot split the stack because it assumes that the arguments
// are available sequentially after &fn; they would not be
// copied if a stack split occurred.
//go:nosplit
func newproc(siz int32, fn *funcval) {
	argp := add(unsafe.Pointer(&fn), sys.PtrSize)
	pc := getcallerpc(unsafe.Pointer(&siz))
	systemstack(func() {
		newproc1(fn, (*uint8)(argp), siz, 0, pc)
	})
}

runtime.newproc1()

newproc1() 就比较长了,这儿概括下它做了的事情:

  1. 从TLS拿到当前运行的G实例,并且使绑定到当前线程的M实例不可抢占。
  2. 从M实例上取到P实例,如果P实例本地上有free goroutine就拿过去,没有就到全局调度器那儿偷一些过来。这两个地方都没有,就按照最低栈大小2K new一个G实例(即goroutine)。
  3. 然后设置好G实例上的各种寄存器的信息,SP、PC等。
  4. 将G实例的状态变更为Grunnable,放到P实例的本地可运行队列里等待调度执行,若队列满了,就把一半的G移到全局调度器下。
  5. 释放M实例的不可抢占状态。返回新的G实例。

如果是程序刚启动,经由runtime.rt0_go调用newproc1时,实质干的事情就是创建一个G,把runtime.main(也包含main.main)放进去。在执行mstart时,触发调度。所以main实际是在一个新的G里运行的,而不是g0。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// Create a new g running fn with narg bytes of arguments starting
// at argp and returning nret bytes of results.  callerpc is the
// address of the go statement that created this. The new g is put
// on the queue of g's waiting to run.
func newproc1(fn *funcval, argp *uint8, narg int32, nret int32, callerpc uintptr) *g {
	_g_ := getg()

	if fn == nil {
		_g_.m.throwing = -1 // do not dump full stacks
		throw("go of nil func value")
	}
	_g_.m.locks++ // disable preemption because it can be holding p in a local var
	siz := narg + nret
	siz = (siz + 7) &^ 7

	// We could allocate a larger initial stack if necessary.
	// Not worth it: this is almost always an error.
	// 4*sizeof(uintreg): extra space added below
	// sizeof(uintreg): caller's LR (arm) or return address (x86, in gostartcall).
	// 判断函数参数和返回值的大小是否超出栈大小
	if siz >= _StackMin-4*sys.RegSize-sys.RegSize {
		throw("newproc: function arguments too large for new goroutine")
	}

	_p_ := _g_.m.p.ptr()
	// 拿到一个free的goroutine,没有就从全局调度器偷
	newg := gfget(_p_)
	if newg == nil {
		// 新建g实例,栈大小2K
		newg = malg(_StackMin)
		// g实例状态改成dead
		casgstatus(newg, _Gidle, _Gdead)
		// 将此g实例加入全局的g队列里
		allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
	}
	if newg.stack.hi == 0 {
		throw("newproc1: newg missing stack")
	}

	if readgstatus(newg) != _Gdead {
		throw("newproc1: new g is not Gdead")
	}

	totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame
	totalSize += -totalSize & (sys.SpAlign - 1)                  // align to spAlign
	sp := newg.stack.hi - totalSize
	spArg := sp

	if usesLR {
        // 使用了LR寄存器存放函数调用完毕后的返回地址
		// caller's LR
		*(*uintptr)(unsafe.Pointer(sp)) = 0
		prepGoExitFrame(sp)
		spArg += sys.MinFrameSize
	}
	if narg > 0 {
		memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg))
		// This is a stack-to-stack copy. If write barriers
		// are enabled and the source stack is grey (the
		// destination is always black), then perform a
		// barrier copy. We do this *after* the memmove
		// because the destination stack may have garbage on
		// it.
		if writeBarrier.needed && !_g_.m.curg.gcscandone {
			f := findfunc(fn.fn)
			stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps))
			// We're in the prologue, so it's always stack map index 0.
			bv := stackmapdata(stkmap, 0)
			bulkBarrierBitmap(spArg, spArg, uintptr(narg), 0, bv.bytedata)
		}
	}

	// 将newg.sched结构的内存置0
	memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
	// g实例的调度现场保存SP寄存器
	newg.sched.sp = sp
	// g实例自身也保存SP寄存器
	newg.stktopsp = sp
	// g实例的调度现场保存goexit函数的PC寄存器,这样goroutine执行完后都能做好回收
	newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
	// g实例的调度现场关联上对应的g
	newg.sched.g = guintptr(unsafe.Pointer(newg))
	// g实例的调度现场保存真正待执行函数的PC寄存器
	gostartcallfn(&newg.sched, fn)
	// g实例保存go语句的PC寄存器位置
	newg.gopc = callerpc
	// g实例保存待执行函数的PC寄存器位置
	newg.startpc = fn.fn

	if _g_.m.curg != nil {
		// 如果是在goroutine中再new 一个goroutine,就会有labels?
		newg.labels = _g_.m.curg.labels
	}

	// 存在一些go自己创建的goroutine,如果是就在全局调度器里把数量记录下来
	if isSystemGoroutine(newg) {
		atomic.Xadd(&sched.ngsys, +1)
	}
	// 设置该goroutine不能被gc扫
	newg.gcscanvalid = false
	// 设置goroutine状态为可运行
	casgstatus(newg, _Gdead, _Grunnable)

	// 检查当前p实例里的goroutine id缓存列表是否已经用完,是的话就从全局调度器那儿再获取_GoidCacheBatch个
	if _p_.goidcache == _p_.goidcacheend {
		// Sched.goidgen is the last allocated id,
		// this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].
		// At startup sched.goidgen=0, so main goroutine receives goid=1.
		_p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
		_p_.goidcache -= _GoidCacheBatch - 1
		_p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
	}
	// 设置goroutine id
	newg.goid = int64(_p_.goidcache)
	_p_.goidcache++
	if raceenabled {
		newg.racectx = racegostart(callerpc)
	}
	if trace.enabled {
		traceGoCreate(newg, newg.startpc)
	}
	// 把新建的G推进当前P的本地队列,并提优设置为下一个可运行的G
	runqput(_p_, newg, true)

	if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
		// main方法启动后才进入此if块。唤醒一个空闲的P,如果没有M则创建一个
		wakep()
	}
	_g_.m.locks--
	if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in case we've cleared it in newstack
		_g_.stackguard0 = stackPreempt
	}
	return newg
}