Java API操作
6.1 原生 API
1)Maven依赖
<dependency>
<groupId>org.apache.zookeeper
</groupId>
<artifactId>zookeeper
</artifactId>
<version>3.4.6
</version>
<type>pom
</type>
</dependency>
2)相关操作
(1)获得客户端对象
private ZooKeeper zooKeeper
;
@Before
public void getCline() throws Exception
{
zooKeeper
= new ZooKeeper("192.168.134.99:2181",2000,null
);
}
(2)获取节点值
@Test
public void getData()throws Exception
{
byte[] data
= zooKeeper
.getData("/ba/123", null
, null
);
System
.out
.println(new String(data
));
}
(3)获取节点孩子
@Test
public void getChild() throws Exception
{
List
<String> children
= zooKeeper
.getChildren("/ba/gjf", null
);
children
.forEach((a
)->{
System
.out
.println(a
);
});
}
(4)创建节点
@Test
public void createNode() throws Exception
{
Object value
= null
;
value
= "12312312";
byte[] bytes
= ((String
) value
).getBytes();
String s
= zooKeeper
.create("/ba/123", bytes
, ZooDefs
.Ids
.OPEN_ACL_UNSAFE
, CreateMode
.PERSISTENT
);
System
.out
.println(s
);
}
(5)更新节点值
@Test
public void setData() throws Exception
{
Stat stat
= zooKeeper
.setData("/ba/123", new String("123123").getBytes(), -1);
System
.out
.println(stat
);
}
(6)判断节点是否存在
@Test
public void exist() throws Exception
{
Stat stat
= zooKeeper
.exists("/ba/13", false);
if (stat
==null
) {
System
.out
.println("该节点不存在!");
}else {
System
.out
.println("该节点存在!");
}
}
6.2 zkCline API
1)Mavne依赖
<dependency>
<groupId>com.101tec
</groupId>
<artifactId>zkclient
</artifactId>
<version>0.8
</version>
</dependency>
2)核心API
ZkClient client
=new ZkClient("ip:port");
3)相关操作
package test
;
import org
.I0Itec
.zkclient
.ZkClient
;
import java
.util
.List
;
public class Main {
public static void main(String
[] args
) {
ZkClient zkClient
= new ZkClient("192.168.134.5:2181");
List
<String> children
= zkClient
.getChildren("/");
children
.forEach((a
)->{
System
.out
.println(a
);
});
}
}
(1)创建节点
@Test
public void createNode() {
String result
= zkClient
.create("/ba/001", "001", CreateMode
.PERSISTENT
);
System
.out
.println(result
);
}
(2)获取节点
@Test
public void getData() {
Object readData
= zkClient
.readData("/ba/001");
System
.out
.println(readData
.toString());
}
(3)更新节点值
@Test
public void setData() {
zkClient
.writeData("/ba/001", "123123");
}
(4)删除节点
@Test
public void delNode() {
boolean delete
= zkClient
.delete("/ba/001");
if (delete
) {
System
.out
.println("删除成功!");
} else {
System
.out
.println("删除失败或者节点不存在!");
}
}
(5)判断节点是否存在
@Test
public void exist(){
boolean exists
= zkClient
.exists("/ba/001");
if (exists
){
System
.out
.println("该节点已经存在!");
}else {
System
.out
.println("该节点不存在!");
}
}
(6)获取孩子节点
@Test
public void getChild(){
List
<String> children
= zkClient
.getChildren("/ba");
for (String child
: children
) {
System
.out
.println(child
);
}
}
6.3 Curator API
1)Maven依赖
<dependency>
<groupId>org
.apache
.curator
</groupId
>
<artifactId>curator
-framework
</artifactId
>
<version>2.7.1</version
>
</dependency
>
2)相关操作
(1)获取客户端对象
private CuratorFramework curatorFramework
;
@Before
public void getClient() {
ExponentialBackoffRetry backoffRetry
= new ExponentialBackoffRetry(1000, 1000);
curatorFramework
= CuratorFrameworkFactory
.newClient("192.168.134.99:2181", backoffRetry
);
curatorFramework
.start();
}
(2)创建节点
@Test
public void createNode() throws Exception
{
String s
= curatorFramework
.create().withMode(CreateMode
.PERSISTENT
).forPath("/ba/002", new String("123").getBytes());
System
.out
.println(s
);
}
(3)获取节点
@Test
public void getData() throws Exception
{
byte[] bytes
= curatorFramework
.getData().forPath("/ba/002");
System
.out
.println(new String(bytes
));
}
(4)更新节点值
@Test
public void setData() {
try {
curatorFramework
.setData().forPath("/ba/001", new String("123123").getBytes());
System
.out
.println("更新成功!");
} catch (KeeperException.NoNodeException e
) {
System
.out
.println("更新失败,该节点不存在!");
} catch (Exception e
) {
e
.printStackTrace();
}
}
(5)获取孩子节点
@Test
public void getChild() throws Exception
{
curatorFramework
.getChildren().forPath("/ba").forEach((child
) -> {
System
.out
.println(child
);
});
}
(6)删除节点
@Test
public void delNode() throws Exception
{
curatorFramework
.delete().forPath("/ba/002");
}
6.4 Watcher接口
1)原生API实现Watch接口
在这里使用的是原生的Apache的Java API
<dependency>
<groupId>org.apache.zookeeper
</groupId>
<artifactId>zookeeper
</artifactId>
<version>3.4.6
</version>
<type>pom
</type>
</dependency>
ZooKeeper Watcher监视使客户端能够接收来自ZooKeeper服务器的通知,并在发生时处理这些事件。 ZooKeeper Java API提供了一个名为Watcher的公共接口,客户端事件处理程序类必须实现该接口才能接收有关来自ZooKeeper服务器的事件通知。 以编程方式,使用这种客户端的应用程序通过向客户端注册回调(callback)对象来处理这些事件。
package test
;
import org
.apache
.zookeeper
.KeeperException
;
import org
.apache
.zookeeper
.WatchedEvent
;
import org
.apache
.zookeeper
.Watcher
;
import org
.apache
.zookeeper
.ZooKeeper
;
import java
.io
.IOException
;
import java
.util
.UUID
;
public class DataUpdater {
private static String hostPort
= "192.168.134.5:2181";
private static String zooDataPath
= "/ba/gjf";
ZooKeeper zk
;
public DataUpdater() throws IOException
{
try {
zk
= new ZooKeeper(hostPort
, 2000, null
);
} catch (IOException e
) {
e
.printStackTrace();
}
}
public void run() throws InterruptedException
, KeeperException
{
while (true) {
String uuid
= UUID
.randomUUID().toString();
byte zoo_data
[] = uuid
.getBytes();
zk
.setData(zooDataPath
, zoo_data
, -1);
try {
Thread
.sleep(5000);
} catch (InterruptedException e
) {
Thread
.currentThread().interrupt();
}
}
}
public static void main(String
[] args
) throws IOException
, InterruptedException
, KeeperException
{
DataUpdater dataUpdater
= new DataUpdater();
dataUpdater
.run();
}
}
package test
;
import org
.apache
.zookeeper
.*
;
public class DataWatcher implements Watcher,Runnable
{
ZooKeeper zooKeeper
;
public DataWatcher() {
try {
zooKeeper
= new ZooKeeper("192.168.134.5:2181",2000,this);
if (zooKeeper
.exists("/ba/gjf",this)==null
) {
zooKeeper
.create("/ba/gjf", "get".getBytes(), ZooDefs
.Ids
.OPEN_ACL_UNSAFE
, CreateMode
.PERSISTENT
);
}
}catch (Exception e
){
e
.printStackTrace();
}
}
public static void main(String
[] args
) {
DataWatcher dataWatcher
= new DataWatcher();
dataWatcher
.run();
}
@Override
public void process(WatchedEvent event
) {
if (event
.getType() == Event
.EventType
.NodeDataChanged
){
System
.out
.println("数据发改变了吧");
try {
byte[] data
= zooKeeper
.getData("/ba/gjf", this, null
);
String s
= new String(data
);
System
.out
.println(s
);
}catch (Exception e
){
}
}
}
public void run() {
try {
synchronized (this) {
while (true) {
wait();
}
}
} catch (InterruptedException e
) {
e
.printStackTrace();
Thread
.currentThread().interrupt();
}
}
}
2)ZkClient实现Watcher接口
@Test
public void watcherInterface() throws Exception
{
zkClient
.subscribeDataChanges("/ba/gjf", new IZkDataListener() {
@Override
public void handleDataChange(String dataPath
, Object data
) throws Exception
{
System
.out
.println("节点名称:" + dataPath
);
System
.out
.println("节点名称:" + data
.toString());
}
@Override
public void handleDataDeleted(String dataPath
) throws Exception
{
}
});
Thread
.sleep(Integer
.MAX_VALUE
);
}
3)Curator API实现Watcher接口
ZooKeeper原生的API支持通过注册Watcher来进行事件监听,但是Watcher通知是一次性的,因此开发过程中需要反复注册Watcher,比较繁琐。Curator引入了Cache来监听ZooKeeper服务端的事件。Cache对ZooKeeper事件监听进行了封装,能够自动处理反复注册监听,简化了ZooKeeper原生API繁琐的开发过程。
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.7.1</version>
</dependency>
(1)NodeCache
监听指定的数据变化
ExecutorService pool
= Executors
.newFixedThreadPool(2);
NodeCache nodeCache
= new NodeCache(curatorFramework
, "/ba/gjf", false);
nodeCache
.start(true);
nodeCache
.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception
{
System
.out
.println(new String(nodeCache
.getCurrentData().getData()));
}
}, pool
);
(2)PathChildrenCache
监听指定节点下所有子节点的数据变化
PathChildrenCache childrenCache
= new PathChildrenCache(curatorFramework
, "/ba", true);
childrenCache
.start(PathChildrenCache
.StartMode
.POST_INITIALIZED_EVENT
);
childrenCache
.getListenable().addListener(
new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client
, PathChildrenCacheEvent event
)
throws Exception
{
switch (event
.getType()) {
case CHILD_ADDED
:
System
.out
.println("CHILD_ADDED: " + event
.getData().getPath());
break;
case CHILD_REMOVED
:
System
.out
.println("CHILD_REMOVED: " + event
.getData().getPath());
break;
case CHILD_UPDATED
:
System
.out
.println("CHILD_UPDATED: " + event
.getData().getPath());
break;
default:
break;
}
}
},
pool
);