上篇文章——Hyperledger Fabric从源码分析事件机制(一),我从 cmd 中添加--waitForEvent参数去监听事件这一入口出发,分析了DeliverClient这一侧的相关源代码,接下来这篇文章将会解析DeliverServer这一侧的相关源代码,准备好上车了兄弟们。
事件相关的pb.go文件在protos/peer/events.pb.go中,关于DeliverServer的部分在576行:
// DeliverServer is the server API for Deliver service. type DeliverServer interface { // deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with // Payload data as a marshaled orderer.SeekInfo message, // then a stream of block replies is received Deliver(Deliver_DeliverServer) error // deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with // Payload data as a marshaled orderer.SeekInfo message, // then a stream of **filtered** block replies is received DeliverFiltered(Deliver_DeliverFilteredServer) error } // 通过这个函数注册DeliverServer func RegisterDeliverServer(s *grpc.Server, srv DeliverServer) { s.RegisterService(&_Deliver_serviceDesc, srv) } // Deliver流的Handler func _Deliver_Deliver_Handler(srv interface{}, stream grpc.ServerStream) error { return srv.(DeliverServer).Deliver(&deliverDeliverServer{stream}) } type Deliver_DeliverServer interface { Send(*DeliverResponse) error Recv() (*common.Envelope, error) grpc.ServerStream } type deliverDeliverServer struct { grpc.ServerStream } func (x *deliverDeliverServer) Send(m *DeliverResponse) error { return x.ServerStream.SendMsg(m) } func (x *deliverDeliverServer) Recv() (*common.Envelope, error) { m := new(common.Envelope) if err := x.ServerStream.RecvMsg(m); err != nil { return nil, err } return m, nil } // DeliverFiltered流的Handler func _Deliver_DeliverFiltered_Handler(srv interface{}, stream grpc.ServerStream) error { return srv.(DeliverServer).DeliverFiltered(&deliverDeliverFilteredServer{stream}) } type Deliver_DeliverFilteredServer interface { Send(*DeliverResponse) error Recv() (*common.Envelope, error) grpc.ServerStream } type deliverDeliverFilteredServer struct { grpc.ServerStream } func (x *deliverDeliverFilteredServer) Send(m *DeliverResponse) error { return x.ServerStream.SendMsg(m) } func (x *deliverDeliverFilteredServer) Recv() (*common.Envelope, error) { m := new(common.Envelope) if err := x.ServerStream.RecvMsg(m); err != nil { return nil, err } return m, nil } // grpc服务对象 var _Deliver_serviceDesc = grpc.ServiceDesc{ ServiceName: "protos.Deliver", HandlerType: (*DeliverServer)(nil), Methods: []grpc.MethodDesc{}, Streams: []grpc.StreamDesc{ { StreamName: "Deliver", Handler: _Deliver_Deliver_Handler, ServerStreams: true, ClientStreams: true, }, { StreamName: "DeliverFiltered", Handler: _Deliver_DeliverFiltered_Handler, ServerStreams: true, ClientStreams: true, }, }, Metadata: "peer/events.proto", }看看RegisterDeliverServer()这个方法在哪被调用了
abServer := peer.NewDeliverEventsServer(mutualTLS, policyCheckerProvider, &peer.DeliverChainManager{}, metricsProvider) pb.RegisterDeliverServer(peerServer.Server(), abServer)上述代码在peer/node/start.go的274行。
因此破案了,在peer节点启动的时候,启动了DeliverServer。其中主要是abServer这个对象实现了DeliverServer的相关接口,那么来看下NewDeliverEventsServer这个方法吧,生成了一个DeliverEventsServer对象,在core/peer/deliverevents.go的132行:
// NewDeliverEventsServer creates a peer.Deliver server to deliver block and // filtered block events func NewDeliverEventsServer(mutualTLS bool, policyCheckerProvider PolicyCheckerProvider, chainManager deliver.ChainManager, metricsProvider metrics.Provider) peer.DeliverServer { // mutualTLS是TLS相关的一些东西 // policyCheckerProvider主要是做ACL的一些检测,暂时不关心 // chainManager传进来的是一个空的DeliverChainManager对象 // metricsProvider是指标相关的东西 timeWindow := viper.GetDuration("peer.authentication.timewindow") if timeWindow == 0 { defaultTimeWindow := 15 * time.Minute logger.Warningf("`peer.authentication.timewindow` not set; defaulting to %s", defaultTimeWindow) timeWindow = defaultTimeWindow } metrics := deliver.NewMetrics(metricsProvider) // 最终返回的是一个server对象 return &server{ // NewHandler创建了一个deliver.Handler对象 dh: deliver.NewHandler(chainManager, timeWindow, mutualTLS, metrics, false), policyCheckerProvider: policyCheckerProvider, } }这个server对象最终实现了Deliver和DeliverFiltered接口,实现了DeliverServer。
上篇文章中说到,cmd 最终建立的是DeliverFiltered流,因此最终会走到server的 DeliverFiltered方法,看下这个方法的实现,在core/peer/deliverevents.go的101行:
// Deliver sends a stream of blocks to a client after commitment func (s *server) DeliverFiltered(srv peer.Deliver_DeliverFilteredServer) error { logger.Debugf("Starting new DeliverFiltered handler") defer dumpStacktraceOnPanic() // getting policy checker based on resources.Event_FilteredBlock resource name deliverServer := &deliver.Server{ Receiver: srv, PolicyChecker: s.policyCheckerProvider(resources.Event_FilteredBlock), ResponseSender: &filteredBlockResponseSender{ Deliver_DeliverFilteredServer: srv, }, } // 执行Handle方法处理 return s.dh.Handle(srv.Context(), deliverServer) }最终执行的是Handle()方法,看一下,在common/deliver/deliver.go的152行:
// Handle receives incoming deliver requests. func (h *Handler) Handle(ctx context.Context, srv *Server) error { addr := util.ExtractRemoteAddress(ctx) logger.Debugf("Starting new deliver loop for %s", addr) // 指标相关 h.Metrics.StreamsOpened.Add(1) defer h.Metrics.StreamsClosed.Add(1) // 死循环从客户端获取消息 for { logger.Debugf("Attempting to read seek info message from %s", addr) envelope, err := srv.Recv() if err == io.EOF { logger.Debugf("Received EOF from %s, hangup", addr) return nil } if err != nil { logger.Warningf("Error reading from %s: %s", addr, err) return err } // 主要的执行函数deliverBlocks status, err := h.deliverBlocks(ctx, srv, envelope) if err != nil { return err } // 响应statusResponse err = srv.SendStatusResponse(status) if status != cb.Status_SUCCESS { return err } if err != nil { logger.Warningf("Error sending to %s: %s", addr, err) return err } // 等待新的SeekInfo信息 logger.Debugf("Waiting for new SeekInfo from %s", addr) } }主要逻辑都在deliverBlocks()中,在common/deliver/deliver.go的194行:
func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.Envelope) (status cb.Status, err error) { addr := util.ExtractRemoteAddress(ctx) // 获取消息的payload,出错就返回Status_BAD_REQUEST状态 payload, err := utils.UnmarshalPayload(envelope.Payload) if err != nil { logger.Warningf("Received an envelope from %s with no payload: %s", addr, err) return cb.Status_BAD_REQUEST, nil } // 如果payload.Header为空就返回Status_BAD_REQUEST状态 if payload.Header == nil { logger.Warningf("Malformed envelope received from %s with bad header", addr) return cb.Status_BAD_REQUEST, nil } // 获取channelHeader,出错就返回Status_BAD_REQUEST状态 chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader) if err != nil { logger.Warningf("Failed to unmarshal channel header from %s: %s", addr, err) return cb.Status_BAD_REQUEST, nil } // 验证ChannelHeader,出错就返回Status_BAD_REQUEST状态 err = h.validateChannelHeader(ctx, chdr) if err != nil { logger.Warningf("Rejecting deliver for %s due to envelope validation error: %s", addr, err) return cb.Status_BAD_REQUEST, nil } // 获取通道信息,也就是链信息,这块不展开了 chain := h.ChainManager.GetChain(chdr.ChannelId) if chain == nil { // 通道找不到,就返回Status_NOT_FOUND状态 logger.Debugf("Rejecting deliver for %s because channel %s not found", addr, chdr.ChannelId) return cb.Status_NOT_FOUND, nil } labels := []string{ "channel", chdr.ChannelId, // 这里filtered为true "filtered", strconv.FormatBool(isFiltered(srv)), } // 指标相关 h.Metrics.RequestsReceived.With(labels...).Add(1) defer func() { labels := append(labels, "success", strconv.FormatBool(status == cb.Status_SUCCESS)) h.Metrics.RequestsCompleted.With(labels...).Add(1) }() // 获取SeekInfo,出错就返回Status_BAD_REQUEST状态 seekInfo := &ab.SeekInfo{} if err = proto.Unmarshal(payload.Data, seekInfo); err != nil { logger.Warningf("[channel: %s] Received a signed deliver request from %s with malformed seekInfo payload: %s", chdr.ChannelId, addr, err) return cb.Status_BAD_REQUEST, nil } erroredChan := chain.Errored() if seekInfo.ErrorResponse == ab.SeekInfo_BEST_EFFORT { // 在SeekInfo_BEST_EFFORT时(表示尽力而为),设置erroredChan = nil // 这样下面就会走default分支,也就是什么都不做 erroredChan = nil } select { case <-erroredChan: // 如果出错,返回Status_SERVICE_UNAVAILABLE状态 logger.Warningf("[channel: %s] Rejecting deliver request for %s because of consenter error", chdr.ChannelId, addr) return cb.Status_SERVICE_UNAVAILABLE, nil default: } // 创建一个SessionAccessControl对象,出错就返回Status_BAD_REQUEST状态 accessControl, err := NewSessionAC(chain, envelope, srv.PolicyChecker, chdr.ChannelId, h.ExpirationCheckFunc) if err != nil { logger.Warningf("[channel: %s] failed to create access control object due to %s", chdr.ChannelId, err) return cb.Status_BAD_REQUEST, nil } // 评估ACL(访问控制权限)相关的一些东西 // 如果没有权限,就会返回Status_FORBIDDEN状态 if err := accessControl.Evaluate(); err != nil { logger.Warningf("[channel: %s] Client authorization revoked for deliver request from %s: %s", chdr.ChannelId, addr, err) return cb.Status_FORBIDDEN, nil } // 如果seekInfo的Start或者是Stop为nil,返回Status_BAD_REQUEST状态 if seekInfo.Start == nil || seekInfo.Stop == nil { logger.Warningf("[channel: %s] Received seekInfo message from %s with missing start or stop %v, %v", chdr.ChannelId, addr, seekInfo.Start, seekInfo.Stop) return cb.Status_BAD_REQUEST, nil } logger.Debugf("[channel: %s] Received seekInfo (%p) %v from %s", chdr.ChannelId, seekInfo, seekInfo, addr) // 这里应该是获取通道账本的迭代器,用来迭代访问区块 // 迭代器从seekInfo.Start开始,number为起始块号 // 这块逻辑跟账本有关,就不展开了 cursor, number := chain.Reader().Iterator(seekInfo.Start) defer cursor.Close() var stopNum uint64 // 判断seekInfo.Stop的类型 switch stop := seekInfo.Stop.Type.(type) { case *ab.SeekPosition_Oldest: // 如果seekInfo.Stop的类型是SeekPosition_Oldest // 那就就将stopNum设置为迭代器迭代的起始块号 stopNum = number case *ab.SeekPosition_Newest: // 如果seekInfo.Stop的类型是SeekPosition_Start // 如果seekInfo.Start的类型也是SeekPosition_Start,那就就将stopNum设置为迭代器迭代的起始块号 // 否则,stopNum设置为当前账本高度-1 // 注意需要-1,否则可能会返回多个块,可能预期的只是一个块 if proto.Equal(seekInfo.Start, seekInfo.Stop) { stopNum = number break } stopNum = chain.Reader().Height() - 1 case *ab.SeekPosition_Specified: // 如果seekInfo.Stop的类型是SeekPosition_Specified // 那么就将stopNum设置为指定的number // 如果指定的stopnum比迭代器起始块的高度还要小,那么就报错 stopNum = stop.Specified.Number if stopNum < number { logger.Warningf("[channel: %s] Received invalid seekInfo message from %s: start number %d greater than stop number %d", chdr.ChannelId, addr, number, stopNum) return cb.Status_BAD_REQUEST, nil } } // 死循环 for { // 如果seekInfo.Behavior是SeekInfo_FAIL_IF_NOT_READY // 表示如果块没有准备好久返回错误 if seekInfo.Behavior == ab.SeekInfo_FAIL_IF_NOT_READY { if number > chain.Reader().Height()-1 { // 如果当前迭代起始块大于当前区块高度-1,表示找不到,返回Status_NOT_FOUND状态 return cb.Status_NOT_FOUND, nil } } // 定义两个变量 var block *cb.Block var status cb.Status // 迭代结束通道,发送迭代完成通知 iterCh := make(chan struct{}) go func() { // 不停的迭代,调用next block, status = cursor.Next() close(iterCh) }() select { case <-ctx.Done(): // 表示在区块取回之间上下午退出了,返回Status_INTERNAL_SERVER_ERROR状态 logger.Debugf("Context canceled, aborting wait for next block") return cb.Status_INTERNAL_SERVER_ERROR, errors.Wrapf(ctx.Err(), "context finished before block retrieved") case <-erroredChan: // TODO, today, the only user of the errorChan is the orderer consensus implementations. If the peer ever reports // this error, we will need to update this error message, possibly finding a way to signal what error text to return. // 这块看不太懂这个erroredChan啥意思,好像是共识有关的,给排序节点用的,返回Status_SERVICE_UNAVAILABLE状态 logger.Warningf("Aborting deliver for request because the backing consensus implementation indicates an error") return cb.Status_SERVICE_UNAVAILABLE, nil case <-iterCh: // Iterator has set the block and status vars // 到这表示迭代完了 } if status != cb.Status_SUCCESS { // 迭代完status不为Status_SUCCESS则返回对应的status logger.Errorf("[channel: %s] Error reading from channel, cause was: %v", chdr.ChannelId, status) return status, nil } // increment block number to support FAIL_IF_NOT_READY deliver behavior // 这个值++为下一个循环开始的判断做准备 // 因为number++以后,number > chain.Reader().Height()-1应该就会成立了 // 这个时候FAIL_IF_NOT_READY 的行为会返回 number++ // 继续再评估一下ACL if err := accessControl.Evaluate(); err != nil { logger.Warningf("[channel: %s] Client authorization revoked for deliver request from %s: %s", chdr.ChannelId, addr, err) return cb.Status_FORBIDDEN, nil } logger.Debugf("[channel: %s] Delivering block [%d] for (%p) for %s", chdr.ChannelId, block.Header.Number, seekInfo, addr) // 将这次迭代拿到的区块返回,发送失败返回Status_INTERNAL_SERVER_ERROR状态 if err := srv.SendBlockResponse(block); err != nil { logger.Warningf("[channel: %s] Error sending to %s: %s", chdr.ChannelId, addr, err) return cb.Status_INTERNAL_SERVER_ERROR, err } //指标设置 h.Metrics.BlocksSent.With(labels...).Add(1) //如果block的区块好和stopnum一样,那就要停止了 //所以一般如果要无限循环的话,stopnum一般就设置为math.MaxInt64了 //这一点在上一篇文章中提到过 if stopNum == block.Header.Number { break } } logger.Debugf("[channel: %s] Done delivering to %s for (%p)", chdr.ChannelId, addr, seekInfo) // 完成之后还是要返回一个Status_SUCCESS状态 return cb.Status_SUCCESS, nil }代码很长,不过好在没有很多需要跳转的地方。需要注意的一个地方是,发送消息给DeliverClient的时候,只有两种响应类型:
// ResponseSender defines the interface a handler must implement to send // responses. type ResponseSender interface { SendStatusResponse(status cb.Status) error SendBlockResponse(block *cb.Block) error }一般拿到一个区块的时候,就会执行SendBlockResponse(),最终返回的响应类型是DeliverResponse_FilteredBlock。在返回一个status时,包括各种错误,或是Status_SUCCESS,都会执行SendStatusResponse(),最终返回的响应类型是DeliverResponse_Status。返回status就代表DeliverServer这边的逻辑处理结束了,需要通知DeliverClient,DeliverClient具体怎么处理就看各自实现了。
最后看下SendBlockResponse()方法,它是最终返回一个事件响应的方法,代码在core/peer/deliverevents.go的79行:
type blockEvent common.Block // SendBlockResponse generates deliver response with block message func (fbrs *filteredBlockResponseSender) SendBlockResponse(block *common.Block) error { // blockEvent主要就是用了另一个类型要包装一下commmon.block类型 // 以便可以自定义一个toFilteredBlock方法 b := blockEvent(*block) // 这块主要就是过滤区块了 filteredBlock, err := b.toFilteredBlock() if err != nil { logger.Warningf("Failed to generate filtered block due to: %s", err) // 出错调用SendStatusResponse返回错误状态 return fbrs.SendStatusResponse(common.Status_BAD_REQUEST) } // 创建响应类型DeliverResponse,type是DeliverResponse_FilteredBlock response := &peer.DeliverResponse{ Type: &peer.DeliverResponse_FilteredBlock{FilteredBlock: filteredBlock}, } return fbrs.Send(response) }来看下toFilteredBlock()方法,在core/peer/deliverevents.go的157行:
func (block *blockEvent) toFilteredBlock() (*peer.FilteredBlock, error) { // 创建要返回的FilteredBlock对象,赋值Number字段为区块高度 filteredBlock := &peer.FilteredBlock{ Number: block.Header.Number, } // 拿到区块元数据中的common.BlockMetadataIndex_TRANSACTIONS_FILTER部分的内容, // TxValidationFlags是一个[]uint8类型,这里是做了一个类型转化 txsFltr := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER]) // block.Data.Data就是每条交易数据 // txIndex是下标,ebytes是一个[]byte类型,是Envelope类型的序列化内容 for txIndex, ebytes := range block.Data.Data { var env *common.Envelope var err error // 当前交易为空,continue if ebytes == nil { logger.Debugf("got nil data bytes for tx index %d, "+ "block num %d", txIndex, block.Header.Number) continue } // 获取env对象 env, err = utils.GetEnvelopeFromBlock(ebytes) if err != nil { logger.Errorf("error getting tx from block, %s", err) continue } // 获取env的payload,这里出错直接返回,而不是continue payload, err := utils.GetPayload(env) if err != nil { return nil, errors.WithMessage(err, "could not extract payload from envelope") } // payload.Header为空continue if payload.Header == nil { logger.Debugf("transaction payload header is nil, %d, block num %d", txIndex, block.Header.Number) continue } // 获取channelHeader chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader) if err != nil { return nil, err } // 设置filteredBlock的ChannelID filteredBlock.ChannelId = chdr.ChannelId // 创建FilteredTransaction对象 // 包括交易ID,头部类型,以及交易验证码(这个东西还不太懂) filteredTransaction := &peer.FilteredTransaction{ Txid: chdr.TxId, Type: common.HeaderType(chdr.Type), TxValidationCode: txsFltr.Flag(txIndex), } // 如果交易类型是HeaderType_ENDORSER_TRANSACTION if filteredTransaction.Type == common.HeaderType_ENDORSER_TRANSACTION { // 那么就获取交易的具体数据,得到peer.Transaction对象tx tx, err := utils.GetTransaction(payload.Data) if err != nil { return nil, errors.WithMessage(err, "error unmarshal transaction payload for block event") } // 将tx.Actions(类型为[]*TransactionAction)转化为transactionActions类型(是[]*TransactionAction的别名) // 本质上是拿到Actions,事件就存储在交易的Acitons字段中 // 来看下toFilteredActions方法,过滤事件去了 filteredTransaction.Data, err = transactionActions(tx.Actions).toFilteredActions() if err != nil { logger.Errorf(err.Error()) return nil, err } } // 设置filteredBlock的FilteredTransactions字段 filteredBlock.FilteredTransactions = append(filteredBlock.FilteredTransactions, filteredTransaction) } // 最终返回filteredBlock return filteredBlock, nil }看下toFilteredActions()方法,在core/peer/deliverevents.go的222行:
type FilteredTransactionActions struct { ChaincodeActions []*FilteredChaincodeAction XXX_NoUnkeyedLiteral struct{} XXX_unrecognized []byte XXX_sizecache int32 } type FilteredChaincodeAction struct { // ChaincodeEvent就是链码事件结构体,这个在之前的文章中有提到 ChaincodeEvent *ChaincodeEvent XXX_NoUnkeyedLiteral struct{} XXX_unrecognized []byte XXX_sizecache int32 } func (ta transactionActions) toFilteredActions() (*peer.FilteredTransaction_TransactionActions, error) { // 创建一个FilteredTransactionActions对象 transactionActions := &peer.FilteredTransactionActions{} // 本质上就是遍历交易的Actions字段,每个action是Action类型 for _, action := range ta { // 获取action的payload字段 chaincodeActionPayload, err := utils.GetChaincodeActionPayload(action.Payload) if err != nil { return nil, errors.WithMessage(err, "error unmarshal transaction action payload for block event") } // chaincodeActionPayload.Action为空跳过 if chaincodeActionPayload.Action == nil { logger.Debugf("chaincode action, the payload action is nil, skipping") continue } // 获取chaincodeActionPayload.Action.ProposalResponsePayload // 是一个ProposalResponsePayload类型 propRespPayload, err := utils.GetProposalResponsePayload(chaincodeActionPayload.Action.ProposalResponsePayload) if err != nil { return nil, errors.WithMessage(err, "error unmarshal proposal response payload for block event") } // 获取propRespPayload.Extension字段,事件就存储在其中 // 该类型是一个ChaincodeAction类型 caPayload, err := utils.GetChaincodeAction(propRespPayload.Extension) if err != nil { return nil, errors.WithMessage(err, "error unmarshal chaincode action for block event") } // 获取caPayload.Events字段,该类型就是一个ChaincodeEvents类型 // 到这里就拿到事件了 ccEvent, err := utils.GetChaincodeEvents(caPayload.Events) if err != nil { return nil, errors.WithMessage(err, "error unmarshal chaincode event for block event") } // 获取链码ID if ccEvent.GetChaincodeId() != "" { // 其实就是一个事件的拷贝操作 // 注意这里没有把事件的payload拷贝过去 filteredAction := &peer.FilteredChaincodeAction{ ChaincodeEvent: &peer.ChaincodeEvent{ TxId: ccEvent.TxId, ChaincodeId: ccEvent.ChaincodeId, EventName: ccEvent.EventName, // 没有拷贝payload字段 }, } // 加上这个事件 transactionActions.ChaincodeActions = append(transactionActions.ChaincodeActions, filteredAction) } } // 最终返回结果 return &peer.FilteredTransaction_TransactionActions{ TransactionActions: transactionActions, }, nil }好了到这里就分析完了。细心的朋友到这里其实就会发现一个问题,在做事件拷贝的时候,这一部分并没有将事件的payload字段拷贝过去,这就是DeliverFiltered这个服务做的,它最终把事件的部分内容返回回去了,不包括事件的payload信息,也就是事件的具体内容,只包含了TxId,ChaincodeId,EventName三个事件字段信息。
既然这样,那么Deliver服务做的应该就是完整的事件信息了,可以来看一下,代码在core/peer/deliverevents.go的52行:
// SendBlockResponse generates deliver response with block message func (brs *blockResponseSender) SendBlockResponse(block *common.Block) error { response := &peer.DeliverResponse{ // 直接将整个block返回 Type: &peer.DeliverResponse_Block{Block: block}, } return brs.Send(response) }这里甚至是直接将整个block返回了,都不带过滤事件的,看这个要从block中获取事件的操作应该要有DeliverClient来做了,这样就可以拿到事件的具体信息。
这个问题其实在protos/peer/events.pb.go中已经说明了:
type DeliverResponse_Block struct { // 这里直接返回 Block *common.Block `protobuf:"bytes,2,opt,name=block,proto3,oneof"` } type DeliverResponse_FilteredBlock struct { // 这里过滤事件后返回 FilteredBlock *FilteredBlock `protobuf:"bytes,3,opt,name=filtered_block,json=filteredBlock,proto3,oneof"` }很多SDK提供了类似BLOCK_FILTERED的标志来让用户选择是注册DeliverFiltered还是Deliver服务,针对不同的类型做出不同的措施。
建议事件如果只是做一个通知的作用,采用DeliverFiltered服务会更轻量,如果需要了解事件中的具体内容,则必须使用Deliver服务