1 设计原理
由于我们服务器基本上都是基于linux,所以本文我们只讲golang在linux的实现
1.1 I/O模型
操作系统中包含阻塞I/O,非阻塞I/O,信号驱动I/O,异步I/O,多路复用I/O,我们本文仅介绍多路复用I/O 也就是golang网络轮训器实现的原理。
1.2 多路复用io
- select select有了epoll后就很少用了,本文大概过一下,select仅支持1024个文件描述符(数组大小为1024),即fd,内存开销大,时间复杂度为便利数组O(n)
- epoll 2.8内核版本之前会惊群,有兴趣的读者可以看看//todo 写一篇关于epoll的文章,对照源码以及画图更好讲述 epoll的数据结构为rbtree对<fd,event>的存储,ready队列存储就绪io,所以时间复杂度为O(logn) epoll的回调,分为et(边缘触发,有消息变化触发一次),lt(水平触发,消息回调触发多次)
2 golang tcp netpoll
golang netpoll在linux上是基于epoll封装的,我们先看下初始化的流程
net.ListenTcp
// src/net/tcpsock.go
func ListenTCP(network string, laddr *TCPAddr){}
#src/net/tcpsock_posix.go
->fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", ctrlCtxFn)
// src/net/ipsock_posix.go
-->socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlCtxFn)
//src/net/sock_posix.go
--->socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlCtxFn func(context.Context, string, string, syscall.RawConn) error) (fd *netFD, err error)
---->err := fd.listenStream(ctx, laddr, listenerBacklog(), ctrlCtxFn);
----->syscall.Bind(fd.pfd.Sysfd, lsa);
------->fd.init();
// src/net/fd_unix.go
-------->fd.pfd.Init(fd.net, true)
--------->serverInit.Do(runtime_pollServerInit)
//return pollDesc
---------->ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
上述是go调用net.listenTcp的流程,对应系统调用为
- 调用syscall.Bind,
- 调用syscall.Listen
- 生成fd,查看epoll是否初始化,未初始化则初始化
- 将fd绑定到对应的epoll事件中
- 设置成非阻塞模式
- 返回pollDesc,每一个listen都对应着一个pollDesc,后文我们在详细了解runtime.epoll里相关的结构体与函数 接下来我们看下runtime里poll初始化的具体代码,linux epoll为例,对应上文runtime_pollServerInit函数
// src/runtime/netpoll_epoll.go
func netpollinit() {
var errno uintptr
//创建epoll描述符
epfd, errno = syscall.EpollCreate1(syscall.EPOLL_CLOEXEC)
if errno != 0 {
println("runtime: epollcreate failed with", errno)
throw("runtime: netpollinit failed")
}
r, w, errpipe := nonblockingPipe()
if errpipe != 0 {
println("runtime: pipe failed with", -errpipe)
throw("runtime: pipe failed")
}
ev := syscall.EpollEvent{
Events: syscall.EPOLLIN,
}
*(**uintptr)(unsafe.Pointer(&ev.Data)) = &netpollBreakRd
//添加到epoll事件上
errno = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, r, &ev)
if errno != 0 {
println("runtime: epollctl failed with", errno)
throw("runtime: epollctl failed")
}
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}
然后我们再来看下runtime_pollOpen函数,主要是将fd添加到epoll中,下面我们看下,怎么处理对fd的读写事件
// src/runtime/netpoll.go
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
errno := netpollopen(fd, pd)
}
func netpollopen(fd uintptr, pd *pollDesc) uintptr {
var ev syscall.EpollEvent
//设置成epoll ET模式(边缘触发,仅监控fd发生更改时通知,比如仅有新数据来时通知)
ev.Events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.Data)) = pd
//将listen对应的fd添加到epoll事件中
return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, int32(fd), &ev)
}
2.1 golang netpoll accept
通过listen.accept()可以获取一个conn结构体,后续的读写操作都基于conn对象,下面我们看下conn里封装了哪些对象,以及流程
func (fd *netFD) accept() (netfd *netFD, err error) {
//for循环读,处理掉了syscall.EAGAIN等,返回则为
d, rsa, errcall, err := fd.pfd.Accept()
//新建fd
netfd, err = newFD(d, fd.family, fd.sotype, fd.net);
//初始化并加入epoll中
err = netfd.init()
lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}
2.2 golang netpoll read/write
read/write本质上都是调用的golang runtime里封装好的runtime_pollWait,下面我们重点看下该函数
// src/internal/poll/fd_unix.go
func (fd *FD) Read(p []byte) (int, error) {
for {
fd.pd.waitRead(fd.isFile)
}
}
// src/internal/poll/fd_poll_runtime.go
func (pd *pollDesc) wait(mode int, isFile bool) error {
if pd.runtimeCtx == 0 {
return errors.New("waiting for unsupported file type")
}
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}
2.2 golang netpoll runtime_pollWait
将调用者陷入gopark状态,等待调度的时候通过netpoll唤醒
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
for !netpollblock(pd, int32(mode), false) {
errcode = netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
}
return pollNoError
}
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
//将该g进入gopark
if waitio || netpollcheckerr(pd, mode) == pollNoError {
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// be careful to not lose concurrent pdReady notification
old := gpp.Swap(pdNil)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}
下面我们回顾下做了哪些事,将读/写初始化fd,然后将fd绑定到epoll里同时调用
- 调用net.listen,会检查epoll是否初始化,未初始化则调用 runtime/netpoll.go里的netpollGenericInit()初始化
- 然后调用runtime_pollOpen()里通过 pollcache.alloc()从pollDesc链表上拿到获取一个pollDesc
- 然后accept/read/write会将对应的fd添加到pollDesc里,同时将对应的g通过gopark阻塞,等待调度的netpoll函数阻塞
下面我们再来看下runtime里具体的实现
3 golang runtime netpoll
由于不同操作系统实现了自己的io多路复用,go实现了多版本的网络轮训,我们仅讲解linux下的epoll我们先来看下方法
3.1 netpoll调度部分
netpoll通过EpollWait获取对应的事件,并通过netpollready添加到glist里,返回glist链表
func netpoll(delay int64) gList {
//省略根据delay计算延时多久调用
//事件数组
var events [128]syscall.EpollEvent
retry:
//通过EpollWait获取事件的数量
n, errno := syscall.EpollWait(epfd, events[:], int32(len(events)), waitms)
if errno != 0 {
if waitms > 0 {
return gList{}
}
goto retry
}
//根据事件数量遍历出对应个g
var toRun gList
for i := int32(0); i < n; i++ {
ev := events[i]
if ev.Events == 0 {
continue
}
if *(**uintptr)(unsafe.Pointer(&ev.Data)) == &netpollBreakRd {
if ev.Events != syscall.EPOLLIN {
println("runtime: netpoll: break fd ready for", ev.Events)
throw("runtime: netpoll: break fd ready for something unexpected")
}
if delay != 0 {
// netpollBreak could be picked up by a
// nonblocking poll. Only read the byte
// if blocking.
var tmp [16]byte
read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
netpollWakeSig.Store(0)
}
continue
}
var mode int32
if ev.Events&(syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
mode += 'r'
}
if ev.Events&(syscall.EPOLLOUT|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.Data))
pd.setEventErr(ev.Events == syscall.EPOLLERR)
netpollready(&toRun, pd, mode)
}
}
return toRun
}
3.2 netpoll调度位置
主要是在监控线程 以及每一个m上的findRunnable里
- sysmon()
- findRunnable()