SprongBoot链接Zookeeper集群&节点监听

    技术2026-04-09  10

    前提摘要

          搭建zookeeper集群服务,推荐博客:https://blog.csdn.net/qq_37936542/article/details/107096985

     

            zk1 -- 192.168.0.211:2181

            zk2 -- 192.168.0.212:2181

            zk3 -- 192.168.0.213:2181

     

    ♦ ♦  ZkClient依赖

    <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> </dependency>

    ♦ ♦  链接zk集群

    // zk集群服务器地址 String serverUrl = "192.168.0.211:2181,192.168.0.212:2181,192.168.0.213:2181"; // 连接超时时间 int timeout = 5000; // 建立连接 ZkClient zk = new ZkClient(serverUrl, timeout);

    ♦ ♦  基操勿6

    import java.util.List; import org.I0Itec.zkclient.ZkClient; public class ZkOperater { public static void main(String[] args) throws Exception { ZkOperater sc = new ZkOperater(); // 获取zk链接 ZkClient zk = sc.getConnection(); // 创建持久节点 zk.createPersistent("/pit", "v1"); // 创建持久序列化节点 zk.createPersistentSequential("/pits", "v2"); // 创建临时节点 zk.createEphemeral("/erl", "v3"); // 创建临时序列化节点 zk.createEphemeralSequential("/erls", "v4"); // 获取节点数据 String v1 = zk.readData("/pit").toString(); System.out.println(v1); // 向节点写数据 zk.writeData("/pit", "v5"); // 获取节点下的所有子节点 List<String> childs = zk.getChildren("/"); for (String child : childs) { System.out.println(child); } } /** * 获取zk链接 * * @return */ public ZkClient getConnection() { // zk集群服务器地址 String serverUrl = "192.168.0.211:2181,192.168.0.212:2181,192.168.0.213:2181"; // 连接超时时间 int timeout = 5000; // 建立连接 ZkClient zk = new ZkClient(serverUrl, timeout); return zk; } }

    ♦ ♦  实现监听子节点变化   应用场景:监听服务器动态上下线

    import java.util.List; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.ZkClient; public class SyncServer { private static String nodePath = "/servers"; public static void main(String[] args) throws Exception { SyncServer ss = new SyncServer(); // 获取zk链接 ZkClient zk = ss.getConnection(); // 监听节点下子节点的变化 ss.childListener(zk, nodePath); // 让服务一直运行 System.in.read(); } /** * 获取zk链接 * * @return */ public ZkClient getConnection() { // zk集群服务器地址 String serverUrl = "192.168.0.211:2181,192.168.0.212:2181,192.168.0.213:2181"; // 连接超时时间 int timeout = 5000; // 建立连接 ZkClient zk = new ZkClient(serverUrl, timeout); return zk; } /** * 监听父节点下子节点的变化,经典案例:监听服务器动态上下线 * * @param zk */ public void childListener(ZkClient zk, String nodePath) { // 如果父节点不存在,创建父节点 if (!zk.exists(nodePath)) { zk.createPersistent(nodePath); } // 监听/server下子节点的变化 zk.subscribeChildChanges(nodePath, new IZkChildListener() { /** * parentPath:父节点 childs:所有子节点的集合 */ @Override public void handleChildChange(String parentPath, List<String> childs) throws Exception { System.out.println("================="); System.out.println("父节点:" + parentPath); for (String child : childs) { System.out.println("子节点:" + child); } } }); } }

    测试:链接客户端,在/servers下动态添加、删除节点,查看打印

     

    ♦ ♦  实现监听节点数据变化   应用场景:协同配置管理

    import java.util.UUID; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; public class SyncConfig { private static String configPath = "/config"; public static void main(String[] args) throws Exception { SyncConfig sc = new SyncConfig(); ZkClient zk = sc.getConnection(); // 创建配置节点 if (!zk.exists(configPath)) { zk.createPersistent(configPath); } // 定时改变节点数据 sc.changeNode(zk, configPath); // 监听节点数据变化 sc.dataListener(zk, configPath); // 让服务一直运行 System.in.read(); } /** * 获取zk链接 * * @return */ public ZkClient getConnection() { // zk集群服务器地址 String serverUrl = "192.168.0.211:2181,192.168.0.212:2181,192.168.0.213:2181"; // 连接超时时间 int timeout = 5000; // 建立连接 ZkClient zk = new ZkClient(serverUrl, timeout); // 自定义zk序列化 zk.setZkSerializer(new ISerializer()); return zk; } /** * 监听节点数据的变化,经典案例:协同配置 * * @param zk */ public void dataListener(ZkClient zk, String configPath) { // 监听/servers下子节点的变化 zk.subscribeDataChanges(configPath, new IZkDataListener() { // 监听数据被被删除 @Override public void handleDataDeleted(String dataPath) throws Exception { System.out.println("节点数据被删除:" + dataPath); } // 监听节点数据被修改 @Override public void handleDataChange(String dataPath, Object data) throws Exception { System.out.println("节点数据被修改:" + dataPath); System.out.println("修改后的数据为:" + String.valueOf(data)); } }); } public void changeNode(ZkClient zk, String configPath) { new Thread(new Runnable() { @Override public void run() { while (true) { // 向节点写入数据 zk.writeData(configPath, UUID.randomUUID()); // 睡眠5秒 try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } }

    这里我们需要自定义zk序列化,不然写入数据会报:org.I0Itec.zkclient.exception.ZkMarshallingError: java.io.EOFException

    import java.io.UnsupportedEncodingException; import org.I0Itec.zkclient.exception.ZkMarshallingError; import org.I0Itec.zkclient.serialize.ZkSerializer; public class ISerializer implements ZkSerializer { @Override public byte[] serialize(Object obj) throws ZkMarshallingError { try { return String.valueOf(obj).getBytes("utf-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return null; } @Override public Object deserialize(byte[] bytes) throws ZkMarshallingError { try { return new String(bytes, "utf-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return bytes; } }

    启动程序,查看打印

     

    ♦ ♦  案例:模拟实现服务器动态上下线

    功能图解:

     

    ServerA

    import org.I0Itec.zkclient.ZkClient; public class ServerA { private static String parentNodePath = "/servers"; public static void main(String[] args) throws Exception { ServerA A = new ServerA(); ZkClient zk = A.getConnection(); // 判断父节点是否存在,不存在则创建 if (!zk.exists(parentNodePath)) { zk.createPersistent(parentNodePath); } // 向/server节点下注册服务,值为服务器ip+端口号 // 节点类型是临时节点 zk.createEphemeral(parentNodePath + "/serverA", "ip1:port1"); System.out.println("服务A启动服务"); // 让程序一直运行 System.in.read(); } /** * 获取zk链接 * * @return */ public ZkClient getConnection() { // zk集群服务器地址 String serverUrl = "192.168.0.211:2181,192.168.0.212:2181,192.168.0.213:2181"; // 连接超时时间 int timeout = 5000; // 建立连接 ZkClient zk = new ZkClient(serverUrl, timeout); return zk; } }

     

    ServerB

    import org.I0Itec.zkclient.ZkClient; public class ServerB { private static String parentNodePath = "/servers"; public static void main(String[] args) throws Exception { ServerB B = new ServerB(); ZkClient zk = B.getConnection(); // 判断父节点是否存在,不存在则创建 if (!zk.exists(parentNodePath)) { zk.createPersistent(parentNodePath); } // 向/server节点下注册服务,值为服务器ip+端口号 // 节点类型是临时节点 zk.createEphemeral(parentNodePath + "/serverB", "ip2:port2"); System.out.println("服务B启动服务"); // 让程序一直运行 System.in.read(); } /** * 获取zk链接 * * @return */ public ZkClient getConnection() { // zk集群服务器地址 String serverUrl = "192.168.0.211:2181,192.168.0.212:2181,192.168.0.213:2181"; // 连接超时时间 int timeout = 5000; // 建立连接 ZkClient zk = new ZkClient(serverUrl, timeout); return zk; } }

    ServerC

    import org.I0Itec.zkclient.ZkClient; public class ServerC { private static String parentNodePath = "/servers"; public static void main(String[] args) throws Exception { ServerC C = new ServerC(); ZkClient zk = C.getConnection(); // 判断父节点是否存在,不存在则创建 if (!zk.exists(parentNodePath)) { zk.createPersistent(parentNodePath); } // 向/server节点下注册服务,值为服务器ip+端口号 // 节点类型是临时节点 zk.createEphemeral(parentNodePath + "/serverC", "ip3:port3"); System.out.println("服务C启动服务"); // 让程序一直运行 System.in.read(); } /** * 获取zk链接 * * @return */ public ZkClient getConnection() { // zk集群服务器地址 String serverUrl = "192.168.0.211:2181,192.168.0.212:2181,192.168.0.213:2181"; // 连接超时时间 int timeout = 5000; // 建立连接 ZkClient zk = new ZkClient(serverUrl, timeout); return zk; } }

     客户端 IClient

    import java.util.ArrayList; import java.util.List; import java.util.Random; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.ZkClient; public class IClient { private static String parentNodePath = "/servers"; private static List<String> servers = new ArrayList<String>(); public static void main(String[] args) throws Exception { IClient client = new IClient(); ZkClient zk = client.getConnection(); // 动态监听服务列表 client.childListener(zk, parentNodePath); // 一直调用服务 while (true) { client.getServer(); Thread.sleep(2000); } } /** * 获取zk链接 * * @return */ public ZkClient getConnection() { // zk集群服务器地址 String serverUrl = "192.168.0.211:2181,192.168.0.212:2181,192.168.0.213:2181"; // 连接超时时间 int timeout = 5000; // 建立连接 ZkClient zk = new ZkClient(serverUrl, timeout); return zk; } /** * 监听某节点下子节点的变化,经典案例:监听服务器动态上下线 * * @param zk */ public void childListener(ZkClient zk, String parentNodePath) { // 可用的服务列表 servers = zk.getChildren(parentNodePath); // 监听/servers下子节点的变化 zk.subscribeChildChanges(parentNodePath, new IZkChildListener() { @Override public void handleChildChange(String parentPath, List<String> childs) throws Exception { servers = childs; } }); } private void getServer() { // 模拟随机负载均衡算法 if (servers.size() > 0) { int index = new Random().nextInt(servers.size()); System.out.println("调用了服务:" + servers.get(index)); } else System.out.println("没有可提供的服务列表"); } }

    测试:依次启动服务A、B、C,最后启动IClient,查看打印发现A、B、C都有被调用

     关闭A服务,由于网络原因,这个监听过程可能需要几秒钟,然后观察打印,发现只有B、C服务调用

     

    本文到这里就结束了,作为记录,方便翻阅,若有错误还请指正 

    Processed: 0.010, SQL: 9