使用Netty + Zookeeper 简单搭建RPC

    技术2022-07-11  140

    目录

    项目介绍common-通用server-服务端client-客户端

    前方备注:由于这篇文章是去年七月写的,然后本来打算写过,就删了,但是感觉后面改善的也没有特别多,就是加了个负载均衡啥的,就又恢复了…,然后项目也放GitHub了,地址:simple-rpc,这篇文章做个参考还行。

    项目介绍

    前段时间看了《netty实战》这本书,简单的入了一个netty的门,然后想着dubbo RPC也是用netty,于是打算敲一个netty + zookeeper的简单的RPC框架。项目目前比较简单,使用kryo做序列化,netty通信,zookeeper做注册中心。后期可能会优化,比如加几种负载均衡策略

    下面看一下我这个项目的结构,比较简单,就三个模块,client-客户端,common-通用,server-服务端。下面吧!

    common-通用

    首先构造的就是通用包,其模块结构如图: 首先看rpc包下的,Request和Response: Response如下:

    package com.cccookie.rpc; import com.cccookie.enums.ResponseEnum; import lombok.Data; import lombok.experimental.Accessors; import java.io.Serializable; /** * @author ccCookie * @date 2020/6/28 */ @Data @Accessors(chain = true) public class Response implements Serializable { private static final long serialVersionUID = 1L; /** * 请求id, 表示回应的是哪条请求。 */ private String requestId; /** * 请求码。 */ private Integer code; /** * 请求message。 */ private String message; /** * 响应数据 */ private Object data; public static Response success(String requestId, Object data) { return new Response() .setRequestId(requestId) .setCode(ResponseEnum.SUCCESS.getCode()) .setMessage(ResponseEnum.SUCCESS.getMessage()) .setData(data); } public static Response fail(ResponseEnum responseEnum) { return new Response() .setCode(responseEnum.getCode()) .setMessage(responseEnum.getMessage()); } }

    Request:

    package com.cccookie.rpc; import cn.hutool.core.util.ObjectUtil; import com.cccookie.enums.ErrorMessageEnum; import com.cccookie.exception.RpcException; import lombok.Data; import lombok.experimental.Accessors; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; /** * 请求类 * * @author ccCookie * @date 2020/6/28 */ @Data @Accessors(chain = true) public class Request { /** * 请求的id。 */ private String requestId; /** * 请求的接口名。 */ private String interfaceName; /** * 请求的类名。 */ private String className; /** * 请求的方法名。 */ private String methodName; /** * 请求的方法名中的参数class。 */ private Class<?>[] parameterTypes; /** * 请求的方法名中的参数具体。 */ private Object[] parameters; public static class UnprocessedRequest { /** * 未处理的请求map,request id 为key, value 为 completableFuture。 */ private Map<String, CompletableFuture<Response>> unprocessedMap = new ConcurrentHashMap<>(); public void put(String requestId, CompletableFuture<Response> completableFuture) { unprocessedMap.put(requestId, completableFuture); } public void processAndComplete(Response response) { CompletableFuture<Response> completableFuture = unprocessedMap.remove(response.getRequestId()); if (ObjectUtil.isNotNull(completableFuture)) { completableFuture.complete(response); } else { throw new RpcException(ErrorMessageEnum.SERVICE_INVOCATION_FAILURE); } } } }

    Response就不讲了,Request里面有个静态内部类,叫UnprocessedRequest,字面意思为未处理的请求,只有一个参数,叫unprocessedMap ,其中key为请求的ID,value使用的是CompletableFuture,一个异步获取计算结果的类。Map使用的是ConcurrentHashMap,保证线程安全。两个方法的话也比较简洁明了,就不赘述了。

    下面再看看service、model、exception、enums包下的:

    SimpleModel:

    package com.cccookie.model; import lombok.Data; import lombok.experimental.Accessors; /** * @author ccCookie * @date 2020/6/28 */ @Data @Accessors(chain = true) public class SimpleModel { /** * 标题 */ private String title; /** * 描述 */ private String description; }

    SimpleService:

    package com.cccookie.service; import com.cccookie.model.SimpleModel; /** * @author ccCookie * @date 2020/6/28 */ public interface SimpleService { /** * 测试用。 * * @param simpleModel 一个简单的model * @return String * @author ccCookie * @date 2020/6/28 */ String test(SimpleModel simpleModel); }

    这个接口就是服务端提供具体的服务,客户端调用其服务,因此,其实现会在服务端出现。

    RpcException:

    package com.cccookie.exception; import com.cccookie.enums.ErrorMessageEnum; /** * @author ccCookie * @date 2020/6/28 */ public class RpcException extends RuntimeException { public RpcException(ErrorMessageEnum errorMessageEnum, String detail) { super(errorMessageEnum.getMessage() + ":" + detail); } public RpcException(String message) { super(message); } public RpcException(ErrorMessageEnum rpcErrorMessageEnum) { super(rpcErrorMessageEnum.getMessage()); } public RpcException(Exception e) { super(e.getMessage(), e.getCause()); } }

    ResponseEnum:

    package com.cccookie.enums; import lombok.AllArgsConstructor; import lombok.Getter; /** * @author ccCookie * @date 2020/6/28 */ @AllArgsConstructor @Getter public enum ResponseEnum { /** * 成功。 */ SUCCESS(200, "调用方法成功"), /** * 失败。 */ FAIL(500, "调用方法失败"); /** * 状态码。 */ private final int code; /** * 对应的信息。 */ private final String message; }

    ErrorMessageEnum:

    package com.cccookie.enums; import lombok.AllArgsConstructor; import lombok.Getter; /** * @author ccCookie * @date 2020/6/28 */ @AllArgsConstructor @Getter public enum ErrorMessageEnum { /** * 处理失败。 */ SERVICE_INVOCATION_FAILURE("处理失败"), /** * 通道错误。 */ CHANNEL_ERROR("通道错误"), /** * 找不到指定服务。 */ SERVICE_CAN_NOT_BE_FOUND("找不到指定的服务"), /** * 连接失败。 */ CLIENT_CONNECT_SERVER_FAILURE("连接服务端失败"), /** * 返回结果错误。 */ REQUEST_NOT_MATCH_RESPONSE("返回结果错误,返回不匹配!"); /** * 调用错误的消息。 */ private String message; }

    这些不解释了。

    再看factory包下的SingletonFactory:

    package com.cccookie.factory; import cn.hutool.core.util.ObjectUtil; import com.cccookie.exception.RpcException; import java.util.HashMap; import java.util.Map; /** * @author ccCookie * @date 2020/6/30 */ public class SingletonFactory { private volatile static Map<String, Object> factoryMap = new HashMap<>(); private SingletonFactory() { } public static <T> T getInstance(Class<T> tClass) { String className = tClass.getCanonicalName(); Object obj = factoryMap.get(className); // 如果不存在。 if (ObjectUtil.isNull(obj)) { // 同步锁住。 synchronized (tClass) { if (ObjectUtil.isNull(obj)) { try { obj = tClass.newInstance(); factoryMap.put(className, obj); } catch (Exception e) { throw new RpcException(e); } } } } return tClass.cast(obj); } }

    一个简单的单例工厂,根据 包名+ 类名 获取其唯一单例。

    下面看下kryo包下:

    Serialization接口:

    package com.cccookie.kryo; /** * 序列化接口。 * * @author ccCookie * @date 2020/6/28 */ public interface Serialization { /** * 序列化对象。 * * @param object 要序列化的对象。 * @return 序列化好的字节数组。 * @author ccCookie * @date 2020/6/28 */ byte[] serialize(Object object); /** * 反序列化对象。 * * @param bytes 反序列化的字节数组。 * @param clazz 将要反序列化的对象的class。 * @return 反序列化好的对象 * @author ccCookie * @date 2020/6/28 */ <T> T deserialize(byte[] bytes, Class<T> clazz); }

    一个序列化的接口。

    KryoSerialization:

    package com.cccookie.kryo.impl; import com.cccookie.kryo.Serialization; import com.cccookie.rpc.Request; import com.cccookie.rpc.Response; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; /** * 序列化实现类。 * * @author ccCookie * @date 2020/6/28 */ package com.cccookie.kryo.impl; import com.cccookie.enums.ErrorMessageEnum; import com.cccookie.exception.RpcException; import com.cccookie.kryo.Serialization; import com.cccookie.rpc.Request; import com.cccookie.rpc.Response; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; /** * 序列化实现类。 * * @author ccCookie * @date 2020/6/28 */ public class KryoSerialization implements Serialization { Kryo kryo; public KryoSerialization() { kryo = new Kryo(); } /** * 序列化对象。 * * @param object 要序列化的对象。 * @return 序列化好的自己数组。 * @author ccCookie * @date 2020/6/28 */ @Override public byte[] serialize(Object object) { try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); Output output = new Output(byteArrayOutputStream)) { kryo.writeObject(output, object); return output.toBytes(); } catch (Exception e) { throw new RpcException("序列化失败"); } } /** * 反序列化对象。 * * @param bytes 反序列化的字节数组。 * @param clazz 将要反序列化的对象的class。 * @return 反序列化好的对象 * @author ccCookie * @date 2020/6/28 */ @Override public <T> T deserialize(byte[] bytes, Class<T> clazz) { try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes); Input input = new Input(byteArrayInputStream)) { Object object = kryo.readObject(input, clazz); return clazz.cast(object); } catch (Exception e) { throw new RpcException("反序列化失败"); } } }

    然后就是它的实现,序列化及反序列化对象。

    其下还有编码器及解码器。

    KryoEncoder 编码器:

    package com.cccookie.kryo.impl; import com.cccookie.kryo.Serialization; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import lombok.AllArgsConstructor; /** * 负责出去的消息,将对象转成字节。 * * @author ccCookie * @date 2020/6/28 */ @AllArgsConstructor public class KryoEncoder extends MessageToByteEncoder<Object> { private Serialization serializer; private Class<?> tClass; /** * 编码。 * * @param ctx ChannelHandlerContext 对象。 * @param msg 出去的数据。 * @param out 解码后的数据对象需要添加到其中。 */ @Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) { if (tClass.isInstance(msg)) { // 将对象转成byte。 byte[] body = serializer.serialize(msg); int dataLen = body.length; // 写入消息对应的字节数组长度。 out.writeInt(dataLen); // 将字节数组写入 ByteBuf 对象中。 out.writeBytes(body); } } }

    KryoDecoder 解码器:

    package com.cccookie.kryo.impl; import com.cccookie.kryo.Serialization; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import java.util.List; /** * 处理 进来的 消息,将消息格式转换为需要的业务对象。 * * @author ccCookie * @date 2020/6/28 */ @Slf4j @AllArgsConstructor public class KryoDecoder extends ByteToMessageDecoder { private Serialization serialization; private Class<?> aClass; /** * 解码 ByteBuf 对象 * * @param channelHandlerContext 解码器相关的ChannelHandlerContext 对象。 * @param byteBuf 入站数据。 * @param list 解码后的数据对象需要添加到其中。 */ @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) { int readableBytes = byteBuf.readableBytes(); if (readableBytes > 0) { // 标记当前readIndex的位置,以便后面重置readIndex的使用。 byteBuf.markReaderIndex(); // 读取消息的长度。 int dataLength = byteBuf.readInt(); // 遇到不合理的情况直接 return if (dataLength < 0) { log.error("序列化时,无效的字符数组!"); return; } // 如果可读字节数小于消息长度的话,说明是不完整的消息,重置readIndex if (readableBytes < dataLength) { byteBuf.resetReaderIndex(); return; } // 至此,可以序列化了。 byte[] body = new byte[dataLength]; byteBuf.readBytes(body); // 将bytes数组转换为我们需要的对象 Object obj = serialization.deserialize(body, aClass); list.add(obj); log.info("字符串数组转对象成功。"); } } }

    kryo 包下主要就是序列化和编码解码器。

    最后就是zk包下的ZookeeperUtil了:

    package com.cccookie.zk; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.ObjectUtil; import com.cccookie.exception.RpcException; import lombok.extern.slf4j.Slf4j; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** * zookeeper 工具类。 * * @author ccCookie * @date 2020/6/28 */ @Slf4j public class ZookeeperUtil { /** * 重试等待的时间, 单位:毫秒。 */ private static final int BASE_SLEEP_TIME = 1000; /** * 最多重试次数。 */ private static final int MAX_RETRIES = 3; /** * 会话超时时间。 */ private static final int SESSION_TIMEOUT = 30 * 1000; /** * 连接超时时间。 */ private static final int CONNECTION_TIMEOUT = 3 * 1000; /** * zookeeper 注册服务地址 */ private static final String SERVER_ADDRESS = "127.0.0.1:2181"; /** * 服务注册路径。 */ public static final String ZK_REGISTER_PATH = "/simple-rpc/"; /** * key: 接口名, value: 其提供服务的节点list。 */ private static Map<String, List<String>> serviceAddressMap = new ConcurrentHashMap<>(); /** * 以及注册的路径。 */ private static Set<String> registeredPathSet = ConcurrentHashMap.newKeySet(); private static CuratorFramework zkClient; static { RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES); zkClient = CuratorFrameworkFactory.newClient(SERVER_ADDRESS, SESSION_TIMEOUT, CONNECTION_TIMEOUT, retryPolicy); zkClient.start(); } /** * 创建持久化节点。 * * @param path 节点路径 * @author ccCookie * @date 2020/6/29 */ public static void createNode(String path) { try { if (registeredPathSet.contains(path) || zkClient.checkExists().forPath(path) != null) { log.info("节点已经存在!"); } else { zkClient.create() .creatingParentsIfNeeded() // 此时模式选择持久化。 .withMode(CreateMode.PERSISTENT) .forPath(path); log.info("节点不存在,创建成功!"); registeredPathSet.add(path); } } catch (Exception e) { log.error("创建持久化节点出现异常,异常原因为:{}", e.getMessage()); throw new RpcException(e); } } /** * 获取所有提供服务的生产者的地址。 * * @param serviceName 服务名 * @return 其下所有子节点。 * @author ccCookie * @date 2020/6/29 */ public static List<String> getChildrenNodes(String serviceName) { List<String> childrenNodes = serviceAddressMap.get(serviceName); if (CollUtil.isEmpty(childrenNodes)) { String servicePath = ZK_REGISTER_PATH + serviceName; try { childrenNodes = zkClient.getChildren() .forPath(servicePath); serviceAddressMap.put(serviceName, childrenNodes); registerWatcher(serviceName); } catch (Exception e) { log.error("获取提供服务的zookeeper节点出现异常,异常原因为:{}", e.getMessage()); throw new RpcException(e); } } return childrenNodes; } /** * 注册监听指定节点。 * * @param serviceName 服务名称。 */ public static void registerWatcher(String serviceName) { String servicePath = ZK_REGISTER_PATH + serviceName; // 1. 创建一个PathChildrenCache PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, servicePath, true); PathChildrenCacheListener pathChildrenCacheListener = (client, event) -> { ChildData childData = event.getData(); if (ObjectUtil.isNotNull(childData)) { log.info("Path: " + childData.getPath()); log.info("Stat:" + childData.getStat()); log.info("Data: " + new String(childData.getData())); switch (event.getType()) { case CHILD_ADDED: log.info("正在新增子节点:" + childData.getPath()); // 获取子节点 List<String> list = client.getChildren().forPath(servicePath); serviceAddressMap.put(serviceName, list); break; case CHILD_UPDATED: log.info("正在更新子节点:" + childData.getPath()); break; case CHILD_REMOVED: log.info("子节点被删除"); break; case CONNECTION_LOST: log.info("连接丢失"); break; case CONNECTION_SUSPENDED: log.info("连接被挂起"); break; case CONNECTION_RECONNECTED: log.info("恢复连接"); break; default: break; } } }; // 2. 添加目录监听器 pathChildrenCache.getListenable() .addListener(pathChildrenCacheListener); try { // 3. 启动监听器 pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); } catch (Exception e) { log.error("启动监听器出现异常,异常原因为:{}", e.getMessage()); throw new RpcException(e); } } public static void clearRegistry() { registeredPathSet.forEach(o -> { try { zkClient.delete().guaranteed().forPath(o); } catch (Exception e) { log.error("清除zookeeper节点出现异常,异常原因为:{}", e.getMessage()); throw new RpcException(e); } }); log.info("服务端所有注册的服务都被清空:{}", registeredPathSet.toString()); } }

    这就是zookeeper的工具类,每个方法我都写了比较清楚的注释了。

    common 包就是共用的,因此这里没有业务逻辑相关的,里面的东西比较少,注释也很清楚,就不多说了。

    server-服务端

    下面来到服务端,首先看一下服务端的结构: 可以看到服务端东西不多。

    首先看一下config下的ThreadFactoryUtils:

    package com.cccookie.config; import cn.hutool.core.util.StrUtil; import lombok.extern.slf4j.Slf4j; import org.apache.curator.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.Map; import java.util.concurrent.*; /** * @author ccCookie * @date 2020/6/29 */ @Slf4j public class ThreadFactoryUtils { private ThreadFactoryUtils() { } /** * 核心线程数。 */ private static final int CORE_POOL_SIZE = 5; /** * 最大线程数。 */ private static final int MAXIMUM_POOL_SIZE = 10; /** * 存活时间。 */ private static final int KEEP_ALIVE_TIME = 60; /** * 单位。秒。 */ private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS; /** * 阻塞队列大小。 */ private static final int BLOCKING_QUEUE_CAPACITY = 100; /** * 线程池map。 */ private static Map<String, ExecutorService> threadPoolMap = new ConcurrentHashMap<>(); /** * 如果该名字对应的线程池不存在,则新建一个。 * * @param threadNamePrefix 线程池前缀 * @return 线程池 * @author ccCookie * @date 2020/7/1 */ public static ExecutorService createThreadPoolIfAbsent(String threadNamePrefix) { ExecutorService threadPool = threadPoolMap.computeIfAbsent(threadNamePrefix, k -> createThreadPool(threadNamePrefix)); // threadPool 被关闭, 就重新创建一个 if (threadPool.isShutdown() || threadPool.isTerminated()) { threadPoolMap.remove(threadNamePrefix); threadPool = createThreadPool(threadNamePrefix); threadPoolMap.put(threadNamePrefix, threadPool); } return threadPool; } /** * 创建线程池。如果没有名称前缀,则使用默认的。 * * @param threadNamePrefix 线程池名称前缀 * @return 构建好的线程池 * @author ccCookie * @date 2020/7/1 */ private static ExecutorService createThreadPool(String threadNamePrefix) { ThreadFactory threadFactory; if (StrUtil.isNotBlank(threadNamePrefix)) { threadFactory = new ThreadFactoryBuilder() .setNameFormat(threadNamePrefix + "-%d") .build(); } else { threadFactory = Executors.defaultThreadFactory(); } BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(BLOCKING_QUEUE_CAPACITY); return new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, DEFAULT_TIME_UNIT, workQueue, threadFactory); } /** * 关闭所有线程池 */ public static void shutDownAllThreadPool() { log.info("call shutDownAllThreadPool method"); threadPoolMap.entrySet().parallelStream() .forEach(entry -> { ExecutorService executorService = entry.getValue(); executorService.shutdown(); log.info("关闭线程池:key为:{}, 成功与否为:{}", entry.getKey(), executorService.isTerminated()); try { executorService.awaitTermination(10, TimeUnit.SECONDS); } catch (InterruptedException ie) { log.error("线程池关闭失败!"); executorService.shutdownNow(); } }); } }

    这是一个线程池工具类。createThreadPoolIfAbsent 方法表示,如果不存在则新建一个线程池。其中线程池的核心线程大小和最大线程数我都没有设的很大,这个是可以酌情的。

    有关线程池的源码分析可以看我的另一篇博客 Java线程池源码分析。

    接下来看service包下的内容。

    ServiceProvider接口:

    package com.cccookie.service; /** * 服务提供接口。 * * @author ccCookie * @date 2020/6/29 */ public interface ServiceProvider { /** * 保存服务实例对象和服务实例对象实现的接口类的对应关系。 * * @param service 服务实例对象 * @param serviceClass 服务实例对象class * @author ccCookie * @date 2020/6/29 */ <T> void addServiceProvider(T service, Class<T> serviceClass); /** * 获取服务实例对象。 * * @param serviceName 服务实例对象实现的接口类的类名 * @return 服务实例对象 * @author ccCookie * @date 2020/6/29 */ Object getServiceProvider(String serviceName); }

    该接口有两个方法,其作用已经在注释中写清。

    下面看下它的实现类:

    package com.cccookie.service.impl; import cn.hutool.core.util.ObjectUtil; import com.cccookie.enums.ErrorMessageEnum; import com.cccookie.exception.RpcException; import com.cccookie.service.ServiceProvider; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** * @author ccCookie * @date 2020/6/29 */ public class ServiceProviderImpl implements ServiceProvider { /** * 服务名和服务实体对应的关系。 */ private static Map<String, Object> serviceMap = new ConcurrentHashMap<>(); /** * 已经注册过的服务。 */ private static Set<String> registeredServiceSet = ConcurrentHashMap.newKeySet(); /** * 保存服务实例对象和服务实例对象实现的接口类的对应关系 * * @param service 服务实例对象 * @param serviceClass 服务实例对象class */ @Override public <T> void addServiceProvider(T service, Class<T> serviceClass) { String serviceName = serviceClass.getCanonicalName(); // 说明已经注册过。 if (registeredServiceSet.contains(serviceName)) { return; } // 还未注册,那么注册。 serviceMap.put(serviceName, service); registeredServiceSet.add(serviceName); } /** * 获取服务实例对象 * * @param serviceName 服务实例对象实现的接口类的类名 * @return 服务实例对象 */ @Override public Object getServiceProvider(String serviceName) { Object obj = serviceMap.get(serviceName); if (ObjectUtil.isNull(obj)) { throw new RpcException(ErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND); } return obj; } }

    接下来是 common 中SimpleService实现服务:

    SimpleServiceImpl:

    package com.cccookie.service.impl; import com.cccookie.model.SimpleModel; import com.cccookie.service.SimpleService; import lombok.extern.slf4j.Slf4j; /** * @author ccCookie * @date 2020/6/29 */ @Slf4j public class SimpleServiceImpl implements SimpleService { /** * 测试用。 * * @param simpleModel 一个简单的model * @return String * @author ccCookie * @date 2020/6/28 */ @Override public String test(SimpleModel simpleModel) { log.info("收到: {}.", simpleModel.getTitle()); String result = "title is " + simpleModel.getTitle() + ", description is " + simpleModel.getDescription(); log.info("返回: {}.", result); return result; } }

    接下来看handler包下,其下就一个Handler,会在netty启动时放入。

    RpcHandler:

    package com.cccookie.handler; import cn.hutool.core.util.ObjectUtil; import com.cccookie.config.ThreadFactoryUtils; import com.cccookie.enums.ErrorMessageEnum; import com.cccookie.exception.RpcException; import com.cccookie.rpc.Request; import com.cccookie.rpc.Response; import com.cccookie.service.ServiceProvider; import com.cccookie.service.impl.ServiceProviderImpl; import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import lombok.extern.slf4j.Slf4j; import java.lang.reflect.Method; import java.util.concurrent.ExecutorService; /** * 自定义ChannelHandler处理来自客户端的请求。 * * @author ccCookie * @date 2020/6/30 */ @Slf4j public class RpcHandler extends SimpleChannelInboundHandler<Request> { private final static String THREAD_NAME_PREFIX = "server-handler-rpc-pool"; private final ExecutorService threadPool; private ServiceProvider serviceProvider = new ServiceProviderImpl(); public RpcHandler() { this.threadPool = ThreadFactoryUtils.createThreadPoolIfAbsent(THREAD_NAME_PREFIX); } /** * 当前channel从远端读取到数据,处理接收到的数据,每个传入的消息都会调用到。 */ @Override protected void channelRead0(ChannelHandlerContext ctx, Request request) { threadPool.execute(() -> { try { log.info("收到的消息为:{}", request); // 通过注册中心获取到目标类。 Object service = serviceProvider.getServiceProvider(request.getInterfaceName()); Method method = service.getClass() .getMethod(request.getMethodName(), request.getParameterTypes()); if (ObjectUtil.isNull(method)) { log.error("方法找不到,方法名为:{}", request.getMethodName()); throw new RpcException(ErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND, "方法不存在!"); } // 调用method invoke,实现该方法。 Object res = method.invoke(service, request.getParameters()); log.info("服务方法调用成功!服务为:{}, 方法为:{}", request.getInterfaceName(), request.getMethodName()); log.info("结果为:{}", res); Channel channel = ctx.channel(); if (channel.isActive() && channel.isWritable()) { // 将产生的结果返回。 Response response = Response.success(res, request.getRequestId()); ctx.writeAndFlush(response) .addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { log.error("channel 不在活动中!"); } } catch (Exception e) { throw new RpcException(e); } }); } /** * 处理客户端消息发生异常的时候被调用 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { log.error("发生异常,捕获到异常信息为:{}", cause.getMessage()); ctx.close(); } }

    做了这么多铺垫终于到核心了。

    这个Handler继承自SimpleChannelInboundHandler, 接收的消息格式定义为Request。 当netty监听到有消息过来时,就会调用channelRead0方法了,因此在该方法中执行了逻辑,下面解释一下它的逻辑,也不绕,比较简单。

    首先到服务提供类中获取请求中携带的接口名,对应的实现类,因为这里我就一个接口,就是SimpleServiceImpl啦!接下来通过反射获取请求中携带的方法名及其参数类型获取对应的方法。找不到就抛异常,找得到就下一步。通过请求中携带的方法参数值,调用方法的invoke,获取方法执行结果。最后,将产生的结果给它返回去。

    再看下也是核心的连接netty的类,那就是server包下的RpcServer:

    package com.cccookie.server; import com.cccookie.config.ThreadFactoryUtils; import com.cccookie.handler.RpcHandler; import com.cccookie.kryo.impl.KryoDecoder; import com.cccookie.kryo.impl.KryoEncoder; import com.cccookie.kryo.impl.KryoSerialization; import com.cccookie.rpc.Request; import com.cccookie.rpc.Response; import com.cccookie.service.ServiceProvider; import com.cccookie.service.impl.ServiceProviderImpl; import com.cccookie.zk.ZookeeperUtil; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; /** * @author ccCookie * @date 2020/6/30 */ @Slf4j public class RpcServer { private String host; private int port; private KryoSerialization kryoSerializer; private ServiceProvider serviceProvider; public RpcServer(String host, int port) { this.host = host; this.port = port; kryoSerializer = new KryoSerialization(); serviceProvider = new ServiceProviderImpl(); } public <T> void registry(T service, Class<T> serviceClass) { InetSocketAddress inetSocketAddress = new InetSocketAddress(host, port); // 根节点下注册 /子节点/服务。 String servicePath = ZookeeperUtil.ZK_REGISTER_PATH + serviceClass.getCanonicalName() + inetSocketAddress.toString(); ZookeeperUtil.createNode(servicePath); // 新增服务提供。 serviceProvider.addServiceProvider(service, serviceClass); // 启动netty。 startNetty(); } private void startNetty() { Runtime.getRuntime().addShutdownHook(new Thread(() -> { ZookeeperUtil.clearRegistry(); ThreadFactoryUtils.shutDownAllThreadPool(); })); EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .option(ChannelOption.SO_BACKLOG, 1024) .channel(NioServerSocketChannel.class) // 当客户端第一次进行请求的时候才会进行初始化 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new KryoDecoder(kryoSerializer, Request.class)); ch.pipeline().addLast(new KryoEncoder(kryoSerializer, Response.class)); ch.pipeline().addLast(new RpcHandler()); } }) // 是否启用 Nagle 算法。该算法是尽可能的发送大数据快,减少网络传输。 .childOption(ChannelOption.TCP_NODELAY, true) // 是否开启 TCP 底层心跳机制 .childOption(ChannelOption.SO_KEEPALIVE, true); // 绑定端口,同步等待绑定成功 ChannelFuture cf = b.bind(host, port).sync(); // 等待服务端监听端口关闭 cf.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("启动服务时出现异常,异常信息为:{}:", e.getMessage()); } finally { log.info("关闭 bossGroup and workerGroup"); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }

    这个我觉得有前面的铺垫算是水到渠成的事情了,里面都是netty 服务端应该需要的。

    提个醒,

    最后,用一个main将SimpleService具体的实现给注册进。

    package com.cccookie; import com.cccookie.server.RpcServer; import com.cccookie.service.SimpleService; import com.cccookie.service.impl.SimpleServiceImpl; /** * @author ccCookie * @date 2020/6/30 */ public class ServiceMain { public static void main(String[] args) { SimpleService simpleService = new SimpleServiceImpl(); RpcServer rpcServer = new RpcServer("127.0.0.1", 7777); rpcServer.registry(simpleService, SimpleService.class); } }

    至此,服务端完成。

    client-客户端

    最后看下客户端的结构。 首先看client包下的,其下有三个类。 ZkServiceDiscovery:

    package client; import com.cccookie.zk.ZookeeperUtil; import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; /** * 服务发现 * * @author ccCookie * @date 2020/6/30 */ @Slf4j public class ZkServiceDiscovery { public InetSocketAddress lookupService(String serviceName) { // 由于比较简单,直接就拿第一个。 String serviceAddress = ZookeeperUtil.getChildrenNodes(serviceName).get(0); log.info("找到服务地址为:{}", serviceAddress); String[] socketAddressArray = serviceAddress.split(":"); // IP String host = socketAddressArray[0]; // 端口 int port = Integer.parseInt(socketAddressArray[1]); return new InetSocketAddress(host, port); } }

    ZookeeperUtil.getChildrenNodes(serviceName) 表示的是 获取该服务的所有节点,由于目前就一个,所以 直接get(0)。

    再来就是ClientTransport:

    package client; import cn.hutool.core.util.ObjectUtil; import com.cccookie.enums.ErrorMessageEnum; import com.cccookie.exception.RpcException; import com.cccookie.factory.SingletonFactory; import com.cccookie.rpc.Request; import com.cccookie.rpc.Response; import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; /** * 客户端发送消息。 * * @author ccCookie * @date 2020/6/30 */ @Slf4j public class ClientTransport { private final ZkServiceDiscovery zkServiceDiscovery; private final Request.UnprocessedRequest unprocessedRequest; private final NettyClient nettyClient; private static Map<String, Channel> channelMap = new ConcurrentHashMap<>(); public ClientTransport() { zkServiceDiscovery = new ZkServiceDiscovery(); unprocessedRequest = SingletonFactory.getInstance(Request.UnprocessedRequest.class); nettyClient = SingletonFactory.getInstance(NettyClient.class); } /** * 发送消息。 */ @SneakyThrows public Response sendMessage(Request request) { // 找到服务。 InetSocketAddress inetSocketAddress = zkServiceDiscovery.lookupService(request.getInterfaceName()); // 根据地址获取channel。 Channel channel = this.getChannel(inetSocketAddress); // 由于可能是重连的channel,因此还是需要判断是否存在且活动着。 if (ObjectUtil.isNotNull(channel) && channel.isActive()) { // 构建完成值。 CompletableFuture<Response> resultFuture = unprocessedRequest.put(request.getRequestId()); channel.writeAndFlush(request) .addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { log.info("客户端发送消息为: {}", request); } else { future.channel().close(); resultFuture.completeExceptionally(future.cause()); log.error("客户端发送消息失败,原因为:", future.cause()); } }); return resultFuture.get(); } else { log.error("获取通道错误"); throw new RpcException(ErrorMessageEnum.CHANNEL_ERROR); } } /** * 获取channel。 */ private Channel getChannel(InetSocketAddress inetSocketAddress) { String inetSocketAddressStr = inetSocketAddress.toString(); Channel channel = channelMap.get(inetSocketAddressStr); if (ObjectUtil.isNotNull(channel)) { if (channel.isActive()) { return channel; } else { channelMap.remove(inetSocketAddressStr); } } channel = nettyClient.doConnect(inetSocketAddress); channelMap.put(inetSocketAddressStr, channel); return channel; } }

    该类主要就是发送消息。

    最后就是一个Netty的连接监听。 NettyClient:

    package client; import com.cccookie.enums.ErrorMessageEnum; import com.cccookie.exception.RpcException; import com.cccookie.kryo.impl.KryoDecoder; import com.cccookie.kryo.impl.KryoEncoder; import com.cccookie.kryo.impl.KryoSerialization; import com.cccookie.rpc.Request; import com.cccookie.rpc.Response; import handler.ClientHandler; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; import java.util.concurrent.CompletableFuture; /** * @author ccCookie * @date 2020/7/1 */ @Slf4j public class NettyClient { private static Bootstrap bootstrap; private static EventLoopGroup eventLoopGroup; static { eventLoopGroup = new NioEventLoopGroup(); bootstrap = new Bootstrap(); KryoSerialization kryoSerializer = new KryoSerialization(); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) // 连接的超时时间,超过这个时间还是建立不上的话则代表连接失败 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) // 是否开启 TCP 底层心跳机制 .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) { // 出去的消息会编码成 Request,进来的消息会 解码为 Response。 // 进来的消息顺着 流, 出去的消息反着流。 socketChannel.pipeline().addLast(new KryoDecoder(kryoSerializer, Response.class)); socketChannel.pipeline().addLast(new KryoEncoder(kryoSerializer, Request.class)); socketChannel.pipeline().addLast(new ClientHandler()); } }); } /** * 建立连接。 * * @param inetSocketAddress 地址 */ @SneakyThrows public Channel doConnect(InetSocketAddress inetSocketAddress) { CompletableFuture<Channel> completableFuture = new CompletableFuture<>(); bootstrap.connect(inetSocketAddress).addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { log.info("客户端连接成功!"); completableFuture.complete(future.channel()); } else { log.error("客户端连接服务端失败!"); this.close(); throw new RpcException(ErrorMessageEnum.CLIENT_CONNECT_SERVER_FAILURE, "客户端连接失败!"); } }); return completableFuture.get(); } public void close() { eventLoopGroup.shutdownGracefully(); } }

    handler包下的ClientHandler,也就是上面 的pipeline下的addLast的Handler:

    package handler; import com.cccookie.factory.SingletonFactory; import com.cccookie.rpc.Request; import com.cccookie.rpc.Response; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.ReferenceCountUtil; import lombok.extern.slf4j.Slf4j; /** * @author ccCookie * @date 2020/7/1 */ @Slf4j public class ClientHandler extends SimpleChannelInboundHandler<Object> { private final Request.UnprocessedRequest unprocessedRequest; public ClientHandler() { this.unprocessedRequest = SingletonFactory.getInstance(Request.UnprocessedRequest.class); } /** * 处理接收到的消息。 * * @param ctx 连接 channelHandler 和 ChannelPipeline 的。 * @param msg 接收到的参数 * @author ccCookie * @date 2020/7/1 */ @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { try { log.info("客户端接收消息为:{}", msg); Response response = (Response) msg; unprocessedRequest.processAndComplete(response); } finally { ReferenceCountUtil.release(msg); } } /** * 处理客户端消息发生异常的时候被调用 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { log.error("客户端捕获异常为:{}", cause.getMessage()); ctx.close(); } }

    下面是一个proxy包下的内容: ClientProxy:

    package proxy; import client.ClientTransport; import cn.hutool.core.util.ObjectUtil; import com.cccookie.enums.ErrorMessageEnum; import com.cccookie.exception.RpcException; import com.cccookie.rpc.Request; import com.cccookie.rpc.Response; import lombok.extern.slf4j.Slf4j; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.UUID; /** * 动态代理类。 * * @author ccCookie * @date 2020/7/1 */ @Slf4j public class ClientProxy implements InvocationHandler { /** * 用于发送请求给服务端,对应socket和netty两种实现方式 */ private final ClientTransport clientTransport; public ClientProxy(ClientTransport clientTransport) { this.clientTransport = clientTransport; } /** * 通过 Proxy.newProxyInstance() 方法获取某个类的代理对象 */ @SuppressWarnings("unchecked") public <T> T getProxy(Class<T> clazz) { return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this); } @Override public Object invoke(Object proxy, Method method, Object[] args) { log.info("invoke 方法:{}", method.getName()); Request request = new Request() .setRequestId(UUID.randomUUID().toString()) .setParameters(args) .setInterfaceName(method.getDeclaringClass().getName()) .setParameterTypes(method.getParameterTypes()) .setMethodName(method.getName()); Response response = clientTransport.sendMessage(request); if (ObjectUtil.isNull(response)) { log.error("未接收到消息。请求ID为:{}, 接口名为:{}", request.getRequestId(), request.getInterfaceName()); throw new RpcException(ErrorMessageEnum.REQUEST_NOT_MATCH_RESPONSE); } else if (!request.getRequestId().equals(response.getRequestId())) { log.error("接收到消息,但请求ID不一致。请求ID为:{}, 返回结果ID为:{}", request.getRequestId(), response.getRequestId()); throw new RpcException(ErrorMessageEnum.REQUEST_NOT_MATCH_RESPONSE); } return response.getData(); } }

    该动态代理类继承自InvocationHandler,到时候代理出的对象调用方法时,就会调用invoke。

    最后就是ClientMain:

    import client.ClientTransport; import com.cccookie.model.SimpleModel; import com.cccookie.service.SimpleService; import lombok.extern.slf4j.Slf4j; import proxy.ClientProxy; /** * @author ccCookie * @date 2020/7/1 */ @Slf4j public class ClientMain { public static void main(String[] args) { ClientTransport clientTransport = new ClientTransport(); ClientProxy clientProxy = new ClientProxy(clientTransport); // 动态代理生成HelloService SimpleService simpleService = clientProxy.getProxy(SimpleService.class); String result = simpleService.test(new SimpleModel() .setTitle("ccCookie") .setDescription("this is ccCookie")); log.info(result); } }

    最后的运行结果: 最后完美实现啦,撒花! 最后,谢谢观看。本人才疏学浅,如有错误之处,欢迎指正,共同进步。

    Processed: 0.023, SQL: 9