Ants库学习

目录

Ants

ants 是一个高性能且低损耗的 goroutine 池。

https://github.com/panjf2000/ants

主要思想:以空间换时间。写 go 并发程序的时候如果程序会启动大量的 goroutine ,势必会消耗大量的系统资源(内存,CPU),通过使用 ants,可以实例化一个 goroutine 池,复用 goroutine ,节省资源,提升性能。

Ants依赖于Go底层的一些实现,比如Locker、sync.Pool、sync.cond

Locker

Locker是一个接口,任何实现了Locker方法的结构体(类)都可以称作为锁🔒

type Locker interface{
    Lock()
    Unlock()
}

最典型的sync.Mutex互斥锁就实现了Locker接口,这是一种相对重量级的锁。因此,Ants的作者自己实现了一种轻量级的自旋锁(Spin Lock在短期内,并发程度不是很激烈的情况下,自旋锁会有一定的优势)

该自旋锁的实现原理:

  1. 使用无符号整型来标识锁的状态,0-未上锁,1-已上锁
  2. 加锁时0->1,解锁1->0 CAS
  3. 通过设置一个backoff值来反应抢锁激烈程度,每次抢锁失败都等待backoff次CPU时间片;backoff值随着失败的次数而增加,最高达到16 利用指数退避算法,参见 https://en.wikipedia.org/wiki/Exponential_backoff

Sync.Pool 对象池

sync.Pool是Golang标准库下并发安全的对象池,适合用于有大量对象资源会存在反复构造和销毁的场景,可缓存资源进行复用,以提高性能并减轻GC压力 syns.Pool里的一些操作和其中的回收机制总体上来看都跟BufferPool很像,但具体细节又有所不同。

sync.Pool中有一个local [P]poolLocal 数组,其中len(p) = GOMAXPROCS,也即为每一个p调度器都维护了一个poolLocal类型的元素,poolLocal中分为private对象和sharedList对象(加锁)

核心方法:

Pool.Pin()

Pool.Pin() 将Goroutine和P绑定起来,在执行unpin()方法之前不能解绑。目的:当前Goroutine可能后续会从Pool中无锁化的取到private,如果此时发生了Goroutine的调度切换,可能会产生意料之外的错误,因此需要绑定。

Pool.Get()

已有的话想用直接拿,不要创造

  1. 先拿私有
  2. 私有没了,去那SharedList里的head,可能涉及到加锁
  3. 再没有,去上一轮的poolLocal中拿
  4. 都没有,调用New()方法创造 (调用用户提前声明的构造器函数)

Pool.Put()

用完了,放回对象池

放回的时候也要先调用Pin()方法,因为先放回的时候也要先尝试放回private,只要和当前的private有交互,就需要和p绑定,这样才能保证和private交互的无锁化。如果当前private为空,则放入,否则放入sharedList的头部。放完后执行Unpin()操作。

回收机制

存入pool的对像会被不定期的回收,因此pool没有容量的概念,即便存入大量元素,也不会发生内存泄露,对象的回收时机不固定,取决于GC的时机

经过一轮GC后Local的数据会被转移至victim(victim [p]poolLocal victim存的是上一轮的数据),再经过一轮GC,victim中的数据就会真正的被GC掉。

Ants 协程池

使用协程池的理由

  1. 在绝大多数情况下,不需要用到协程池,因为goroutine本身已经很强大,但是会有极端的场景出现。但是当goroutine的量级达到一定程度的时候,大批量的协程创建/销毁成本过高。
  2. 对并发资源的控制,通过协程池创建协程,从而对全局的并发度进行控制,限定池中协程的数量。
  3. 可以实时查看当前全局并发协程的数量
  4. 有一个紧急入口可以一次性的释放全局协程

Ants数据结构

type Pool struct {
	capacity     int32       //池子的容量
	running      int32       //运行中的协程数量  已经有多少个goroutine被取出
	lock         sync.Locker //自实现的自旋锁
	workers      workerArray //协程列表  goroutine被封装成了go worker 
	state        int32       //池子的状态
    cond         *sync.Cond  //并发协调器   通过调用wait()和signal() 来协调协程  已用完
	workerCache  sync.Pool   //协程回收站
	waiting      int32       //阻塞等待的协程数量
	purgeDone    int32
	stopPurge    context.CancelFunc
	ticktockDone int32
	stopTicktock context.CancelFunc
	now          atomic.Value
	options      *Options //一些定制化配置
}

workerArray和workerCache的区别workerArray是当前还真实存在且可用的go worker,workerCache是协程回收站workerCache中的协程是已经被逻辑删除但还没有被物理删除需要被GC掉何时被删除这取决于GC的时机

go worker

type goWorker struct {
    pool *Pool   //并不意味着goWorker包含了pool,而是goWorker需要*Pool来找到其属于哪个Pool,找到其回池的路径
    task chan func() //非常精彩的设计,所谓的协程池,它能够创建协程,但是创建的协程不能返回,需要其长时间运作,如果go func()真实返回了,那么在后续的使用go Worker还需要调用go func(),这其实是没有达到复用协程的目的。通过不停的轮询该chan来侦测外界是否有任务传递进来,如果没有就阻塞住,并不会返回。
    recycleTime time.Time //回收到ceche时间    workArray -> workerCache
}

options

在创建pool时通过构造器传递参数,从而定制化pool

type Options struct{
    DisablePurge bool //是否允许回收空闲的goWorker
    ExpiryDuration time.Duration //空闲多长时间进行回收,仅当上个参数为False时生效
    MaxBlockingTasks int //是否设置为阻塞模式,如果是,goWorker不够时不等待,直接返回err
    Nonblocking bool  //阻塞还是非阻塞标识  通过cond  wait() signal()
    PanicHandler func(interface{}) //定制panic处理逻辑  ants实现了一个默认的panic兜底(方法栈打印)
}

workerArray

type workerArray interface {
	len() int
	isEmpty() bool
	insert(worker *goWorker) error
	detach() *goWorker
	retrieveExpiry(duration time.Duration) []*goWorker //使用到了二分查找 n -> logn 加速
	reset()
}
取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦