split流程
处理协程启动
func (bs *Raftstore) startWorkers(peers []*peer) {ctx := bs.ctxworkers := bs.workersrouter := bs.routerbs.wg.Add(2) // raftWorker, storeWorkerrw := newRaftWorker(ctx, router)go rw.run(bs.closeCh, bs.wg)sw := newStoreWorker(ctx, bs.storeState)go sw.run(bs.closeCh, bs.wg)router.sendStore(message.Msg{Type: message.MsgTypeStoreStart, Data: ctx.store})for i := 0; i < len(peers); i++ {regionID := peers[i].regionId_ = router.send(regionID, message.Msg{RegionID: regionID, Type: message.MsgTypeStart})}engines := ctx.enginecfg := ctx.cfgworkers.splitCheckWorker.Start(runner.NewSplitCheckHandler(engines.Kv, NewRaftstoreRouter(router), cfg))workers.regionWorker.Start(runner.NewRegionTaskHandler(engines, ctx.snapMgr))workers.raftLogGCWorker.Start(runner.NewRaftLogGCTaskHandler())workers.schedulerWorker.Start(runner.NewSchedulerTaskHandler(ctx.store.Id, ctx.schedulerClient, NewRaftstoreRouter(router)))go bs.tickDriver.run()
}
point1:
func (w *Worker) Start(handler TaskHandler) {w.wg.Add(1)go func() {defer w.wg.Done()if s, ok := handler.(Starter); ok {s.Start()}for {Task := <-w.receiverif _, ok := Task.(TaskStop); ok {return}handler.Handle(Task)}}()
}
func (r *splitCheckHandler) Handle(t worker.Task) {spCheckTask, ok := t.(*SplitCheckTask)if !ok {log.Errorf("unsupported worker.Task: %+v", t)return}region := spCheckTask.RegionregionId := region.Idlog.Debugf("executing split check worker.Task: [regionId: %d, startKey: %s, endKey: %s]", regionId,hex.EncodeToString(region.StartKey), hex.EncodeToString(region.EndKey))key := r.splitCheck(regionId, region.StartKey, region.EndKey)if key != nil {_, userKey, err := codec.DecodeBytes(key)if err == nil {// It's not a raw key.// To make sure the keys of same user key locate in one Region, decode and then encode to truncate the timestampkey = codec.EncodeBytes(userKey)}msg := message.Msg{Type: message.MsgTypeSplitRegion,RegionID: regionId,Data: &message.MsgSplitRegion{RegionEpoch: region.GetRegionEpoch(),SplitKey: key,},}err = r.router.Send(regionId, msg)if err != nil {log.Warnf("failed to send check result: [regionId: %d, err: %v]", regionId, err)}} else {log.Debugf("no need to send, split key not found: [regionId: %v]", regionId)}
}
peerSender也就是raftCh
func (d *peerMsgHandler) onPrepareSplitRegion(regionEpoch *metapb.RegionEpoch, splitKey []byte, cb *message.Callback) {if err := d.validateSplitRegion(regionEpoch, splitKey); err != nil {cb.Done(ErrResp(err))return}region := d.Region()d.ctx.schedulerTaskSender <- &runner.SchedulerAskSplitTask{Region: region,SplitKey: splitKey,Peer: d.Meta,Callback: cb,}
}
请求启动过程
触发上面的point1
func (r *RaftstoreRouter) SendRaftCommand(req *raft_cmdpb.RaftCmdRequest, cb *message.Callback) error {cmd := &message.MsgRaftCmd{Request: req,Callback: cb,}regionID := req.Header.RegionIdreturn r.router.send(regionID, message.NewPeerMsg(message.MsgTypeRaftCmd, regionID, cmd))
}
handleMsg---------》
心跳更新region
func (m *MockSchedulerClient) RegionHeartbeat(req *schedulerpb.RegionHeartbeatRequest) error {if err := m.checkBootstrap(); err != nil {return err}m.Lock()defer m.Unlock()regionID := req.Region.GetId()for _, p := range req.Region.GetPeers() {delete(m.pendingPeers, p.GetId())}for _, p := range req.GetPendingPeers() {m.pendingPeers[p.GetId()] = p}m.leaders[regionID] = req.Leaderif err := m.handleHeartbeatVersion(req.Region); err != nil {return err}if err := m.handleHeartbeatConfVersion(req.Region); err != nil {return err}resp := &schedulerpb.RegionHeartbeatResponse{Header: &schedulerpb.ResponseHeader{ClusterId: m.clusterID},RegionId: regionID,RegionEpoch: req.Region.GetRegionEpoch(),TargetPeer: req.Leader,}if op := m.operators[regionID]; op != nil {if m.tryFinished(op, req.Region, req.Leader) {delete(m.operators, regionID)} else {m.makeRegionHeartbeatResponse(op, resp)}log.Debugf("[region %d] schedule %v", regionID, op)}store := m.stores[req.Leader.GetStoreId()]store.heartbeatResponseHandler(resp)return nil
}