Fabric 允许应用程序在调用链码的时候监听链码中设置的事件,在监听到相应的事件后做相应的处理过程。
Fabric1.4 中的事件机制与老版本的事件机制源码部分有所不同,用的是Deliver的方式,所以老版本的代码与 Fabric1.4 版本的代码会有所不同。
本文将从源码解析整个事件机制的处理流程的。
在之前的文章——Hyperledger Fabric从源码分析链码查询与调用,我有提到在ChaincodeInvokeOrQuery()函数中有对是否等待链码事件的处理,先来回顾一下:
func ChaincodeInvokeOrQuery(/*参数省略*/) (*pb.ProposalResponse, error) { // ........... if invoke { // invoke走这里 // .......... var dg *deliverGroup var ctx context.Context if waitForEvent { // 这个参数是事件相关,命令中可以设置--waitForEvent参数,表示是否监听事件 var cancelFunc context.CancelFunc ctx, cancelFunc = context.WithTimeout(context.Background(), waitForEventTimeout) defer cancelFunc() // 创建DeliverGroup dg = newDeliverGroup(deliverClients, peerAddresses, certificate, channelID, txid) // 连接所有peer节点上的deliver service err := dg.Connect(ctx) if err != nil { return nil, err } } // 将交易发送给排序节点 if err = bc.Send(env); err != nil { return proposalResp, errors.WithMessage(err, fmt.Sprintf("error sending transaction for %s", funcName)) } if dg != nil && ctx != nil { // 等待事件被记录到账本中 err = dg.Wait(ctx) if err != nil { return nil, err } } } } return proposalResp, nil }如果在执行peer chaincode invoke命令的时候设置了--waitForEvent参数,表示会等待并监听链码上的事件。函数中调用newDeliverGroup()创建了一个dg对象,看下这个函数,在peer/chaincode/common.go的557行:
type deliverGroup struct { Clients []*deliverClient // 所有背书节点的deliverClient Certificate tls.Certificate // tls证书 ChannelID string // 通道ID TxID string // txID mutex sync.Mutex Error error wg sync.WaitGroup } func newDeliverGroup(deliverClients []api.PeerDeliverClient, peerAddresses []string, certificate tls.Certificate, channelID string, txid string) *deliverGroup { // deliverClients表示所有连接背书节点的deliverClients // peerAddress表示所有背书节点的address // certificate表示tls证书 // channelID为通道ID,txid为交易id clients := make([]*deliverClient, len(deliverClients)) for i, client := range deliverClients { dc := &deliverClient{ Client: client, Address: peerAddresses[i], } clients[i] = dc } dg := &deliverGroup{ Clients: clients, Certificate: certificate, ChannelID: channelID, TxID: txid, } return dg }deliverClients参数在InitCmdFactory()中创建,这个函数我们之前只是简单介绍了一下,它是创建了一个链码命令工厂,其中保存了很多clients,其中就包括了DeliverClients,现在就来仔细看一下它是如何创建这些clients的,代码在peer/chaincode/common.go的349行:
// ChaincodeCmdFactory holds the clients used by ChaincodeCmd type ChaincodeCmdFactory struct { EndorserClients []pb.EndorserClient // 背书客户端,用于背书 DeliverClients []api.PeerDeliverClient // Deliver客户端,用于向peer的DeliverServer发送消息,主要就是事件 Certificate tls.Certificate // tls证书相关信息 Signer msp.SigningIdentity // 用于消息的签名 BroadcastClient common.BroadcastClient // 广播客户端,用于向orderer节点发送消息 } // InitCmdFactory init the ChaincodeCmdFactory with default clients func InitCmdFactory(cmdName string, isEndorserRequired, isOrdererRequired bool) (*ChaincodeCmdFactory, error) { var err error var endorserClients []pb.EndorserClient var deliverClients []api.PeerDeliverClient if isEndorserRequired { // 是否需要背书,一般使用peer chaincode package命令打包链码的时候该参数为false // 主要是验证peer连接的一些参数 if err = validatePeerConnectionParameters(cmdName); err != nil { return nil, errors.WithMessage(err, "error validating peer connection parameters") } // 如果没有指定peerAddresses,它的值默认为[]string{""},包含一个空字符串,也就是下面的循环只会执行一次 for i, address := range peerAddresses { var tlsRootCertFile string if tlsRootCertFiles != nil { // 如果没有指定tlsRootCertFiles,它的值默认为[]string{""} tlsRootCertFile = tlsRootCertFiles[i] } // 获取指定peer的背书客户端 endorserClient, err := common.GetEndorserClientFnc(address, tlsRootCertFile) if err != nil { return nil, errors.WithMessage(err, fmt.Sprintf("error getting endorser client for %s", cmdName)) } endorserClients = append(endorserClients, endorserClient) // 获取指定peer的deliver客户端 deliverClient, err := common.GetPeerDeliverClientFnc(address, tlsRootCertFile) if err != nil { return nil, errors.WithMessage(err, fmt.Sprintf("error getting deliver client for %s", cmdName)) } deliverClients = append(deliverClients, deliverClient) } if len(endorserClients) == 0 { // 如果endorserClients为长度为0则报错 return nil, errors.New("no endorser clients retrieved - this might indicate a bug") } } // 获取证书 certificate, err := common.GetCertificateFnc() if err != nil { return nil, errors.WithMessage(err, "error getting client cerificate") } // 获取签名者 signer, err := common.GetDefaultSignerFnc() if err != nil { return nil, errors.WithMessage(err, "error getting default signer") } var broadcastClient common.BroadcastClient // 如果需要发送交易给排序节点的操作,isOrdererRequired为空 // query的时候就会为false,invoke的时候就会为空 if isOrdererRequired { if len(common.OrderingEndpoint) == 0 { if len(endorserClients) == 0 { return nil, errors.New("orderer is required, but no ordering endpoint or endorser client supplied") } endorserClient := endorserClients[0] orderingEndpoints, err := common.GetOrdererEndpointOfChainFnc(channelID, signer, endorserClient) if err != nil { return nil, errors.WithMessage(err, fmt.Sprintf("error getting channel (%s) orderer endpoint", channelID)) } if len(orderingEndpoints) == 0 { return nil, errors.Errorf("no orderer endpoints retrieved for channel %s", channelID) } logger.Infof("Retrieved channel (%s) orderer endpoint: %s", channelID, orderingEndpoints[0]) // override viper env viper.Set("orderer.address", orderingEndpoints[0]) } broadcastClient, err = common.GetBroadcastClientFnc() if err != nil { return nil, errors.WithMessage(err, "error getting broadcast client") } } // 赋值成员,返回ChaincodeCmdFactory return &ChaincodeCmdFactory{ EndorserClients: endorserClients, DeliverClients: deliverClients, Signer: signer, BroadcastClient: broadcastClient, Certificate: certificate, }, nil }大体流程就是,为ChaincodeCmdFactory的各个成员赋值的过程,我们关心DeliverClients是如何赋值的,追踪过去找到相应的方法,在peer/common/peerclient.go的200行:
// InitFactory()中这么获取deliverClient deliverClient, err := common.GetPeerDeliverClientFnc(address, tlsRootCertFile) //GetPeerDeliverClientFnc是一个函数变量,在init的时候被赋值为GetPeerDeliverClient GetPeerDeliverClientFnc = GetPeerDeliverClient // GetPeerDeliverClient returns a new deliver client. If both the address and // tlsRootCertFile are not provided, the target values for the client are taken // from the configuration settings for "peer.address" and // "peer.tls.rootcert.file" func GetPeerDeliverClient(address, tlsRootCertFile string) (api.PeerDeliverClient, error) { var peerClient *PeerClient var err error // address是传进来的peer地址,当我们没有指定peeraddress参数时,传进来的address就是"" if address != "" { peerClient, err = NewPeerClientForAddress(address, tlsRootCertFile) } else { // 走这里 peerClient, err = NewPeerClientFromEnv() } if err != nil { return nil, err } return peerClient.PeerDeliver() }看下NewPeerClientFromEnv()方法,在peer/common/peerclient.go31行:
// NewPeerClientFromEnv creates an instance of a PeerClient from the global // Viper instance func NewPeerClientFromEnv() (*PeerClient, error) { // 从环境变量中获取address,override,clientConfig,具体实现在下面 address, override, clientConfig, err := configFromEnv("peer") if err != nil { return nil, errors.WithMessage(err, "failed to load config for PeerClient") } // 主要就是创建一个peerClient return newPeerClientForClientConfig(address, override, clientConfig) } // ClientConfig defines the parameters for configuring a GRPCClient instance type ClientConfig struct { // 安全相关参数(tls) SecOpts *SecureOptions // keepalive参数 KaOpts *KeepaliveOptions // client阻塞的时长 Timeout time.Duration // 连接是否是非阻塞的 AsyncConnect bool } // 这里就设置了两个参数,一个是SecOpts,一个是Timeout,其他都是默认值 func configFromEnv(prefix string) (address, override string, clientConfig comm.ClientConfig, err error) { // 获取CORE_PEER_ADDRESS环境变量,这个肯定会有 address = viper.GetString(prefix + ".address") // 获取CORE_PEER_TLS_SERVERHOSTOVERRIDE环境变量,这个可能没有 override = viper.GetString(prefix + ".tls.serverhostoverride") clientConfig = comm.ClientConfig{} // 获取CORE_PEER_CLIENT_CONNTIMEOUT环境变量,这个一般没有,用默认的 connTimeout := viper.GetDuration(prefix + ".client.connTimeout") if connTimeout == time.Duration(0) { // 默认的3S connTimeout = defaultConnTimeout } clientConfig.Timeout = connTimeout // 下面都是一些TLS相关的环境变量参数读取,读取证书等等 secOpts := &comm.SecureOptions{ // 都是从环境变量中获取 UseTLS: viper.GetBool(prefix + ".tls.enabled"), RequireClientCert: viper.GetBool(prefix + ".tls.clientAuthRequired")} if secOpts.UseTLS { caPEM, res := ioutil.ReadFile(config.GetPath(prefix + ".tls.rootcert.file")) if res != nil { err = errors.WithMessage(res, fmt.Sprintf("unable to load %s.tls.rootcert.file", prefix)) return } secOpts.ServerRootCAs = [][]byte{caPEM} } if secOpts.RequireClientCert { keyPEM, res := ioutil.ReadFile(config.GetPath(prefix + ".tls.clientKey.file")) if res != nil { err = errors.WithMessage(res, fmt.Sprintf("unable to load %s.tls.clientKey.file", prefix)) return } secOpts.Key = keyPEM certPEM, res := ioutil.ReadFile(config.GetPath(prefix + ".tls.clientCert.file")) if res != nil { err = errors.WithMessage(res, fmt.Sprintf("unable to load %s.tls.clientCert.file", prefix)) return } secOpts.Certificate = certPEM } clientConfig.SecOpts = secOpts // 返回clientConfig return }看下newPeerClientForClientConfig()方法,在peer/common/peerclient.go的85行:
func newPeerClientForClientConfig(address, override string, clientConfig comm.ClientConfig) (*PeerClient, error) { // 根据给入的配置,创建了GRPCClient对象 gClient, err := comm.NewGRPCClient(clientConfig) if err != nil { return nil, errors.WithMessage(err, "failed to create PeerClient from config") } // 创建了PeerClient对象 pClient := &PeerClient{ commonClient: commonClient{ GRPCClient: gClient, address: address, sn: override}} return pClient, nil }因此到这里NewPeerClientFromEnv()方法创建了一个peerClient,它从环境变量中获取相应的配置参数,创建了一个与peer节点进行通信的peerClient客户端,接下来调用peerClient.PeerDeliver()创建与peer节点的deliverServer通信的PeerDeliverClient,看下这个实现,在peer/common/peerclient.go的116行:
// PeerDeliver returns a client for the Deliver service for peer-specific use // cases (i.e. DeliverFiltered) func (pc *PeerClient) PeerDeliver() (api.PeerDeliverClient, error) { // 创建了一个grpc.ClientConn的连接对象 conn, err := pc.commonClient.NewConnection(pc.address, pc.sn) if err != nil { return nil, errors.WithMessage(err, fmt.Sprintf("deliver client failed to connect to %s", pc.address)) } pbClient := pb.NewDeliverClient(conn) return &PeerDeliverClient{Client: pbClient}, nil }看下NewDeliverClient()方法,在protos/peer/events.pb.go的509行:
// DeliverClient is the client API for Deliver service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type DeliverClient interface { // deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with // Payload data as a marshaled orderer.SeekInfo message, // then a stream of block replies is received Deliver(ctx context.Context, opts ...grpc.CallOption) (Deliver_DeliverClient, error) // deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with // Payload data as a marshaled orderer.SeekInfo message, // then a stream of **filtered** block replies is received DeliverFiltered(ctx context.Context, opts ...grpc.CallOption) (Deliver_DeliverFilteredClient, error) } type deliverClient struct { cc *grpc.ClientConn } func NewDeliverClient(cc *grpc.ClientConn) DeliverClient { return &deliverClient{cc} }OK到这里我们就找了DeliverClient的相关的proto接口与对象了,DeliverClient可以调用Deliver与DeliverFiltered两个接口分别实现两个不同的流:
Deliver流:
// Deliver接口返回的流,实现了接口Send,Recv分别从流中获取消息 func (c *deliverClient) Deliver(ctx context.Context, opts ...grpc.CallOption) (Deliver_DeliverClient, error) { stream, err := c.cc.NewStream(ctx, &_Deliver_serviceDesc.Streams[0], "/protos.Deliver/Deliver", opts...) if err != nil { return nil, err } x := &deliverDeliverClient{stream} return x, nil } type Deliver_DeliverClient interface { Send(*common.Envelope) error Recv() (*DeliverResponse, error) grpc.ClientStream } type deliverDeliverClient struct { grpc.ClientStream } func (x *deliverDeliverClient) Send(m *common.Envelope) error { return x.ClientStream.SendMsg(m) } func (x *deliverDeliverClient) Recv() (*DeliverResponse, error) { m := new(DeliverResponse) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err } return m, nil }DeliverFiltered流:
// DeliverFiltered接口返回的流,实现了接口Send,Recv分别从流中获取消息 func (c *deliverClient) DeliverFiltered(ctx context.Context, opts ...grpc.CallOption) (Deliver_DeliverFilteredClient, error) { stream, err := c.cc.NewStream(ctx, &_Deliver_serviceDesc.Streams[1], "/protos.Deliver/DeliverFiltered", opts...) if err != nil { return nil, err } x := &deliverDeliverFilteredClient{stream} return x, nil } type Deliver_DeliverFilteredClient interface { Send(*common.Envelope) error Recv() (*DeliverResponse, error) grpc.ClientStream } type deliverDeliverFilteredClient struct { grpc.ClientStream } func (x *deliverDeliverFilteredClient) Send(m *common.Envelope) error { return x.ClientStream.SendMsg(m) } func (x *deliverDeliverFilteredClient) Recv() (*DeliverResponse, error) { m := new(DeliverResponse) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err } return m, nil }DeliverServer服务我们一会再回过头来看,现在先跟着刚才的思路走下去。
在PeerDeliver()方法中,我们通过NewDeliverClient()方法创建了一个DeliverClient,并赋值到PeerDeliverClient对象的成员变量中返回。
好了到这里InitCmdFactory()方法中就创建好了PeerDeliverClient,它最终提供了两个接口,最终在使用接口的时候再来看看:
// PeerDeliverClient defines the interface for a peer deliver client type PeerDeliverClient interface { // Deliver方法在源码中找调用发送没有地方在调用 Deliver(ctx context.Context, opts ...grpc.CallOption) (api.Deliver, error) DeliverFiltered(ctx context.Context, opts ...grpc.CallOption) (api.Deliver, error) }再回到ChaincodeInvokeOrQuery()中,我们通过newDeliverGroup()创建了一个DeliverGroup对象dg,其中包含了刚刚说的在InitCmdFactory()方法中就创建好了PeerDeliverClient。
接下来执行的下面这行代码:
// 创建DeliverGroup dg = newDeliverGroup(deliverClients, peerAddresses, certificate, channelID, txid) // 连接所有peer节点上的deliver service err := dg.Connect(ctx)来看下这个方法,在peer/chaincode/common.go的581行:
// Connect waits for all deliver clients in the group to connect to // the peer's deliver service, receive an error, or for the context // to timeout. An error will be returned whenever even a single // deliver client fails to connect to its peer func (dg *deliverGroup) Connect(ctx context.Context) error { // waitgroup Add deliverClients的长度,如果没有peeraddress参数默认就是一个peer dg.wg.Add(len(dg.Clients)) for _, client := range dg.Clients { // 建立起deliverClient与deliverServer的连接 go dg.ClientConnect(ctx, client) } readyCh := make(chan struct{}) // 每个ClientConnect执行完都会加waitgroup -1 // 这里就是执行wg.wait(),完了以后发送信号给readyCh通道 go dg.WaitForWG(readyCh) select { case <-readyCh: // 全部完成后判断有没有错,有一个出错就返回错误 if dg.Error != nil { err := errors.WithMessage(dg.Error, "failed to connect to deliver on all peers") return err } case <-ctx.Done(): // ctx设置了timeout,如果时间到了连接还没有全部建立完成就报错 err := errors.New("timed out waiting for connection to deliver on all peers") return err } return nil }接下来看下ClientConnect()方法,在peer/chaincode/common.go的606行:
// ClientConnect sends a deliver seek info envelope using the // provided deliver client, setting the deliverGroup's Error // field upon any error func (dg *deliverGroup) ClientConnect(ctx context.Context, dc *deliverClient) { // wg.Done defer dg.wg.Done() // 这里最终建立的是DeliverFiltered流,没有建立Deliver流 df, err := dc.Client.DeliverFiltered(ctx) if err != nil { err = errors.WithMessage(err, fmt.Sprintf("error connecting to deliver filtered at %s", dc.Address)) dg.setError(err) return } // CloseSend关闭流的发送方向,如果遇到错误时,将关闭流 // 看这意思是只会发送一条消息,也就是下面将要发送的这一条消息 defer df.CloseSend() dc.Connection = df // 创建消息envelope,一会看下这个函数 envelope := createDeliverEnvelope(dg.ChannelID, dg.Certificate) // 往流里发送数据 err = df.Send(envelope) if err != nil { err = errors.WithMessage(err, fmt.Sprintf("error sending deliver seek info envelope to %s", dc.Address)) dg.setError(err) return } }看下createDeliverEnvelope()方法,在peer/chaincode/common.go的701行:
// 该函数创建了DeliverClient发送的第一个消息,也是唯一一个消息 func createDeliverEnvelope(channelID string, certificate tls.Certificate) *pcommon.Envelope { var tlsCertHash []byte // 检查证书,计算证书Hash if len(certificate.Certificate) > 0 { tlsCertHash = util.ComputeSHA256(certificate.Certificate[0]) } // start最终表示的是区块寻址的开始高度 start := &ab.SeekPosition{ Type: &ab.SeekPosition_Newest{ Newest: &ab.SeekNewest{}, }, } // stop表示的是区块寻址的终止高度 stop := &ab.SeekPosition{ Type: &ab.SeekPosition_Specified{ Specified: &ab.SeekSpecified{ Number: math.MaxUint64, }, }, } // 寻址信息,包含Start,Stop,Behavior // 下面再来解析这几个参数的意思 seekInfo := &ab.SeekInfo{ Start: start, Stop: stop, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY, } // 根据现有的信息创建一个签名的Envelope env, err := putils.CreateSignedEnvelopeWithTLSBinding( pcommon.HeaderType_DELIVER_SEEK_INFO, channelID, localmsp.NewSigner(), seekInfo, int32(0), uint64(0), tlsCertHash) if err != nil { logger.Errorf("Error signing envelope: %s", err) return nil } return env }来看下几个关键的几个结构体
SeekPosition表示寻找的位置
type SeekPosition struct { Type isSeekPosition_Type XXX_NoUnkeyedLiteral struct{} XXX_unrecognized []byte XXX_sizecache int32 }其中Type字段可以有三种类型:
SeekPosition_Newest表示最新的区块SeekPosition_Oldest表示最原始的区块SeekPosition_Specified表示指定的区块,需要指定区块高度SeekInfo表示寻找具体信息
// SeekInfo specifies the range of requested blocks to return // If the start position is not found, an error is immediately returned // Otherwise, blocks are returned until a missing block is encountered, then behavior is dictated // by the SeekBehavior specified. type SeekInfo struct { // 起始高度 Start *SeekPosition // 停止高度 Stop *SeekPosition // 寻找行为 Behavior SeekInfo_SeekBehavior // 错误响应 ErrorResponse SeekInfo_SeekErrorResponse XXX_NoUnkeyedLiteral struct{} XXX_unrecognized []byte XXX_sizecache int32 }来看下SeekInfo_SeekBehavior类型:
// If BLOCK_UNTIL_READY is specified, the reply will block until the requested blocks are available, // if FAIL_IF_NOT_READY is specified, the reply will return an error indicating that the block is not // found. To request that all blocks be returned indefinitely as they are created, behavior should be // set to BLOCK_UNTIL_READY and the stop should be set to specified with a number of MAX_UINT64 type SeekInfo_SeekBehavior int32 const ( // 阻塞直到请求的区块可用 SeekInfo_BLOCK_UNTIL_READY SeekInfo_SeekBehavior = 0 // 返回错误表示未找到指定的区块 SeekInfo_FAIL_IF_NOT_READY SeekInfo_SeekBehavior = 1 ) // 如果要求所有区块无限期地返回(不断地返回最新的区块)。应该设置SeekInfo_BLOCK_UNTIL_READY ,并且stop的SeekPosition应该设置为SeekPosition_Specified,并且指定的高度应该为MAX_UINT64在createDeliverEnvelope()函数中,创建的SeekInfo对象的Start参数为SeekPosition_Newest,Stop参数为SeekPosition_Specified,并且指定为MAX_UINT64高度,Behavior指定为SeekInfo_BLOCK_UNTIL_READY。这样的设置方式就可以表示一直返回最新的区块回来。
好了看下CreateSignedEnvelopeWithTLSBinding()方法,创建一条Deliver客户端与服务之间通信的消息,在protos/utils/txutils.go的69行:
// CreateSignedEnvelopeWithTLSBinding creates a signed envelope of the desired // type, with marshaled dataMsg and signs it. It also includes a TLS cert hash // into the channel header func CreateSignedEnvelopeWithTLSBinding(txType common.HeaderType, channelID string, signer crypto.LocalSigner, dataMsg proto.Message, msgVersion int32, epoch uint64, tlsCertHash []byte) (*common.Envelope, error) { // txType为HeaderType_DELIVER_SEEK_INFO // channelID为通道ID // signer为从msp中获取的签名者,用来签名 // dataMsg就是创建好的seek_info类型 // msgVersion为0,epoch为0, // tlsCertHash为证书hash // 创建ChannelHeader payloadChannelHeader := MakeChannelHeader(txType, msgVersion, channelID, epoch) payloadChannelHeader.TlsCertHash = tlsCertHash var err error // 创建SignatureHeader payloadSignatureHeader := &common.SignatureHeader{} if signer != nil { // 如果signer不为空,则是用signer创建一个SignatureHeader // 否则SignatureHeader为空 payloadSignatureHeader, err = signer.NewSignatureHeader() if err != nil { return nil, err } } // 序列化seek_info信息 data, err := proto.Marshal(dataMsg) if err != nil { return nil, errors.Wrap(err, "error marshaling") } // 序列化Payload paylBytes := MarshalOrPanic( &common.Payload{ Header: MakePayloadHeader(payloadChannelHeader, payloadSignatureHeader), Data: data, }, ) // 对payload签名 var sig []byte if signer != nil { sig, err = signer.Sign(paylBytes) if err != nil { return nil, err } } // 创建env对象并返回 env := &common.Envelope{ Payload: paylBytes, Signature: sig, } return env, nil }OK到这里ClientConnect()方法就解析完了,梳理一下:
与DeliverServer建立了DeliverFiltered流创建了第一个消息并发送发送完以后调用CloseSend()关闭发送流,不会再次发送信息其实这里完了以后,客户端就完成了建立流连接并发送消息的过程,那么接下来就是接收服务端消息的过程了。
回到ChaincodeInvokeOrQuery()函数中,有下面一段代码:
if dg != nil && ctx != nil { // wait for event that contains the txid from all peers err = dg.Wait(ctx) if err != nil { return nil, err } }等待并处理事件的就这么一行代码,dg.Wait(),看下这个方法,在peer/chaincode/common.go的629行:
// Wait waits for all deliver client connections in the group to // either receive a block with the txid, an error, or for the // context to timeout func (dg *deliverGroup) Wait(ctx context.Context) error { if len(dg.Clients) == 0 { return nil } //waitgroup++ dg.wg.Add(len(dg.Clients)) for _, client := range dg.Clients { // 等待事件 go dg.ClientWait(client) } readyCh := make(chan struct{}) //等待waitgroup go dg.WaitForWG(readyCh) select { case <-readyCh: if dg.Error != nil { //有一个客户端执行ClientWait失败就返回错误 err := errors.WithMessage(dg.Error, "failed to receive txid on all peers") return err } case <-ctx.Done(): // 超时还未搞定,就是等待超时,返回错误 err := errors.New("timed out waiting for txid on all peers") return err } return nil }看下ClientWait()方法,在peer/chaincode/common.go的657行:
// ClientWait waits for the specified deliver client to receive // a block event with the requested txid func (dg *deliverGroup) ClientWait(dc *deliverClient) { defer dg.wg.Done() for { // 死循环从流中获取消息 resp, err := dc.Connection.Recv() if err != nil { // Recv出错直接返回 err = errors.WithMessage(err, fmt.Sprintf("error receiving from deliver filtered at %s", dc.Address)) dg.setError(err) return } switch r := resp.Type.(type) { case *pb.DeliverResponse_FilteredBlock: // 如果是DeliverResponse_FilteredBlock这个类型 // filteredTransactions其实是监听到的区块中包含过滤事件的所有交易 filteredTransactions := r.FilteredBlock.FilteredTransactions for _, tx := range filteredTransactions { if tx.Txid == dg.TxID { // 仅仅输出一个INFO日志而已,这个如果我们加了--waitForEvent参数可以看到这个日志(如果设置了事件的话) // 这里仅仅是输出了当前这笔交易 logger.Infof("txid [%s] committed with status (%s) at %s", dg.TxID, tx.TxValidationCode, dc.Address) return } } case *pb.DeliverResponse_Status: // 如果是DeliverResponse_Status类型就报错 err = errors.Errorf("deliver completed with status (%s) before txid received", r.Status) dg.setError(err) return default: // 其他类型不支持,报错 err = errors.Errorf("received unexpected response type (%T) from %s", r, dc.Address) dg.setError(err) return } } }虽然ClientWait()用了一个死循环去接收事件,但是每个分支的处理都是return的,因此我们可以看到即使DeliverClient中设置的SeekInfo参数表示的是无限期返回区块,但是我们在命令行加上--waitForEvent参数的时候,会输出一个 INFO 日志就退出了。
这里插一嘴,Fabric 的各个语言的 SDK 中都有对事件部分的控制逻辑,当然不仅仅是像 cmd 这样简单处理,可以拿到的完整的事件对象,里面包含了事件的具体内容,有兴趣地去看看各个版本的 SDK 是如何使用 Fabric 的事件机制的就可以了
到这里,链码调用这一侧,也就是DeliverClient这一侧的逻辑基本就分析完了,下一篇文章——Hyperledger Fabric从源码分析事件机制(二),我们再看看DeliverServer这一侧的逻辑。