Ants
ants 是一个高性能且低损耗的 goroutine 池。
https://github.com/panjf2000/ants
主要思想:以空间换时间。写 go 并发程序的时候如果程序会启动大量的 goroutine ,势必会消耗大量的系统资源(内存,CPU),通过使用 ants
,可以实例化一个 goroutine 池,复用 goroutine ,节省资源,提升性能。
Ants依赖于Go底层的一些实现,比如Locker、sync.Pool、sync.cond
Locker
Locker是一个接口,任何实现了Locker方法的结构体(类)都可以称作为锁🔒
type Locker interface{
Lock()
Unlock()
}
最典型的sync.Mutex互斥锁就实现了Locker接口,这是一种相对重量级的锁。因此,Ants的作者自己实现了一种轻量级的自旋锁(Spin Lock在短期内,并发程度不是很激烈的情况下,自旋锁会有一定的优势)
该自旋锁的实现原理:
- 使用无符号整型来标识锁的状态,0-未上锁,1-已上锁
- 加锁时0->1,解锁1->0 CAS
- 通过设置一个backoff值来反应抢锁激烈程度,每次抢锁失败都等待backoff次CPU时间片;backoff值随着失败的次数而增加,最高达到16 利用指数退避算法,参见 https://en.wikipedia.org/wiki/Exponential_backoff
Sync.Pool 对象池
sync.Pool是Golang标准库下并发安全的对象池,适合用于有大量对象资源会存在反复构造和销毁的场景,可缓存资源进行复用,以提高性能并减轻GC压力
syns.Pool里的一些操作和其中的回收机制总体上来看都跟BufferPool很像,但具体细节又有所不同。
sync.Pool中有一个local [P]poolLocal 数组,其中len(p) = GOMAXPROCS,也即为每一个p调度器都维护了一个poolLocal类型的元素,poolLocal中分为private对象和sharedList对象(加锁)
核心方法:
Pool.Pin()
Pool.Pin() 将Goroutine和P绑定起来,在执行unpin()方法之前不能解绑。目的:当前Goroutine可能后续会从Pool中无锁化的取到private,如果此时发生了Goroutine的调度切换,可能会产生意料之外的错误,因此需要绑定。
Pool.Get()
已有的话想用直接拿,不要创造
- 先拿私有
- 私有没了,去那SharedList里的head,可能涉及到加锁
- 再没有,去上一轮的poolLocal中拿
- 都没有,调用New()方法创造 (调用用户提前声明的构造器函数)
Pool.Put()
用完了,放回对象池
放回的时候也要先调用Pin()方法,因为先放回的时候也要先尝试放回private,只要和当前的private有交互,就需要和p绑定,这样才能保证和private交互的无锁化。如果当前private为空,则放入,否则放入sharedList的头部。放完后执行Unpin()操作。
回收机制
存入pool的对像会被不定期的回收,因此pool没有容量的概念,即便存入大量元素,也不会发生内存泄露,对象的回收时机不固定,取决于GC的时机
经过一轮GC后Local的数据会被转移至victim(victim [p]poolLocal victim存的是上一轮的数据),再经过一轮GC,victim中的数据就会真正的被GC掉。
Ants 协程池
使用协程池的理由
- 在绝大多数情况下,不需要用到协程池,因为goroutine本身已经很强大,但是会有极端的场景出现。但是当goroutine的量级达到一定程度的时候,大批量的协程创建/销毁成本过高。
- 对并发资源的控制,通过协程池创建协程,从而对全局的并发度进行控制,限定池中协程的数量。
- 可以实时查看当前全局并发协程的数量
- 有一个紧急入口可以一次性的释放全局协程
Ants数据结构
type Pool struct {
capacity int32 //池子的容量
running int32 //运行中的协程数量 已经有多少个goroutine被取出
lock sync.Locker //自实现的自旋锁
workers workerArray //协程列表 goroutine被封装成了go worker
state int32 //池子的状态
cond *sync.Cond //并发协调器 通过调用wait()和signal() 来协调协程 已用完
workerCache sync.Pool //协程回收站
waiting int32 //阻塞等待的协程数量
purgeDone int32
stopPurge context.CancelFunc
ticktockDone int32
stopTicktock context.CancelFunc
now atomic.Value
options *Options //一些定制化配置
}
workerArray和workerCache的区别:workerArray是当前还真实存在且可用的go worker,workerCache是协程回收站,workerCache中的协程是已经被逻辑删除,但还没有被物理删除,需要被GC掉。何时被删除,这取决于GC的时机
go worker
type goWorker struct {
pool *Pool //并不意味着goWorker包含了pool,而是goWorker需要*Pool来找到其属于哪个Pool,找到其回池的路径
task chan func() //非常精彩的设计,所谓的协程池,它能够创建协程,但是创建的协程不能返回,需要其长时间运作,如果go func()真实返回了,那么在后续的使用go Worker还需要调用go func(),这其实是没有达到复用协程的目的。通过不停的轮询该chan来侦测外界是否有任务传递进来,如果没有就阻塞住,并不会返回。
recycleTime time.Time //回收到ceche时间 workArray -> workerCache
}
options
在创建pool时通过构造器传递参数,从而定制化pool
type Options struct{
DisablePurge bool //是否允许回收空闲的goWorker
ExpiryDuration time.Duration //空闲多长时间进行回收,仅当上个参数为False时生效
MaxBlockingTasks int //是否设置为阻塞模式,如果是,goWorker不够时不等待,直接返回err
Nonblocking bool //阻塞还是非阻塞标识 通过cond wait() signal()
PanicHandler func(interface{}) //定制panic处理逻辑 ants实现了一个默认的panic兜底(方法栈打印)
}
workerArray
type workerArray interface {
len() int
isEmpty() bool
insert(worker *goWorker) error
detach() *goWorker
retrieveExpiry(duration time.Duration) []*goWorker //使用到了二分查找 n -> logn 加速
reset()
}