sync once

sync.Once只有一个方法,sync.Do(),通常我们用来调用在多个地方执行保证只执行一次的方法,如初始化,关闭fd等 具体代码如下,通过atomic获取done,为0则抢锁执行do传进来的方法,执行完后将done atomic变成1

type Once struct {
	done uint32
	m    Mutex
}
func (o *Once) Do(f func()) {
	if atomic.LoadUint32(&o.done) == 0 {
		o.doSlow(f)
	}
}

func (o *Once) doSlow(f func()) {
	o.m.Lock()
	defer o.m.Unlock()
	if o.done == 0 {
		defer atomic.StoreUint32(&o.done, 1)
		f()
	}
}

sync.pool

主要为了减少gc,减少gc在mark的时候需要标记的inuse_objects数量过多。通俗点来说就是一个内存池,我们自己实现g et,put。
需要注意的是,仅存在与pool中,在外没有被其它地方引用,可能会被pool删除,下面我们来看看调用sync.Pool具体流程
img.png
根据流程图我们先一起梳理下,所有的pool都被存储在allPools这个数组中,注意不要不停的生成新的sync.Pool,allPools是加锁去append

sync.pool init

pool的初始化调用的是pool.poolCleanup的方法,主要逻辑如下,将old里每个p清空,将allpools里p的local,localSize移动到victim,victimSize
然后将allpool移动给oldPools,方便后续清空

func poolCleanup() {
	for _, p := range oldPools {
		p.victim = nil
		p.victimSize = 0
	}

	for _, p := range allPools {
		p.victim = p.local
		p.victimSize = p.localSize
		p.local = nil
		p.localSize = 0
	}
	
	oldPools, allPools = allPools, nil
}

pool.get

现在我们来看看get的逻辑

  • 通过调用pool.Get->pool.pin->pool.pinSlow获取一个poolLocal,具体代码如下
func (p *Pool) Get() any {
    l, pid := p.pin()
}
func (p *Pool) pin() (*poolLocal, int) {
	//获取pid,使g绑定p并禁止抢占(本地队列)
	pid := runtime_procPin()
	s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
	l := p.local                              // load-consume
	//如果pid比我本地Pool.localSize小则直接返回
	if uintptr(pid) < s {
		return indexLocal(l, pid), pid
	}
	//生成新的poolLocal
	return p.pinSlow()
}
func (p *Pool) pinSlow() (*poolLocal, int) {
	//解除独占
	runtime_procUnpin()
	//加锁
	allPoolsMu.Lock()
	defer allPoolsMu.Unlock()
	pid := runtime_procPin()
	// poolCleanup won't be called while we are pinned.
	s := p.localSize
	l := p.local
	if uintptr(pid) < s {
		return indexLocal(l, pid), pid
	}
	//将poolLocal追加到allPools数组里
	if p.local == nil {
		allPools = append(allPools, p)
	}
	// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
	size := runtime.GOMAXPROCS(0)
	//local根据p的个数设置数组长度
	local := make([]poolLocal, size)
	atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
	runtime_StoreReluintptr(&p.localSize, uintptr(size))     // store-release
	return &local[pid], pid
}

然后我们接着看后续逻辑

func (p *Pool) Get() any {
	l, pid := p.pin()
	//最优先元素,类似于p.runnext,或者cpu l1 缓存
	x := l.private
	l.private = nil
	//如果没取到private
	if x == nil {
		// Try to pop the head of the local shard. We prefer
		// the head over the tail for temporal locality of
		// reuse.
		//去shared取,pool.local[pid].poolLocal.shared
		x, _ = l.shared.popHead()
		//shared没拿到,去别的pid里对应的poolLocal.shared偷一个
		if x == nil {
			x = p.getSlow(pid)
		}
	}
}

我们来看看从别的poolLocal获取内存具体的操作

func (p *Pool) getSlow(pid int) any {
	size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
	locals := p.local                            // load-consume
	//从别的slow里偷
	for i := 0; i < int(size); i++ {
		l := indexLocal(locals, (pid+i+1)%int(size))
		if x, _ := l.shared.popTail(); x != nil {
			return x
		}
	}
	//在victim里找
	size = atomic.LoadUintptr(&p.victimSize)
	if uintptr(pid) >= size {
		return nil
	}
	locals = p.victim
	l := indexLocal(locals, pid)
	if x := l.private; x != nil {
		l.private = nil
		return x
	}
    //在victim里别的p上偷
	for i := 0; i < int(size); i++ {
		l := indexLocal(locals, (pid+i)%int(size))
		if x, _ := l.shared.popTail(); x != nil {
			return x
		}
	}

	// Mark the victim cache as empty for future gets don't bother
	// with it.
	atomic.StoreUintptr(&p.victimSize, 0)
    //都没有返回nil
	return nil
}

若果没有从getSlow上获取,还会调用pool.New,也就是我们sync.Pool{New:func}的方法,都拿不到则返回为nil

if x == nil && p.New != nil {
    x = p.New()
}
return x

总结下,get的具体逻辑为

  • 是否有Pool,没有则生成Pool并添加到runtime.allPools上
  • 从Pool.poolLocal[pid].poolLocalInternal.private上拿,拿不到去shared上拿,还拿不到去别的pid上的shared拿
  • 再拿不到从别的即将要被清空的Pool.victim上拿,逻辑如上,还拿不到则执行sync.Pool{New:func}。还拿不到则返回nil

pool.put

下面我们再来看看Pool.put,先放到Pool.poolLocal[pid].private,如果有值了,调用Pool.poolLocal[pid].shared.pushHead pushHead里如果没值,则生成poolChainElt,且val长度为8,如果满了,则扩容,每次创建新的poolChainElt并扩容val长度都会*2,最大10亿多,并将新生成的
poolChainElt通过双向链表挂到之前的poolChainElt后面

// Put adds x to the pool.
func (p *Pool) Put(x any) {
	if x == nil {
		return
	}
	//获取一个[pid]poolLocal
	l, _ := p.pin()
    //[pid]poolLocal.poolLocalInternal.Private为空则赋值
	if l.private == nil {
		l.private = x
	} else {
        //放[pid]poolLocal.poolLocalInternal.shared.head上
		l.shared.pushHead(x)
	}
	runtime_procUnpin()
	if race.Enabled {
		race.Enable()
	}
}
//l.shared.pushHead(x)
func (c *poolChain) pushHead(val any) {
	d := c.head
	//head为空则生成一个poolChainElt,且val为8长度的双向链表节点
	if d == nil {
		// Initialize the chain.
		const initSize = 8 // Must be a power of 2
		d = new(poolChainElt)
		d.vals = make([]eface, initSize)
		c.head = d
		storePoolChainElt(&c.tail, d)
	}
    //放入节点的head
	if d.pushHead(val) {
		return
	}
	//没放入成功,满了 则生成下一个链表节点,并将节点容量并乘2
	newSize := len(d.vals) * 2
	if newSize >= dequeueLimit {
		// Can't make it any bigger.
		newSize = dequeueLimit
	}

	d2 := &poolChainElt{prev: d}
	d2.vals = make([]eface, newSize)
	c.head = d2
	storePoolChainElt(&d.next, d2)
	//扩容后重新放入
	d2.pushHead(val)
}

sync gc部分

同上sync.pool.init 将Pool.local移动到Pool.victim ,将Pool.localSize移动到Pool.victimSize并将上一次的Pool.victim,Pool.victimSize清空

lock 锁实现基础

锁的定义如下,先看下state字段,sema后文会详细介绍

// src/sync/mutex.go
type Mutex struct {
	//32位 前29位表示有多少人g等待互斥锁的释放,后三位分别为mutexLocked,mutexWoken,mutexStarving
	state int32
	//信号量 对应在src/runtime/sema.go上的semTabSize存储,挂着等待锁的g
	sema  uint32
}
  • mutexLocked表示是否已经加锁 1为已加锁
  • mutexWoken 表示从正常模式被从唤醒
  • mutexStarving 进入饥饿模式

锁正常模式和饥饿模式

正常模式下,锁是先入先出,排队获取锁,新唤醒的g与新创建的g竞争时(这时候占有cpu),大概率拿不到锁,go里面解决办法是出现超过1ms没拿到锁的g,锁进入饥饿模式 饥饿模式下,互斥锁会直接交给等待队列最前面的g,新的g直接丢在队尾,且不能自旋(没有cpu片)
取消条件为队尾的g获取到了锁,或者当前g获取锁1ms内

加锁逻辑

加锁逻辑,如果状态位是0,则改成1然后加锁成功,如果已经有锁的情况下,进入m.lockSlow()逻辑

// src/sync/mutex.go
func (m *Mutex) Lock() {
	// Fast path: grab unlocked mutex.
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		return
	}
	// Slow path (outlined so that the fast path can be inlined)
	m.lockSlow()
}

如果当前不能加锁,则进入自旋等模式等待锁释放,大体流程如下

  • 通过runtime_canSpin判断是否能进入自旋
    • 运行在多 CPU 的机器上
    • 当前g获取锁进入自旋少于4次
    • 至少有个p runq为空
  • 通过自旋等待互斥锁的释放
  • 计算互斥锁的最新状态
  • 更新互斥锁的状态并获取锁
func (m *Mutex) lockSlow() {
	//省略自旋获取锁代码
	//通过信号量保证只会有一个g获取到锁,也就是我们开头的Mutex.sema
	runtime_SemacquireMutex(&m.sema, queueLifo, 1)
}

下面我们看看信号量的逻辑,了解完信号量后接着分析runtime_SemacquireMutex

信号量详解

信号量的存储方式如下图,全局一个semtable,semtable是一个251长度的数据,每一个数组元素根据&m.sema hash后,用二叉树存储
比如lock A addr=1,lock b addr = 252,根据hash后,都在semtable[1]里,用树存储,而等待锁lock A的所有的g都通过sudog用
waitlink连接,具体结构图如下 sync.lock.jpg 下面我们接着看runtime_SemacquireMutex函数

//runtime_SemacquireMutex()->semacquire1()
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason) {
	//获取当前的g
	gp := getg()
    //获取一个sudog
	s := acquireSudog()
	//通过地址hash得到二叉树跟目录
	root := semtable.rootFor(addr)
	for {
		lockWithRank(&root.lock, lockRankRoot)
        //root上等待锁的sudog的个数
		root.nwait.Add(1)
		if cansemacquire(addr) {
			root.nwait.Add(-1)
			unlock(&root.lock)
			break
		}
        //找到该addr在树上对应的节点
		root.queue(addr, s, lifo)
		//阻塞gopark,解锁的时候goready恢复
		goparkunlock(&root.lock, reason, traceEvGoBlockSync, 4+skipframes)
		if s.ticket != 0 || cansemacquire(addr) {
			break
		}
	}
	releaseSudog(s)
}

信号量解锁详解

具体代码与加锁的信号量处理差不多,简略代码如下,有兴趣的可以看看原来代码

// src/runtime/sema.go
func semrelease1(addr *uint32, handoff bool, skipframes int) {
    readyWithTime(s, 5+skipframes)
}
func readyWithTime(s *sudog, traceskip int) {
  if s.releasetime != 0 {
    s.releasetime = cputicks()
  }
    goready(s.g, traceskip)
}

读写锁RWMutex

我们依次分析获取写锁和读锁的实现原理

type RWMutex struct {
	w           Mutex        // 复用读写锁
	writerSem   uint32       // 写等待读信号量
	readerSem   uint32       // 读等待写信号量
	readerCount atomic.Int32 // 执行的读操作数量
	readerWait  atomic.Int32 // 写操作被阻塞时等待的读操作个数
}

写锁

复用mutex锁,分几种情况

  • 已有写锁,rw.w.Lock()将会阻塞,上文mutex逻辑,同时将readerCount设置最小值
  • readerCount有值,说明有读锁,调用runtime_SemacquireRWMutex休眠,等待读锁释放writerSem信号量
func (rw *RWMutex) Lock() {
	rw.w.Lock()
	r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
	if r != 0 && rw.readerWait.Add(r) != 0 {
		runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
	}
}

读锁

readerCount为负数(-1<<30),则说明有写锁
直接调用runtime_SemacquireRWMutexR,如果有写锁,会阻塞readerSem 如果该方法的结果为非负数,则通过信号量readerSem阻塞,后续等待Unlock解写锁唤醒
bmap.png

func (rw *RWMutex) RLock() {
	if rw.readerCount.Add(1) < 0 {
		// A writer is pending, wait for it.
		runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
	}
}