区块链教程Fabric1.0源代码分析流言算法Gossip服务端二

  区块链教程Fabric1.0源代码分析流言算法Gossip服务端二

Fabric 1.0源代码笔记 之 gossip(流言算法) #GossipServer(Gossip服务端)

5.2、commImpl结构体方法

//conn.serviceConnection(),启动连接服务
func (c *commImpl) GossipStream(stream proto.Gossip_GossipStreamServer) error
//return &proto.Empty{}
func (c *commImpl) Ping(context.Context, *proto.Empty) (*proto.Empty, error)func (c *commImpl) GetPKIid() common.PKIidType
//向指定节点发送消息
func (c *commImpl) Send(msg *proto.SignedGossipMessage, peers ...*RemotePeer)
//探测远程节点是否有响应,_, err = cl.Ping(context.Background(), &proto.Empty{})
func (c *commImpl) Probe(remotePeer *RemotePeer) error
//握手验证远程节点,_, err = cl.Ping(context.Background(), &proto.Empty{})
func (c *commImpl) Handshake(remotePeer *RemotePeer) (api.PeerIdentityType, error)
func (c *commImpl) Accept(acceptor common.MessageAcceptor) <-chan proto.ReceivedMessage
func (c *commImpl) PresumedDead() <-chan common.PKIidType
func (c *commImpl) CloseConn(peer *RemotePeer)
func (c *commImpl) Stop()//创建并启动gRPC Server,以及注册GossipServer实例
func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity api.PeerIdentityType,
//将GossipServer实例注册至peerServer
func NewCommInstance(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper,
func extractRemoteAddress(stream stream) string
func readWithTimeout(stream interface{}, timeout time.Duration, address string) (*proto.SignedGossipMessage, error) 
//创建gRPC Server,grpc.NewServer(serverOpts...)
func createGRPCLayer(port int) (*grpc.Server, net.Listener, api.PeerSecureDialOpts, []byte)//创建与服务端连接
func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error)
//向指定节点发送消息
func (c *commImpl) sendToEndpoint(peer *RemotePeer, msg *proto.SignedGossipMessage)
//return atomic.LoadInt32(&c.stopping) == int32(1)
func (c *commImpl) isStopping() bool
func (c *commImpl) emptySubscriptions()
func (c *commImpl) authenticateRemotePeer(stream stream) (*proto.ConnectionInfo, error)
func (c *commImpl) disconnect(pkiID common.PKIidType)
func (c *commImpl) createConnectionMsg(pkiID common.PKIidType, certHash []byte, cert api.PeerIdentityType, signer proto.Signer) (*proto.SignedGossipMessage, error)
//代码在gossip/comm/comm_impl.go

5.2.1、func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity api.PeerIdentityType,secureDialOpts api.PeerSecureDialOpts, dialOpts ...grpc.DialOption) (Comm, error)

创建并启动gRPC Server,以及注册GossipServer实例

func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity api.PeerIdentityType,secureDialOpts api.PeerSecureDialOpts, dialOpts ...grpc.DialOption) (Comm, error) {var ll net.Listenervar s *grpc.Servervar certHash []byteif len(dialOpts) == 0 {//peer.gossip.dialTimeout,gRPC连接拨号的超时dialOpts = []grpc.DialOption{grpc.WithTimeout(util.GetDurationOrDefault("peer.gossip.dialTimeout", defDialTimeout))}}if port > 0 {//创建gRPC Server,grpc.NewServer(serverOpts...)s, ll, secureDialOpts, certHash = createGRPCLayer(port)}commInst := &commImpl{selfCertHash:   certHash,PKIID:          idMapper.GetPKIidOfCert(peerIdentity),idMapper:       idMapper,logger:         util.GetLogger(util.LoggingCommModule, fmt.Sprintf("%d", port)),peerIdentity:   peerIdentity,opts:           dialOpts,secureDialOpts: secureDialOpts,port:           port,lsnr:           ll,gSrv:           s,msgPublisher:   NewChannelDemultiplexer(),lock:           &sync.RWMutex{},deadEndpoints:  make(chan common.PKIidType, 100),stopping:       int32(0),exitChan:       make(chan struct{}, 1),subscriptions:  make([]chan proto.ReceivedMessage, 0),}commInst.connStore = newConnStore(commInst, commInst.logger)if port > 0 {commInst.stopWG.Add(1)go func() {defer commInst.stopWG.Done()s.Serve(ll) //启动gRPC Server}()//commInst注册到gRPC Serverproto.RegisterGossipServer(s, commInst)}return commInst, nil
}//代码在gossip/comm/comm_impl.go

5.2.2、func NewCommInstance(s grpc.Server, cert tls.Certificate, idStore identity.Mapper,peerIdentity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts,dialOpts ...grpc.DialOption) (Comm, error)

将GossipServer实例注册至peerServer

func NewCommInstance(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper,peerIdentity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts,dialOpts ...grpc.DialOption) (Comm, error) {dialOpts = append(dialOpts, grpc.WithTimeout(util.GetDurationOrDefault("peer.gossip.dialTimeout", defDialTimeout)))//构造commImplcommInst, err := NewCommInstanceWithServer(-1, idStore, peerIdentity, secureDialOpts, dialOpts...)if cert != nil {inst := commInst.(*commImpl)inst.selfCertHash = certHashFromRawCert(cert.Certificate[0])}proto.RegisterGossipServer(s, commInst.(*commImpl))return commInst, nil
}//代码在gossip/comm/comm_impl.go

//创建与服务端连接

5.2.3、func (c commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (connection, error)

func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error) {var err errorvar cc *grpc.ClientConnvar stream proto.Gossip_GossipStreamClientvar pkiID common.PKIidTypevar connInfo *proto.ConnectionInfovar dialOpts []grpc.DialOptiondialOpts = append(dialOpts, c.secureDialOpts()...)dialOpts = append(dialOpts, grpc.WithBlock())dialOpts = append(dialOpts, c.opts...)cc, err = grpc.Dial(endpoint, dialOpts...)cl := proto.NewGossipClient(cc)if _, err = cl.Ping(context.Background(), &proto.Empty{}); err != nil {cc.Close()return nil, err}ctx, cf := context.WithCancel(context.Background())stream, err = cl.GossipStream(ctx)connInfo, err = c.authenticateRemotePeer(stream)pkiID = connInfo.IDconn := newConnection(cl, cc, stream, nil)conn.pkiID = pkiIDconn.info = connInfoconn.logger = c.loggerconn.cancel = cfh := func(m *proto.SignedGossipMessage) {c.msgPublisher.DeMultiplex(&ReceivedMessageImpl{conn:                conn,lock:                conn,SignedGossipMessage: m,connInfo:            connInfo,})}conn.handler = hreturn conn, nil
}
//代码在gossip/comm/comm_impl.go

6、connectionStore和connection结构体及方法

6.1、connection结构体及方法

type connection struct {cancel       context.CancelFuncinfo         *proto.ConnectionInfooutBuff      chan *msgSendinglogger       *logging.Logger                 // loggerpkiID        common.PKIidType                // pkiID of the remote endpointhandler      handler                         // function to invoke upon a message receptionconn         *grpc.ClientConn                // gRPC connection to remote endpointcl           proto.GossipClient              // gRPC stub of remote endpointclientStream proto.Gossip_GossipStreamClient // client-side stream to remote endpointserverStream proto.Gossip_GossipStreamServer // server-side stream to remote endpointstopFlag     int32                           // indicates whether this connection is in process of stoppingstopChan     chan struct{}                   // a method to stop the server-side gRPC call from a different go-routinesync.RWMutex                                 // synchronizes access to shared variables
}//构造connection
func newConnection(cl proto.GossipClient, c *grpc.ClientConn, cs proto.Gossip_GossipStreamClient, ss proto.Gossip_GossipStreamServer) *connection
//关闭connection
func (conn *connection) close()
//atomic.LoadInt32(&(conn.stopFlag)) == int32(1)
func (conn *connection) toDie() bool
//conn.outBuff <- m,其中m为msgSending{envelope: msg.Envelope,onErr: onErr,}
func (conn *connection) send(msg *proto.SignedGossipMessage, onErr func(error))
//go conn.readFromStream(errChan, msgChan)、go conn.writeToStream(),同时msg := <-msgChan,conn.handler(msg)
func (conn *connection) serviceConnection() error
//循环不间断从conn.outBuff取数据,然后stream.Send(m.envelope)
func (conn *connection) writeToStream()
//循环不间断envelope, err := stream.Recv()、msg, err := envelope.ToGossipMessage()、msgChan <- msg
func (conn *connection) readFromStream(errChan chan error, msgChan chan *proto.SignedGossipMessage)
//获取conn.serverStream
func (conn *connection) getStream() stream
//代码在gossip/comm/conn.go

6.2、connectionStore结构体及方法

type connectionStore struct {logger           *logging.Logger          // loggerisClosing        bool                     // whether this connection store is shutting downconnFactory      connFactory              // creates a connection to remote peersync.RWMutex                              // synchronize access to shared variablespki2Conn         map[string]*connection   //connection map, key为pkiID,value为connectiondestinationLocks map[string]*sync.RWMutex //mapping between pkiIDs and locks,// used to prevent concurrent connection establishment to the same remote endpoint
}//构造connectionStore
func newConnStore(connFactory connFactory, logger *logging.Logger) *connectionStore
//从connection map中获取连接,如无则创建并启动连接,并写入connection map中
func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error)
//连接数量
func (cs *connectionStore) connNum() int
//关闭指定连接
func (cs *connectionStore) closeConn(peer *RemotePeer)
//关闭所有连接
func (cs *connectionStore) shutdown()
func (cs *connectionStore) onConnected(serverStream proto.Gossip_GossipStreamServer, connInfo *proto.ConnectionInfo) *connection
//注册连接
func (cs *connectionStore) registerConn(connInfo *proto.ConnectionInfo, serverStream proto.Gossip_GossipStreamServer) *connection
//关闭指定连接
func (cs *connectionStore) closeByPKIid(pkiID common.PKIidType) 
//代码在gossip/comm/conn.go

6.2.1、func (cs connectionStore) getConnection(peer RemotePeer) (*connection, error)

func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error) {cs.RLock()isClosing := cs.isClosingcs.RUnlock()pkiID := peer.PKIIDendpoint := peer.Endpointcs.Lock()destinationLock, hasConnected := cs.destinationLocks[string(pkiID)]if !hasConnected {destinationLock = &sync.RWMutex{}cs.destinationLocks[string(pkiID)] = destinationLock}cs.Unlock()destinationLock.Lock()cs.RLock()//从connection map中获取conn, exists := cs.pki2Conn[string(pkiID)]if exists {cs.RUnlock()destinationLock.Unlock()return conn, nil}cs.RUnlock()//创建连接createdConnection, err := cs.connFactory.createConnection(endpoint, pkiID)destinationLock.Unlock()conn = createdConnectioncs.pki2Conn[string(createdConnection.pkiID)] = conngo conn.serviceConnection() //启动连接的消息接收处理、以及向对方节点发送消息return conn, nil
}
//代码在gossip/comm/conn.go

7、ChannelDeMultiplexer结构体及方法(多路复用器)

type ChannelDeMultiplexer struct {channels []*channellock     *sync.RWMutexclosed   int32
}//构造ChannelDeMultiplexer
func NewChannelDemultiplexer() *ChannelDeMultiplexer
//atomic.LoadInt32(&m.closed) == int32(1)
func (m *ChannelDeMultiplexer) isClosed() bool
//关闭
func (m *ChannelDeMultiplexer) Close() 
//添加通道
func (m *ChannelDeMultiplexer) AddChannel(predicate common.MessageAcceptor) chan interface{} 
//挨个通道发送消息
func (m *ChannelDeMultiplexer) DeMultiplex(msg interface{}) 

转载于:https://blog.51cto.com/14041296/2311323

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/253181.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一段H264数据的分析

&#xfeff;&#xfeff;目录(?)[-] 分析00 00 00 01 67 42 00 1E 99 A0 B1 31 00 00 00 01分析00 00 00 01 68 CE 38 80 00 00 00 01 分析00 00 00 01 67 42 00 1E 99 A0 B1 31 00 00 00 01 H264的数据流分为两种&#xff0c;一种是NAL UNIT stream(RTP),一种是 bits stream…

海华模组:WIFI、BT、SoC模组列表

各种模块广泛应用于网络摄像头、智能机器人、儿童故事机、词典笔、智能音箱、智能家电等需要实现无线联网设备的消费类电子产品。 模块化有很大的有点&#xff1a;集成设计、减少调试工作&#xff0c;避开开发盲区、加速将产品推向市场&#xff01; 下面介绍下海华各类通讯模…

JAVA-初步认识-第七章-构造函数和一般函数的区别

一. 构造函数是对象一创建&#xff0c;就被调用了。(调用这个词很特殊&#xff0c;是涉及到实体时&#xff0c;才会有调用的过程) 还有一点想说的是&#xff0c;构造函数的声明应该是固定的&#xff0c;不然没法随着对象的创建一起执行&#xff0c;必须是类名括号的形式。 二. …

深入理解哈希表

转自&#xff1a;https://bestswifter.com/hashtable/ 这篇文章由一个简单的问题引出: 有两个字典&#xff0c;分别存有 100 条数据和 10000 条数据&#xff0c;如果用一个不存在的 key 去查找数据&#xff0c;在哪个字典中速度更快&#xff1f; 有些计算机常识的读者都会立刻回…

Linux服务器ftp+httpd部署

一、ftp安装 1、安装vsftpd 命令&#xff1a;yum -y install vsftpd 2、修改ftp配置文件 命令&#xff1a;vim /etc/vsftpd/vsftpd.conf 3、按i进入insert模式后&#xff0c;按以下要求修改 anonymous_enableYES 改为anonymous_enableNO chroot_local_userYES #去掉前面的注释 …

高清网络摄像机主流芯片方案之安霸、TI和海思对比

高清网络视频监控发展到今天&#xff0c;市场也开始进入真正的高清时代&#xff0c;诸多有实力的高清摄像机厂家的产品线也逐渐完善起来&#xff0c;高清网络视频监控的配套产品有更加丰富和成熟。与此同时困扰很多人的高清网络摄像机与后端平台或者与后端NVR互联互通的问题也在…

ios审核4.3被拒,快速通过IOS4.3问题

最近有许多开发者遇到了因为审核条款 4.3&#xff08;后文统一简称 4.3&#xff09;审核条款 4.3&#xff08;后文统一简称 4.3&#xff09;&#xff0c;这种情况 常见于大家上传重复应用的时候&#xff0c;因为App Store 已经有了很多相似的应用 而被打回&#xff0c;今天我们…

正基模组:WIFI/BT/GPS/FM模组列表

各种模块广泛应用于网络摄像头、智能机器人、儿童故事机、词典笔、智能音箱、智能家电等需要实现无线联网设备的消费类电子产品。 模组由于其特性&#xff0c;给终端硬件开发带来巨大的便利性和实用性&#xff0c;具体小结如下&#xff1a; Feature特点:1. 模块均采用邮票孔形…

计算机网络基础教程---强烈推荐!来自锐捷官方网站

一、计算机网络基础教程 说明&#xff1a;每个教程的时间大约为6分钟&#xff0c;以问题为导向&#xff0c;以项目为驱动。1、第一章 IPV4地址介绍 http://www.ruijie.com.cn/fw/zxpx/4092、第二章 TCP/IP协议簇介绍 http://www.ruijie.com.cn/fw/zxpx/4103、第三章 ARP协议工作…

杨幂掐点祝福唐嫣,打破不和传言,情感营销还能这么玩?

发现今天的蜂蜜泡水特别地甜&#xff0c;舍友说&#xff0c;同样地蜂蜜同样多的水泡出来的水有什么不一样&#xff0c;肯定是你心情变好了。说得好像也有道理&#xff0c;想想最近这么多甜蜜的事&#xff0c;一开始是颖宝结婚&#xff0c;不久唐嫣和罗晋也宣布结婚&#xff0c;…

RTP/RTCP协议介绍

1流媒体协议 当前在Internet上传输音频和视频等信息主要有两种方式&#xff1a;下载和流式传输。 下载情况下&#xff0c;用户需要先下载整个媒体文件到本地&#xff0c;然后才能播放媒体文件。流式传输是指传输之前首先对多媒体进行预处理(降低质量和高效压缩)&#xff0c;然后…

推荐一款软件(作业)

在过去&#xff0c;每当我遇见不认识的英文单词时我的解决方法是:查阅英汉词典&#xff0c;后来在我拥有手机之后&#xff0c;我的解决方法是&#xff1a;上网百度&#xff0c;而现在我的解决方法是&#xff1a;“有道翻译官”。是的&#xff0c;我要介绍的这款软件便是“有道翻…

网易有道最新力作 有道词典笔3 结构拆解

2020年12月1日&#xff0c;有道品牌推出了一款硬件新品&#xff0c;名叫有道词典笔3。 网易有道于2019年8月推出可以“一扫查词”的有道词典笔2代&#xff0c;搭载了OCR&#xff08;光学字符识别&#xff09;技术的产品&#xff0c;大大改变了传统的学习方式&#xff0c;查词效…

DataGridView动态添加新行的两种方法

简单介绍如何为DataGridView控件动态添加新行的两种方 法&#xff1a; 方法一&#xff1a; int indexthis.dataGridView1.Rows.Add();this.dataGridView1.Rows[index].Cells[0].Value "1"; this.dataGridView1.Rows[index].Cells[1].Value "2"; this.dat…

使用glew和glad 新建窗口

一、添加头文件 首先&#xff0c;将头文件加到项目的.cpp文件中 1 #include <glad/glad.h> 2 #include <GLFW/glfw3.h> 注&#xff1a; 包含glad的头文件一定要在包含glfw的头文件之前使用。因为glad的头文件包含了正确的openGL头文件&#xff08;例如GL/gl.h&…

有道词典笔3新增功能扫读和点读是怎么集成的?

2020年12月1日&#xff0c;有道品牌推出了一款硬件新品&#xff0c;名叫有道词典笔3。 相对有道于2019年8月推出后来被称为“爆品”的有道词典笔2来说&#xff0c;有道3硬件最大最明显差别是屏幕变的更大了&#xff0c;同时增加了点读功能&#xff08;点读笔点读特定教材的功能…

RTP协议分析

RTP协议分析 一&#xff0e; RTP协议背景.......................................................................................................... 1 二&#xff0e; RTP协议原理及工作机制........................................................................…

mongodb 部署

安装mongodb-3.4 1&#xff09;将安装包上传至服务器 2&#xff09;对压缩文件进行解压 tar -zxvf mongodb-linux-x86_64-suse12-v3.4-latest.tar.gz 3&#xff09;把解压出来的文件修改一下名字&#xff0c;并挪到指定安装路径 sudo mv mongodb-linux-x86_64-suse12-3.4.6-22-…

如何选择一款优秀的儿童读写台灯?

如何选择一款优秀的儿童阅读台灯&#xff1f;除了品牌、外观、材质、价格等因素外&#xff0c;最关键的是技术参数。 先说结论&#xff0c;满足如下几点参数&#xff0c;当数优选&#xff1a; 1-光通量&#xff1a;500lm以上 2-显色指数&#xff1a;≥95 3-色温&#xff1a…

Python与操作系统有关的模块

Os模块Python的标准库中的os模块主要涉及普遍的操作系统功能。可以在Linux和Windows下运行&#xff0c;与平台无关。os.sep 可以取代操作系统特定的路径分割符。os.name字符串指示你正在使用的平台。比如对于Windows&#xff0c;它是’nt’&#xff0c;而对于Linux/Unix用户&am…