Zookeeper客户端curator使用详解

    技术2025-05-27  19

    简介

    curator是Netflix公司开源的一个Zookeeper客户端框架,curator框架在Zookeeper原生的API接口上进行了封装,屏蔽了Zookeeper原生客户端非常底层的细节开发,使得使用更加方便。并且还提供了Zookeeper多种应用场景的封装,比如分布式锁服务、集群领导选举等场景实现的封装,还实现了Fluent风格的链式调用API,是最好用,最流行的Zookeeper客户端。

    原生Zookeeper的不足:

    连接对象异步创建,需要开发人员自行编码等待。连接没有会话超时自动重连机制。Watcher一次注册只生效一次。不支持递归创建树形节点。

    curator特点:

    设有Session超时重连机制。Watcher重复注册机制。简化开发API。遵循fluent风格API。提供Zookeeper常用的场景封装实现。

    依赖:

    <!--这个包是curator提供的对各种常用场景的封装实现,比如分布式锁、集群master选举等--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.2.0</version> </dependency> <!--这个包是curator提供对Zookeeper的基本操作封装--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.2.0</version> </dependency>

    实战demo:

    curator对Zookeeper的基本操作是通过CuratorFramework接口来定义的。

    curator会话连接超时重试策略

    curator提供了会话连接超时的重试策略,用RetryPolicy接口定义。 常用的会话连接超时重试策略实现有:

    //重试N次,当会话超时出现后,curator会每间隔sleepMsBetweenRetries毫秒时间重试一次,共重试n次。 RetryNTimes(int n, int sleepMsBetweenRetries) //重试一次,是RetryNTimes的一种特殊实现,相当于RetryNTimes(1, int sleepMsBetweenRetries), // 会话超时后sleepMsBetweenRetry毫秒后进行一次连接重试,仅重试一次。 public RetryOneTime(int sleepMsBetweenRetry) //在maxElapsedTimeMs毫秒时间内,每隔sleepMsBetweenRetries重试一次。 //比如maxElapsedTimeMs=10000 sleepMsBetweenRetries=3000 ,表示在10秒内,每隔3秒重试一次,理论上是重试3次。 //重试次数有上限,是int类型的最大值次数。 public RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries) //衰减重试,baseSleepTimeMs是基础衰减时间,maxRetries是最大重试次数。 //重试时间间隔 = baseSleepTimeMs*math.max(1,random.nextInt(1<<(retryCount+1))) //retryCount从0开始 //比如ExponentialBackoffRetry(1000,5), //那么第一次的重试时间间隔=1000*math.max(1,random.nextInt(1<<(1))),重试的时间间隔随着重试的次数增大而增大,衰减重试,但是retryCount有最大值,就是当retryCount>29时,curator会把他设置成29。因为1最多左移30位。最大间隔时间也有限制,默认int的最大值,也可以通过另一个构造函数来设置。 //只会重试maxRetries次。 public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries) //maxSleepMs最大间隔时间,当衰减重试计算的间隔时间比它大的,就设置为该时间间隔。 public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)

    创建客户端会话,实际上就是创建CuratorFramework对象。有两种创建风格,普通风格跟fluent风格。

    public class CuratorSessionDemo{ //Zookeeper服务器地址,集群的话多个用逗号分隔 private static final String SERVER_STRING = "192.168.18.137:2181"; //父级节点路径 private static final String PREFIX_PATH = "curator"; private static RetryPolicy retryOneTimePolicy = new RetryOneTime(3000); private static RetryPolicy retryThreeTimePolicy = new RetryNTimes(3,3000); private static RetryPolicy retryUntilElapsed = new RetryUntilElapsed(10000,3000); private static RetryPolicy exponentialBackoffRetry = new ExponentialBackoffRetry(1000,3); private static List<AuthInfo> authInfoList = new ArrayList<>(); static { AuthInfo authInfo = new AuthInfo("digest","admin:123456".getBytes()); authInfoList.add(authInfo); } @Test public CuratorFramework getClientWithNormalStyle(){ /** * 普通方式创建Zookeeper的curator客户端 *参数一 Zookeeper服务器地址,多个用逗号分隔 * 参数二 客户端的会话超时 单位毫秒 * 参数三: 客户端连接到服务器的连接超时时间 单位毫秒 * 参数四: 会话超时重连策略 */ CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(SERVER_STRING,5000, 10000,exponentialBackoffRetry); //上面仅创建了客户端。必须要使用start方法来进行连接 curatorFramework.start(); return curatorFramework; } public CuratorFramework getClientWithFluentStyle(){ CuratorFramework curatorFramework = //创建一个curatorFramework构造器 CuratorFrameworkFactory.builder() //设置Zookeeper服务器地址,多个用逗号分隔 .connectString(SERVER_STRING) //设置连接超时时间 .connectionTimeoutMs(10000) //设置会话超时时间 .sessionTimeoutMs(5000) //设置数据压缩的压缩器 .compressionProvider(new GzipCompressionProvider()) //设置会话超时重连策略 .retryPolicy(exponentialBackoffRetry) //添加认证 .authorization(authInfoList) //设置该客户端是否只进行非事务操作 .canBeReadOnly(false) //设置默认的数据,当创建节点时不提供节点数据的话,就使用该数据 .defaultData("".getBytes()) //设置节点路径的父节点路径 .namespace(PREFIX_PATH) //构建curatorFramework对象 .build(); //上面仅创建了客户端。必须要使用start方法来进行连接 curatorFramework.start(); return curatorFramework; } }
    常用API(不包括Watcher机制):
    public class CuratorOperationDemo { //同步创建节点 @Test public void createNodeSync() throws Exception { CuratorFramework curatorFramework = CuratorSessionDemo.getClientWithFluentStyle(); Stat stat = new Stat(); //返回创建节点的路径 String path = curatorFramework //创建节点构建器CreateBuilder .create() //把创建的节点的元信息保存在stat对象中 .storingStatIn(stat) //递归创建 .creatingParentsIfNeeded() //创建节点的类型,这里选的是创建持久化节点。 .withMode(CreateMode.PERSISTENT) //设置节点权限 .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) //创建节点的路径和数据 .forPath("/node1", "node1".getBytes()); System.out.println(path); curatorFramework.close(); } //异步创建节点 @Test public void createNodeASync() throws Exception { CuratorFramework curatorFramework = CuratorSessionDemo.getClientWithFluentStyle(); //创建一个固定的线程数的线程池 Executor executor = Executors.newFixedThreadPool(1); Stat stat = new Stat(); //返回创建节点的路径 String path = curatorFramework //创建节点构建器CreateBuilder .create() //把创建的节点的元信息保存在stat对象中 .storingStatIn(stat) //递归创建 .creatingParentsIfNeeded() //创建节点的类型,这里选的是创建持久化节点。 .withMode(CreateMode.PERSISTENT) //设置节点权限 .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .inBackground(new BackgroundCallback() { @Override /** * 节点创建操作成功的回调函数 * client 客户端 * event 事件上下文 */ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { System.out.println("path = " + event.getPath()); System.out.println("context = " + event.getContext()); System.out.println("type = " + event.getType()); } },"删除",executor) //创建节点的路径和数据 .forPath("/node3", "node3".getBytes()); System.out.println(path); TimeUnit.SECONDS.sleep(10); curatorFramework.close(); } //设置节点数据 @Test public void setNodeData() throws Exception { CuratorFramework curatorFramework = CuratorSessionDemo.getClientWithFluentStyle(); //返回设置节点后的节点元信息 Stat stat = curatorFramework.setData() //对数据进行压缩在设置,如果这里设置了压缩,获取节点数据时就要进行相应的解压缩操作。 .compressed() .withVersion(-1) .forPath("/node3", "node33333".getBytes()); System.out.println(stat); curatorFramework.close(); } //删除节点 @Test public void deleteNode() throws Exception{ CuratorFramework curatorFramework = CuratorSessionDemo.getClientWithFluentStyle(); curatorFramework.delete() //递归删除子节点 .deletingChildrenIfNeeded() .withVersion(-1) .forPath("/node3"); curatorFramework.close(); } //获取节点数据 @Test public void getNodeData() throws Exception{ CuratorFramework curatorFramework = CuratorSessionDemo.getClientWithFluentStyle(); Stat stat = new Stat(); //返回数据 byte[] bytes = curatorFramework.getData() //解压缩,如果设置数据时使用了压缩,获取数据时要进行相应的解压缩进行数据还原 .decompressed() //获取节点元信息存在stat对象中 .storingStatIn(stat) //添加监听器 .usingWatcher(new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println(watchedEvent.getPath()); } }) .forPath("/node3"); curatorFramework.close(); } //获取和设置节点的权限列表 @Test public void aclDemo() throws Exception { CuratorFramework curatorFramework = CuratorSessionDemo.getClientWithFluentStyle(); //设置acl Stat stat = curatorFramework.setACL() .withVersion(-1) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath("/node3"); //获取节点acl List<ACL> acls = curatorFramework.getACL().forPath("/node3"); System.out.println(acls); curatorFramework.close(); } //获取子节点列表 @Test public void getNodeChildren() throws Exception { CuratorFramework curatorFramework = CuratorSessionDemo.getClientWithFluentStyle(); List<String> children = curatorFramework.getChildren() .forPath("/node3"); System.out.println(children); curatorFramework.close(); } //检查节点是否存在 @Test public void checkExist() throws Exception { CuratorFramework curatorFramework = CuratorSessionDemo.getClientWithFluentStyle(); //返回null表示节点不存在,反正存在 Stat stat = curatorFramework.checkExists() //当父节点不存在时创建父节点 .creatingParentsIfNeeded() //添加监听 .usingWatcher(new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println(watchedEvent.getPath()); } }) .forPath("/node3"); System.out.println(stat); curatorFramework.close(); } }

    异步操作的inBackground方法详解: 以上的操作都有异步模式,上面在创建节点时演示了一下,其他操作也差不多。

    public interface Backgroundable<T> { //此方法有5个重载 //啥也不设置 public T inBackground(); //提供一个上下文对象,其实这个用的也不多。 public T inBackground(Object context); //传入一个回调对象BackgroundCallback,会在操作执行完成后执行该回调方法。 public T inBackground(BackgroundCallback callback); //传入一个回调对象BackgroundCallback和一个上下文对象context,会在操作执行完成后执行该回调方法。 public T inBackground(BackgroundCallback callback, Object context); //传入一个回调对象BackgroundCallback和一个线程池对象,会用该线程池里面的线程进行相关操作,比如异步创建节点是使用这个线程池中的线程异步创建的。 public T inBackground(BackgroundCallback callback, Executor executor); //传入一个回调器、上下文对象、线程池。 public T inBackground(BackgroundCallback callback, Object context, Executor executor); } //回调器 public interface BackgroundCallback { /** * 回调方法,我们就是通过实现该接口复写自己的回调方法 * client CuratorFramework 客户端对象 * event事件上下文 */ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception; } //CuratorEvent事件上下文,里面的值不是每个事件类型都会有值,某些值只有特定的事件触发才会有。 public interface CuratorEvent { /** * @return event type 事件类型 */ public CuratorEventType getType(); /** * 操作执行的返回状态码可以用于判断操作有没有执行成功,与原生API回调的rc参数一致 */ public int getResultCode(); /** * 节点路径 */ public String getPath(); /** * 节点上下文参数,就是inBackground方法传递的context。 */ public Object getContext(); /** * 节点的元信息 */ public Stat getStat(); /** * 节点的数据 */ public byte[] getData(); public String getName(); /** * 节点的子节点 */ public List<String> getChildren(); /** * 节点的权限列表 */ public List<ACL> getACLList(); public List<CuratorTransactionResult> getOpResults(); /** * 节点的监听事件 */ public WatchedEvent getWatchedEvent(); } //curator的事件类型,会在调用不同的方法就会触发相关的事件 public enum CuratorEventType { /** * create方法事件 */ CREATE, /** *delete方法事件 */ DELETE, /** * checkExists方法的事件 */ EXISTS, /** * getData方法事件 */ GET_DATA, /** * setData方法事件 */ SET_DATA, /** * getChildren方法事件 */ CHILDREN, /** * sync方法事件 */ SYNC, /** * getACL方法事件 */ GET_ACL, /** * setACL方法事件 */ SET_ACL, /** * transaction方法事件 */ TRANSACTION, /** * getConfig方法事件 */ GET_CONFIG, /** * reconfig方法事件 */ RECONFIG, /** * watched方法事件 */ WATCHED, /** * watches方法事件 */ REMOVE_WATCHES, /** * close方法事件 */ CLOSING }
    curator的事务操作

    curator还能进行事务操作,保证一段代码的原子性。这是curator独有的。

    @Test public void transactionNode() throws Exception{ CuratorFramework curatorFramework = CuratorSessionDemo.getClientWithFluentStyle(); //CuratorOp是对每个操作的抽象,这里创建一个新增节点的操作 //transactionOp() 这个方法返回事务操作类型 目前有create 、 check 、delete、setData CuratorOp createOp = curatorFramework.transactionOp(). .create() .withMode(CreateMode.PERSISTENT) .forPath("node3", "555".getBytes()); //这里创建一个修改节点数据的操作 CuratorOp setDataOp = curatorFramework.transactionOp() .setData() .withVersion(-1) .forPath("node4", "666".getBytes()); //把操作添加进事务中,这两个操作就具有原子性了 //返回每个操作的结果封装成CuratorTransactionResult对象 List<CuratorTransactionResult> curatorTransactionResults = curatorFramework.transaction().forOperations(createOp, setDataOp); for (CuratorTransactionResult curatorTransactionResult:curatorTransactionResults){ System.out.println(curatorTransactionResult); } }
    curator的watcher实现:

    curator有三种watcher来做节点监听

    PathChildrenCache:监视一个路径下子节点的创建、删除、节点数据的更新。nodecache:监视一个节点的创建、删除、更新。treecache:pathcache + nodecache。缓存路径下所有子节点的数据。
    PathChildrenCachedemo
    @Test public void testPathCache() throws Exception { ExecutorService executorService = Executors.newSingleThreadExecutor(); CuratorFramework curatorFramework = CuratorSessionDemo.getClientWithFluentStyle(); //构造器有多个重载,这里就实现参数比较多的一个 /** * 第一个参数 客户端对象 * 第二个参数 要节点的节点路径 * 第三个参数 是否缓存子节点数据 * 第四个参数 告知监听器节点数据是否是压缩数据 * 第五个参数 用于PathChildrenCache的后台线程的ExecutorService。此线程池应该是单线程的,否则缓存可能会看到不一致的结果。 */ PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework ,"/PathChildrenCache", true ,false ,executorService); //启动监听,必须要有这一步才能实现监听,有三种启动模式 //PathChildrenCache.StartMode.NORMAL 异步进行缓存初始化 //PathChildrenCache.StartMode.BUILD_INITIAL_CACHE 同步进行缓存初始化 并且会rebuild监听器缓存 //PathChildrenCache.StartMode.POST_INITIALIZED_EVENT 异步进行缓存初始化,并且会触发PathChildrenCacheEvent.Type.INITIALIZED事件 pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { switch (event.getType()){ case CHILD_ADDED:{ System.out.println("子节点被创建了"); System.out.println(event.getData()); System.out.println(event.getInitialData()); System.out.println(event.getType()); System.out.println(event.toString()); break; } case CHILD_REMOVED:{ System.out.println("子节点被删除了"); System.out.println(event.getData()); System.out.println(event.getInitialData()); System.out.println(event.getType()); System.out.println(event.toString()); break; } case CHILD_UPDATED:{ System.out.println("子节点被修改了"); System.out.println(event.getData()); System.out.println(event.getInitialData()); System.out.println(event.getType()); System.out.println(event.toString()); break; } default: break; } } }); //获取缓存中的子节点数据 System.out.println(pathChildrenCache.getCurrentData()); //创建子节点 String path = curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/PathChildrenCache/node4", "node4".getBytes()); TimeUnit.SECONDS.sleep(1); //修改子节点 Stat stat = curatorFramework.setData().forPath("/PathChildrenCache/node4", "node100".getBytes()); TimeUnit.SECONDS.sleep(1); //删除子节点 curatorFramework.delete().forPath("/PathChildrenCache/node4"); System.out.println("okokokokokok"); //阻塞主线程 TimeUnit.SECONDS.sleep(3000); }

    结果:

    NodeCache demo
    @Test public void testNodeCache() throws Exception { ExecutorService executorService = Executors.newSingleThreadExecutor(); CuratorFramework curatorFramework = CuratorSessionDemo.getClientWithFluentStyle(); /** * 第一个参数 curator客户端 * 第二个参数 NodeCache * 第三个参数 节点数据是否被压缩过 */ NodeCache nodeCache = new NodeCache(curatorFramework,"/NodeCache5",false); //如果为true,就会在start前调用rebuild方法。 nodeCache.start(true); nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { //节点的创建 删除 数据修改都会触发,但是这里却没有分事件类型 System.out.println("nodeChanged触发"); System.out.println("节点数据修改后的结果:" + nodeCache.getCurrentData()); System.out.println("节点路径: " + nodeCache.getPath()); } }); curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/NodeCache5","555".getBytes()); TimeUnit.SECONDS.sleep(1); curatorFramework.setData().forPath("/NodeCache5","NodeCache5".getBytes()); TimeUnit.SECONDS.sleep(1); curatorFramework.delete().forPath("/NodeCache5"); TimeUnit.SECONDS.sleep(3000); }

    结果:

    TreeCache

    这个是最强大的Watcher

    @Test public void testTree() throws Exception { CuratorFramework curatorFramework = CuratorSessionDemo.getClientWithFluentStyle(); TreeCache treeCache = new TreeCache(curatorFramework,"/treeCache"); treeCache.start(); treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { switch (event.getType()){ case NODE_ADDED:{ System.out.println("节点"+ event.getData().getPath() +"被创建了"); System.out.println("新增的节点的数据:" + new String(event.getData().getData())); break; } case NODE_REMOVED:{ System.out.println("节点"+ event.getData().getPath() +"被删除了"); System.out.println("删除的节点的数据:" + new String(event.getData().getData())); break; } case NODE_UPDATED:{ System.out.println("节点"+ event.getData().getPath() +"数据被修改了"); System.out.println("修改后的节点的数据:" + new String(event.getData().getData())); break; } } } }); //创建节点 curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/treeCache","555".getBytes()); TimeUnit.SECONDS.sleep(1); //修改节点数据 curatorFramework.setData().forPath("/treeCache","treeCache".getBytes()); //创建节点的子节点 curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/treeCache/treeCache1","555".getBytes()); //创建节点的子节点 curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/treeCache/treeCache2","555".getBytes()); //创建节点的子节点的子节点 curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/treeCache/treeCache1/treeCache1.1","555".getBytes()); //修改子节点的数据 curatorFramework.setData().forPath("/treeCache/treeCache2","treeCache2".getBytes()); TimeUnit.SECONDS.sleep(1); //获取本节点treeCache的数据 ChildData treeCacheData = treeCache.getCurrentData("/treeCache"); System.out.println(treeCacheData.getPath() + "节点的数据是" + new String(treeCacheData.getData())); //获取子节点的数据treeCache2 ChildData treeCache2Data = treeCache.getCurrentData("/treeCache/treeCache2"); System.out.println(treeCache2Data.getPath() + "节点的数据是" + new String(treeCache2Data.getData())); //获取子节点的子节点treeCache1.1的数据 ChildData treeCache1_1Data = treeCache.getCurrentData("/treeCache/treeCache1/treeCache1.1"); System.out.println(treeCache1_1Data.getPath() + "节点的数据是" + new String(treeCache1_1Data.getData())); //获取本节点的子节点 Map<String, ChildData> currentChildren = treeCache.getCurrentChildren("/treeCache"); System.out.println("节点/treeCache的子节点列表============子节点列表"); for (Map.Entry<String,ChildData> entry: currentChildren.entrySet()){ System.out.println("key = " + entry.getKey() + "data = " + new String(entry.getValue().getData())); } //获取本节点的子节点的子节点列表 Map<String, ChildData> currentChildren1 = treeCache.getCurrentChildren("/treeCache/treeCache1"); System.out.println("节点/treeCache/treeCache1的子节点列表============子节点列表"); for (Map.Entry<String,ChildData> entry: currentChildren1.entrySet()){ System.out.println("key = " + entry.getKey() + "data = " + new String(entry.getValue().getData())); } //删除节点/treeCache及其子节点 curatorFramework.delete().deletingChildrenIfNeeded().forPath("/treeCache"); TimeUnit.SECONDS.sleep(3000); }

    结果: 后面的删除就不贴出来了 。 总结 : TreeCache能够监控 节点的新建/删除/数据修改 、节点的子节点的新建/删除/修改、节点的子节点的子节点的新建/删除/修改,还有能缓存节点及其子节点、子节点的子节点到本地,实属强大。

    Processed: 0.017, SQL: 9