Zookeeper java客户端ZkClient使用详解

    技术2024-02-18  67

    简介

    ZKClient是一个Zookeeper客户端框架,是对Zookeeper原生API的封装。使得使用更方便、功能更多。 查看之前必须要对Zookeeper的基本命令操作、Watch机制、Acl等有一定了解。 查看前必须要对原生API的基本使用有一定了解。 原生API可以查看以下两篇文章 Zookeeper之java api详解 Zookeeper之Watcher机制详解

    依赖:

    <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> </dependency>

    ZkClient对Zookeeper的操作主要操作是通过org.I0Itec.zkclient.ZkClient类实现。

    api:

    //创建ZKClient客户端的构造器 //创建客户端并连接,参数是Zookeeper服务器的地址,与原生客户端的地址字符串一样,集群用逗号隔开。 public ZkClient(String serverstring) /** *创建客户端并连接 *zkServers Zookeeper服务器的地址 * connectionTimeout 设置连接超时,如果在设置时间内没能连接成功,就抛出异常。默认int类型最大值,单位毫秒。 **/ public ZkClient(String zkServers, int connectionTimeout) /** *创建客户端并连接 *zkServers Zookeeper服务器的地址 * sessionTimeout session超时,与原生客户端的作用一样。 * connectionTimeout 设置连接超时,如果在设置时间内没能连接成功,就抛出异常。默认int类型最大值,单位毫秒。 **/ public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout) /** *创建客户端并连接 *zkServers Zookeeper服务器的地址 * sessionTimeout session超时,与原生客户端的作用一样。 * connectionTimeout 设置连接超时,如果在设置时间内没能连接成功,就抛出异常。默认int类型最大值,单位毫秒。 * ZkSerializer 自定义的序列化和反序列化器,默认使用java的对象序列化机制。 **/ public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout, ZkSerializer zkSerializer) //创建节点的API /** *创建持久化节点 *path:节点路径 *没有设置节点数据data,数据时对象null。 */ public void createPersistent(String path) /** *创建持久化节点 *path:节点路径 *没有设置节点数据data,数据时对象null。 *createParents :是否递归创建父节点,true表示递归创建。 */ public void createPersistent(String path, boolean createParents) /** *创建持久化节点 *path:节点路径 *没有设置节点数据data,数据时对象null。 *createParents :是否递归创建父节点,true表示递归创建。 * acl 指定特定的权限 */ public void createPersistent(String path, boolean createParents, List<ACL> acl) /** *创建持久化节点,因为这里是专门有设置数据的,所以就不能递归创建多个节点。 *path:节点路径 *data:数据,如果创建ZKclient客户端时,没有指定序列化器,就会使用Java的对象序列化机制, *也就是说如果没有指定序列化器,该data对象就必须实现。否则就会抛出java.io.NotSerializableException异常。 */ public void createPersistent(String path, Object data) /** *创建持久化节点,因为这里是专门有设置数据的,所以就不能递归创建多个节点。 *path:节点路径 *data:数据,如果创建ZKclient客户端时,没有指定序列化器,就会使用Java的对象序列化机制, *也就是说如果没有指定序列化器,该data对象就必须实现。否则就会抛出java.io.NotSerializableException异常。 * acl 指定特定的权限 */ public void createPersistent(String path, Object data, List<ACL> acl) /** *创建临时节点 *path:节点路径 *没有设置节点数据data,数据时对象null。 */ public void createEphemeral(String path) /** *创建临时节点 *path:节点路径 *没有设置节点数据data,数据时对象null。 *acl 指定特定的权限 */ public void createEphemeral(String path, List<ACL> acl) /** *创建临时节点 *path:节点路径 *data 设置节点数据data。 */ public void createEphemeral(String path, Object data) /** *创建临时节点 *path:节点路径 *data 设置节点数据data。 *acl 指定特定的权限 */ public void createEphemeral(String path, Object data, List<ACL> acl) /** *创建持久化有序节点 *path 节点路径 *data 节点数据 * 返回值 String 创建的节点路径名,因为有序节点会在路径后面补后缀,所以要返回让我们知道实际的节点路径。 */ public String createPersistentSequential(String path, Object data) /** *创建持久化有序节点 *path 节点路径 *data 节点数据 *acl 设置特定权限 * 返回值 String 创建的节点路径名,因为有序节点会在路径后面补后缀,所以要返回让我们知道实际的节点路径。 */ public String createPersistentSequential(String path, Object data, List<ACL> acl) /** * 创建临时有序节点 *path 节点路径 *data 节点数据 * 返回值 String 创建的节点路径名,因为有序节点会在路径后面补后缀,所以要返回让我们知道实际的节点路径。 */ public String createEphemeralSequential(String path, Object data) /** * 创建临时有序节点 *path 节点路径 *data 节点数据 *acl 设置特定权限 * 返回值 String 创建的节点路径名,因为有序节点会在路径后面补后缀,所以要返回让我们知道实际的节点路径。 */ public String createEphemeralSequential(String path, Object data, List<ACL> acl) /** * 修改节点数据 * path 节点路径 * data 节点数据 * public void writeData(String path, Object object) /** * 修改节点数据 * path 节点路径 * datat节点数据 * expectedVersion 数据版本,乐观锁 */ public void writeData(String path, Object datat, int expectedVersion) /** * 修改节点数据并修改后的节点元信息 * path 节点路径 * datat 节点数据 * expectedVersion 数据版本,乐观锁 -1 该参数无效 */ public Stat writeDataReturnStat(final String path, Object datat, final int expectedVersion) //删除节点 public boolean delete(String path) //删除节点 version 节点数据版本 乐观锁 public boolean delete(final String path, final int version) //递归删除节点。 public boolean deleteRecursive(String path) //查看节点数据,泛型方法。返回的数据要经过序列化器进行反序列化得到。 public <T> T readData(String path) /** * 查看节点数据,泛型方法。返回的数据要经过序列化器进行反序列化得到。 * returnNullIfPathNotExists 如果节点不存在时是否返回null ,true时返回null,false直接抛异常 */ public <T> T readData(String path, boolean returnNullIfPathNotExists) /** * 查看节点数据,泛型方法。返回的数据要经过序列化器进行反序列化得到。 * stat 把节点的元信息 赋值到该对象上。 */ public <T> T readData(String path, Stat stat) /** * 设置节点权限。 */ public void setAcl(final String path, final List<ACL> acl) //获取节点的子节点 public List<String> getChildren(String path) //添加授权,与原生API用法差不多 public void addAuthInfo(final String scheme, final byte[] auth) //查看节点是否存在,true存在 false不存在。 public boolean exists(String path) //关闭连接 public void close() 监听watch机制相关的API// //ZkClient把监听事件API分成三类,监听子节点变化,监听节点数据变化,监听连接状态变化。 //zkClient的监听API本身便带有重复监听效果 /** * 监听子节点变化 * path:监听节点 * listener 一个接口,我们要实现它来定义相关回调方法 */ public List<String> subscribeChildChanges(String path, IZkChildListener listener) /** * 监听节点数据变化 * path:监听节点 * listener 一个接口,我们要实现它来定义相关回调方法 */ public void subscribeDataChanges(String path, IZkDataListener listener) /** * 监听连接状态变化,也就是监听None类型 * listener 一个接口,我们要实现它来定义相关回调方法 */ public void subscribeStateChanges(IZkStateListener listener) /** * 撤销该客户端的所有监听 */ public void unsubscribeAll() /** * 撤销某节点的某个监听,子节点变化监听 * path 节点路径 * childListener 要撤销的监听器,此时参数不能跟上面的参数那样新建一个对象,那样没有意义,应该传入一个与上面的监听一样的对象 也就是 obj == obj2 */ public void unsubscribeChildChanges(String path, IZkChildListener childListener) /** * 撤销某节点的某个节点数据变化监听 * path 节点路径 * dataListener 要撤销的监听器,此时参数不能跟上面的参数那样新建一个对象,那样没有意义,应该传入一个与上面的监听一样的对象 也就是 obj == obj2 */ public void unsubscribeDataChanges(String path, IZkDataListener dataListener) /** * 撤销某个状态变化监听 * stateListener要撤销监听器,此时参数不能跟上面的参数那样新建一个对象,那样没有意义,应该传入一个与上面的监听一样的对象 也就是 obj == obj2 */ public void unsubscribeStateChanges(IZkStateListener stateListener)
    练习Demo
    public class ZkClientDemo { //Zookeeper服务器地址,集群的话多个用逗号分隔 private static final String SERVER_STRING = "192.168.18.137:2181"; //父级节点路径 private static final String PREFIX_PATH = "/zkclient"; //获取客户端连接 public static ZkClient getZkClient(){ ZkClient zkClient = new ZkClient(SERVER_STRING,30000); System.out.println("连接成功"); return zkClient; } //创建持久化节点 @Test public void createPersistentNode(){ ZkClient zkClient = getZkClient(); zkClient.createPersistent(PREFIX_PATH + "/node1"); zkClient.createPersistent(PREFIX_PATH + "/node2/node2.0",true); zkClient.createPersistent(PREFIX_PATH + "/node3",false, ZooDefs.Ids.OPEN_ACL_UNSAFE); zkClient.createPersistent(PREFIX_PATH + "/node4",new UserConfig("John",18)); zkClient.createPersistent(PREFIX_PATH + "/node5",new UserConfig("Mike",25), ZooDefs.Ids.OPEN_ACL_UNSAFE); zkClient.close(); } //创建临时节点 @Test public void createEphemeralNode(){ UserConfig John = new UserConfig("John",56); ZkClient zkClient = getZkClient(); zkClient.createEphemeral(PREFIX_PATH + "/node7"); zkClient.createEphemeral(PREFIX_PATH + "/node8", ZooDefs.Ids.OPEN_ACL_UNSAFE); zkClient.createEphemeral(PREFIX_PATH + "/node9",John); zkClient.createEphemeral(PREFIX_PATH + "/node10",John, ZooDefs.Ids.OPEN_ACL_UNSAFE); zkClient.close(); } //创建持久化有序节点 @Test public void createPersistentSeqNode(){ UserConfig John = new UserConfig("John",56); ZkClient zkClient = getZkClient(); String node1 = zkClient.createPersistentSequential(PREFIX_PATH + "/node8", John); String node2 = zkClient.createPersistentSequential(PREFIX_PATH + "/node9", John, ZooDefs.Ids.OPEN_ACL_UNSAFE); System.out.println(node1); System.out.println(node2); zkClient.close(); } //创建临时有序节点节点 @Test public void createEphemeralSeqNode(){ UserConfig John = new UserConfig("John",56); ZkClient zkClient = getZkClient(); String node1 = zkClient.createEphemeralSequential(PREFIX_PATH + "/node10", John); String node2 = zkClient.createEphemeralSequential(PREFIX_PATH + "/node11",John, ZooDefs.Ids.OPEN_ACL_UNSAFE); System.out.println(node1); System.out.println(node2); zkClient.close(); } //设置节点数据 @Test public void setNodeData(){ UserConfig John = new UserConfig("John",56); ZkClient zkClient = getZkClient(); zkClient.writeData(PREFIX_PATH + "/node10", John); zkClient.writeData(PREFIX_PATH + "/node10", John,5); zkClient.writeDataReturnStat(PREFIX_PATH + "/node10", John,5); zkClient.close(); } //删除节点 @Test public void deleteNode(){ ZkClient zkClient = getZkClient(); zkClient.delete(PREFIX_PATH + "/node10"); zkClient.delete(PREFIX_PATH + "/node10",5); zkClient.deleteRecursive(PREFIX_PATH + "/node2"); zkClient.close(); } //获取节点数据 @Test public void getNodeData(){ Stat stat = new Stat(); ZkClient zkClient = getZkClient(); UserConfig userConfig = zkClient.readData(PREFIX_PATH + "/node9"); UserConfig userConfig1 = zkClient.readData(PREFIX_PATH + "/node8",stat); UserConfig userConfig2 = zkClient.readData(PREFIX_PATH + "/node7",true); } //设置节点权限 @Test public void setNodeAcl(){ ZkClient zkClient = getZkClient(); zkClient.setAcl(PREFIX_PATH + "/node10", ZooDefs.Ids.OPEN_ACL_UNSAFE); zkClient.close(); } //获取节点的子节点 @Test public void getNodeChildren(){ ZkClient zkClient = getZkClient(); List<String> children = zkClient.getChildren(PREFIX_PATH + "/node10"); zkClient.addAuthInfo("digest","admin:123456".getBytes()); zkClient.close(); } //测试watch机制 @Test public void nodeWatch(){ //创建子节点变化监听器 IZkChildListener childListener = new IZkChildListener() { /** *子节点变化事件回调方法 * @param s 节点路径 * @param list 变化后的子节点列表 * @throws Exception */ @Override public void handleChildChange(String s, List<String> list) throws Exception { System.out.println(s); System.out.println(list); } }; //创建节点数据变化监听器 IZkDataListener dataListener = new IZkDataListener() { /** * 节点数据发生改变回调函数 * @param s 节点路径 * @param o 修改后的数据 * @throws Exception */ @Override public void handleDataChange(String s, Object o) throws Exception { System.out.println(s); System.out.println(o); } /** * 节点删除事件回调函数 * @param s * @throws Exception */ @Override public void handleDataDeleted(String s) throws Exception { System.out.println(s); } }; ZkClient zkClient = getZkClient(); zkClient.subscribeChildChanges(PREFIX_PATH + "/node10",childListener); zkClient.subscribeDataChanges(PREFIX_PATH + "/node10",dataListener); //对象必须一致 zkClient.unsubscribeChildChanges(PREFIX_PATH + "/node10",childListener); zkClient.unsubscribeDataChanges(PREFIX_PATH + "/node10",dataListener); } }
    Processed: 0.016, SQL: 9