主页 > imtoken官方网站 > 以太坊源码解读(十)广播与同步(一)protocolManager及其手
以太坊源码解读(十)广播与同步(一)protocolManager及其手
前面提到的ProtocolManager从字面上看就是一个协议管理器,负责p2p通信协议的管理。 它连接p2p逻辑层peer和顶层peer之间的调用,将协议从顶层传递给逻辑层,然后从逻辑层获取消息传递给顶层。
1、fastSync指定同步模式,fastSync模式通过atmoic.LoadUint32(&pm.fastSync) == 1或0开启或关闭;
2.acceptTxs是节点是否接受交易的阀门。 只有当 pm.acceptTxs == 1 时,节点才会接受交易。 该操作只有在同步结束后才会开始,即同步期间节点不会接受交易;
3. SubProtocols是以太坊的通信协议,通常只有一个值,即eth63。 以太坊中指定了一个常量ProtocolVersion = [eth63, eth62],它指定了以太坊的两个版本,这两个版本的ProtocolLength分别为[17, 8]。
4.下载器是从远程网络节点获取哈希和块的下载器。
5. fetcher 收集网络中其他以太坊节点发送的同步通知,并进行验证,并进行相应的处理。
6. Peers 是经过验证和信任的通信节点的集合。
1、protocolManager的初始化和启动:NewProtocolManager和Start()
func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkID uint64,
mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain,
chaindb ethdb.Database) (*ProtocolManager, error) {
// Create the protocol manager with the base fields
manager := &ProtocolManager{
networkID: networkID,
eventMux: mux,
txpool: txpool,
blockchain: blockchain,
chainconfig: config,
peers: newPeerSet(),
newPeerCh: make(chan *peer),
noMorePeers: make(chan struct{}),
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
}
// Figure out whether to allow fast sync or not
if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {
log.Warn("Blockchain not empty, fast sync disabled")
mode = downloader.FullSync
}
if mode == downloader.FastSync {
manager.fastSync = uint32(1)
}
// Initiate a sub-protocol for every implemented version we can handle
manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
for i, version := range ProtocolVersions {
// Skip protocol version if incompatible with the mode of operation
if mode == downloader.FastSync && version < eth63 {
continue
}
// Compatible; initialise the sub-protocol
version := version // Closure for the run
manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
Name: ProtocolName,
Version: version,
Length: ProtocolLengths[i],
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := manager.newPeer(int(version), p, rw)
select {
case manager.newPeerCh <- peer:
manager.wg.Add(1)
defer manager.wg.Done()
return manager.handle(peer)
case <-manager.quitSync:
return p2p.DiscQuitting
}
},
NodeInfo: func() interface{} {
return manager.NodeInfo()
},
PeerInfo: func(id discover.NodeID) interface{} {
if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
return p.Info()
}
return nil
},
})
}
if len(manager.SubProtocols) == 0 {
return nil, errIncompatibleConfig
}
// Construct the different synchronisation mechanisms
manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)
validator := func(header *types.Header) error {
return engine.VerifyHeader(blockchain, header, true)
}
heighter := func() uint64 {
return blockchain.CurrentBlock().NumberU64()
}
inserter := func(blocks types.Blocks) (int, error) {
// If fast sync is running, deny importing weird blocks
if atomic.LoadUint32(&manager.fastSync) == 1 {
log.Warn("Discarded bad propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())
return 0, nil
}
atomic.StoreUint32(&manager.acceptTxs, 1) // Mark initial sync done on any fetcher import
return manager.blockchain.InsertChain(blocks)
}
manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)
return manager, nil
}
NewProtocolManager()的逻辑如下:
1、初始化protocolManager对象,包括事件多路复用、区块链、交易池和节点等;
2. 启动 fastSync 或 fullSync。 以太坊启动时默认的同步方式是fastSync模式。 该方法直接下载区块头、区块体和状态,不执行交易,直到最后64个区块都采用fullSync模式。 这种方法 模式不下载状态,而是验证并执行每笔交易,消耗CPU;
3、配置以太坊协议为SubProtocols,设置以太坊的Run()方法,等待p2p.peer模块的调用;
4.新建下载器;
5、定义validator、heighter、inserter三个函数,辅助校验和插入新区块,并以此为基础创建新的fetcher。
node.Start() 将启动以太坊协议,ethereum.Start() 将执行 protocolManager.Start():
func (pm *ProtocolManager) Start(maxPeers int) {
pm.maxPeers = maxPeers
// broadcast transactions
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
go pm.txBroadcastLoop()
// broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop()
// start sync handlers
go pm.syncer()
go pm.txsyncLoop()
}
ProtocolManager.Start()启动了四个go例程,分别是交易订阅广播例程(txBroadcastLoop)、挖矿订阅例程(minedBroadcastLoop)、节点定时同步例程(syncer)和交易同步例程(txsyncLoop)。
1. 为新交易创建一个订阅频道,并启动一个goroutine 进行交易广播。 广播一个新出现的交易对象。 txBroadcastLoop()会在txCh通道的接收端继续等待。 一旦它收到关于新交易的事件,它会立即调用 BroadcastTx() 函数向那些还没有交易对象的相邻个体广播。
2.创建挖矿订阅频道,并启动挖矿广播goroutine。 广播新开采的区块。 minedBroadcastLoop()继续等待个体新挖出的区块事件,然后立即广播给相邻的需要它的个体。 当没有新的挖矿区块事件被订阅时,该函数将等待结束返回。 有趣的是,minedBroadcastLoop() 在收到一个新挖出的区块事件后,会连续两次调用 BroadcastBlock()。 这两个调用仅在一个 bool 参数 @propagate 上有所不同。 当该参数为真时,将整个新区块依次发送给一小部分相邻区块; 为false时,只将新区块的Hash值和Number发送给所有相邻链表。
3. pm.syncer() 启动同步goroutine,定期与网络中的其他节点同步,并处理来自网络节点的相关通知。 定期执行整个区块链与相邻个体的强制同步。 syncer()先启动fetcher成员,然后进入死循环。 在每个循环中,它将对相邻对等点列表中的“最佳”对等点执行区块范围的链同步。 发起上述同步的原因有两个:如果有新注册(加入)的相邻个体,当整个peer列表的个数大于5时才会发起; 如果没有新的 peer 到达,它会每隔 10s 发起一次。 这里所谓的“最优”是指节点维护的区块链的最高TotalDifficulty(td)。 由于Td是整条链中从创世块到最新头块的Difficulty值之和,Td值最高的意味着它的区块链是最新的,整条区块链与这样一个peer同步,显然以太坊挖坑流程图,变化量是最小的,也就是“最优”。
4. pm.txsyncLoop() 启动交易同步goroutine,将新交易均匀同步到网络节点。 ProtocolManager 以 ethereum.Start() 开始,txsyncLoop() at s:=
peer.broadcase() 是每个Peer连接的广播函数。 它只广播3种消息:transaction、complete block、block Hash,表示节点只会主动广播这3种数据,其余数据同步通过request-response完成。
2.回调函数handle()交给其他peer
func (pm *ProtocolManager) handle(p *peer) error {
// Ignore maxPeers if this is a trusted peer
if pm.peers.Len() >= pm.maxPeers && !p.Peer.Info().Network.Trusted {
return p2p.DiscTooManyPeers
}
p.Log().Debug("Ethereum peer connected", "name", p.Name())
// Execute the Ethereum handshake
var (
genesis = pm.blockchain.Genesis()
head = pm.blockchain.CurrentHeader()
hash = head.Hash()
number = head.Number.Uint64()
td = pm.blockchain.GetTd(hash, number)
)
if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil {
p.Log().Debug("Ethereum handshake failed", "err", err)
return err
}
if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
rw.Init(p.version)
}
// Register the peer locally
if err := pm.peers.Register(p); err != nil {
p.Log().Error("Ethereum peer registration failed", "err", err)
return err
}
defer pm.removePeer(p.id)
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
return err
}
// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
pm.syncTransactions(p)
// main loop. handle incoming messages.
for {
if err := pm.handleMsg(p); err != nil {
p.Log().Debug("Ethereum message handling failed", "err", err)
return err
}
}
}
handle() 函数为新对等点执行以下操作:
p.Handshakes() 握手,与peer节点沟通自己的区块链状态; p.rw 初始化一个读写通道,用于与 peer peer 进行数据传输。 p.peers.Register() 注册peer peer并存入peer列表; 只有当 handle() 函数退出时,peer 才会从列表中删除。 Downloader成员注册这个新的peer; Downloader 会自己维护一个相邻节点的列表。 调用syncTransactions(),将当前txpool中新积累的tx对象组装成一个txsync{}对象,推送到内部通道txsyncCh。 在无限循环中启动 handleMsg()。 当peer peer 发送任何msg 时, handleMsg() 可以捕获相应类型的消息并在自己这边进行处理。
以太坊的通信基于以太坊协议,它是Service的实现体,其构造函数收集在Node中的map[reflect.Type]Service中。 节点将协议交给服务器对象。 Node.Start()时,Server执行Start()以太坊挖坑流程图,也就是执行以太坊的Start()。 然后会新建一个protocolManager,将以太坊协议交给后者,实现协议接口的实现:
manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
Name: ProtocolName,
Version: version,
Length: ProtocolLengths[i],
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := manager.newPeer(int(version), p, rw)
select {
case manager.newPeerCh <- peer:
manager.wg.Add(1)
defer manager.wg.Done()
return manager.handle(peer)
case <-manager.quitSync:
return p2p.DiscQuitting
}
},
NodeInfo: func() interface{} {
return manager.NodeInfo()
},
PeerInfo: func(id discover.NodeID) interface{} {
if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
return p.Info()
}
return nil
},
})
在Server.Start()中开启一个单独的线程(listenLoop())来监听某个端口是否有活跃的IP连接; 另一个单独的线程启动 run() 函数来处理在无限循环新对象中接收到的任何新消息。 在run()函数中,如果远程peer发送连接请求(new p2p.conn{}),调用Server.newPeer()生成一个新的peer对象,将所有的Server.Protocols交给peer。
handle()在ProtocolManager对象创建时包含在p2p.Protocol对象的Run方法中,一起交给新的peer对象,负责与peer peer通信,向peer peer发起请求并等待接收对端的处理信息,所以是Callback。