主页 > imtoken官方网站 > 以太坊源码解读(十)广播与同步(一)protocolManager及其手

以太坊源码解读(十)广播与同步(一)protocolManager及其手

imtoken官方网站 2023-12-01 05:07:37

前面提到的ProtocolManager从字面上看就是一个协议管理器,负责p2p通信协议的管理。 它连接p2p逻辑层peer和顶层peer之间的调用,将协议从顶层传递给逻辑层,然后从逻辑层获取消息传递给顶层。

以太坊经典和以太坊_以太坊挖坑流程图_买以太坊流程

1、fastSync指定同步模式,fastSync模式通过atmoic.LoadUint32(&pm.fastSync) == 1或0开启或关闭;

2.acceptTxs是节点是否接受交易的阀门。 只有当 pm.acceptTxs == 1 时,节点才会接受交易。 该操作只有在同步结束后才会开始,即同步期间节点不会接受交易;

3. SubProtocols是以太坊的通信协议,通常只有一个值,即eth63。 以太坊中指定了一个常量ProtocolVersion = [eth63, eth62],它指定了以太坊的两个版本,这两个版本的ProtocolLength分别为[17, 8]。

4.下载器是从远程网络节点获取哈希和块的下载器。

5. fetcher 收集网络中其他以太坊节点发送的同步通知,并进行验证,并进行相应的处理。

6. Peers 是经过验证和信任的通信节点的集合。

1、protocolManager的初始化和启动:NewProtocolManager和Start()

func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, 
mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, 
chaindb ethdb.Database) (*ProtocolManager, error) {
	// Create the protocol manager with the base fields
	manager := &ProtocolManager{
		networkID:   networkID,
		eventMux:    mux,
		txpool:      txpool,
		blockchain:  blockchain,
		chainconfig: config,
		peers:       newPeerSet(),
		newPeerCh:   make(chan *peer),
		noMorePeers: make(chan struct{}),
		txsyncCh:    make(chan *txsync),
		quitSync:    make(chan struct{}),
	}
	// Figure out whether to allow fast sync or not
	if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {
		log.Warn("Blockchain not empty, fast sync disabled")
		mode = downloader.FullSync
	}
	if mode == downloader.FastSync {
		manager.fastSync = uint32(1)
	}
	// Initiate a sub-protocol for every implemented version we can handle

买以太坊流程_以太坊挖坑流程图_以太坊经典和以太坊

manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions)) for i, version := range ProtocolVersions { // Skip protocol version if incompatible with the mode of operation if mode == downloader.FastSync && version < eth63 { continue } // Compatible; initialise the sub-protocol version := version // Closure for the run manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{ Name: ProtocolName, Version: version, Length: ProtocolLengths[i], Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { peer := manager.newPeer(int(version), p, rw) select { case manager.newPeerCh <- peer: manager.wg.Add(1) defer manager.wg.Done() return manager.handle(peer) case <-manager.quitSync: return p2p.DiscQuitting } }, NodeInfo: func() interface{} { return manager.NodeInfo() }, PeerInfo: func(id discover.NodeID) interface{} { if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil { return p.Info() } return nil }, })

以太坊经典和以太坊_买以太坊流程_以太坊挖坑流程图

} if len(manager.SubProtocols) == 0 { return nil, errIncompatibleConfig } // Construct the different synchronisation mechanisms manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer) validator := func(header *types.Header) error { return engine.VerifyHeader(blockchain, header, true) } heighter := func() uint64 { return blockchain.CurrentBlock().NumberU64() } inserter := func(blocks types.Blocks) (int, error) { // If fast sync is running, deny importing weird blocks if atomic.LoadUint32(&manager.fastSync) == 1 { log.Warn("Discarded bad propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash()) return 0, nil } atomic.StoreUint32(&manager.acceptTxs, 1) // Mark initial sync done on any fetcher import return manager.blockchain.InsertChain(blocks) } manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer) return manager, nil }

NewProtocolManager()的逻辑如下:

1、初始化protocolManager对象,包括事件多路复用、区块链、交易池和节点等;

2. 启动 fastSync 或 fullSync。 以太坊启动时默认的同步方式是fastSync模式。 该方法直接下载区块头、区块体和状态,不执行交易,直到最后64个区块都采用fullSync模式。 这种方法 模式不下载状态,而是验证并执行每笔交易,消耗CPU;

3、配置以太坊协议为SubProtocols,设置以太坊的Run()方法,等待p2p.peer模块的调用;

4.新建下载器;

5、定义validator、heighter、inserter三个函数,辅助校验和插入新区块,并以此为基础创建新的fetcher。

node.Start() 将启动以太坊协议,ethereum.Start() 将执行 protocolManager.Start():

以太坊经典和以太坊_以太坊挖坑流程图_买以太坊流程

func (pm *ProtocolManager) Start(maxPeers int) {
	pm.maxPeers = maxPeers
	// broadcast transactions
	pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
	pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
	go pm.txBroadcastLoop()
	// broadcast mined blocks
	pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
	go pm.minedBroadcastLoop()
	// start sync handlers
	go pm.syncer()
	go pm.txsyncLoop()
}

ProtocolManager.Start()启动了四个go例程,分别是交易订阅广播例程(txBroadcastLoop)、挖矿订阅例程(minedBroadcastLoop)、节点定时同步例程(syncer)和交易同步例程(txsyncLoop)。

以太坊挖坑流程图_以太坊经典和以太坊_买以太坊流程

1. 为新交易创建一个订阅频道,并启动一个goroutine 进行交易广播。 广播一个新出现的交易对象。 txBroadcastLoop()会在txCh通道的接收端继续等待。 一旦它收到关于新交易的事件,它会立即调用 BroadcastTx() 函数向那些还没有交易对象的相邻个体广播。

2.创建挖矿订阅频道,并启动挖矿广播goroutine。 广播新开采的区块。 minedBroadcastLoop()继续等待个体新挖出的区块事件,然后立即广播给相邻的需要它的个体。 当没有新的挖矿区块事件被订阅时,该函数将等待结束返回。 有趣的是,minedBroadcastLoop() 在收到一个新挖出的区块事件后,会连续两次调用 BroadcastBlock()。 这两个调用仅在一个 bool 参数 @propagate 上有所不同。 当该参数为真时,将整个新区块依次发送给一小部分相邻区块; 为false时,只将新区块的Hash值和Number发送给所有相邻链表。

3. pm.syncer() 启动同步goroutine,定期与网络中的其他节点同步,并处理来自网络节点的相关通知。 定期执行整个区块链与相邻个体的强制同步。 syncer()先启动fetcher成员,然后进入死循环。 在每个循环中,它将对相邻对等点列表中的“最佳”对等点执行区块范围的链同步。 发起上述同步的原因有两个:如果有新注册(加入)的相邻个体,当整个peer列表的个数大于5时才会发起; 如果没有新的 peer 到达,它会每隔 10s 发起一次。 这里所谓的“最优”是指节点维护的区块链的最高TotalDifficulty(td)。 由于Td是整条链中从创世块到最新头块的Difficulty值之和,Td值最高的意味着它的区块链是最新的,整条区块链与这样一个peer同步,显然以太坊挖坑流程图,变化量是最小的,也就是“最优”。

4. pm.txsyncLoop() 启动交易同步goroutine,将新交易均匀同步到网络节点。 ProtocolManager 以 ethereum.Start() 开始,txsyncLoop() at s:=

peer.broadcase() 是每个Peer连接的广播函数。 它只广播3种消息:transaction、complete block、block Hash,表示节点只会主动广播这3种数据,其余数据同步通过request-response完成。

2.回调函数handle()交给其他peer

func (pm *ProtocolManager) handle(p *peer) error {
	// Ignore maxPeers if this is a trusted peer
	if pm.peers.Len() >= pm.maxPeers && !p.Peer.Info().Network.Trusted {
		return p2p.DiscTooManyPeers
	}
	p.Log().Debug("Ethereum peer connected", "name", p.Name())
	// Execute the Ethereum handshake
	var (

以太坊经典和以太坊_以太坊挖坑流程图_买以太坊流程

genesis = pm.blockchain.Genesis() head = pm.blockchain.CurrentHeader() hash = head.Hash() number = head.Number.Uint64() td = pm.blockchain.GetTd(hash, number) ) if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil { p.Log().Debug("Ethereum handshake failed", "err", err) return err } if rw, ok := p.rw.(*meteredMsgReadWriter); ok { rw.Init(p.version) } // Register the peer locally if err := pm.peers.Register(p); err != nil { p.Log().Error("Ethereum peer registration failed", "err", err) return err } defer pm.removePeer(p.id) // Register the peer in the downloader. If the downloader considers it banned, we disconnect if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil { return err } // Propagate existing transactions. new transactions appearing // after this will be sent via broadcasts. pm.syncTransactions(p) // main loop. handle incoming messages. for { if err := pm.handleMsg(p); err != nil { p.Log().Debug("Ethereum message handling failed", "err", err) return err

以太坊经典和以太坊_以太坊挖坑流程图_买以太坊流程

} } }

handle() 函数为新对等点执行以下操作:

p.Handshakes() 握手,与peer节点沟通自己的区块链状态; p.rw 初始化一个读写通道,用于与 peer peer 进行数据传输。 p.peers.Register() 注册peer peer并存入peer列表; 只有当 handle() 函数退出时,peer 才会从列表中删除。 Downloader成员注册这个新的peer; Downloader 会自己维护一个相邻节点的列表。 调用syncTransactions(),将当前txpool中新积累的tx对象组装成一个txsync{}对象,推送到内部通道txsyncCh。 在无限循环中启动 handleMsg()。 当peer peer 发送任何msg 时, handleMsg() 可以捕获相应类型的消息并在自己这边进行处理。

以太坊的通信基于以太坊协议,它是Service的实现体,其构造函数收集在Node中的map[reflect.Type]Service中。 节点将协议交给服务器对象。 Node.Start()时,Server执行Start()以太坊挖坑流程图,也就是执行以太坊的Start()。 然后会新建一个protocolManager,将以太坊协议交给后者,实现协议接口的实现:

manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
			Name:    ProtocolName,
			Version: version,
			Length:  ProtocolLengths[i],
			Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
				peer := manager.newPeer(int(version), p, rw)
				select {
				case manager.newPeerCh <- peer:
					manager.wg.Add(1)
					defer manager.wg.Done()
					return manager.handle(peer)
				case <-manager.quitSync:
					return p2p.DiscQuitting
				}
			},
			NodeInfo: func() interface{} {
				return manager.NodeInfo()
			},
			PeerInfo: func(id discover.NodeID) interface{} {
				if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
					return p.Info()
				}
				return nil
			},
		})

在Server.Start()中开启一个单独的线程(listenLoop())来监听某个端口是否有活跃的IP连接; 另一个单独的线程启动 run() 函数来处理在无限循环新对象中接收到的任何新消息。 在run()函数中,如果远程peer发送连接请求(new p2p.conn{}),调用Server.newPeer()生成一个新的peer对象,将所有的Server.Protocols交给peer。

handle()在ProtocolManager对象创建时包含在p2p.Protocol对象的Run方法中,一起交给新的peer对象,负责与peer peer通信,向peer peer发起请求并等待接收对端的处理信息,所以是Callback。

以太坊经典和以太坊_以太坊挖坑流程图_买以太坊流程