调度相关的数据结构有三个,M(线程),P(调度器),G(goroutine)
M表示线程,P作为调度器用来帮助每个线程管理自己的goroutine,G就是golang的协程。我们可以通过runtime.GOMAXPROCS(n int)函数设置P的个数,注意P的个数并不代表M的个数,例如程序启动时runtime代码会出实话procs个P,但开始的时候只会启动一个M,就是M0和一个栈为64K(其他goroutine默认初始栈大小2K)来执行runtime代码。
那其他线程是什么时候创建的呐?
当goroutine被唤醒时,要在M上运行(恢复goroutine的上下文),P是帮助M管理goroutine的,恢复上下文的操作也由P来完成。如果被唤醒时发现还有空闲的P,并且没有其他M在窃取goroutine(M发现本地goroutine队列和全局goroutine队列都没有goroutine的时候,会去其他线程窃取goroutine),说明其他M都在忙,就会创建一个M让这个空闲的P帮他来管理goroutine。
总之一句话,开始的时候创建一个M,当发现调度不过来且还有空闲P没有工作就在创建新的,直到创建procs个M(procs通过runtime.GOMAXPROCS设置)
golang 用结构体g表示goroutine
type g struct {
stack stack // 当前栈的范围[stack.lo, stack.hi)
stackguard0 uintptr // 用于抢占的,一般情况值为stack.lo + StackGuard
stackguard1 uintptr // 用于C语言的抢占
_panic *_panic // 最内侧的panic函数
_defer *_defer // 最外侧的defer函数
m *m // 当前goroutine属于哪个m
sched gobuf // 调度相关信息
...
schedlink guintptr // sched是全局的goroutine链表,schedlink表示这个goroutine在链表中的下一个goroutine的指针
...
preempt bool // 抢占标志,如果需要抢占就将preempt设置为true
...
}
gobuf保存goroutine的调度信息,当一个goroutine被调度的时,本质上就是把这个goroutine放到cpu,恢复各个寄存器的值,然后运行
type gobuf struct {
// The offsets of sp, pc, and g are known to (hard-coded in) libmach.
//
// ctxt is unusual with respect to GC: it may be a
// heap-allocated funcval, so GC needs to track it, but it
// needs to be set and cleared from assembly, where it‘s
// difficult to have write barriers. However, ctxt is really a
// saved, live register, and we only ever exchange it between
// the real register and the gobuf. Hence, we treat it as a
// root during stack scanning, which means assembly that saves
// and restores it doesn‘t need write barriers. It‘s still
// typed as a pointer so that any other writes from Go get
// write barriers.
sp uintptr // 栈指针
pc uintptr // 程序计数器
g guintptr // 当前被哪个goroutine持有
ctxt unsafe.Pointer
ret sys.Uintreg // 系统调用返回值,防止系统调用后被其他goroutine抢占,所以有个地方保存返回值
lr uintptr
bp uintptr // 保存CPU的rip寄存器的值
}
golang中M表示实际操作系统的线程
type m struct {
g0 *g // g0帮M处理大小事务的goroutine,他是m中的第一个goroutine
...
gsignal *g // 用于信号处理的goroutine
tls [6]uintptr // 线程私有空间
mstartfn func()
curg *g // current running goroutine
...
p puintptr // 当前正在运行的p(处理器)
nextp puintptr // 暂存的p
oldp puintptr // 执行系统调用之前的p
...
spinning bool // 表示当前m没有goroutine了,正在从其他m偷取goroutine
blocked bool // m is blocked on a note
...
park note // m没有goroutine的时候会在park上sleep,需要其他m在park中wake up这个m
alllink *m // on allm // 所有m的链表
...
thread uintptr // thread handle
...
}
golang中P表示一个调度器,为M提供上下文环境,使得M可以执行多个goroutine
type p struct {
m muintptr // 与哪个M关联(可能为空的)
...
runqhead uint32 // p本地goroutine队列的头
runqtail uint32 // p本地goroutine队列的尾
runq [256]guintptr // 队列指针,和sync.pool中数据结构一样也是循环队列
...
sudogcache []*sudog // sudog缓存,channel用的
sudogbuf [128]*sudog // 也是防止false sharing
...
pad cpu.CacheLinePad // 防止false sharing
}
schedt结构体用来保存P的状态信息和goroutine的全局运行队列
type schedt struct {
...
lock mutex // 全局锁
// When increasing nmidle, nmidlelocked, nmsys, or nmfreed, be
// sure to call checkdead().
// 维护空闲的M
midle muintptr // 等待中的M链表
nmidle int32 // 等待中的M的数量
nmidlelocked int32 // number of locked m‘s waiting for work
mnext int64 // number of m‘s that have been created and next M ID
maxmcount int32 // 最多创建多少个M(10000)
nmsys int32 // number of system m‘s not counted for deadlock
nmfreed int64 // cumulative number of freed m‘s
ngsys uint32 // number of system goroutines; updated atomically
// 维护空闲的P
pidle puintptr // idle p‘s
npidle uint32
nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go.
// goroutine的全局队列
runq gQueue
runqsize int32
...
// 全局缓存已经退出的goroutine链表,下次再创建的时候直接用
// Global cache of dead G‘s.
gFree struct {
lock mutex
stack gList // Gs with stacks
noStack gList // Gs without stacks
n int32
}
...
}
allgs []*g // 保存所有的g
allm *m // 所有的m构成的一个链表,包括下面的m0
allp []*p // 保存所有的p,len(allp) == gomaxprocs
ncpu int32 // 系统中cpu核的数量,程序启动时由runtime代码初始化
gomaxprocs int32 // p的最大值,默认等于ncpu,但可以通过GOMAXPROCS修改
sched schedt // 调度器结构体对象,记录了调度器的工作状态
m0 m // 代表进程的主线程
g0 g // m0的g0,也就是m0.g0 = &g0
下面是用go实现的hello world,代码里并没有关于调度的初始化,所以程序的入口并非是main.main,下面通过gdb一步步找到go是如何初始化调度的。
// test.go
package main
func main() {
println("hello, world!")
}
go build -gcflags "-N -l" test.go
使用OS X的同学注意,go1.11之后压缩的debug信息,OS X的同学需要同时做以下设置参考Debug Go Program With Gdb On Macos
export GOFLAGS="-ldflags=-compressdwarf=false"
? sudo gdb test
(gdb) info files
Symbols from "/Users/journey/workspace/src/tool/gdb/test".
Local exec file:
`/Users/journey/workspace/src/tool/gdb/test‘, file type mach-o-x86-64.
Entry point: 0x104cd00
0x0000000001001000 - 0x00000000010515b1 is .text
0x00000000010515c0 - 0x000000000108162a is __TEXT.__rodata
0x0000000001081640 - 0x0000000001081706 is __TEXT.__symbol_stub1
0x0000000001081720 - 0x0000000001081e80 is __TEXT.__typelink
0x0000000001081e80 - 0x0000000001081e88 is __TEXT.__itablink
0x0000000001081e88 - 0x0000000001081e88 is __TEXT.__gosymtab
0x0000000001081ea0 - 0x00000000010bfacd is __TEXT.__gopclntab
0x00000000010c0000 - 0x00000000010c0020 is __DATA.__go_buildinfo
0x00000000010c0020 - 0x00000000010c0128 is __DATA.__nl_symbol_ptr
0x00000000010c0140 - 0x00000000010c0d08 is __DATA.__noptrdata
0x00000000010c0d20 - 0x00000000010c27f0 is .data
0x00000000010c2800 - 0x00000000010ddc90 is .bss
0x00000000010ddca0 - 0x00000000010e01e8 is __DATA.__noptrbss
(gdb) b *0x104cd00
Breakpoint 1 at 0x104cd00: file /usr/local/go/src/runtime/rt0_darwin_amd64.s, line 8.
? runtime ls rt0_*
rt0_aix_ppc64.s rt0_darwin_amd64.s rt0_freebsd_arm.s rt0_linux_arm64.s rt0_nacl_386.s rt0_netbsd_arm64.s rt0_plan9_amd64.s
rt0_android_386.s rt0_darwin_arm.s rt0_illumos_amd64.s rt0_linux_mips64x.s rt0_nacl_amd64p32.s rt0_openbsd_386.s rt0_plan9_arm.s
rt0_android_amd64.s rt0_darwin_arm64.s rt0_js_wasm.s rt0_linux_mipsx.s rt0_nacl_arm.s rt0_openbsd_amd64.s rt0_solaris_amd64.s
rt0_android_arm.s rt0_dragonfly_amd64.s rt0_linux_386.s rt0_linux_ppc64.s rt0_netbsd_386.s rt0_openbsd_arm.s rt0_windows_386.s
rt0_android_arm64.s rt0_freebsd_386.s rt0_linux_amd64.s rt0_linux_ppc64le.s rt0_netbsd_amd64.s rt0_openbsd_arm64.s rt0_windows_amd64.s
rt0_darwin_386.s rt0_freebsd_amd64.s rt0_linux_arm.s rt0_linux_s390x.s rt0_netbsd_arm.s rt0_plan9_386.s rt0_windows_arm.s
TEXT _rt0_amd64_darwin(SB),NOSPLIT,$-8 // 参数+返回值共8字节
JMP _rt0_amd64(SB)
(gdb) b _rt0_amd64
Breakpoint 2 at 0x1049350: file /usr/local/go/src/runtime/asm_amd64.s, line 15.
这里首先把参数放到DI,SI寄存器中,然后调用runtime.rt0_go,这就是进程初始化主要函数了
参数0放在DI通用寄存器
参数1放在SI通用寄存器
参数2放在DX通用寄存器
参数3放在CX通用寄存器
TEXT _rt0_amd64(SB),NOSPLIT,$-8 // 参数+返回值共8字节
MOVQ 0(SP), DI // argc
LEAQ 8(SP), SI // argv
JMP runtime·rt0_go(SB)
(gdb) b runtime.rt0_go
Breakpoint 3 at 0x1049360: file /usr/local/go/src/runtime/asm_amd64.s, line 89.
这个函数有点长,下面我们分段来看rt0_go这个函数
首先将之前放入通用寄存器的参数放入AX,BX寄存器,然后调整栈顶指针(真SP寄存器)的位置,SP指针先减39,关于16字节向下对齐(因为CPU有一组 SSE 指令,这些指令中出现的内存地址必须是16的倍数),然后把参数放到SP+16字节和SP+24字节处
golang的汇编有抽象出来的寄存器,通过是否有前缀变量区分真假寄存器,例如a+8(SP)就是golang的寄存器,8(SP)就是真的寄存器
创建g0,并初始化g.stackgruard0,g.stackguard1以及g.stack.lo,g.stack.hi的值(实际上是分配一段内存,然后分割成小段,约定哪小段表示哪个变量)
TEXT runtime·rt0_go(SB),NOSPLIT,$0
// copy arguments forward on an even stack
MOVQ DI, AX // argc
MOVQ SI, BX // argv
SUBQ $(4*8+7), SP // 2args 2auto
ANDQ $~15, SP
MOVQ AX, 16(SP)
MOVQ BX, 24(SP)
// create istack out of the given (operating system) stack.
// _cgo_init may update stackguard.
// 初始化g0,g0就是go的第一个协程
// 给g0分配栈空间大概64K
//
MOVQ $runtime·g0(SB), DI
LEAQ (-64*1024+104)(SP), BX // BX = SP - 64 * 1024 + 104
MOVQ BX, g_stackguard0(DI) // g0.g_stackguard0 = SP - 64 * 1024 + 104
MOVQ BX, g_stackguard1(DI) // g0.g_stackguard1 = SP - 64 * 1024 + 104
MOVQ BX, (g_stack+stack_lo)(DI) // g0.stack.lo = SP - 64 * 1024 + 104
MOVQ SP, (g_stack+stack_hi)(DI) // g0.stack.hi = SP
创建完g0的内存分布
然后略过一段CPU型号检测和CGO初始化的代码
...
// 将m0与主线程绑定
LEAQ runtime·m0+m_tls(SB), DI // 将m0的thread local store成员的地址到DI
CALL runtime·settls(SB) // 调用settls设置线程本地存储(mac 下settls什么都没做,线程已经设置好本地存储了)
// 通过往TLS存0x123在判断tls[0]是不是0x123验证TLS是否可用,如果不可用就abort
// store through it, to make sure it works
get_tls(BX)
MOVQ $0x123, g(BX)
MOVQ runtime·m0+m_tls(SB), AX
CMPQ AX, $0x123
JEQ 2(PC)
CALL runtime·abort(SB)
ok:
// set the per-goroutine and per-mach "registers"
// 把g0存入m0的本地存储tls[0]
get_tls(BX) // 将m0.tls[0]地址放入BX
LEAQ runtime·g0(SB), CX // 将g0地址放入CX
MOVQ CX, g(BX) // m0.tls[0] = &g0
LEAQ runtime·m0(SB), AX // 将m0地址放入AX
// 将m0和g0建立映射关系
// save m->g0 = g0
MOVQ CX, m_g0(AX) // m0.g0 = g0
// save m0 to g0->m
MOVQ AX, g_m(CX) // g0.m = m0
CLD // convention is D is always left cleared
CALL runtime·check(SB)
创建完m0之后的内存分布
g0和m0都创建并初始化好了,下面就该进行调度初始化了
// 初始化m0
// 将argc和argv入栈
MOVL 16(SP), AX // copy argc
MOVL AX, 0(SP)
MOVQ 24(SP), AX // copy argv
MOVQ AX, 8(SP)
// 处理参数
CALL runtime·args(SB)
// 获取cpu的核数
CALL runtime·osinit(SB)
// 调度系统初始化
CALL runtime·schedinit(SB)
下面函数省略了调度无关的代码,大概流程:
func schedinit() {
// 取出g0
_g_ := getg()
if raceenabled {
_g_.racectx, raceprocctx0 = raceinit()
}
// 设置最大线程数
sched.maxmcount = 10000
...
// 初始化m0, 前边已经将m0和g0的关系绑定好了
// 只是检查一下各种变量,然后将m0挂到allm链表中
mcommoninit(_g_.m)
...
sched.lastpoll = uint64(nanotime())
// ncpu在osinit时已经获取
procs := ncpu
// 如果GOMAXPROCS设置并且合法就将procs的设置为GOMAXPROCS
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n
}
if procresize(procs) != nil {
throw("unknown runnable goroutine during bootstrap")
}
...
}
func procresize(nprocs int32) *p {
old := gomaxprocs
if old < 0 || nprocs <= 0 {
throw("procresize: invalid arg")
}
if trace.enabled {
traceGomaxprocs(nprocs)
}
// update statistics
now := nanotime()
if sched.procresizetime != 0 {
sched.totaltime += int64(old) * (now - sched.procresizetime)
}
sched.procresizetime = now
// Grow allp if necessary.
if nprocs > int32(len(allp)) { // 初始化的len(allp) == 0
// Synchronize with retake, which could be running
// concurrently since it doesn‘t run on a P.
lock(&allpLock)
if nprocs <= int32(cap(allp)) { // 需要缩容
allp = allp[:nprocs]
} else { // 扩容
nallp := make([]*p, nprocs)
// Copy everything up to allp‘s cap so we
// never lose old allocated Ps.
copy(nallp, allp[:cap(allp)])
allp = nallp
}
unlock(&allpLock)
}
// initialize new P‘s
for i := old; i < nprocs; i++ {
pp := allp[i]
if pp == nil {
pp = new(p)
}
pp.init(i)
atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
}
_g_ := getg() // 获取g0
if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs { // 进程初始化时g0.m与p没有绑定,所以g0.m.p == 0
// continue to use the current P
_g_.m.p.ptr().status = _Prunning
_g_.m.p.ptr().mcache.prepareForSweep()
} else {
// release the current P and acquire allp[0].
//
// We must do this before destroying our current P
// because p.destroy itself has write barriers, so we
// need to do that from a valid P.
if _g_.m.p != 0 {
if trace.enabled {
// Pretend that we were descheduled
// and then scheduled again to keep
// the trace sane.
traceGoSched()
traceProcStop(_g_.m.p.ptr())
}
_g_.m.p.ptr().m = 0
}
_g_.m.p = 0
_g_.m.mcache = nil
p := allp[0]
p.m = 0
p.status = _Pidle
acquirep(p) // 把allp[0]和m0关联起来
if trace.enabled {
traceGoStart()
}
}
// 如果有需要销毁的p,就是销毁
// release resources from unused P‘s
for i := nprocs; i < old; i++ {
p := allp[i]
p.destroy()
// can‘t free P itself because it can be referenced by an M in syscall
}
// Trim allp.
if int32(len(allp)) != nprocs {
lock(&allpLock)
allp = allp[:nprocs]
unlock(&allpLock)
}
// 将空闲p放入空闲链表
var runnablePs *p
for i := nprocs - 1; i >= 0; i-- {
p := allp[i]
if _g_.m.p.ptr() == p { // allp[0]已经和m0关联了,所以不用放入空闲链表
continue
}
p.status = _Pidle
if runqempty(p) {
pidleput(p)
} else {
p.m.set(mget())
p.link.set(runnablePs)
runnablePs = p
}
}
stealOrder.reset(uint32(nprocs))
var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32
atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
return runnablePs
}
digraph {
rankdir=TB;
subgraph cluster_all{
label="全局G";
G10[shape=circle,label="G ",style=filled,fillcolor=purple]
G11[shape=circle,label="G ",style=filled,fillcolor=purple]
G12[shape=circle,label="G ",style=filled,fillcolor=purple]
G10->G11[dir=none]
G11->G12[dir=none]
}
subgraph cluster_in{
label="运行中G"
G1[shape=circle,label="G1",style=filled,fillcolor=red]
G2[shape=circle,label="G2",style=filled,fillcolor=red]
G3[shape=circle,label="G3",style=filled,fillcolor=red]
}
M1[shape=triangle,label="M1",style=filled,fillcolor=yellow]
P1[shape=record,label="P1",style=filled,fillcolor=blue]
M2[shape=triangle,label="M2",style=filled,fillcolor=yellow]
P2[shape=record,label="P2",style=filled,fillcolor=blue]
M3[shape=triangle,label="M3",style=filled,fillcolor=yellow]
P3[shape=record,label="P3",style=filled,fillcolor=blue]
subgraph cluster_wait{
label="等待中G"
G4[shape=circle,label="G ",style=filled,fillcolor=green]
G5[shape=circle,label="G ",style=filled,fillcolor=green]
G6[shape=circle,label="G ",style=filled,fillcolor=green]
G7[shape=circle,label="G ",style=filled,fillcolor=green]
G8[shape=circle,label="G ",style=filled,fillcolor=green]
G9[shape=circle,label="G ",style=filled,fillcolor=green]
}
G1->M1[dir=none]
M1->P1[dir=none]
P1->G4[dir=none]
G4->G5[dir=none]
G2->M2[dir=none]
M2->P2[dir=none]
P2->G6[dir=none]
G6->G7[dir=none]
G3->M3[dir=none]
M3->P3[dir=none]
P3->G8[dir=none]
G8->G9[dir=none]
}
原文:https://www.cnblogs.com/wuwangchuxin0924/p/13264054.html