curator是Netflix公司开源的一个Zookeeper客户端框架,curator框架在Zookeeper原生的API接口上进行了封装,屏蔽了Zookeeper原生客户端非常底层的细节开发,使得使用更加方便。并且还提供了Zookeeper多种应用场景的封装,比如分布式锁服务、集群领导选举等场景实现的封装,还实现了Fluent风格的链式调用API,是最好用,最流行的Zookeeper客户端。
原生Zookeeper的不足:
连接对象异步创建,需要开发人员自行编码等待。连接没有会话超时自动重连机制。Watcher一次注册只生效一次。不支持递归创建树形节点。curator特点:
设有Session超时重连机制。Watcher重复注册机制。简化开发API。遵循fluent风格API。提供Zookeeper常用的场景封装实现。依赖:
<!--这个包是curator提供的对各种常用场景的封装实现,比如分布式锁、集群master选举等--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.2.0</version> </dependency> <!--这个包是curator提供对Zookeeper的基本操作封装--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.2.0</version> </dependency>curator对Zookeeper的基本操作是通过CuratorFramework接口来定义的。
curator提供了会话连接超时的重试策略,用RetryPolicy接口定义。 常用的会话连接超时重试策略实现有:
//重试N次,当会话超时出现后,curator会每间隔sleepMsBetweenRetries毫秒时间重试一次,共重试n次。 RetryNTimes(int n, int sleepMsBetweenRetries) //重试一次,是RetryNTimes的一种特殊实现,相当于RetryNTimes(1, int sleepMsBetweenRetries), // 会话超时后sleepMsBetweenRetry毫秒后进行一次连接重试,仅重试一次。 public RetryOneTime(int sleepMsBetweenRetry) //在maxElapsedTimeMs毫秒时间内,每隔sleepMsBetweenRetries重试一次。 //比如maxElapsedTimeMs=10000 sleepMsBetweenRetries=3000 ,表示在10秒内,每隔3秒重试一次,理论上是重试3次。 //重试次数有上限,是int类型的最大值次数。 public RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries) //衰减重试,baseSleepTimeMs是基础衰减时间,maxRetries是最大重试次数。 //重试时间间隔 = baseSleepTimeMs*math.max(1,random.nextInt(1<<(retryCount+1))) //retryCount从0开始 //比如ExponentialBackoffRetry(1000,5), //那么第一次的重试时间间隔=1000*math.max(1,random.nextInt(1<<(1))),重试的时间间隔随着重试的次数增大而增大,衰减重试,但是retryCount有最大值,就是当retryCount>29时,curator会把他设置成29。因为1最多左移30位。最大间隔时间也有限制,默认int的最大值,也可以通过另一个构造函数来设置。 //只会重试maxRetries次。 public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries) //maxSleepMs最大间隔时间,当衰减重试计算的间隔时间比它大的,就设置为该时间间隔。 public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)创建客户端会话,实际上就是创建CuratorFramework对象。有两种创建风格,普通风格跟fluent风格。
public class CuratorSessionDemo{ //Zookeeper服务器地址,集群的话多个用逗号分隔 private static final String SERVER_STRING = "192.168.18.137:2181"; //父级节点路径 private static final String PREFIX_PATH = "curator"; private static RetryPolicy retryOneTimePolicy = new RetryOneTime(3000); private static RetryPolicy retryThreeTimePolicy = new RetryNTimes(3,3000); private static RetryPolicy retryUntilElapsed = new RetryUntilElapsed(10000,3000); private static RetryPolicy exponentialBackoffRetry = new ExponentialBackoffRetry(1000,3); private static List<AuthInfo> authInfoList = new ArrayList<>(); static { AuthInfo authInfo = new AuthInfo("digest","admin:123456".getBytes()); authInfoList.add(authInfo); } @Test public CuratorFramework getClientWithNormalStyle(){ /** * 普通方式创建Zookeeper的curator客户端 *参数一 Zookeeper服务器地址,多个用逗号分隔 * 参数二 客户端的会话超时 单位毫秒 * 参数三: 客户端连接到服务器的连接超时时间 单位毫秒 * 参数四: 会话超时重连策略 */ CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(SERVER_STRING,5000, 10000,exponentialBackoffRetry); //上面仅创建了客户端。必须要使用start方法来进行连接 curatorFramework.start(); return curatorFramework; } public CuratorFramework getClientWithFluentStyle(){ CuratorFramework curatorFramework = //创建一个curatorFramework构造器 CuratorFrameworkFactory.builder() //设置Zookeeper服务器地址,多个用逗号分隔 .connectString(SERVER_STRING) //设置连接超时时间 .connectionTimeoutMs(10000) //设置会话超时时间 .sessionTimeoutMs(5000) //设置数据压缩的压缩器 .compressionProvider(new GzipCompressionProvider()) //设置会话超时重连策略 .retryPolicy(exponentialBackoffRetry) //添加认证 .authorization(authInfoList) //设置该客户端是否只进行非事务操作 .canBeReadOnly(false) //设置默认的数据,当创建节点时不提供节点数据的话,就使用该数据 .defaultData("".getBytes()) //设置节点路径的父节点路径 .namespace(PREFIX_PATH) //构建curatorFramework对象 .build(); //上面仅创建了客户端。必须要使用start方法来进行连接 curatorFramework.start(); return curatorFramework; } }异步操作的inBackground方法详解: 以上的操作都有异步模式,上面在创建节点时演示了一下,其他操作也差不多。
public interface Backgroundable<T> { //此方法有5个重载 //啥也不设置 public T inBackground(); //提供一个上下文对象,其实这个用的也不多。 public T inBackground(Object context); //传入一个回调对象BackgroundCallback,会在操作执行完成后执行该回调方法。 public T inBackground(BackgroundCallback callback); //传入一个回调对象BackgroundCallback和一个上下文对象context,会在操作执行完成后执行该回调方法。 public T inBackground(BackgroundCallback callback, Object context); //传入一个回调对象BackgroundCallback和一个线程池对象,会用该线程池里面的线程进行相关操作,比如异步创建节点是使用这个线程池中的线程异步创建的。 public T inBackground(BackgroundCallback callback, Executor executor); //传入一个回调器、上下文对象、线程池。 public T inBackground(BackgroundCallback callback, Object context, Executor executor); } //回调器 public interface BackgroundCallback { /** * 回调方法,我们就是通过实现该接口复写自己的回调方法 * client CuratorFramework 客户端对象 * event事件上下文 */ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception; } //CuratorEvent事件上下文,里面的值不是每个事件类型都会有值,某些值只有特定的事件触发才会有。 public interface CuratorEvent { /** * @return event type 事件类型 */ public CuratorEventType getType(); /** * 操作执行的返回状态码可以用于判断操作有没有执行成功,与原生API回调的rc参数一致 */ public int getResultCode(); /** * 节点路径 */ public String getPath(); /** * 节点上下文参数,就是inBackground方法传递的context。 */ public Object getContext(); /** * 节点的元信息 */ public Stat getStat(); /** * 节点的数据 */ public byte[] getData(); public String getName(); /** * 节点的子节点 */ public List<String> getChildren(); /** * 节点的权限列表 */ public List<ACL> getACLList(); public List<CuratorTransactionResult> getOpResults(); /** * 节点的监听事件 */ public WatchedEvent getWatchedEvent(); } //curator的事件类型,会在调用不同的方法就会触发相关的事件 public enum CuratorEventType { /** * create方法事件 */ CREATE, /** *delete方法事件 */ DELETE, /** * checkExists方法的事件 */ EXISTS, /** * getData方法事件 */ GET_DATA, /** * setData方法事件 */ SET_DATA, /** * getChildren方法事件 */ CHILDREN, /** * sync方法事件 */ SYNC, /** * getACL方法事件 */ GET_ACL, /** * setACL方法事件 */ SET_ACL, /** * transaction方法事件 */ TRANSACTION, /** * getConfig方法事件 */ GET_CONFIG, /** * reconfig方法事件 */ RECONFIG, /** * watched方法事件 */ WATCHED, /** * watches方法事件 */ REMOVE_WATCHES, /** * close方法事件 */ CLOSING }curator还能进行事务操作,保证一段代码的原子性。这是curator独有的。
@Test public void transactionNode() throws Exception{ CuratorFramework curatorFramework = CuratorSessionDemo.getClientWithFluentStyle(); //CuratorOp是对每个操作的抽象,这里创建一个新增节点的操作 //transactionOp() 这个方法返回事务操作类型 目前有create 、 check 、delete、setData CuratorOp createOp = curatorFramework.transactionOp(). .create() .withMode(CreateMode.PERSISTENT) .forPath("node3", "555".getBytes()); //这里创建一个修改节点数据的操作 CuratorOp setDataOp = curatorFramework.transactionOp() .setData() .withVersion(-1) .forPath("node4", "666".getBytes()); //把操作添加进事务中,这两个操作就具有原子性了 //返回每个操作的结果封装成CuratorTransactionResult对象 List<CuratorTransactionResult> curatorTransactionResults = curatorFramework.transaction().forOperations(createOp, setDataOp); for (CuratorTransactionResult curatorTransactionResult:curatorTransactionResults){ System.out.println(curatorTransactionResult); } }curator有三种watcher来做节点监听
PathChildrenCache:监视一个路径下子节点的创建、删除、节点数据的更新。nodecache:监视一个节点的创建、删除、更新。treecache:pathcache + nodecache。缓存路径下所有子节点的数据。结果:
结果:
这个是最强大的Watcher
@Test public void testTree() throws Exception { CuratorFramework curatorFramework = CuratorSessionDemo.getClientWithFluentStyle(); TreeCache treeCache = new TreeCache(curatorFramework,"/treeCache"); treeCache.start(); treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { switch (event.getType()){ case NODE_ADDED:{ System.out.println("节点"+ event.getData().getPath() +"被创建了"); System.out.println("新增的节点的数据:" + new String(event.getData().getData())); break; } case NODE_REMOVED:{ System.out.println("节点"+ event.getData().getPath() +"被删除了"); System.out.println("删除的节点的数据:" + new String(event.getData().getData())); break; } case NODE_UPDATED:{ System.out.println("节点"+ event.getData().getPath() +"数据被修改了"); System.out.println("修改后的节点的数据:" + new String(event.getData().getData())); break; } } } }); //创建节点 curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/treeCache","555".getBytes()); TimeUnit.SECONDS.sleep(1); //修改节点数据 curatorFramework.setData().forPath("/treeCache","treeCache".getBytes()); //创建节点的子节点 curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/treeCache/treeCache1","555".getBytes()); //创建节点的子节点 curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/treeCache/treeCache2","555".getBytes()); //创建节点的子节点的子节点 curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/treeCache/treeCache1/treeCache1.1","555".getBytes()); //修改子节点的数据 curatorFramework.setData().forPath("/treeCache/treeCache2","treeCache2".getBytes()); TimeUnit.SECONDS.sleep(1); //获取本节点treeCache的数据 ChildData treeCacheData = treeCache.getCurrentData("/treeCache"); System.out.println(treeCacheData.getPath() + "节点的数据是" + new String(treeCacheData.getData())); //获取子节点的数据treeCache2 ChildData treeCache2Data = treeCache.getCurrentData("/treeCache/treeCache2"); System.out.println(treeCache2Data.getPath() + "节点的数据是" + new String(treeCache2Data.getData())); //获取子节点的子节点treeCache1.1的数据 ChildData treeCache1_1Data = treeCache.getCurrentData("/treeCache/treeCache1/treeCache1.1"); System.out.println(treeCache1_1Data.getPath() + "节点的数据是" + new String(treeCache1_1Data.getData())); //获取本节点的子节点 Map<String, ChildData> currentChildren = treeCache.getCurrentChildren("/treeCache"); System.out.println("节点/treeCache的子节点列表============子节点列表"); for (Map.Entry<String,ChildData> entry: currentChildren.entrySet()){ System.out.println("key = " + entry.getKey() + "data = " + new String(entry.getValue().getData())); } //获取本节点的子节点的子节点列表 Map<String, ChildData> currentChildren1 = treeCache.getCurrentChildren("/treeCache/treeCache1"); System.out.println("节点/treeCache/treeCache1的子节点列表============子节点列表"); for (Map.Entry<String,ChildData> entry: currentChildren1.entrySet()){ System.out.println("key = " + entry.getKey() + "data = " + new String(entry.getValue().getData())); } //删除节点/treeCache及其子节点 curatorFramework.delete().deletingChildrenIfNeeded().forPath("/treeCache"); TimeUnit.SECONDS.sleep(3000); }结果: 后面的删除就不贴出来了 。 总结 : TreeCache能够监控 节点的新建/删除/数据修改 、节点的子节点的新建/删除/修改、节点的子节点的子节点的新建/删除/修改,还有能缓存节点及其子节点、子节点的子节点到本地,实属强大。