golang标准库sync.Pool原理及源码简析
pool关键作用:
- 减轻GC的压力。
- 复用对象内存。有时不一定希望复用内存,单纯是想减轻GC压力也可主动给pool塞对象。
Pool’s purpose is to cache allocated but unused items for later reuse, relieving pressure on the garbage collector. That is, it makes it easy to build efficient, thread-safe free lists. However, it is not suitable for all free lists.
原理简述
sync.Pool就是围绕New字段、Get和Put方法来使用。用过都懂,比较简单就不介绍了。
Go是提供goroutine进行并发编程,在并发环境下,sync.Pool的使用不会造成严重性能问题是它的设计考虑点。
容易想到的方法是Pool对象为每个P都分配一个空间,这样在P上运行的G进行Get和Put操作时,就可以在P本地的空间上读写。这样方法比Pool对象维护一个全局空间有明显好处,全局空间的读写肯定要加锁。
即使每个P都有了自己的本地空间,也不是说就可以完全避免锁使用。不要忘了Pool提供了内存复用功效,每个P上的G都使用的是P本地的空间的话,那内存复用就有局限性,只能局限在一个P上。
而pool提供的内存复用是覆盖所有P。意思是,一个G在执行Get方法时,发生G所在的P上,没有可复用的对象。这时就到别的P那儿去偷。偷这个动作就要加锁了。因为偷取别人可复用对象时候,别人也可能同时在读写。
前面开始说每个P有自己的空间,作用是避免锁,后面又说到别的P上偷对象,又要加锁。是不是矛盾了。
不矛盾,让我们来看看sync.Pool的实现原理。
sync.Pool对象底层两个关键字段,local和localSize,前者是指向一个数组,数组大小存在localSize。localSize的大小跟P个数保持一致。数组每个元素就是代表每个P自己的本地空间,类型是poolLocal。
poolLocal类型有两个关键字段,private和shared:
- shared是一个数组,读写要加锁。
- private只能存一个对象,读写不加锁。
来理一下在Pool对象上读写的逻辑:
- Get操作时,先返回本地P上的private上的对象。
- 如果private为空,继续从本地P上的shared找,这里要加锁。
- 如果shared也没有,就到别的P那儿,从shared里偷。
所有其它P都遍历过了,没有任何对象可偷。就返回nil或调用New函数。
Put操作时,优先放private。
private已经被放了,那就放到shared的最后。
用一张图来表示:
sync.Pool的特性
- 无大小限制。
- 自动清理,每次GC前会清掉Pool里的所有对象。所以不适用于做连接池。
- 每个P都会有一个本地的poolLocal,Get和Put优先在当前P的本地poolLocal操作。其次再进行跨P操作。
- 所以Pool的最大个数是runtime.GOMAXPROCS(0)。
sync.Pool的缺点
pool的Get()并非成本低廉,最坏情况可能会上锁runtime.GOMAXPROCS(0)次。
所以,多Goroutine与多P的情况下,使用Pool的效果才会突显。否则要经历无谓的锁成本。
简单的常用场景
bytes.Buffer作为临时对象放在池子里,这样减轻每次都需要创建的消耗。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
type Dao struct {
bp sync.Pool
}
func New(c *conf.Config) (d *Dao) {
d = &Dao{
bp: sync.Pool{
New: func() interface{} {
return &bytes.Buffer{}
},
},
}
return
}
func (d *Dao) Infoc(args ...string) (value string, err error) {
if len(args) == 0 {
return
}
// fetch a buf from bufpool
buf, ok := d.bp.Get().(*bytes.Buffer)
if !ok {
return "", ErrType
}
// append first arg
if _, err := buf.WriteString(args[0]); err != nil {
return "", err
}
for _, arg := range args[1:] {
// append ,arg
if _, err := buf.WriteString(defaultSpliter); err != nil {
return "", err
}
if _, err := buf.WriteString(strings.Replace(arg, defaultSpliter, defaultReplacer, -1)); err != nil {
return "", err
}
}
value = buf.String()
buf.Reset()
d.bp.Put(buf)
return
}
|
带注释的源码
sync.Pool数据结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
// pool 的数据结构
type Pool struct {
noCopy noCopy
// 指向一个数组,个数与P相等,每个元素的类型为poolLocalInternal
local unsafe.Pointer
// local数组的大小
localSize uintptr
// 创建pool对象时,用户必须提供的new函数
New func() interface{}
}
type poolLocalInternal struct {
// 私有对象,每个P都有,用于不同g执行get和put可以无锁操作
private interface{}
// 共享对象数组,每个P都有一个,同一个P上不同g可以多次执行put方法,需要有地方能存储。并且别的p上的g可能过来偷,所以要加锁
shared []interface{}
// 对shared进行加锁,private不用加锁
Mutex
}
type poolLocal struct {
poolLocalInternal
// Prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
|
Get方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
// ...
// 拿到当前P对应的pool
l := p.pin()
if l.private == nil {
// 私有区有位置的话直接放私有区
l.private = x
x = nil
}
runtime_procUnpin()
if x != nil {
// 否则放在共享区里
l.Lock()
l.shared = append(l.shared, x)
l.Unlock()
}
// ...
}
func (p *Pool) pin() *poolLocal {
// 拿到当前P的ID
pid := runtime_procPin()
s := atomic.LoadUintptr(&p.localSize)
l := p.local
if uintptr(pid) < s {
// 定义pool对象时,s取值为0。只有经过pinSlow后,p.localSize的值才被设置
// 如果local数组已经初始化,就可以把对应P的本地pool返回
return indexLocal(l, pid)
}
// 否则就得重建local
return p.pinSlow()
}
func (p *Pool) pinSlow() *poolLocal {
runtime_procUnpin()
// 锁上所有的pool对象
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
pid := runtime_procPin()
s := p.localSize
l := p.local
if uintptr(pid) < s {
// pinSlow是一个创建local的方法。在获得allPoolsMu锁前,可能被别的P先获取,这种情况下local就已经被初始化了
// 所以在获得allPoolsMu锁后需要再检查一次uintptr(pid) < s
return indexLocal(l, pid)
}
if p.local == nil {
allPools = append(allPools, p)
}
// local的大小默认就是P的个数
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // 设置local
atomic.StoreUintptr(&p.localSize, uintptr(size)) // 设置localSize
return &local[pid]
}
|
Put方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
// ...
// 拿到当前P对应的pool
l := p.pin()
if l.private == nil {
// 私有区有位置的话直接放私有区
l.private = x
x = nil
}
runtime_procUnpin()
if x != nil {
// 否则放在共享区里
l.Lock()
l.shared = append(l.shared, x)
l.Unlock()
}
// ...
}
|
runtime_procPin和runtime_procUnpin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
//go:linkname sync_runtime_procPin sync.runtime_procPin
//go:nosplit
func sync_runtime_procPin() int {
return procPin()
}
//go:nosplit
func procPin() int {
_g_ := getg()
mp := _g_.m
mp.locks++
return int(mp.p.ptr().id)
}
//go:linkname sync_atomic_runtime_procUnpin sync/atomic.runtime_procUnpin
//go:nosplit
func sync_atomic_runtime_procUnpin() {
procUnpin()
}
//go:nosplit
func procUnpin() {
_g_ := getg()
_g_.m.locks--
}
|