是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。 微服务常用更高效的rpc(远程过程调用协议)通信。
创建book.proto文件,并输入
syntax = "proto3"; // 版本声明,使用Protocol Buffers v3版本 package book; // 包名 // 包含id的一个请求消息 message BookRequest { int32 id = 1; } // 包含名称的响应消息 message BookResponse { string name = 1; } // 定义根据id获取名称的服务,对应go中的接口 service BookFun { rpc GetBookInfoByID (BookRequest) returns (BookResponse) {} }将.proto文件放到book文件夹下,并执行如下命令,生成book.pb.go文件,
protoc --go_out=plugins=grpc:. book.proto获取grpc开发工具
go get google.golang.org/grpc type BookEntity struct { } //实现GetBookInfoByID接口 func (b *BookEntity) GetBookInfoByID(ctx context.Context,requ *book.BookRequest) (*book.BookResponse, error){ resp := new(book.BookResponse) switch requ.Id { case 1: resp.Name = "西游记" default: resp.Name = "金瓶梅" } return resp,nil } func main(){ ls, _ := net.Listen("tcp", ":666") gs := grpc.NewServer() //将接口实现注册到grpc实例中 book.RegisterBookFunServer(gs,new(BookEntity)) gs.Serve(ls) }1.创建一个book.proto文件
syntax = "proto3"; // 版本声明,使用Protocol Buffers v3版本 package book; // 包名 // 包含id的一个请求消息 message BookRequest { int32 id = 1; } // 包含名称的响应消息 message BookResponse { string name = 1; } // 定义根据id获取名称的服务 service BookFun { rpc GetBookInfoByID (BookRequest) returns (BookResponse) {} }2.在文件目录下运行如下命令,生成book.pb.go, 需要提前安装protobuf并配置环境变量
protoc --go_out=plugins=grpc:. book.proto3.在service文件夹下创建go文件 并创建接口和实现方法
type BookInter interface { GetBookInfoByID(int32)string } type Book struct { } func(b * Book)GetBookInfoByID(id int32)string{ switch id { case 1: return "西游记" case 2: return "三国演义" case 3: return "水浒传" case 4: return "红楼梦" case 5: return "金瓶梅" } return "未知id" }4.在transport文件夹下实现endpoint的编解码方法
func DecodeBook(ctx context.Context,inter interface{}) (request interface{}, err error){ return inter,nil } func EncodeBook(ctx context.Context,inter interface{}) (response interface{}, err error){ return inter,nil }5.在epoint文件夹下, 声明创建endpoint的方法,该方法调用业务逻辑
func GetGrpcEndpointForGetBookIDS(inter service.BookInter)endpoint.Endpoint{ return func(ctx context.Context, request interface{}) (response interface{}, err error){ bookresp := new(book.BookResponse) bookresp.Name = inter.GetBookInfoByID(request.(*book.BookRequest).Id) return bookresp,nil } }6.声明一个结构体,并实现proto中的接口,在实现方法中调用endpoint的ServeGRPC方法。 kitgrpc 是 “github.com/go-kit/kit/transport/grpc”
//调用该方法将GetBookInfoByID方法的实现注册到gRPC服务中 func InitGRPCRouter(gs * grpc.Server) { book.RegisterBookFunServer(gs,NewGrpcBook()) } //创建实现接口的函数,此处实现了endpoint的创建,并赋值给结构体中的参数 func NewGrpcBook() book.BookFunServer{ b := &GrpcBook{} b.Handler = kitgrpc.NewServer(epoint.GetGrpcEndpointForGetBookIDS(new(service.Book)), transport.DecodeBook,transport.EncodeBook) return b } type GrpcBook struct { Handler kitgrpc.Handler } //实现proto中的接口 func (g * GrpcBook)GetBookInfoByID(ctx context.Context,request *book.BookRequest)(*book.BookResponse,error){ _,res ,err := g.Handler.ServeGRPC(ctx,request) return res.(*book.BookResponse),err }7.创建GRPC实例,并注册方法
func main(){ ls, _ := net.Listen("tcp", ":666") gs := grpc.NewServer() router.InitGRPCRouter(gs) gs.Serve(ls) }1.声明解压缩方法,创建一个endpioint实例 import kitgrpc “github.com/go-kit/kit/transport/grpc”
//创建一个grpc客户端 //第二个参数是接口名称 .proto文件中的 BookFun //第三个参数是方法名称 .proto文件中的 GetBookInfoByID func GetEndpoint(cc *grpc.ClientConn, serviceName string, method string,) *kitgrpc.Client{ //参数grpcReply是grpc返回值,实际上是取类型 return kitgrpc.NewClient(cc,serviceName,method,EncodeRequestBook,DecodeResponseBook,book.BookResponse{}) } //编码 func EncodeRequestBook (ctx context.Context,inter interface{}) (request interface{}, err error){ return inter,nil } //解码 func DecodeResponseBook(ctx context.Context,inter interface{}) (response interface{}, err error){ return inter,nil }2.创建grpc实例并调用
func main(){ conn,err := grpc.Dial(":666",grpc.WithInsecure() ) if err!=nil{ panic(err) } defer conn.Close() //创建endpoint,并指明grpc调用的接口名和方法名 client := GetEndpoint(conn,"book.BookFun", "GetBookInfoByID") resp,err:= client.Endpoint()(context.Background(),&book.BookRequest{Id:1}) if err!=nil{ fmt.Println(err) } fmt.Println(resp) }将服务运行起来,然后运行客户端,截图如下
拦截器在作用于每一个 RPC 调用,通常用来做日志,认证,metric 等等
interfactor 分为两种
unary interceptor 拦截 unary(一元) RPC 调用stream interceptor 处理 stream RPCUnaryServerInterceptor
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)unary会根据消息进行拦截,todo后,可以调用invoker方法继续执行,并可以在方法后添加一些滞后操作
err := invoker(ctx, method, req, reply, cc, opts...)StreamServerInteceptor源码
type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) errorstream会根据流进行拦截, 通过调用streamer 可以获得 ClientStream, 包装ClientStream 并重载他的 RecvMsg 和 ·SendMsg 方法,即可做一些拦截处理了最后将包装好的 ClientStream 返回给客户 例子如下:(偷来的,原文)
type wrappedStream struct{ grpc.ClientStream } func (w *wrappedStream) RecvMsg(m interface{})error{ log.Printf("Receive a message (Type: %T)", m) return w.ClientStream.RecvMsg(m) } func (w *wrappedStream)SendMsg(m interface{})error{ log.Printf("Send a message (Type: %T)", m) return w.ClientStream.SendMsg(m) } func streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption)(grpc.ClientStream, error){ // do soming // return ClientStream s , err := streamer(ctx, desc, cc, method, opts...) if err != nil{ return nil, err } return &wrappedStream{s}, nil }添加方法
s := grpc.NewServer( grpc.UnaryInterceptor(unaryInterceptor), grpc.StreamInterceptor(streamInterceptor) )链式拦截器 调用 grpc.NewServer 时,使用ChainUnaryInterceptor 和 ChainStreamInterceptor 可以设置链式的 interceptor
func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption第一个 interceptor 是最外层的,最后一个为最内层 使用UnaryInterceptor, StreamInterceptor·添加的interceptor,总是最先执行
对应服务端
UnaryClientInterceptor
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) errorStreamClientInterceptor
type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)添加方法
conn, err := grpc.Dial( grpc.WithUnaryInterceptor(unaryInterepotr), grpc.WithStreamInterceptor(streamInterceptor) )链式拦截器
func WithChainStreamInterceptor(interceptors ...StreamClientInterceptor) DialOption func WithChainUnaryInterceptor(interceptors ...UnaryClientInterceptor) DialOptiongrpc能够通过ctx上下文携带元数据 实现方法是通过grpc包下的metadate包实现 “google.golang.org/grpc/metadata”
MD是元数据的基本结构如下,通过操作md实现在ctx上下文中添加提取数据
type MD map[string][]string创建MD
键不允许是“grpc-”开头,该开头已内部使用键以“-bin” 为后缀,这时,值会在传输前后以 base64 进行编解码,进行二进制存储提示: 如果对metadata进行修改,那么需要用拷贝的副本进行修改 (FromIncomingContext的注释) 方法是:func (md MD) Copy() MD func New(m map[string]string) MD func Pairs(kv ...string) MD //将若干个md连接到一起 func Join(mds ...MD) MD //从上下文中提取一个md //如果对metadata进行修改,那么需要用拷贝的副本进行修改 func FromIncomingContext(ctx context.Context) (md MD, ok bool) //创建一个md副本 func (md MD) Copy() MD md := metadata.New(map[string]string{"key1": "val1", "key2": "val2"}) md = metadata.Pairs( "key1", "val1", "key1", "val1-2", "key2", "val2", "key-bin", string([]byte{96, 102}), )客户端
//向一个已有的ctx中拼接 func AppendToOutgoingContext(ctx context.Context, kv ...string) context.Context //根据MD创建一个新的ctx上下文 func NewOutgoingContext(ctx context.Context, md MD) context.Context服务器
server 端会把 metadata 分为 header 和 trailer 发送给 clientunary RPC 可以通过 CallOption grpc.SendHeader 和 grpc.SetTriler 来发送 header, trailer metadatastream RPC 则可以直接使用 ServerStream 接口的方法SetTriler 可以被调用多次,并且所有 metadata 会被合并,当 RPC 返回的时候, trailer metadata 会被发送 //unary rpc func SendHeader(ctx context.Context, md metadata.MD) error func SetTrailer(ctx context.Context, md metadata.MD) error //stream rpc func SendHeader(md metadata.MD) error func SetTrailer(md metadata.MD) error客户端
和服务器对应采用Header和Trailer方法接收, 它们返回CallOption,放在grpc调用的方法的最后一个参数 opts …grpc.CallOption上 var header,trailer metadata.MD bookclient := book.NewBookFunClient(conn) resp,err:=bookclient.GetBookInfoByID(context.Background(),&book.BookRequest{Id:1}, grpc.Header(&header), grpc.Trailer(&trailer))stream RPC 同理
stream, err := bookclient.GetBookInfoByID(ctx) // retrieve header header, err := stream.Header() // retrieve trailer trailer := stream.Trailer()服务端
服务端调用 FromIncomingContext 即可从 context 中接收 client 发送的 metadata func FromIncomingContext(ctx context.Context) (md MD, ok bool) func (g * GrpcBook)GetBookInfoByID(ctx context.Context,request *book.BookRequest)(*book.BookResponse,error){ //在次提示,如需修改md,请先copy md,ok := metadata.FromIncomingContext(ctx) //TODO }未完待续。。。