golang标准库sync.Pool原理及源码简析


pool关键作用:

  1. 减轻GC的压力。
  2. 复用对象内存。有时不一定希望复用内存,单纯是想减轻GC压力也可主动给pool塞对象。

Pool’s purpose is to cache allocated but unused items for later reuse, relieving pressure on the garbage collector. That is, it makes it easy to build efficient, thread-safe free lists. However, it is not suitable for all free lists.

原理简述

sync.Pool就是围绕New字段、Get和Put方法来使用。用过都懂,比较简单就不介绍了。

Go是提供goroutine进行并发编程,在并发环境下,sync.Pool的使用不会造成严重性能问题是它的设计考虑点。

容易想到的方法是Pool对象为每个P都分配一个空间,这样在P上运行的G进行Get和Put操作时,就可以在P本地的空间上读写。这样方法比Pool对象维护一个全局空间有明显好处,全局空间的读写肯定要加锁。

即使每个P都有了自己的本地空间,也不是说就可以完全避免锁使用。不要忘了Pool提供了内存复用功效,每个P上的G都使用的是P本地的空间的话,那内存复用就有局限性,只能局限在一个P上。

而pool提供的内存复用是覆盖所有P。意思是,一个G在执行Get方法时,发生G所在的P上,没有可复用的对象。这时就到别的P那儿去偷。偷这个动作就要加锁了。因为偷取别人可复用对象时候,别人也可能同时在读写。

前面开始说每个P有自己的空间,作用是避免锁,后面又说到别的P上偷对象,又要加锁。是不是矛盾了。

不矛盾,让我们来看看sync.Pool的实现原理。

sync.Pool对象底层两个关键字段,local和localSize,前者是指向一个数组,数组大小存在localSize。localSize的大小跟P个数保持一致。数组每个元素就是代表每个P自己的本地空间,类型是poolLocal

poolLocal类型有两个关键字段,private和shared

来理一下在Pool对象上读写的逻辑:

  1. Get操作时,先返回本地P上的private上的对象。
  2. 如果private为空,继续从本地P上的shared找,这里要加锁。
  3. 如果shared也没有,就到别的P那儿,从shared里偷。
  4. 所有其它P都遍历过了,没有任何对象可偷。就返回nil或调用New函数。

  5. Put操作时,优先放private。

  6. private已经被放了,那就放到shared的最后。

用一张图来表示:

sync_pool

sync.Pool的特性

sync.Pool的缺点

pool的Get()并非成本低廉,最坏情况可能会上锁runtime.GOMAXPROCS(0)次。

所以,多Goroutine与多P的情况下,使用Pool的效果才会突显。否则要经历无谓的锁成本。

简单的常用场景

bytes.Buffer作为临时对象放在池子里,这样减轻每次都需要创建的消耗。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
type Dao struct {
    bp      sync.Pool
}

func New(c *conf.Config) (d *Dao) {
    d = &Dao{
        bp: sync.Pool{
            New: func() interface{} {
                return &bytes.Buffer{}
            },
        },
    }
    return
}

func (d *Dao) Infoc(args ...string) (value string, err error) {
    if len(args) == 0 {
        return
    }

    // fetch a buf from bufpool
    buf, ok := d.bp.Get().(*bytes.Buffer)
    if !ok {
        return "", ErrType
    }

    // append first arg
    if _, err := buf.WriteString(args[0]); err != nil {
        return "", err
    }

    for _, arg := range args[1:] {
        // append ,arg
        if _, err := buf.WriteString(defaultSpliter); err != nil {
            return "", err
        }

        if _, err := buf.WriteString(strings.Replace(arg, defaultSpliter, defaultReplacer, -1)); err != nil {
            return "", err
        }
    }

    value = buf.String()
    buf.Reset()
    d.bp.Put(buf)
    return

}

带注释的源码

sync.Pool数据结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// pool 的数据结构
type Pool struct {
	noCopy noCopy
	// 指向一个数组,个数与P相等,每个元素的类型为poolLocalInternal
	local     unsafe.Pointer
	// local数组的大小
	localSize uintptr
	// 创建pool对象时,用户必须提供的new函数
	New func() interface{}
}

type poolLocalInternal struct {
	// 私有对象,每个P都有,用于不同g执行get和put可以无锁操作
	private interface{}
	// 共享对象数组,每个P都有一个,同一个P上不同g可以多次执行put方法,需要有地方能存储。并且别的p上的g可能过来偷,所以要加锁
	shared  []interface{}
	// 对shared进行加锁,private不用加锁
	Mutex
}

type poolLocal struct {
	poolLocalInternal

	// Prevents false sharing on widespread platforms with
	// 128 mod (cache line size) = 0 .
	pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

Get方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
func (p *Pool) Put(x interface{}) {
	if x == nil {
		return
	}
	// ...
	// 拿到当前P对应的pool
	l := p.pin()
	if l.private == nil {
		// 私有区有位置的话直接放私有区
		l.private = x
		x = nil
	}
	runtime_procUnpin()
	if x != nil {
		// 否则放在共享区里
		l.Lock()
		l.shared = append(l.shared, x)
		l.Unlock()
	}
  // ...
}

func (p *Pool) pin() *poolLocal {
	// 拿到当前P的ID
	pid := runtime_procPin()
	s := atomic.LoadUintptr(&p.localSize)
	l := p.local
	if uintptr(pid) < s {
		// 定义pool对象时,s取值为0。只有经过pinSlow后,p.localSize的值才被设置
		// 如果local数组已经初始化,就可以把对应P的本地pool返回
		return indexLocal(l, pid)
	}
	// 否则就得重建local
	return p.pinSlow()
}

func (p *Pool) pinSlow() *poolLocal {
	runtime_procUnpin()
	// 锁上所有的pool对象
	allPoolsMu.Lock()
	defer allPoolsMu.Unlock()
	pid := runtime_procPin()
	s := p.localSize
	l := p.local
	if uintptr(pid) < s {
    // pinSlow是一个创建local的方法。在获得allPoolsMu锁前,可能被别的P先获取,这种情况下local就已经被初始化了
		// 所以在获得allPoolsMu锁后需要再检查一次uintptr(pid) < s
		return indexLocal(l, pid)
	}
	if p.local == nil {
		allPools = append(allPools, p)
	}
	// local的大小默认就是P的个数
	size := runtime.GOMAXPROCS(0)
	local := make([]poolLocal, size)
	atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // 设置local
	atomic.StoreUintptr(&p.localSize, uintptr(size))         // 设置localSize
	return &local[pid]
}

Put方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func (p *Pool) Put(x interface{}) {
	if x == nil {
		return
	}
	// ...
	// 拿到当前P对应的pool
	l := p.pin()
	if l.private == nil {
		// 私有区有位置的话直接放私有区
		l.private = x
		x = nil
	}
	runtime_procUnpin()
	if x != nil {
		// 否则放在共享区里
		l.Lock()
		l.shared = append(l.shared, x)
		l.Unlock()
	}
	// ...
}

runtime_procPin和runtime_procUnpin

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//go:linkname sync_runtime_procPin sync.runtime_procPin
//go:nosplit
func sync_runtime_procPin() int {
	return procPin()
}

//go:nosplit
func procPin() int {
	_g_ := getg()
	mp := _g_.m

	mp.locks++
	return int(mp.p.ptr().id)
}

//go:linkname sync_atomic_runtime_procUnpin sync/atomic.runtime_procUnpin
//go:nosplit
func sync_atomic_runtime_procUnpin() {
	procUnpin()
}

//go:nosplit
func procUnpin() {
	_g_ := getg()
	_g_.m.locks--
}