BlockKeeper的逻辑是什么

免费建站   2024年05月10日 18:31  

这篇文章主要讲解了“BlockKeeper的逻辑是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“BlockKeeper的逻辑是什么”吧!

我们要发送什么样的数据请求,才能让比原节点把它持有的区块数据发给我?找到发送请求的代码

首先我们先要在代码中定位到,比原到底是在什么时候来向对方节点发送请求的。

在前一篇讲的是如何建立连接并验证身份,那么发出数据请求的操作,一定在上次的代码之后。按照这个思路,我们在SyncManager类中Switch启动之后,找到了一个叫BlockKeeper的类,相关的操作是在它里面完成的。

下面是老规矩,还是从启动开始,但是会更简化一些:

cmd/bytomd/main.go#L54

funcmain(){cmd:=cli.PrepareBaseCmd(commands.RootCmd,"TM",os.ExpandEnv(config.DefaultDataDir()))cmd.Execute()}

cmd/bytomd/commands/run_node.go#L41

funcrunNode(cmd*cobra.Command,args[]string)error{n:=node.NewNode(config)if_,err:=n.Start();err!=nil{//...}

node/node.go#L169

func(n*Node)OnStart()error{//...n.syncManager.Start()//...}

netsync/handle.go#L141

func(sm*SyncManager)Start(){gosm.netStart()//...gosm.syncer()}

注意sm.netStart(),我们在一篇中建立连接并验证身份的操作,就是在它里面完成的。而这次的这个问题,是在下面的sm.syncer()中完成的。

另外注意,由于这两个函数调用都使用了goroutine,所以它们是同时进行的。

sm.syncer()的代码如下:

netsync/sync.go#L46

func(sm*SyncManager)syncer(){sm.fetcher.Start()defersm.fetcher.Stop()//...for{select{case<-sm.newPeerCh:log.Info("Newpeerconnected.")//Makesurewehavepeerstoselectfrom,thensyncifsm.sw.Peers().Size()<minDesiredPeerCount{break}gosm.synchronise()//..}}

这里混入了一个叫fetcher的奇怪的东西,名字看起来好像是专门去抓取数据的,我们要找的是它吗?

可惜不是,fetcher的作用是从多个peer那里拿到了区块数据之后,对数据进行整理,把有用的放到本地链上。我们在以后会研究它,所以这里不展开讨论。

接着是一个for循环,当发现通道newPeerCh有了新数据(也就是有了新的节点连接上了),会判断一下当前自己连着的节点是否够多(大于等于minDesiredPeerCount,值为5),够多的话,就会进入sm.synchronise(),进行数据同步。

这里为什么要多等几个节点,而不是一连上就马上同步呢?我想这是希望有更多选择的机会,找到一个数据够多的节点。

sm.synchronise()还是属于SyncManager的方法。在真正调用到BlockKeeper的方法之前,它还做了一些比如清理已经断开的peer,找到最适合同步数据的peer等。其中“清理peer”的工作涉及到不同的对象持有的peer集合间的同步,略有些麻烦,但对当前问题帮助不大,所以我打算把它们放在以后的某个问题中回答(比如“当一个节点断开了,比原会有什么样的处理”),这里就先省略。

sm.synchronise()代码如下:

netsync/sync.go#L77

func(sm*SyncManager)synchronise(){log.Info("bkpeernum:",sm..peers.Len(),"swpeernum:",sm.sw.Peers().Size(),"",sm.sw.Peers().List())//...peer,bestHeight:=sm.peers.BestPeer()//...ifbestHeight>sm.chain.BestBlockHeight(){//...sm..BlockRequestWorker(peer.Key,bestHeight)}}

可以看到,首先是从众多的peers中,找到最合适的那个。什么叫Best呢?看一下BestPeer()的定义:

netsync/peer.go#L266

func(ps*peerSet)BestPeer()(*p2p.Peer,uint64){//...for_,p:=rangeps.peers{ifbestPeer==nil||p.height>bestHeight{bestPeer,bestHeight=p.swPeer,p.height}}returnbestPeer,bestHeight}

其实就是持有区块链数据最长的那个。

找到了BestPeer之后,就调用sm.blockKeeper.BlockRequestWorker(peer.Key, bestHeight)方法,从这里,正式进入BlockKeeper -- 也就是本文的主角 -- 的世界。

BlockKeeper

blockKeeper.BlockRequestWorker的逻辑比较复杂,它包含了:

根据自己持有的区块数据来计算需要同步的数据

向前面找到的最佳节点发送数据请求

拿到对方发过来的区块数据

对数据进行处理

广播新状态

处理各种出错情况,等等

由于本文中只关注“发送请求”,所以一些与之关系不大的逻辑我会忽略掉,留待以后再讲。

在“发送请求”这里,实际也包含了两种情形,一种简单的,一种复杂的:

简单的:假设不存在分叉,则直接检查本地高度最高的区块,然后请求下一个区块

复杂的:考虑分叉的情况,则当前本地的区块可能就存在分叉,那么到底应该请求哪个区块,就需要慎重考虑

由于第2种情况对于本文来说过于复杂(因为需要深刻理解比原链中分叉的处理逻辑),所以在本文中将把问题简化,只考虑第1种。而分叉的处理,将放在以后讲解。

下面是把blockKeeper.BlockRequestWorker中的代码简化成了只包含第1种情况:

netsync/block_keeper.go#L72

func(bk*blockKeeper)BlockRequestWorker(peerIDstring,maxPeerHeightuint64)error{num:=bk.chain.BestBlockHeight()+1reqNum:=uint64(0)reqNum=num//...bkPeer,ok:=bk.peers.Peer(peerID)swPeer:=bkPeer.getPeer()//...block,err:=bk.BlockRequest(peerID,reqNum)//...}

在这种情况下,我们可以认为bk.chain.BestBlockHeight()中的Best,指的是本地持有的不带分叉的区块链高度最高的那个。(需要提醒的是,如果存在分叉情况,则Best不一定是高度最高的那个)

那么我们就可以直接向最佳peer请求下一个高度的区块,它是通过bk.BlockRequest(peerID, reqNum)实现的:

netsync/block_keeper.go#L152

func(bk*blockKeeper)BlockRequest(peerIDstring,heightuint64)(*types.Block,error){varblock*types.Blockiferr:=bk.blockRequest(peerID,height);err!=nil{returnnil,errReqBlock}//...for{select{casependingResponse:=<-bk.pendingProcessCh:block=pendingResponse.block//...returnblock,nil//...}}}

在上面简化后的代码中,主要分成了两个部分。一个是发送请求bk.blockRequest(peerID, height),这是本文的重点;它下面的for-select部分,已经是在等待并处理对方节点的返回数据了,这部分我们今天先略过不讲。

bk.blockRequest(peerID, height)这个方法,从逻辑上又可以分成两部分:

构造出请求的信息

把信息发送给对方节点

构造出请求的信息

bk.blockRequest(peerID, height)经过一连串的方法调用之后,使用height构造出了一个BlockRequestMessage对象,代码如下:

netsync/block_keeper.go#L148

func(bk*blockKeeper)blockRequest(peerIDstring,heightuint64)error{returnbk.peers.requestBlockByHeight(peerID,height)}

netsync/peer.go#L332

func(ps*peerSet)requestBlockByHeight(peerIDstring,heightuint64)error{peer,ok:=ps.Peer(peerID)//...returnpeer.requestBlockByHeight(height)}

netsync/peer.go#L73

func(p*peer)requestBlockByHeight(heightuint64)error{msg:=&BlockRequestMessage{Height:height}p.swPeer.TrySend(BlockchainChannel,struct{BlockchainMessage}{msg})returnnil}

到这里,终于构造出了所需要的BlockRequestMessage,其实主要就是把height告诉peer。

然后,通过Peer的TrySend()把该信息发出去。

发送请求

在TrySend中,主要是通过github.com/tendermint/go-wire库将其序列化,再发送给对方。看起来应该是很简单的操作吧,先预个警,还是挺绕的。

当我们进入TrySend()后:

p2p/peer.go#L242

func(p*Peer)TrySend(chIDbyte,msginterface{})bool{if!p.IsRunning(){returnfalse}returnp.mconn.TrySend(chID,msg)}

发现它把锅丢给了p.mconn.TrySend方法,那么mconn是什么?chID又是什么?

mconn是MConnection的实例,它是从哪儿来的?它应该在之前的某个地方初始化了,否则我们没法直接调用它。所以我们先来找到它初始化的地方。

经过一番寻找,发现原来是在前一篇之后,即比原节点与另一个节点完成了身份验证之后,具体的位置在Switch类启动的地方。

我们这次直接从Swtich的OnStart作为起点:

p2p/switch.go#L186

func(sw*Switch)OnStart()error{//...//Startlistenersfor_,listener:=rangesw.listeners{gosw.listenerRoutine(listener)}returnnil}

p2p/switch.go#L498

func(sw*Switch)listenerRoutine(lListener){for{inConn,ok:=<-l.Connections()//...err:=sw.addPeerWithConnectionAndConfig(inConn,sw.peerConfig)//...}}

p2p/switch.go#L645

func(sw*Switch)addPeerWithConnectionAndConfig(connnet.Conn,config*PeerConfig)error{//...peer,err:=newInboundPeerWithConfig(conn,sw.reactorsByCh,sw.chDescs,sw.StopPeerForError,sw.nodePrivKey,config)//...}

p2p/peer.go#L87

funcnewInboundPeerWithConfig(connnet.Conn,reactorsByChmap[byte]Reactor,chDescs[]*ChannelDescriptor,onPeerErrorfunc(*Peer,interface{}),ourNodePrivKeycrypto.PrivKeyEd25519,config*PeerConfig)(*Peer,error){returnnewPeerFromConnAndConfig(conn,false,reactorsByCh,chDescs,onPeerError,ourNodePrivKey,config)}

p2p/peer.go#L91

funcnewPeerFromConnAndConfig(rawConnnet.Conn,outboundbool,reactorsByChmap[byte]Reactor,chDescs[]*ChannelDescriptor,onPeerErrorfunc(*Peer,interface{}),ourNodePrivKeycrypto.PrivKeyEd25519,config*PeerConfig)(*Peer,error){conn:=rawConn//...ifconfig.AuthEnc{//...conn,err=MakeSecretConnection(conn,ourNodePrivKey)//...}//KeyandNodeInfoaresetafterHandshakep:=&Peer{outbound:outbound,conn:conn,config:config,Data:cmn.NewCMap(),}p.mconn=createMConnection(conn,p,reactorsByCh,chDescs,onPeerError,config.MConfig)p.BaseService=*cmn.NewBaseService(nil,"Peer",p)returnp,nil}

终于找到了。上面方法中的MakeSecretConnection就是与对方节点交换公钥并进行身份验证的地方,下面的p.mconn = createMConnection(...)就是创建mconn的地方。

继续进去:

p2p/peer.go#L292

funccreateMConnection(connnet.Conn,p*Peer,reactorsByChmap[byte]Reactor,chDescs[]*ChannelDescriptor,onPeerErrorfunc(*Peer,interface{}),config*MConnConfig)*MConnection{onReceive:=func(chIDbyte,msgBytes[]byte){reactor:=reactorsByCh[chID]ifreactor==nil{ifchID==PexChannel{return}else{cmn.PanicSanity(cmn.Fmt("Unknownchannel%X",chID))}}reactor.Receive(chID,p,msgBytes)}onError:=func(rinterface{}){onPeerError(p,r)}returnNewMConnectionWithConfig(conn,chDescs,onReceive,onError,config)}

原来mconn是MConnection的实例,它是通过NewMConnectionWithConfig创建的。

看了上面的代码,发现这个MConnectionWithConfig与普通的net.Conn并没有太大的区别,只不过是当收到了对方发来的数据后,会根据指定的chID调用相应的Reactor的Receive方法来处理。所以它起到了将数据分发给Reactor的作用。

为什么需要这样的分发操作呢?这是因为,在比原中,节点之间交换数据,有多种不同的方式:

一种是规定了详细的数据交互协议(比如有哪些信息类型,分别代表什么意思,什么情况下发哪个,如何应答等),在ProtocolReactor中实现,它对应的chID是BlockchainChannel,值为byte(0x40)

另一种使用了与BitTorrent类似的文件共享协议,叫PEX,在PEXReactor中实现,它对应的chID是PexChannel,值为byte(0x00)

所以节点之间发送信息的时候,需要知道对方发过来的数据对应的是哪一种方式,然后转交给相应的Reactor去处理。

在比原中,前者是主要的方式,后者起到辅助作用。我们目前的文章中涉及到的都是前者,后者将在以后专门研究。

p.mconn.TrySend

当我们知道了p.mconn.TrySend中的mconn是什么,并且在什么时候初始化以后,下面就可以进入它的TrySend方法了。

p2p/connection.go#L243

func(c*MConnection)TrySend(chIDbyte,msginterface{})bool{//...channel,ok:=c.channelsIdx[chID]//...ok=channel.trySendBytes(wire.BinaryBytes(msg))ifok{//WakeupsendRoutineifnecessaryselect{casec.send<-struct{}{}:default:}}returnok}

可以看到,它找到相应的channel后(在这里应该是ProtocolReactor对应的channel),调用channel的trySendBytes方法。在发送数据的时候,使用了github.com/tendermint/go-wire库,将msg序列化为二进制数组。

p2p/connection.go#L602

func(ch*Channel)trySendBytes(bytes[]byte)bool{select{casech.sendQueue<-bytes:atomic.AddInt32(&ch.sendQueueSize,1)returntruedefault:returnfalse}}

原来它是把要发送的数据,放到了该channel对应的sendQueue中,交由别人来发送。具体是由谁来发送,我们马上要就找到它。

细心的同学会发现,Channel除了trySendBytes方法外,还有一个sendBytes(在本文中没有用上):

p2p/connection.go#L589

func(ch*Channel)sendBytes(bytes[]byte)bool{select{casech.sendQueue<-bytes:atomic.AddInt32(&ch.sendQueueSize,1)returntruecase<-time.After(defaultSendTimeout):returnfalse}}

它们两个的区别是,前者尝试把待发送数据bytes放入ch.sendQueue时,如果能放进去,则返回true,否则马上失败,返回false,所以它是非阻塞的。而后者,如果放不进去(sendQueue已满,那边还没处理完),则等待defaultSendTimeout(值为10秒),然后才会失败。另外,sendQueue的容量默认为1。

感谢各位的阅读,以上就是“BlockKeeper的逻辑是什么”的内容了,经过本文的学习后,相信大家对BlockKeeper的逻辑是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

域名注册
购买VPS主机

您或许对下面这些文章有兴趣:                    本月吐槽辛苦排行榜

看贴要回贴有N种理由!看帖不回贴的后果你懂得的!


评论内容 (*必填):
(Ctrl + Enter提交)   

部落快速搜索栏

各类专题梳理

网站导航栏

X
返回顶部