In file "tendermint/p2p/peer.go", we have following code
// peer implements Peer. // // Before using a peer, you will need to perform a handshake on connection. type peer struct { service.BaseService // raw peerConn and the multiplex connection peerConn mconn *tmconn.MConnection // peer's node info and the channel it knows about // channels = nodeInfo.Channels // cached to avoid copying nodeInfo in hasChannel nodeInfo NodeInfo channels []byte // User data Data *cmap.CMap metrics *Metrics metricsTicker *time.Ticker } type PeerOption func(*peer) func newPeer( pc peerConn, mConfig tmconn.MConnConfig, nodeInfo NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*tmconn.ChannelDescriptor, onPeerError func(Peer, interface{}), options ...PeerOption, ) *peer { p := &peer{ peerConn: pc, nodeInfo: nodeInfo, channels: nodeInfo.(DefaultNodeInfo).Channels, Data: cmap.NewCMap(), metricsTicker: time.NewTicker(metricsTickerDuration), metrics: NopMetrics(), } p.mconn = createMConnection( pc.conn, p, reactorsByCh, chDescs, onPeerError, mConfig, ) p.BaseService = *service.NewBaseService(nil, "Peer", p) for _, option := range options { option(p) } return p } ........ func createMConnection( conn net.Conn, p *peer, reactorsByCh map[byte]Reactor, chDescs []*tmconn.ChannelDescriptor, onPeerError func(Peer, interface{}), config tmconn.MConnConfig, ) *tmconn.MConnection { onReceive := func(chID byte, msgBytes []byte) { reactor := reactorsByCh[chID] if reactor == nil { // Note that its ok to panic here as it's caught in the conn._recover, // which does onPeerError. panic(fmt.Sprintf("Unknown channel %X", chID)) } labels := []string{ "peer_id", string(p.ID()), "chID", fmt.Sprintf("%#x", chID), } p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes))) reactor.Receive(chID, p, msgBytes) } onError := func(r interface{}) { onPeerError(p, r) } return tmconn.NewMConnectionWithConfig( conn, chDescs, onReceive, onError, config, ) }
In file "tendermint/p2p/conn/connection.go", we have code:
type MConnection struct { service.BaseService conn net.Conn bufConnReader *bufio.Reader bufConnWriter *bufio.Writer sendMonitor *flow.Monitor recvMonitor *flow.Monitor send chan struct{} pong chan struct{} channels []*Channel channelsIdx map[byte]*Channel onReceive receiveCbFunc onError errorCbFunc errored uint32 config MConnConfig ........ } // recvRoutine reads PacketMsgs and reconstructs the message using the channels' "recving" buffer. // After a whole message has been assembled, it's pushed to onReceive(). // Blocks depending on how the connection is throttled. // Otherwise, it never blocks. func (c *MConnection) recvRoutine() { defer c._recover() protoReader := protoio.NewDelimitedReader(c.bufConnReader, c._maxPacketMsgSize) FOR_LOOP: for { ........ // Read packet type var packet tmp2p.Packet _n, err := protoReader.ReadMsg(&packet) ........ // Read more depending on packet type. switch pkt := packet.Sum.(type) { ........ case *tmp2p.Packet_PacketMsg: channelID := byte(pkt.PacketMsg.ChannelID) channel, ok := c.channelsIdx[channelID] if pkt.PacketMsg.ChannelID < 0 || pkt.PacketMsg.ChannelID > math.MaxUint8 || !ok || channel == nil { err := fmt.Errorf("unknown channel %X", pkt.PacketMsg.ChannelID) c.Logger.Debug("Connection failed @ recvRoutine", "conn", c, "err", err) c.stopForError(err) break FOR_LOOP } msgBytes, err := channel.recvPacketMsg(*pkt.PacketMsg) if err != nil { if c.IsRunning() { c.Logger.Debug("Connection failed @ recvRoutine", "conn", c, "err", err) c.stopForError(err) } break FOR_LOOP } if msgBytes != nil { c.Logger.Debug("Received bytes", "chID", channelID, "msgBytes", msgBytes) // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine c.onReceive(channelID, msgBytes) } default: ........ }
Conclusion:
The MConnection relentlessly receives packets from network. When it receives a packet of type Packet_PacketMsg, it go ahead to receive the whole message with this function:
msgBytes, err := channel.recvPacketMsg(*pkt.PacketMsg)
Then it calls the
c.onReceive(channelID, msgBytes)
The onReceive is a callback defined in struc MConnection. It gets initialzied in "tendermint/p2p/peer.go". The callback first finds out the reactor which the channelID corresponds to. And then call the reactor's "Receive" function with the channelID, peer, and msgBytes as parameters:
reactor.Receive(chID, p, msgBytes)
This is how the reactor's Receive function gets message from peers.
Top comments (0)