if fn == nil { _g_.m.throwing = -1// do not dump full stacks throw("go of nil func value") } acquirem() // disable preemption because it can be holding p in a local var
_p_ := _g_.m.p.ptr()//获取当前m所绑定的p newg := gfget(_p_)//获取空闲状态的g对象,优先从p对象的gFree队列中取,如果p中gFree为空,则向sched对象的全局gFree中获取 if newg == nil { newg = malg(_StackMin)//如果没有空闲的g对象,则创建一个新的g对象,并创建最小栈空间大小的栈 casgstatus(newg, _Gidle, _Gdead)//设置g对象状态为_Gdead,并存入allg切片中 allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack. } if newg.stack.hi == 0 { throw("newproc1: newg missing stack") }
if readgstatus(newg) != _Gdead { throw("newproc1: new g is not Gdead") } //初始化栈空间 totalSize := uintptr(4*goarch.PtrSize + sys.MinFrameSize) // extra space in case of reads slightly beyond frame totalSize = alignUp(totalSize, sys.StackAlign) sp := newg.stack.hi - totalSize spArg := sp if usesLR { // caller's LR *(*uintptr)(unsafe.Pointer(sp)) = 0 prepGoExitFrame(sp) spArg += sys.MinFrameSize } //以下赋值newg.sched非常关键 memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched)) newg.sched.sp = sp//将当前栈顶地址保存 newg.stktopsp = sp//将当前栈顶地址保存 //设置pc值为goexit函数的地址+1 newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function newg.sched.g = guintptr(unsafe.Pointer(newg))//保存newg对象地址 gostartcallfn(&newg.sched, fn)//这行代码非常关键,其作用是模拟goexit函数入口调用了fn newg.gopc = callerpc newg.ancestors = saveAncestors(callergp) newg.startpc = fn.fn if isSystemGoroutine(newg, false) { atomic.Xadd(&sched.ngsys, +1) } else { // Only user goroutines inherit pprof labels. if _g_.m.curg != nil { newg.labels = _g_.m.curg.labels } } // Track initial transition? newg.trackingSeq = uint8(fastrand()) if newg.trackingSeq%gTrackingPeriod == 0 { newg.tracking = true } casgstatus(newg, _Gdead, _Grunnable)//将g对象的状态设置成_Grunnable gcController.addScannableStack(_p_, int64(newg.stack.hi-newg.stack.lo)) //分配goroutine id if _p_.goidcache == _p_.goidcacheend { // Sched.goidgen is the last allocated id, // this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch]. // At startup sched.goidgen=0, so main goroutine receives goid=1. _p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch) _p_.goidcache -= _GoidCacheBatch - 1 _p_.goidcacheend = _p_.goidcache + _GoidCacheBatch } newg.goid = int64(_p_.goidcache) _p_.goidcache++ if raceenabled { newg.racectx = racegostart(callerpc) } if trace.enabled { traceGoCreate(newg, newg.startpc) } releasem(_g_.m)
//入参_p_:当前所在的m绑定的p对象 //入参gp:新建的g对象 //入参next:是否优先放置到runnext funcrunqput(_p_ *p, gp *g, next bool) { if randomizeScheduler && next && fastrandn(2) == 0 {//如果是随机调度模式,则一半概率是不放置到runnext next = false }
if next {//如果优先放置到runnext,则尝试将gp放置到runnext位置,如果runnext本身为空则返回,否则需要将原runnext放到队尾 retryNext: oldnext := _p_.runnext if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) { goto retryNext } if oldnext == 0 { return } // Kick the old runnext out to the regular run queue. gp = oldnext.ptr() }
retry://如果可运行队列未满,则将gp放置到队尾,否则将一半的gp从本地可运行队列放至全局可运行队列中 h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers t := _p_.runqtail if t-h < uint32(len(_p_.runq)) { _p_.runq[t%uint32(len(_p_.runq))].set(gp) atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption return } if runqputslow(_p_, gp, h, t) { return } // the queue is not full, now the put above must succeed goto retry }