剥开比原看代码07:比原节点收到“请求区块数据”的信息后如何应答?

作者:freewind

比原项目仓库:

Github地址:https://github.com/Bytom/bytom

Gitee地址:https://gitee.com/BytomBlockchain/bytom

在上一篇,我们知道了比原是如何把“请求区块数据”的信息BlockRequestMessage发送给peer节点的,那么本文研究的重点就是,当peer节点收到了这个信息,它将如何应答?

那么这个问题如果细分的话,也可以分为三个小问题:

  1. 比原节点是如何收到对方发过来的信息的?
  2. 收到BlockRequestMessage后,将会给对方发送什么样的信息?
  3. 这个信息是如何发送出去的?

我们先从第一个小问题开始。

比原节点是如何接收对方发过来的信息的?

如果我们在代码中搜索BlockRequestMessage,会发现只有在ProtocolReactor.Receive方法中针对该信息进行了应答。那么问题的关键就是,比原是如何接收对方发过来的信息,并且把它转交给ProtocolReactor.Receive的。

如果我们对前一篇《比原是如何把请求区块数据的信息发出去的》有印象的话,会记得比原在发送信息时,最后会把信息写入到MConnection.bufWriter中;与之相应的,MConnection还有一个bufReader,用于读取数据,它也是与net.Conn绑定在一起的:

p2p/connection.go#L114-L118

func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {mconn := &MConnection{conn:        conn,bufReader:   bufio.NewReaderSize(conn, minReadBufferSize),bufWriter:   bufio.NewWriterSize(conn, minWriteBufferSize),

(其中minReadBufferSize的值为常量1024

所以,要读取对方发来的信息,一定会读取bufReader。经过简单的搜索,我们发现,它也是在MConnection.Start中启动的:

p2p/connection.go#L152-L159

func (c *MConnection) OnStart() error {// ...go c.sendRoutine()go c.recvRoutine()// ...
}

其中的c.recvRoutine()就是我们本次所关注的。它上面的c.sendRoutine是用来发送的,是前一篇文章中我们关注的重点。

继续c.recvRoutine()

p2p/connection.go#L403-L502

func (c *MConnection) recvRoutine() {// ...for {c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true)// ...pktType := wire.ReadByte(c.bufReader, &n, &err)c.recvMonitor.Update(int(n))// ...switch pktType {// ...case packetTypeMsg:pkt, n, err := msgPacket{}, int(0), error(nil)wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err)c.recvMonitor.Update(int(n))// ...channel, ok := c.channelsIdx[pkt.ChannelID]// ...msgBytes, err := channel.recvMsgPacket(pkt)// ...if msgBytes != nil {// ...c.onReceive(pkt.ChannelID, msgBytes)}// ...}}// ...
}

经过简化以后,这个方法分成了三块内容:

  1. 第一块就限制接收速率,以防止恶意结点突然发送大量数据把节点撑死。跟发送一样,它的限制是500K/s
  2. 第二块是从c.bufReader中读取出下一个数据包的类型。它的值目前有三个,两个跟心跳有关:packetTypePingpacketTypePong,另一个表示是正常的信息数据类型packetTypeMsg,也是我们需要关注的
  3. 第三块就是继续从c.bufReader中读取出完整的数据包,然后根据它的ChannelID找到相应的channel去处理它。ChannelID有两个值,分别是BlockchainChannelPexChannel,我们目前只需要关注前者即可,它对应的reactor是ProtocolReactor。当最后调用c.onReceive(pkt.ChannelID, msgBytes)时,读取的二进制数据msgBytes就会被ProtocolReactor.Receive处理

我们的重点是看第三块内容。首先是channel.recvMsgPacket(pkt),即通道是怎么从packet包里读取到相应的二进制数据的呢?

p2p/connection.go#L667-L682

func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) {// ...ch.recving = append(ch.recving, packet.Bytes...)if packet.EOF == byte(0x01) {msgBytes := ch.recving// ...ch.recving = ch.recving[:0]return msgBytes, nil}return nil, nil
}

这个方法我去掉了一些错误检查和关于性能方面的注释,有兴趣的同学可以点接上方的源代码查看,这里就忽略了。

这段代码主要是利用了一个叫recving的通道,把packet中持有的字节数组加到它后面,然后再判断该packet是否代表整个信息结束了,如果是的话,则把ch.recving的内容完整返回,供调用者处理;否则的话,返回一个nil,表示还没拿完,暂时处理不了。在前一篇文章中关于发送数据的地方可以与这里对应,只不过发送方要麻烦的多,需要三个通道sendQueuesendingsend才能实现,这边接收方就简单了。

然后回到前面的方法MConnection.recvRoutine,我们继续看最后的c.onReceive调用。这个onReceive实际上是一个由别人赋值给该channel的一个函数,它位于MConnection创建的地方:

p2p/peer.go#L292-L310

func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection {onReceive := func(chID byte, msgBytes []byte) {reactor := reactorsByCh[chID]if reactor == nil {if chID == PexChannel {return} else {cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))}}reactor.Receive(chID, p, msgBytes)}onError := func(r interface{}) {onPeerError(p, r)}return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
}

逻辑也比较简单,就是当前面的c.onReceive(pkt.ChannelID, msgBytes)调用时,它会根据传入的chID找到相应的Reactor,然后执行其Receive方法。对于本文来说,就会进入到ProtocolReactor.Receive

那我们继续看ProtocolReactor.Receive:

netsync/protocol_reactor.go#L179-L247

func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {_, msg, err := DecodeMessage(msgBytes)// ...switch msg := msg.(type) {case *BlockRequestMessage:// ...
}

其中的DecodeMessage(...)就是把传入的二进制数据反序列化成一个BlockchainMessage对象,该对象是一个没有任何内容的interface,它有多种实现类型。我们在后面继续对该对象进行判断,如果它是BlockRequestMessage类型的信息,我们就会继续做相应的处理。处理的代码我在这里暂时省略了,因为它是属于下一个小问题的,我们先不考虑。

好像不知不觉我们就把第一个小问题的后半部分差不多搞清楚了。那么前半部分是什么?我们在前面说,读取bufReader的代码的起点是在MConnection.Start中,那么前半部分就是:比原从启动开始中,是在什么情况下怎样一步步走到MConnection.Start的呢?

好在前半部分的问题我们在前一篇文章《比原是如何把请求区块数据的信息发出去的》中进行了专门的讨论,这里就不讲了,有需要的话可以再过去看一下(可以先看最后“总结”那一小节)。

下面我们进入第二个小问题:

收到BlockRequestMessage后,将会给对方发送什么样的信息?

这里就是接着前面的ProtocolReactor.Receive继续向下讲了。首先我们再贴一下它的较完整的代码:

netsync/protocol_reactor.go#L179-L247

func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {_, msg, err := DecodeMessage(msgBytes)// ...switch msg := msg.(type) {case *BlockRequestMessage:var block *types.Blockvar err errorif msg.Height != 0 {block, err = pr.chain.GetBlockByHeight(msg.Height)} else {block, err = pr.chain.GetBlockByHash(msg.GetHash())}// ...response, err := NewBlockResponseMessage(block)// ...src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})// ...
}

可以看到,逻辑还是比较简单的,即根据对方发过来的BlockRequestMessage中指定的height或者hash信息,在本地的区块链数据中找到相应的block,组成BlockResponseMessage发过去就行了。

其中chain.GetBlockByHeight(...)chain.GetBlockByHash(...)如果详细说明的话,需要深刻理解区块链数据在比原节点中是如何保存的,我们在本文先不讲,等到后面专门研究。

在这里,我觉得我们只需要知道我们会查询区块数据并且构造出一个BlockResponseMessage,再通过BlockchainChannel这个通道发送出去就可以了。

最后一句代码中调用了src.TrySend方法,它是把信息向对方peer发送过去。(其中的src就是指的对方peer)

那么,它到底是怎么发送出去的呢?下面我们进入最后一个小问题:

这个BlockResponseMessage信息是如何发送出去的?

我们先看看peer.TrySend代码:

p2p/peer.go#L242-L247

func (p *Peer) TrySend(chID byte, msg interface{}) bool {if !p.IsRunning() {return false}return p.mconn.TrySend(chID, msg)
}

它在内部将会调用MConnection.TrySend方法,其中chIDBlockchainChannel,也就是它对应的Reactor是ProtocolReactor

再接着就是我们熟悉的MConnection.TrySend,由于它在前一篇文章中进行了全面的讲解,在本文就不提了,如果需要可以过去翻看一下。

那么今天的问题就算是解决啦。

到这里,我们总算能够完整的理解清楚,当我们向一个比原节点请求“区块数据”,我们这边需要怎么做,对方节点又需要怎么做了。

 

转载于:https://www.cnblogs.com/bytom/p/9355998.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/485585.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux连接外部库时候编译,交叉编译时如何使用外部库?

关于你的一般问题:为什么C库有效:C库是交叉工具链的一部分。这就是找到标题并且程序正确链接和运行的原因。对于其他一些非常基本的系统库(如libm和libstdc)也是如此(并非在每种情况下都依赖于工具链配置)。通常,在处理交叉开发时&#xff0c…

LVS入门篇(五)之LVS+Keepalived实战

一、实验架构和环境说明 (1)本次基于VMware Workstation搭建一个四台Linux(CentOS 7.4)系统所构成的一个服务器集群,其中两台负载均衡服务器(一台为主机,另一台为备机),另…

任正非深度剖析技术差距:我们为何憎恨化学?

文章来源:今日头条、化工技术宝典关于华为芯片难题和华为情结,近日觉得很有必要必须表达点什么,算是纪念2020这个寒冬,以表明国人还是有人是清醒的,哪怕是装睡着,但心中是明白的。2020年11月10日上午10点&a…

linux应用程序课程设计,linux操作系统与应用课程设计 .pdf

XXXXXXXX 学校Linux 操作系统与应用课程设计题 目 基于RHEL6.4 的ftp 服务器搭建院 (部) 信息工程学院班 级姓 名 XXX学 号 XXXXXXXXX指导教师2019 年 月 日1目录一、设计背景3二、设计目的3三、设计要求3四、软件设计44.1 搭建FTP 服务器44.2C 语言编程4五、调试运行45.1C 语言…

DBCC SHRINKFILE收缩日志/收缩数据库/收缩文件

DBCC SHRINKFILE 收缩相关数据库的指定数据文件或日志文件大小。 语法 DBCC SHRINKFILE ( { file_name | file_id } { [ ,target_size ] | [ , { EMPTYFILE | NOTRUNCATE | TRUNCATEONLY } ] } ) 参数 file_name 是已收缩文件的逻辑名称。文件…

Leetcode--2. 两数相加

给出两个 非空 的链表用来表示两个非负的整数。其中,它们各自的位数是按照 逆序 的方式存储的,并且它们的每个节点只能存储 一位 数字。 如果,我们将这两个数相加起来,则会返回一个新的链表来表示它们的和。 您可以假设除了数字…

【Brain】复旦类脑研究院:破解大脑奥秘,为实现人工智能自我思考奠定基础...

文章来源:新民晚报图说:复旦类脑研究院 采访对象供图理解大脑的结构与功能是21世纪最具挑战性的前沿科学问题,谁揭开大脑运作的神秘面纱,谁就在重大脑疾病防治和全球智能产业革命中抢占了先机。利用磁共振成像技术观察大脑内部结构&#xff0…

linux system函数传参,Linux系统调用例程system_call和参数传递

系统调用接口调用“int $Ox8O”指令进入内核并准各了相关参数后,剩下的工作就由系统调用例程来进行。Linux定义的系统调用 例程的入口为system_call。下面具体介绍system_call所做的工作。system_call是用汇编语言编写的,在i386体系中&#x…

Sci-Hub重生了,这回用上了分布式网络

来源:Python开发者在网站域名屡次被撤销之后, Sci-Hub 创始人 Alexandra Elbakyan 在分布式域名网络 Handshake 上注册了新的网站。现在,每个用户都可以直接通过服务门户和 NextDNS 直接访问 Sci-Hub。NextDNS:https://learn.name…

Leetcode--24. 两两交换链表中的结点

给定一个链表,两两交换其中相邻的节点,并返回交换后的链表。 你不能只是单纯的改变节点内部的值,而是需要实际的进行节点交换。 示例: 给定 1->2->3->4, 你应该返回 2->1->4->3. 提交的代码: /** * Definition for sin…

linux db2备份,db2实现备份

db2实现备份/var/db2/db2inst1/sqllib/db2profile;dbbackpath/db2/db2inst1/1;dblogpath/db2/NODE0000;mv $dbbackpath/file/*.gz $dbbackpath/all;mv $dbbackpath/logs/*.gz $dbbackpath/all;mv $dbbackpath/logs/*.tar $dbbackpath/all;db2 backup db urp_rs online to $dbba…

Dede更新提示DedeTag Engine Create File False的解决办法

第一种情况:列表、频道、文章等命名规则未填写或填写错误 此种情况较为少见,因为初级用户一般不会去修改这些东西,情况可以大致分为: 命名规则未填写(即为空)解决方法:只需填好相应的规则即可&a…

超级人工智能何时能实现?

来源:赛先生制版编辑 :Morgan撰文:斯图尔特罗素(加州大学伯克利分校计算机科学家,人类兼容人工智能中心主任)01近未来1997年5月3日,IBM制造的国际象棋计算机“深蓝”和国际象棋世界冠军加里卡斯…

Ajax:异步js和xml

如果通过之前的转发,或者重定向,很多问题没法解决 比如我给某个视频点个赞,你经过转发或者重定向,最后虽然点赞成功了,但页面刷新了,视频从头开始放了。 异步刷新:如果网页某一个地方需要修改&…

linux cache buffer区别,Linux buffer/cache异同

buffers与cached1)、异同点在Linux 操作系统中,当应用程序需要读取文件中的数据时,操作系统先分配一些内存,将数据从磁盘读入到这些内存中,然后再将数据分发给应用程序;当需要往文件中写 数据时,操作系统先…

sql中join与left-join图解区别

select a.* from YG_BRSYK a left join(SELECT DISTINCT SYXH,STUFF((SELECT 、MS FROM #lsb where SYXHt.SYXH FOR XML PATH()),1,1,) AS MSFROM #lsb as t) c on a.SYXHc.SYXH WHERE c.MS IS NOT NULL order by RYBQ --注:left join...on 为左关联,保…

很遗憾,自然语言理解是AI尚未攻克的领域

来源: Venture Beat作者: Pieter Buteneers编译: 科技行者短短几年之内,深度学习算法得到了长足发展,不仅在棋类游戏中击败了全球最顶尖的选手,也能够以等同于、甚至超越人类的准确率识别人脸。但事实证明,人类语言仍是一项独特且…

分层结构,协议,接口,服务

发送文件前的工作: 1. 发起通信的计算机必须将数据通信的通路进行激活 2. 要告诉网络如何识别目的主机 3. 发起通信的计算机要查明目的主机是否开机,并且网络连接正常 4. 发起通信的计算机要清楚,对方计算机中文件管理程序是否做好准备工…

linux怎么抓sip包,Ubuntu下使用Wireshark进行抓包分析(含SIP和RTP包)

遇到需要在Linux下抓包分析的问题,便用到了wireshark,非常强大的抓包分析软件,直接在系统里面安装,然后使用明亮抓包即可!我这里用的是Ubuntuserver版,执行安装:1、apt-get install wireshark安装成功后使用…

jsp+javabean实现购物车

采用Model1(jspjavabean) 实现DBHelper类创建实体类创建业务逻辑类(dao) DBHelper类的设计package util;import java.sql.Connection; import java.sql.DriverManager;public class DBHelper {private static final String drive…