现在有许多框架等在使用Protocol Buffers。gRPC也是基于Protocol Buffers。 Protocol Buffers 目前有2和3两个版本号。
在gRPC中推荐使用proto3版本。
新建并编辑reco.proto
syntax = "proto3"; message UserRequest { string user_id=1; int32 channel_id=2; int32 article_num=3; int64 time_stamp=4; } message Track { string click=1; string collect=2; string share=3; string read=4; } message Article { int64 article_id=1; Track track=2; } message ArticleResponse { string exposure=1; int64 time_stamp=2; repeated Article recommends=3; } service UserRecommend { rpc user_recommend(UserRequest) returns(ArticleResponse) {} }会自动生成如下两个rpc调用辅助代码模块:
reco_pb2.py 保存根据接口定义文件中的数据类型生成的python类
reco_pb2_grpc.py 保存根据接口定义文件中的服务方法类型生成的python调用RPC方法
新建并编辑server.py
# -*- coding: utf-8 -*- import reco_pb2 import reco_pb2_grpc import grpc from concurrent.futures import ThreadPoolExecutor import time # rpc接口定义中服务对应成Python的类 class UserRecommendService(reco_pb2_grpc.UserRecommendServicer): # 在接口定义的同名方法中补全,被调用时应该执行的逻辑 def user_recommend(self, request, context): # request是调用的请求数据对象 user_id = request.user_id channel_id = request.channel_id article_num = request.article_num time_stamp = request.time_stamp response = reco_pb2.ArticleResponse() # 手动构造推荐结果,后续对接真实推荐代码 response.exposure = 'exposure param' response.time_stamp = round(time.time() * 1000) recommends = [] for i in range(article_num): article = reco_pb2.Article() article.track.click = 'click param {}'.format(i + 1) article.track.collect = 'collect param {}'.format(i + 1) article.track.share = 'share param {}'.format(i + 1) article.track.read = 'read param {}'.format(i + 1) article.article_id = i + 1 recommends.append(article) response.recommends.extend(recommends) # 最终要返回一个调用结果 return response def serve(): """ rpc服务端启动方法 """ # 创建一个rpc服务器 server = grpc.server(ThreadPoolExecutor(max_workers=10)) # 向服务器中添加被调用的服务方法 reco_pb2_grpc.add_UserRecommendServicer_to_server(UserRecommendService(), server) # 微服务器绑定ip地址和端口 server.add_insecure_port('127.0.0.1:8888') # 启动rpc服务 server.start() # start()不会阻塞,此处需要加上循环睡眠 防止程序退出 while True: time.sleep(10) if __name__ == '__main__': serve()新建并编辑client.py
# -*- coding: utf-8 -*- import grpc import reco_pb2 import reco_pb2_grpc import time def feed_articles(stub): # 构建rpc调用的调用参数 user_request = reco_pb2.UserRequest() user_request.user_id = '1' user_request.channel_id = 1 user_request.article_num = 10 user_request.time_stamp = round(time.time() * 1000) # 通过stub进行方法调用,并接收调用返回值 ret = stub.user_recommend(user_request) print('ret={}'.format(ret)) def run(): """ rpc客户端调用的方法 """ # 使用with语句连接rpc服务器 with grpc.insecure_channel('127.0.0.1:8888') as channel: # 创建调用rpc远端服务的辅助对象stub stub = reco_pb2_grpc.UserRecommendStub(channel) # 通过stub进行rpc调用 feed_articles(stub) if __name__ == '__main__': run()1、先运行服务端python server.py
2、再运行客户端python client.py
3、结果:
ret=exposure: "exposure param" time_stamp: 1593676322597 recommends { article_id: 1 track { click: "click param 1" collect: "collect param 1" share: "share param 1" read: "read param 1" } } 。。。省略。。。 recommends { article_id: 10 track { click: "click param 10" collect: "collect param 10" share: "share param 10" read: "read param 10" } }