环境配置
三个节点: node1:2181 node1:2181 node1:2181 具体配置请参考大数据集群之Hadoop集群(HA)
实现说明
运用zk的事件和临时序列化节点实现。
当一个线程需要获取锁时,先zk上创建一个临时序列化节点,并监听事件创建节点后,判断自己是否为唯一的节点,如果是则直接获取锁,如果不是,获取所有的兄弟节点并从小到大排序判断自己是不是序列号最小的节点(最早注册),如果是则获取锁。如果均不是则等待。当一个节点拿锁并执行完任务后,会将自己节点从zk上删除,此时其他节点将会收到有节点删除的事件信息。收到事件信息后,获取所有的当前节点并进行从小到大进行排序判断自己是否为最小节点,如果是则获取锁,执行任务。
实现
启动所有的zk节点。
使用zkCli.sh登陆zk并创建父节点
zkCli
.sh # 登陆zk服务
create
/test
'' # 创建
/test节点
create
/test
/root
'' # 创建
/test
/root节点
Java代码如下
package com
.yc
.zookeeper
.project
.zkDome
.lockpro
;
import java
.util
.Collections
;
import java
.util
.List
;
import java
.util
.Random
;
import java
.util
.concurrent
.CountDownLatch
;
import org
.apache
.zookeeper
.CreateMode
;
import org
.apache
.zookeeper
.KeeperException
;
import org
.apache
.zookeeper
.WatchedEvent
;
import org
.apache
.zookeeper
.Watcher
;
import org
.apache
.zookeeper
.ZooDefs
;
import org
.apache
.zookeeper
.ZooKeeper
;
public class DistributedClient {
private static final String root
= "/test/root";
private static final String lock
= "lock_";
private String thisPath
;
private static final int sessionTimeout
= 5000;
private String connectString
= "node1:2181,node1:2181,node1:2181";
private ZooKeeper zooKeeper
;
public void doTask() throws InterruptedException
, KeeperException
{
System
.out
.println(Thread
.currentThread().getName() + "获取到锁,开始执行任务...");
try {
Thread
.sleep(new Random().nextInt(1000));
} catch (InterruptedException e
) {
e
.printStackTrace();
} finally {
zooKeeper
.delete(root
+ "/" + this.thisPath
, -1);
}
System
.out
.println(Thread
.currentThread().getName() + "任务执行完毕,解除锁...");
}
public void lock() {
try {
zooKeeper
= new ZooKeeper(connectString
, sessionTimeout
, new Watcher() {
@Override
public void process(WatchedEvent event
) {
if (event
.getState() == Event
.KeeperState
.SyncConnected
) {
if (event
.getType() == Event
.EventType
.NodeDeleted
) {
System
.out
.println("有节点" + event
.getPath() + ":被删除了");
try {
List
<String> children1
= zooKeeper
.getChildren(root
, false);
Collections
.sort(children1
);
int index
= children1
.indexOf(thisPath
);
if (index
== 0) {
doTask();
}
} catch (KeeperException e
) {
e
.printStackTrace();
} catch (InterruptedException e
) {
e
.printStackTrace();
}
}
}
}
});
this.thisPath
= zooKeeper
.create(root
+ "/" + lock
, new byte[0], ZooDefs
.Ids
.OPEN_ACL_UNSAFE
,
CreateMode
.EPHEMERAL_SEQUENTIAL
);
System
.out
.println(Thread
.currentThread().getName() + "create " + this.thisPath
);
this.thisPath
= thisPath
.substring(thisPath
.lastIndexOf("/") + 1);
List
<String> children
= zooKeeper
.getChildren(root
, false);
if (children
.size() == 1) {
if (children
.get(0).equals(this.thisPath
)) {
doTask();
}
} else {
Collections
.sort(children
);
if (children
.get(0).equals(this.thisPath
)) {
doTask();
}
}
} catch (Exception e
) {
e
.printStackTrace();
}
}
public static void main(String
[] args
) throws InterruptedException
{
for (int i
= 0; i
< 10; i
++) {
new Thread(new Runnable() {
@Override
public void run() {
DistributedClient c
= new DistributedClient();
try {
Thread
.sleep(new Random().nextInt(1000));
} catch (InterruptedException e
) {
e
.printStackTrace();
}
c
.lock();
}
}).start();
}
Thread
.sleep(60000);
}
}
源码已上传至Github:点击跳转
另外还有一个使用zookeeper模拟Hadoop 的DataNode 在 NameNode上注册的功能,有兴趣的同学也可以看看:点击跳转