RPC实现伪个性化推荐方案——python

    技术2022-07-20  76

    RPC实现伪个性化推荐方案——python

    一、相关概念

    RPC

    远程过程调用(英语:Remote Procedure Call,缩写为 RPC,也叫远程程序调用)是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用或远程方法调用。

    GRPC

    gRPC是由Google公司开源的高性能RPC框架。gRPC支持多语言。gRPC原生使用C、Java、Go进行了三种实现,而C语言实现的版本进行封装后又支持C++、C#、Node、ObjC、Python、Ruby、PHP等开发语言gRPC支持多平台。支持的平台包括:Linux、Android、iOS、MacOS、WindowsgRPC的消息协议使用Google自家开源的Protocol Buffers协议机制(proto3) 序列化gRPC的传输使用HTTP/2标准,支持双向流和连接多路复用

    Protocol Buffers

    Protocol Buffers 是一种与语言无关,平台无关的可扩展机制,用于序列化结构化数据。使用Protocol Buffers 可以一次定义结构化的数据,然后可以使用特殊生成的源代码轻松地在各种数据流中使用各种语言编写和读取结构化数据。

    现在有许多框架等在使用Protocol Buffers。gRPC也是基于Protocol Buffers。 Protocol Buffers 目前有2和3两个版本号。

    在gRPC中推荐使用proto3版本。

    二、推荐系统接口定义

    接口名称: user_recommend

    调用参数:

    UserRequest: user_id # 用户id channel_id # 频道id article_num # 推荐的文章数量 time_stamp # 推荐的时间戳

    返回数据:

    ArticleResponse: expousre # 曝光埋点数据 time_stamp # 推荐的时间戳 recommends: # 推荐结果 article_id # 文章id track: # 关于文章的埋点数据 click # 用户点击行为的埋点参数 collect # 用户收藏的埋点参数 share # 用户分享的埋点参数 read # 用户进入文章详情的埋点参数

    使用Protobuf 定义接口

    新建并编辑reco.proto

    syntax = "proto3"; message UserRequest { string user_id=1; int32 channel_id=2; int32 article_num=3; int64 time_stamp=4; } message Track { string click=1; string collect=2; string share=3; string read=4; } message Article { int64 article_id=1; Track track=2; } message ArticleResponse { string exposure=1; int64 time_stamp=2; repeated Article recommends=3; } service UserRecommend { rpc user_recommend(UserRequest) returns(ArticleResponse) {} }

    三、代码生成

    安装protobuf编译器和grpc库

    pip install grpcio-tools

    编译生成代码

    python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. reco.proto -I表示搜索proto文件中被导入文件的目录–python_out表示保存生成Python文件的目录,生成的文件中包含接口定义中的数据类型–grpc_python_out表示保存生成Python文件的目录,生成的文件中包含接口定义中的服务类型

    会自动生成如下两个rpc调用辅助代码模块:

    reco_pb2.py 保存根据接口定义文件中的数据类型生成的python类

    reco_pb2_grpc.py 保存根据接口定义文件中的服务方法类型生成的python调用RPC方法

    四、服务端

    新建并编辑server.py

    # -*- coding: utf-8 -*- import reco_pb2 import reco_pb2_grpc import grpc from concurrent.futures import ThreadPoolExecutor import time # rpc接口定义中服务对应成Python的类 class UserRecommendService(reco_pb2_grpc.UserRecommendServicer): # 在接口定义的同名方法中补全,被调用时应该执行的逻辑 def user_recommend(self, request, context): # request是调用的请求数据对象 user_id = request.user_id channel_id = request.channel_id article_num = request.article_num time_stamp = request.time_stamp response = reco_pb2.ArticleResponse() # 手动构造推荐结果,后续对接真实推荐代码 response.exposure = 'exposure param' response.time_stamp = round(time.time() * 1000) recommends = [] for i in range(article_num): article = reco_pb2.Article() article.track.click = 'click param {}'.format(i + 1) article.track.collect = 'collect param {}'.format(i + 1) article.track.share = 'share param {}'.format(i + 1) article.track.read = 'read param {}'.format(i + 1) article.article_id = i + 1 recommends.append(article) response.recommends.extend(recommends) # 最终要返回一个调用结果 return response def serve(): """ rpc服务端启动方法 """ # 创建一个rpc服务器 server = grpc.server(ThreadPoolExecutor(max_workers=10)) # 向服务器中添加被调用的服务方法 reco_pb2_grpc.add_UserRecommendServicer_to_server(UserRecommendService(), server) # 微服务器绑定ip地址和端口 server.add_insecure_port('127.0.0.1:8888') # 启动rpc服务 server.start() # start()不会阻塞,此处需要加上循环睡眠 防止程序退出 while True: time.sleep(10) if __name__ == '__main__': serve()

    五、客户端

    新建并编辑client.py

    # -*- coding: utf-8 -*- import grpc import reco_pb2 import reco_pb2_grpc import time def feed_articles(stub): # 构建rpc调用的调用参数 user_request = reco_pb2.UserRequest() user_request.user_id = '1' user_request.channel_id = 1 user_request.article_num = 10 user_request.time_stamp = round(time.time() * 1000) # 通过stub进行方法调用,并接收调用返回值 ret = stub.user_recommend(user_request) print('ret={}'.format(ret)) def run(): """ rpc客户端调用的方法 """ # 使用with语句连接rpc服务器 with grpc.insecure_channel('127.0.0.1:8888') as channel: # 创建调用rpc远端服务的辅助对象stub stub = reco_pb2_grpc.UserRecommendStub(channel) # 通过stub进行rpc调用 feed_articles(stub) if __name__ == '__main__': run()

    六、运行与结果

    1、先运行服务端python server.py

    2、再运行客户端python client.py

    3、结果:

    ret=exposure: "exposure param" time_stamp: 1593676322597 recommends { article_id: 1 track { click: "click param 1" collect: "collect param 1" share: "share param 1" read: "read param 1" } } 。。。省略。。。 recommends { article_id: 10 track { click: "click param 10" collect: "collect param 10" share: "share param 10" read: "read param 10" } }
    Processed: 0.008, SQL: 9