源代码地址:https://github.com/yelvmiaoyue/simple-rpc-framework
主要技术点设计:
注册中心:使用 MySQL注册服务信息序列化手段:hessian(本想用fastjson,然而对于泛型的处理实在太麻烦了)桩的生成方式:调用时使用模板和反射动态生成 java源代码并编译加载通信协议:大概会用netty自带的LengthFieldBasedFrameDecoder来处理粘包拆包吧可插拔设计:SPI。注册中心、服务提供者都有使用到,方便后期替换不同的注册中心和服务实现类项目基础结构:
service-interface:服务模块,包含其中用到的实体类client:服务消费者,作为netty客户端server:SpringBoot 项目(方便使用一些配置化的参数),作为netty服务端rpc-framework:rpc 实现模块需要注意的是,这里设计成单向的调用关系只是为了方便,实际项目中每个项目既会调用其他项目的服务,也会提供自己的服务,是同时作为客户端和服务端的。
这里贴两个网上找的图,就按这个顺序一点点实现
第一第二步分别是服务端注册服务,和客户端订阅服务。那就先写注册中心。
还要提供一个公共方法给客户端和服务端,来获取注册中心实例,另写了一个公共接口服务。
public class RpcCommonService { /** * 获取注册中心引用 * * @param uri 连接地址 * @return */ public NameService getNameService(URI uri) { NameService nameService = ServiceLoaderUtils.load(NameService.class); nameService.init(uri.toString()); return nameService; } }这里用了个SPI工具类来加载注册中心实现类,代码省略。注册中心用的是 MySQL实现。
CREATE TABLE `nameservice` ( `id` int(11) NOT NULL AUTO_INCREMENT, `service_name` varchar(255) NOT NULL, `uri` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB在注册中心实现中,有一个Map 存放本地缓存,key为serviceName,value是服务提供者的 List<URI>。lookupService 方法返回时会从 list 里选择一个,为了方便就用了随机的算法做负载均衡。后面会把负载均衡的动作放到实际调用之前,显得更合理。
public URI lookupService(String serviceName) { //查本地缓存 List<URI> uris = localCache.get(serviceName); if (CollectionUtils.isEmpty(uris)) { //查数据库,更新缓存 synchronized (lock) { //二次判断,减轻数据库压力 uris = localCache.get(serviceName); if (CollectionUtils.isEmpty(uris)) { localCache = this.getAllServices(); } } } uris = localCache.get(serviceName); if (CollectionUtils.isEmpty(uris)) { return null; } else { return this.loadBalance(uris); } } private URI loadBalance(List<URI> uris) { //随机算法 return uris.get(ThreadLocalRandom.current().nextInt(uris.size())); }客户端整个调用的流程设计如下:先拿到公共接口实例,再去拿注册中心实例,再获得目标服务的stub实例,调用stub的方法发出请求,调用远程实现类的真实实现。
这里test1 方法中调用 add 之前的代码的作用就相当于dubbo 中的 @Reference
public static void main(String[] args) throws Exception { RpcCommonService rpcCommonService = new RpcCommonService(); NameService nameService = rpcCommonService.getNameService(new URI(NAMESERVICE_URI)); test1(nameService); } private static void test1(NameService nameService) { String serviceName = HelloService.class.getCanonicalName(); // step 1 URI uri = nameService.lookupService(serviceName); log.info("本次调用服务:{},地址:{}", serviceName, uri); // step 2 HelloService helloService = RpcCommonService.getStub(uri, HelloService.class); // step 3 Integer result = helloService.add(1, 2); log.info("收到响应: {}", result); }现在step 1 已经实现,下面来实现 step 2 。
private static Map<Class<?>, AbstractStub> stubMap = new ConcurrentHashMap<>(); private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private final Lock readLock = readWriteLock.readLock(); private final Lock writeLock = readWriteLock.writeLock(); private final String NAMESERVICE_URI = "jdbc:mysql://localhost:3306/study?user=root&password=123456"; private final NameService nameService = getNameService(URI.create(NAMESERVICE_URI)); /** * 生成目标stub * * @param serviceName stub实现的接口class对象 * @param <T> 接口类型 */ public <T> T getStub(Class<T> serviceName) { if (serviceName == null) { log.error("service对象为空。"); return null; } AbstractStub stub; readLock.lock(); try { stub = stubMap.get(serviceName); } finally { readLock.unlock(); } if (stub == null) { writeLock.lock(); try { //再次验证 stub = stubMap.get(serviceName); if (stub == null) { String service = serviceName.getCanonicalName(); List<URI> uris = nameService.lookupService(service); if (CollectionUtils.isEmpty(uris)) { log.error("目标服务当前不可达。"); return null; } ServiceInfo serviceInfo = new ServiceInfo(uris); stub = StubFactory.createStub(serviceInfo, serviceName); stubMap.put(serviceName, stub); } } finally { writeLock.unlock(); } } return (T) stub; }这里的代码已经经过了重构,把 lookup的动作也放到 getStub 中,这样HelloService helloService = rpcCommonService.getStub(HelloService.class);这一句代码的作用就可以等效于 @Reference了。
这里用读写锁控制并发,生成过的 stub 会放在 stubMap 中,如果是新的服务,先拿到写锁,然后查询服务提供地址,再去生成对应的 stub。
StubFactory 的 createStub 方法,就是实际通过模板和反射生成 stub 实例的地方了。
public class StubFactory { private static final Logger log = LoggerFactory.getLogger(StubFactory.class); private final static String CLASS_TEMPLATE = "package priv.patrick.rpc.stub;\n" + "\n" + "public class %s extends AbstractStub implements %s {\n" + "%s \n" + "}"; private final static String METHOD_TEMPLATE = " @Override\n" + " public %s %s( %s ) {\n" + "%s \n" + " return invoke(\n" + " new RpcRequest(\n" + " \"%s\",\n" + " \"%s\",\n" + " arguments\n" + " )\n" + " );\n" + " }\n"; private final static String ARGUMENTS_TEMPLATE = " Argument[] arguments = new Argument[%d];\n" + "%s\n"; private final static String ARGUMENT_TEMPLATE = " arguments[%d] =new Argument();\n" + " arguments[%d].setType(%s);\n" + " arguments[%d].setValue(arg%d);\n"; public static AbstractStub createStub(ServiceInfo serviceInfo, Class<?> serviceName) { try { //模板类名 String stubSimpleName = serviceName.getSimpleName() + "Stub"; //模板类实现的接口名 String interfaceName = serviceName.getName(); //模板类全路径 String stubFullName = "priv.patrick.rpc.stub." + stubSimpleName; StringBuilder methodSources = new StringBuilder(); Method[] methods = serviceName.getMethods(); //循环填充方法模板 for (Method method : methods) { String returnType = method.getReturnType().getTypeName(); String methodName = method.getName(); StringBuilder parameters = new StringBuilder(); StringBuilder arguments = new StringBuilder(); Class<?>[] parameterTypes = method.getParameterTypes(); for (int i = 0; i < parameterTypes.length; i++) { String name = parameterTypes[i].getName(); //形参列表 parameters.append(name).append(" arg").append(i).append(","); //请求参数 String argument = String.format(ARGUMENT_TEMPLATE, i, i, name + ".class", i, i); arguments.append(argument); } //最后删掉个逗号 if (parameters.length() > 0) { parameters.deleteCharAt(parameters.length() - 1); } String argumentSource = String.format(ARGUMENTS_TEMPLATE, parameterTypes.length, arguments); String methodSource = String.format(METHOD_TEMPLATE, returnType, methodName, parameters, argumentSource, interfaceName, methodName); methodSources.append(methodSource); } String source = String.format(CLASS_TEMPLATE, stubSimpleName, interfaceName, methodSources); if (log.isDebugEnabled()) { log.debug(source); } // 编译源代码 JavaStringCompiler compiler = new JavaStringCompiler(); Map<String, byte[]> results = compiler.compile(stubSimpleName + ".java", source); // 加载编译好的类 Class<?> clazz = compiler.loadClass(stubFullName, results); AbstractStub stubInstance = (AbstractStub) clazz.newInstance(); stubInstance.setServiceInfo(serviceInfo); return stubInstance; } catch (Exception e) { log.error("stub生成失败,{}", e.toString()); return null; } } }由于接口有任意个方法,方法有任意个参数,所以这里用了多个模板,用 StringBuilder 的 append 方法拼接多个同级对象,然后再用 String.format 方法组装到上级对象里。
stub 的所有方法里,都是调用远程实例的真实方法,所以要有一个父类 AbstractStub,调用 invoke 方法发送请求,用一个 RpcRequest 封装所有的请求信息。
public class RpcRequest implements Serializable { private String interfaceName; private String methodName; private Argument[] arguments; } public class Argument implements Serializable { private Class<?> type; private Object value; }Argument 是请求方法的参数,这里的 value 用了 Object,不知道后面有没有问题,想着就算是基本类型也会自动装箱,大概可以吧。
至此, stub 就已经生成了,后面就要做请求操作了。
调用 stub 方法时,实则调用的是 AbstractStub.invoke 方法,在这里要拿到 Channel 对象做发送操作。我是设计成 stub 实例持有该服务所有的可用地址 List<URI>,在invoke 的时候做负载均衡,用实际选出的 URI 去全局 Channel Map里拿到对应的 Channel 对象,再进行发送。
Channel channel = RpcCommonService.getChannel(this.loadBalance(uris)); channel.writeAndFlush(rpcRequest);在 RpcCommonService 中用一个 getChannel 方法,实现上和 getStub 一样用读写锁同步,用 Map<URI,Channel>存放。代码参考上面 getStub方法。
如果 map 里还没有生成对应的 Channel ,调用 NettyClient 的 createChannel方法。
public synchronized Channel createChannel(InetSocketAddress address) { if(bootstrap==null){ this.init(); } ChannelFuture channelFuture = bootstrap.connect(address).addListener((ChannelFutureListener) future -> { if(!future.isSuccess()){ throw new RuntimeException("无法连接到目标地址"+address.toString()); } }); Channel channel = channelFuture.channel(); if(channel==null || !channel.isActive()){ throw new RuntimeException("无法连接到目标地址"+address.toString()); } channels.add(channel); return channel; }这样 stub 就拿到了对应的 Channel,通过 writeAndFlush就可以发送请求了,但是考虑如何拿到服务器的响应。这里用 CompletableFuture 实现,发送时创建一个包含请求 id 的 CompletableFuture 对象,创建一个全局的对象持有目前已发送尚未得到响应的请求的 Map<Integer , CompletableFuture> ,key 为请求 id,value 为future。当服务器响应到达时,处理器会选出对应的请求,调用 CompletableFuture.complete 方法完成对 future 的等待。
CompletableFuture<T> result = new CompletableFuture<>(); this.pendingRequest.put(new ResponseFuture(rpcRequest.getRequestId(), result)); channel.writeAndFlush(rpcRequest).addListener(future -> { if (!future.isSuccess()) { result.completeExceptionally(future.cause()); channel.close(); } }); try { return result.get(); } catch (Exception e) { throw new RuntimeException("调用异常:" + rpcRequest + "." + e.toString()); }netty client 端的处理器链如下:
ch.pipeline().addLast(new LengthFieldPrepender(2, 0, false)) .addLast(new Encoder()) .addLast(new LengthFieldBasedFrameDecoder(32767, 0, 2, 0, 2)) .addLast(new Decoder()) .addLast(new ResponseHandler(pendingRequest));server 端的处理器链如下:
ch.pipeline().addLast(new LengthFieldPrepender(2, 0, false)) .addLast(new Encoder()) .addLast(new LengthFieldBasedFrameDecoder(32767, 0, 2, 0, 2)) .addLast(new Decoder()) .addLast(new RequestHandler(serviceMap));两端使用变长协议处理粘包半包、然后通过自定义的 Encoder / Decoder,内部通过 hessian完成序列化和反序列化。
public class SerializeUtils { private SerializeUtils() { } public static byte[] serialize(Object input) throws IOException { ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); HessianOutput hessianOutput = new HessianOutput(byteArrayOutputStream); hessianOutput.writeObject(input); return byteArrayOutputStream.toByteArray(); } public static <T> T deserialize(byte[] input) throws IOException { ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(input); HessianInput hessianInput = new HessianInput(byteArrayInputStream); return (T) hessianInput.readObject(); } } protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { byte[] msg = new byte[in.readableBytes()]; in.readBytes(msg); Object deserialize = SerializeUtils.deserialize(msg); out.add(deserialize); } protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { out.writeBytes(SerializeUtils.serialize(msg)); }服务器端的主处理器继承 SimpleChannelInboundHandler 类,只接收 RpcRequest 对象,收到请求时,通过反射调用实际服务。
@Override protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception { RpcResponse response = this.handle(request); ctx.writeAndFlush(response); } private RpcResponse handle(RpcRequest request) { RpcResponse response = new RpcResponse(); response.setId(request.getId()); Object instance = serviceMap.get(request.getInterfaceName()); if (instance == null) { return null; } try { Class<?>[] types = Arrays.stream(request.getArguments()).map(Argument::getType).toArray(Class<?>[]::new); Method method = instance.getClass().getMethod(request.getMethodName(), types); Object[] args = Arrays.stream(request.getArguments()).map(Argument::getValue).toArray(Object[]::new); Object result = method.invoke(instance, args); response.setResponse(result); } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { return null; } return response; }客户端接收到响应时,通过 id 从等待队列中获取对应的请求future,把响应设置进去进行 complete,到此双方的交互流程就完成了。
@Override protected void channelRead0(ChannelHandlerContext ctx, RpcResponse rpcResponse) throws Exception { CompletableFuture<Object> future = pendingRequest.remove(rpcResponse.getId()); if (null != future) { future.complete(rpcResponse.getResponse()); } }服务端增加读超时机制,使用 IdleStateHandler 实现。
ch.pipeline().addLast(new IdleStateHandler(10,0,0)) // 10 秒没收到客户端信息就触发 读空闲事件当读空闲触发时,会触发 ChannelInboundHandlerAdapter.userEventTriggered 方法,在里面可以自定义操作,这里就直接关闭channel。
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { System.out.println("关闭channel"); ctx.channel().close(); } } else { super.userEventTriggered(ctx, evt); } }此时,启动服务端和客户端,等待十秒后服务端会跳出 “关闭channel”,之后客户端再发送请求会报 java.nio.channels.ClosedChannelException
如果在客户端建立连接时,新建一个心跳任务,则服务端不会关闭 chnnel。
public void channelActive(ChannelHandlerContext ctx) throws Exception { new ScheduledThreadPoolExecutor(1).scheduleAtFixedRate(() -> { ctx.writeAndFlush("ping"); }, 1, 5, TimeUnit.SECONDS); }