概述
基本流程
- 用go func() 创建G
- 放入P本地队列,或平衡到全局队列
- 唤醒或新建M执行任务
- 进入调度循环schedule
- 获取待执行G任务并执行
- 清理现场,重新进入调度循环
PMG
其实就是将cpu的控制更佳精细了,从原来的一个线程为最小单位改成了一个goroutine为单位。
P: Processor, 四类与CPU核,用来控制可同时并发执行的任务数量,每个工作线程必须要绑定一个P才能执行任务,否则只能休眠,知道有空闲P时被唤醒. P还为线程执行提供资源,比如对象分配内存,本地任务队列等。线程独享所绑定的P资源,可在无锁状态下高效操作
G: Goroutine,形成内的一切都在以goroutine方式运行,包括运行相关服务,main.main入口函数. 需要指出G并非执行体,它仅仅保存并发任务状态,为任务提供执行所需的栈内存空间。类似于线程的调用栈,只是goroutine更小,在用户态上操作,执行所需要的初始栈也很小,只要几kb,这也是能应对高并发的前提.
M: 进行的实际执行体,它和P绑定,以调度循环方式不停执行G并发任务。 M通过修改寄存器,将执行栈只想G自带的栈内存,并在此空间内分配堆栈帧,执行任务函数。当初中图切换时,只需要寄存器相关信息保存回G空间即可维持状态,任务M都可据此恢复执行。线程仅负责执行,不再持有状态,这是并发任务跨线程调度,实现多路复用的根本所在。
尽管P/M构成执行组合体,但是数量并非一一对应。P数量相对恒定,默认就是ncpu,也可能更多或者更少,runtime.GOMAXPROCS()
可以设置最大限制. M则是由调度器按需创建的.
G初始栈仅有2KB,创建操作只是简单的在用户空间分配对象,远比进入内核态分配线程要简单的多. 调度器让多个M进入调度循环,不停获取并执行任务,所以才能创建成千上万个并发任务.
初始化
判段 len(allp) 和nprocs,看是否需要重新分配allp,或者new(p)创建新的P.
接着看g.m.p是否是合法id,也就是id<len(allp),如果不是的话则复用P,反之则将当前的p解绑,并绑定到allp[0]上
循环将多余的p直接p.destroy()
进行释放。 然后遍历allp,如果p.runq为空,则放入空闲p队列,否则去m空闲队列中获取一个m与之绑定,最后返回有本地任务的P链表
所谓的释放就是将其对应的goroutine本地任务(pp.runq, pp.gFree, pp.runnext)移到全局queue中去, 将mcache置为nil,将本身标记为_Pdead
调用 procresize(procs)的地方有scheinit()
和startTheWorldWithSema
,schedinit
中除了当前线程外,其他均为空闲,startTheWorldWithSema
则会唤醒所有有任务的p
注意: pp.gFree
需要加锁,可能会有别的P到这里偷取任务.
runtime/proc.go
allp []*p // len(allp) == gomaxprocs; may change at safe points, otherwise immutable
func schedinit() {
sched.maxmcount = 10000
procs := ncpu
if nprocs > int32(len(allp)) { //如果allp不够,会重新新建allp
nallp := make([]*p, nprocs)
// Copy everything up to allp's cap so we
// never lose old allocated Ps.
copy(nallp, allp[:cap(allp)])
allp = nallp
// initialize new P's
for i := old; i < nprocs; i++ {
pp := allp[i]
if pp == nil {
pp = new(p)
}
pp.init(i)
atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
}
_g_ := getg()
if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
// continue to use the current P
_g_.m.p.ptr().status = _Prunning
_g_.m.p.ptr().mcache.prepareForSweep()
} else {
if _g_.m.p != 0 {
if trace.enabled {
// Pretend that we were descheduled
// and then scheduled again to keep
// the trace sane.
traceGoSched()
traceProcStop(_g_.m.p.ptr())
}
_g_.m.p.ptr().m = 0
}
_g_.m.p = 0
_g_.m.mcache = nil
p := allp[0]
p.m = 0
p.status = _Pidle
acquirep(p)
if trace.enabled {
traceGoStart()
}
// release resources from unused P's
for i := nprocs; i < old; i++ {
p := allp[i]
p.destroy()
// can't free P itself because it can be referenced by an M in syscall
}
}
go参数入栈顺序是从右到左的,栈越往下地址越小
func main(){
x := 0x100
y := 0x200
go add(x,y)
time.Sleep(time.Second)
}
go tool objdump -s "main\.main" test
MOVL $0x18, 0(SP) //第一个参数,长度,这个在栈底
LEAQ 0x3c355(IP), AX
MOVQ AX, 0x8(SP) // 第二个参数,函数的地址
MOVQ $0x100, 0x10(SP) // 第三个, 函数的第一个参数
MOVQ $0x200, 0x18(SP) // 第四个, 函数的第二个参数,在最上面
CALL runtime.newproc(SB)
在runtime/proc.go
中,newproc(siz int32, fn *funcval)
只有2个参数,但main却压入了四个值,所以后面三个值应该被合成了funcval
func newproc(siz int32, fn *funcval) {}
goroutine的结构
stack
为栈指针
sched
用于保存现场
startpc
任务函数
gcAssistBytes
用于gc 辅助信用,如果是正数,则不需要辅助gc
type g struct {
// Stack parameters.
// stack describes the actual stack memory: [stack.lo, stack.hi).
// stackguard0 is the stack pointer compared in the Go stack growth prologue.
// It is stack.lo+StackGuard normally, but can be StackPreempt to trigger a preemption.
// stackguard1 is the stack pointer compared in the C stack growth prologue.
// It is stack.lo+StackGuard on g0 and gsignal stacks.
// It is ~0 on other goroutine stacks, to trigger a call to morestackc (and crash).
stack stack // offset known to runtime/cgo
stackguard0 uintptr // offset known to liblink
stackguard1 uintptr // offset known to liblink
m *m // current m; offset known to arm liblink
sched gobuf //用于保存现场
goid int64
gopc uintptr // pc of go statement that created this goroutine
startpc uintptr // pc of goroutine function
// gcAssistBytes is this G's GC assist credit in terms of
// bytes allocated. If this is positive, then the G has credit
// to allocate gcAssistBytes bytes without assisting. If this
// is negative, then the G must correct this by performing
// scan work. We track this in bytes to make it fast to update
// and check for debt in the malloc hot path. The assist ratio
// determines how this corresponds to scan work debt.
gcAssistBytes int64
新建操作
- 从当前g.m.p的本地空闲列表中获取一个g, 如果为空,则去sched.gFree.stack,即有stack的空闲列表中获取一个,否则去sched.gFree.noStack中pop一个出来放在p.gFree中,也就是本地空闲列表中,如果stack为空,则申请一个stack.
如果都为空,则调用malg
申请一个最小(2kb)的goroutine,并放到allg列表中去. - 计算需要的size并对齐
- 通过
newg.stack.hi - totalSize
确定sp和参数入栈位置的位置,也就是goroutine 的栈指针 - 将执行参数拷贝入栈
- 初始化用于保存现场的区域,也就是newg.sched相关参数
- 初始化基本状态
- 将newg的状态改为_Grunnable
- 设置goid
9.放入当前p的qunq中去
// The minimum size of stack used by Go code
_StackMin = 2048
func newproc1(fn *funcval, argp *uint8, narg int32, callergp *g, callerpc uintptr) {
_p_ := _g_.m.p.ptr()
newg := gfget(_p_)
if newg == nil {
newg = malg(_StackMin)
casgstatus(newg, _Gidle, _Gdead)
allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
}
totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame
totalSize += -totalSize & (sys.SpAlign - 1) // align to spAlign
sp := newg.stack.hi - totalSize
spArg := sp
if narg > 0 {
memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg)) //将原有的栈复制过来
}
newg.sched.sp = sp
newg.stktopsp = sp
newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
newg.sched.g = guintptr(unsafe.Pointer(newg))
gostartcallfn(&newg.sched, fn)
newg.gopc = callerpc
newg.startpc = fn.fn
casgstatus(newg, _Gdead, _Grunnable)
newg.goid = int64(_p_.goidcache)
runqput(_p_, newg, true)
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
wakep()
}
}
线程
当入队后会调用wakep()
, 如果sched中没有spinning的m, 则重新新建一个m
// Tries to add one more P to execute G's.
// Called when a G is made runnable (newproc, ready).
func wakep() {
// be conservative about spinning threads
if !atomic.Cas(&sched.nmspinning, 0, 1) {
return
}
startm(nil, true)
}
func startm(_p_ *p, spinning bool) {
if _p_ == nil {
_p_ = pidleget()
}
mp := mget() //从m空闲列表中获取一个mp
// The caller incremented nmspinning, so set m.spinning in the new M.
mp.spinning = spinning
mp.nextp.set(_p_)
// The caller incremented nmspinning, so set m.spinning in the new M.
mp.spinning = spinning
mp.nextp.set(_p_)
}
m的结构:
g0 用于提供系统栈空间
mstartfn 是启动函数
p 是当前绑定的P
next 是临时存放的P
spinning是自选状态
type m struct {
g0 *g // goroutine with scheduling stack
mstartfn func()
curg *g // current running goroutine
nextp puintptr
p puintptr // attached p for executing go code (nil if not executing go code)
spinning bool // m is out of work and is actively looking for work
park note
schedlink muintptr
}
创建m
M最特别的就是自带一个名为g0,默认8KB栈内存的G对象属性, 作为系统线程的默认堆栈空间,当需要执行管理指令时, 会将线程栈临时切换到g0,于用户逻辑彻底隔离. 就经常看到systemstack这种执行方式就是切换到g0栈在执行相关管理操作的
// Create a new m. It will start off with a call to fn, or else the scheduler.
// fn needs to be static and not a heap allocated closure.
// May run with m.p==nil, so write barriers are not allowed.
//go:nowritebarrierrec
func newm(fn func(), _p_ *p) {
mp := allocm(_p_, fn)
mp.nextp.set(_p_)
newm1(mp)
}
func newm1(mp *m) {
newosproc(mp)
}
func newosproc(mp *m) {
stk := unsafe.Pointer(mp.g0.stack.hi)
ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))
}
M初始化,会检查m的数量,如果超过限制则会报错,mp.alllink = allm
是防止垃圾回收
func mcommoninit(mp *m) {
checkmcount()
mpreinit(mp)
// Add to allm so garbage collector doesn't free g->m
// when it is just in a register or thread-local storage.
mp.alllink = allm
}
func checkmcount() {
// sched lock is held
if mcount() > sched.maxmcount {
print("runtime: program exceeds ", sched.maxmcount, "-thread limit\n")
throw("thread exhaustion")
}
}
// Called to initialize a new m (including the bootstrap m).
// Called on the parent thread (main thread in case of bootstrap), can allocate memory.
func mpreinit(mp *m) {
mp.gsignal = malg(32 * 1024) // Linux wants >= 2K
mp.gsignal.m = mp
}
获取M
mget
type schedt struct {
midle muintptr // idle m's waiting for work
nmidle int32 // number of idle m's waiting for work
maxmcount int32 // maximum number of m's allowed (or die)
}
// Try to get an m from midle list.
// Sched must be locked.
// May run during STW, so write barriers are not allowed.
//go:nowritebarrierrec
func mget() *m {
mp := sched.midle.ptr()
if mp != nil {
sched.midle = mp.schedlink
sched.nmidle--
}
return mp
}
func stopm() {
_g_ := getg()
mput(_g_.m)
notesleep(&_g_.m.park)
noteclear(&_g_.m.park)
}
执行
M执行并发任务有两个起点: 线程启动函数mstart,还有就是stopm休眠唤醒后再次恢复调度循环
对于无法使用g0 stack的系统,直接在系统堆栈上画出所需的空间
在schedule中,当获取到可用的G之后,会交友execute去执行,调用gogo(&gp.sched)
// mstart is the entry-point for new Ms.
func mstart() {
osStack := _g_.stack.lo == 0
if osStack {
// Initialize stack bounds from system stack.
// Cgo may have left stack size in stack.hi.
// minit may update the stack bounds.
size := _g_.stack.hi
if size == 0 {
size = 8192 * sys.StackGuardMultiplier
}
_g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
_g_.stack.lo = _g_.stack.hi - size + 1024
}
mstart1()
}
func mstart1() {
schedule()
}
func schedule(){
if gp == nil {
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
execute(gp, inheritTime)
}
func execute(gp *g, inheritTime bool) {
_g_ := getg()
casgstatus(gp, _Grunnable, _Grunning)
gp.waitsince = 0
gp.preempt = false
gp.stackguard0 = gp.stack.lo + _StackGuard
if !inheritTime {
_g_.m.p.ptr().schedtick++
}
_g_.m.curg = gp
gp.m = _g_.m
gogo(&gp.sched)
}
初始化完成后G栈顶端被亚茹了goexit地址,gogo函数执行完毕后,尾部的RET指定会将goexit地址恢复到PC/IP,从而实现任务街舞清理操作和再次进入调度循环.
在asm_arm64.s
中
MOVD buf+0(FP), R5 //将&gp.sched移到R5
MOVD gobuf_g(R5), g // 将 gobuf.g移到g中
BL runtime·save_g(SB) //保存g
MOVD 0(g), R4 // make sure g is not nil
MOVD gobuf_sp(R5), R0
MOVD R0, RSP // 通过恢复sp寄存器值切换到G栈
MOVD gobuf_bp(R5), R29
MOVD gobuf_lr(R5), LR
MOVD gobuf_ret(R5), R0
MOVD gobuf_ctxt(R5), R26
MOVD $0, gobuf_sp(R5) //清除数据帮助垃圾回收
MOVD $0, gobuf_bp(R5)
MOVD $0, gobuf_ret(R5)
MOVD $0, gobuf_lr(R5)
MOVD $0, gobuf_ctxt(R5)
CMP ZR, ZR // set condition codes for == test, needed by stack split
MOVD gobuf_pc(R5), R6 //获取G任务函数地址
B (R6) //执行
在proc
中 newg.sched.pc
保存的是funcPC(goexit)
, gostartcall
中会原来的sp指针往下移动,并将goexit的指针放到ret的位置,再将pc的值改为真正执行的fn的地址
func newproc1(fn *funcval, argp *uint8, narg int32, callergp *g, callerpc uintptr) {
newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
gostartcallfn(&newg.sched, fn)
}
// adjust Gobuf as if it executed a call to fn
// and then did an immediate gosave.
func gostartcallfn(gobuf *gobuf, fv *funcval) {
var fn unsafe.Pointer
if fv != nil {
fn = unsafe.Pointer(fv.fn)
} else {
fn = unsafe.Pointer(funcPC(nilfunc))
}
gostartcall(gobuf, fn, unsafe.Pointer(fv))
}
// adjust Gobuf as if it executed a call to fn with context ctxt
// and then did an immediate gosave.
func gostartcall(buf *gobuf, fn, ctxt unsafe.Pointer) {
sp := buf.sp
if sys.RegSize > sys.PtrSize {
sp -= sys.PtrSize
*(*uintptr)(unsafe.Pointer(sp)) = 0
}
sp -= sys.PtrSize
*(*uintptr)(unsafe.Pointer(sp)) = buf.pc
buf.sp = sp
buf.pc = uintptr(fn)
buf.ctxt = ctxt
}
在goexit中,将G的状态改为dead, 并调用dropg将当前g关联的m和m->curg置为nil,将其放到gFree中去等待,最后调用schedule()开始下一轮的
casgstatus(gp, _Grunning, _Gdead)
dropg()
gfput(_g_.m.p.ptr(), gp)
schedule()
在schedule中,findrunnable用来查找需要需要运行的G,如果都找不到会runqsteal
,从别的P的本地运行队列中偷取一半放到自己的本地队列中去,如果实在找不到,甚至会偷取runnext。
stackMore,扩容
stackguard0是g的第二个指针, 也就是0x10, 运行时会比较CMPQ 0x10(CX), SP
,如果SP比stackguard0小,则说明栈已经溢出,需要扩容,跳转到0x48cfc9的位置,执行runtime.morestack_noctxt(SB)
type g struct {
stack stack // offset known to runtime/cgo
stackguard0 uintptr // offset known to liblink
TEXT main.main(SB) /home/darcyaf/Development/go/src/readsrc/main.go
main.go:8 0x48cfa0 64488b0c25f8ffffff MOVQ FS:0xfffffff8, CX
main.go:8 0x48cfa9 483b6110 CMPQ 0x10(CX), SP
main.go:8 0x48cfad 761a JBE 0x48cfc9
main.go:8 0x48cfaf 4883ec08 SUBQ $0x8, SP
main.go:8 0x48cfb3 48892c24 MOVQ BP, 0(SP)
main.go:8 0x48cfb7 488d2c24 LEAQ 0(SP), BP
main.go:9 0x48cfbb e870ffffff CALL main.test(SB)
main.go:10 0x48cfc0 488b2c24 MOVQ 0(SP), BP
main.go:10 0x48cfc4 4883c408 ADDQ $0x8, SP
main.go:10 0x48cfc8 c3 RET
main.go:8 0x48cfc9 e8d247fcff CALL runtime.morestack_noctxt(SB)
main.go:8 0x48cfce ebd0 JMP main.main(SB)
morestack_noctxt会调用newstack(),这里会分配oldsize*2的大小,最后通过gogo继续执行
func newstack() {
// Allocate a bigger segment and move the stack.
oldsize := gp.stack.hi - gp.stack.lo
newsize := oldsize * 2
if newsize > maxstacksize {
print("runtime: goroutine stack exceeds ", maxstacksize, "-byte limit\n")
throw("stack overflow")
}
casgstatus(gp, _Grunning, _Gcopystack)
copystack(gp, newsize, true)
casgstatus(gp, _Gcopystack, _Grunning)
gogo(&gp.sched)
}
stackfree
会释放栈空间,markroot
,gfput
也会释放占空间,freeStackSpans
会将stackpool
中的空stack spans归还给heap
shrinkstack
也尝试将大小收缩一半,如果使用量超过1/4则不会收缩
系统调用
为支持并发调度,Go专门对syscall,cgo进行了包装,以便在长时间阻塞时能切换执行其他任务. 在标准库syscall包里,将系统调用分为Syscall和RawSyscall
区别在于syscall
增加了entersyscall/exitsyscall, 这是允许调度的关键所在.
在reentersyscall
会存储调用的pc和sp,并将g的状态改为_Gsyscall