1 设计原理

由于我们服务器基本上都是基于linux,所以本文我们只讲golang在linux的实现

1.1 I/O模型

操作系统中包含阻塞I/O,非阻塞I/O,信号驱动I/O,异步I/O,多路复用I/O,我们本文仅介绍多路复用I/O 也就是golang网络轮训器实现的原理。

1.2 多路复用io

  • select select有了epoll后就很少用了,本文大概过一下,select仅支持1024个文件描述符(数组大小为1024),即fd,内存开销大,时间复杂度为便利数组O(n)
  • epoll 2.8内核版本之前会惊群,有兴趣的读者可以看看//todo 写一篇关于epoll的文章,对照源码以及画图更好讲述 epoll的数据结构为rbtree对<fd,event>的存储,ready队列存储就绪io,所以时间复杂度为O(logn) epoll的回调,分为et(边缘触发,有消息变化触发一次),lt(水平触发,消息回调触发多次)

2 golang tcp netpoll

golang netpoll在linux上是基于epoll封装的,我们先看下初始化的流程 net.ListenTcp

// src/net/tcpsock.go
func ListenTCP(network string, laddr *TCPAddr){}
#src/net/tcpsock_posix.go
->fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", ctrlCtxFn)
// src/net/ipsock_posix.go
-->socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlCtxFn)
//src/net/sock_posix.go
--->socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlCtxFn func(context.Context, string, string, syscall.RawConn) error) (fd *netFD, err error)
---->err := fd.listenStream(ctx, laddr, listenerBacklog(), ctrlCtxFn);
----->syscall.Bind(fd.pfd.Sysfd, lsa);
------->fd.init();
// src/net/fd_unix.go
-------->fd.pfd.Init(fd.net, true)
--------->serverInit.Do(runtime_pollServerInit)
//return pollDesc
---------->ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))

上述是go调用net.listenTcp的流程,对应系统调用为

  • 调用syscall.Bind,
  • 调用syscall.Listen
  • 生成fd,查看epoll是否初始化,未初始化则初始化
  • 将fd绑定到对应的epoll事件中
  • 设置成非阻塞模式
  • 返回pollDesc,每一个listen都对应着一个pollDesc,后文我们在详细了解runtime.epoll里相关的结构体与函数 接下来我们看下runtime里poll初始化的具体代码,linux epoll为例,对应上文runtime_pollServerInit函数
// src/runtime/netpoll_epoll.go
func netpollinit() {
	var errno uintptr
	//创建epoll描述符
	epfd, errno = syscall.EpollCreate1(syscall.EPOLL_CLOEXEC)
	if errno != 0 {
		println("runtime: epollcreate failed with", errno)
		throw("runtime: netpollinit failed")
	}
	r, w, errpipe := nonblockingPipe()
	if errpipe != 0 {
		println("runtime: pipe failed with", -errpipe)
		throw("runtime: pipe failed")
	}
	ev := syscall.EpollEvent{
		Events: syscall.EPOLLIN,
	}
	*(**uintptr)(unsafe.Pointer(&ev.Data)) = &netpollBreakRd
	//添加到epoll事件上
	errno = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, r, &ev)
	if errno != 0 {
		println("runtime: epollctl failed with", errno)
		throw("runtime: epollctl failed")
	}
	netpollBreakRd = uintptr(r)
	netpollBreakWr = uintptr(w)
}

然后我们再来看下runtime_pollOpen函数,主要是将fd添加到epoll中,下面我们看下,怎么处理对fd的读写事件

// src/runtime/netpoll.go
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
	errno := netpollopen(fd, pd)
}
func netpollopen(fd uintptr, pd *pollDesc) uintptr {
    var ev syscall.EpollEvent
	//设置成epoll ET模式(边缘触发,仅监控fd发生更改时通知,比如仅有新数据来时通知)
    ev.Events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLET
    *(**pollDesc)(unsafe.Pointer(&ev.Data)) = pd
	//将listen对应的fd添加到epoll事件中
    return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, int32(fd), &ev)
}

2.1 golang netpoll accept

通过listen.accept()可以获取一个conn结构体,后续的读写操作都基于conn对象,下面我们看下conn里封装了哪些对象,以及流程

func (fd *netFD) accept() (netfd *netFD, err error) {
    //for循环读,处理掉了syscall.EAGAIN等,返回则为
	d, rsa, errcall, err := fd.pfd.Accept()
	//新建fd
	netfd, err = newFD(d, fd.family, fd.sotype, fd.net); 
    //初始化并加入epoll中
	err = netfd.init()
	lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
	netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
	return netfd, nil
}

2.2 golang netpoll read/write

read/write本质上都是调用的golang runtime里封装好的runtime_pollWait,下面我们重点看下该函数

// src/internal/poll/fd_unix.go
func (fd *FD) Read(p []byte) (int, error) {
	for {
        fd.pd.waitRead(fd.isFile)
	}
}
// src/internal/poll/fd_poll_runtime.go
func (pd *pollDesc) wait(mode int, isFile bool) error {
    if pd.runtimeCtx == 0 {
        return errors.New("waiting for unsupported file type")
    }
    res := runtime_pollWait(pd.runtimeCtx, mode)
    return convertErr(res, isFile)
}

2.2 golang netpoll runtime_pollWait

将调用者陷入gopark状态,等待调度的时候通过netpoll唤醒

func poll_runtime_pollWait(pd *pollDesc, mode int) int {
	for !netpollblock(pd, int32(mode), false) {
		errcode = netpollcheckerr(pd, int32(mode))
		if errcode != pollNoError {
			return errcode
		}
	}
	return pollNoError
}

func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
	//将该g进入gopark
	if waitio || netpollcheckerr(pd, mode) == pollNoError {
		gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
	}
	// be careful to not lose concurrent pdReady notification
	old := gpp.Swap(pdNil)
	if old > pdWait {
		throw("runtime: corrupted polldesc")
	}
	return old == pdReady
}

下面我们回顾下做了哪些事,将读/写初始化fd,然后将fd绑定到epoll里同时调用

  • 调用net.listen,会检查epoll是否初始化,未初始化则调用 runtime/netpoll.go里的netpollGenericInit()初始化
  • 然后调用runtime_pollOpen()里通过 pollcache.alloc()从pollDesc链表上拿到获取一个pollDesc
  • 然后accept/read/write会将对应的fd添加到pollDesc里,同时将对应的g通过gopark阻塞,等待调度的netpoll函数阻塞
    下面我们再来看下runtime里具体的实现

3 golang runtime netpoll

由于不同操作系统实现了自己的io多路复用,go实现了多版本的网络轮训,我们仅讲解linux下的epoll我们先来看下方法

3.1 netpoll调度部分

netpoll通过EpollWait获取对应的事件,并通过netpollready添加到glist里,返回glist链表

func netpoll(delay int64) gList {
	//省略根据delay计算延时多久调用
	//事件数组
	var events [128]syscall.EpollEvent
retry:
	//通过EpollWait获取事件的数量
	n, errno := syscall.EpollWait(epfd, events[:], int32(len(events)), waitms)
	if errno != 0 {
		if waitms > 0 {
			return gList{}
		}
		goto retry
	}
	//根据事件数量遍历出对应个g
	var toRun gList
	for i := int32(0); i < n; i++ {
		ev := events[i]
		if ev.Events == 0 {
			continue
		}

		if *(**uintptr)(unsafe.Pointer(&ev.Data)) == &netpollBreakRd {
			if ev.Events != syscall.EPOLLIN {
				println("runtime: netpoll: break fd ready for", ev.Events)
				throw("runtime: netpoll: break fd ready for something unexpected")
			}
			if delay != 0 {
				// netpollBreak could be picked up by a
				// nonblocking poll. Only read the byte
				// if blocking.
				var tmp [16]byte
				read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
				netpollWakeSig.Store(0)
			}
			continue
		}

		var mode int32
		if ev.Events&(syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
			mode += 'r'
		}
		if ev.Events&(syscall.EPOLLOUT|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
			mode += 'w'
		}
		if mode != 0 {
			pd := *(**pollDesc)(unsafe.Pointer(&ev.Data))
			pd.setEventErr(ev.Events == syscall.EPOLLERR)
			netpollready(&toRun, pd, mode)
		}
	}
	return toRun
}

3.2 netpoll调度位置

主要是在监控线程 以及每一个m上的findRunnable里

  • sysmon()
  • findRunnable()