package com.carelink.rpc.registry.client.util;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.GetChildrenBuilder;
import org.apache.curator.framework.api.GetDataBuilder;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import com.carelink.rpc.registry.client.ZkClient;
import com.carelink.rpc.registry.client.ZkConfig;
import com.carelink.rpc.registry.client.factory.ZkClientServiceFactory;
import com.google.common.base.Charsets;
import com.google.common.base.Objects;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
public class RpcRegisteryService {
public static final int PROCESS = Runtime.getRuntime().availableProcessors();
private RpcRegisteryService() {
}
private static class SingletonHolder {
static final RpcRegisteryService instance = new RpcRegisteryService();
}
public static RpcRegisteryService instance() {
return SingletonHolder.instance;
}
private CuratorFramework zkclient = null;
public CuratorFramework getZkclient() {
return zkclient;
}
private void setZkclient(CuratorFramework zkclient) {
this.zkclient = zkclient;
}
/**
* 连接ZK 创建初始
*
* @param address
* 地址
* @param timeout
* 超时时间
* @param namespace
* 命名空间
* @param group
* 组
* @param groupVal
* 组节点值
* @param node
* 节点
* @param nodeVal
* 节点值
*/
public void connectZookeeper(
String address,
int timeout,
String namespace,
String group,
String groupVal,
String node,
String nodeVal) {
if (getZkclient() != null) {
return;
}
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
builder.connectString(address).connectionTimeoutMs(timeout).sessionTimeoutMs(timeout)
.retryPolicy(new RetryNTimes(Integer.MAX_VALUE, 2000));
if (!Strings.isNullOrEmpty(namespace)) {
builder.namespace(namespace);
}
setZkclient(builder.build());
RpcConnectionStateListener listener = new RpcConnectionStateListener(group, groupVal, node, nodeVal);
getZkclient().getConnectionStateListenable().addListener(listener);
getZkclient().start();
// 注入
startRegisterServer(group, groupVal, node, nodeVal);
}
private void startRegisterServer(String group, String groupVal, String node, String nodeVal) {
registerGroup(group, groupVal);
registerNode(group, node, nodeVal);
}
public boolean registerGroup(String group, String groupVal) {
return createNode("/" + group, groupVal, CreateMode.PERSISTENT);// 创建持久的
}
public boolean registerNode(String group, String node, String nodeVal) {
return createNode("/" + group + "/" + node, nodeVal, CreateMode.EPHEMERAL_SEQUENTIAL); // 创建临时的
}
// 设置路径更改监听
public void listenerPathChildren(String group,PathChildrenCacheListener listener) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(PROCESS * 2);
@SuppressWarnings("resource")
PathChildrenCache childrenCache = new PathChildrenCache(getZkclient(), "/" + group, true);
childrenCache.start(StartMode.POST_INITIALIZED_EVENT);
childrenCache.getListenable().addListener(listener, pool);
}
/**
* 创建节点
*
* @param nodeName
* @param value
* @param createMode
* @return
* @throws Exception
*/
public boolean createNode(String nodeName, String value, CreateMode createMode) {
boolean suc = false;
if (getZkclient() == null) {
return suc;
}
try {
Stat stat = getZkclient().checkExists().forPath(nodeName);
if (stat == null) {
String opResult = null;
// 节点判断值为不为空
if (Strings.isNullOrEmpty(value)) {
opResult = getZkclient().create().creatingParentsIfNeeded().withMode(createMode).forPath(nodeName);
} else {
opResult = getZkclient().create().creatingParentsIfNeeded().withMode(createMode).forPath(nodeName,
value.getBytes(Charsets.UTF_8));
}
suc = Objects.equal(nodeName, opResult);
}
} catch (Exception e) {
System.out.println("create node fail! path: " + nodeName + " value: " + value + " CreateMode: "
+ createMode.name());
e.printStackTrace();
return suc;
}
return suc;
}
public void destory() {
if(getZkclient()==null){
return;
}
getZkclient().close();
}
/**
* 删除节点
*
* @param node
* @return
*/
public boolean deleteNode(String node) {
if (getZkclient() == null) {
return false;
}
try {
Stat stat = getZkclient().checkExists().forPath(node);
if (stat != null) {
getZkclient().delete().deletingChildrenIfNeeded().forPath(node);
}
return true;
} catch (Exception e) {
System.out.println("delete node fail! path: " + node);
return false;
}
}
/**
* 获取指定节点下的子节点路径和值
* @param node
* @return
*/
public Map<String, String> getChildrenDetail(String node) {
if (getZkclient() == null) {
return null;
}
Map<String, String> map = Maps.newHashMap();
try {
GetChildrenBuilder childrenBuilder = getZkclient().getChildren();
List<String> children = childrenBuilder.forPath(node);
GetDataBuilder dataBuilder = getZkclient().getData();
if (children != null) {
for (String child : children) {
String propPath = ZKPaths.makePath(node, child);
map.put(child, new String(dataBuilder.forPath(propPath), Charsets.UTF_8));
}
}
} catch (Exception e) {
System.out.println("get node chilren list fail! path: " + node);
return null;
}
return map;
}
class RpcConnectionStateListener implements ConnectionStateListener{
private String group;
@SuppressWarnings("unused")
private String groupVal;
private String node;
private String nodeVal;
public RpcConnectionStateListener(String group, String groupVal,String node,String nodeVal) {
this.groupVal = groupVal;
this.group = group;
this.node = node;
this.nodeVal = nodeVal;
}
@Override
public void stateChanged(CuratorFramework cf, ConnectionState state) {
if(state == ConnectionState.LOST){
//重新注册
while(true){
//只需要注册节点,组已经是持久的//
if(registerNode(group, node, nodeVal)){
break;
}
}
}
}
}
//获取本机IP
public static String getLocalHost(String type){
InetAddress addr = null;
try {
addr = InetAddress.getLocalHost();
if("address".equals(type)){
return addr.getHostName().toString();//获得本机名称
}
return addr.getHostAddress().toString();//获得本机IP
} catch (UnknownHostException e) {
e.printStackTrace();
return "";
}
}
public static void main(String[] args) throws Exception {
String address = "192.168.200.34:2181,192.168.200.44:2181,192.168.200.64:2181";
System.out.println(ZkConfig.forView());
System.out.println(getLocalHost("ip"));
ZkClient.startZkRegistery(address,5000);
int i = 0;
while(true){
try {
System.out.println( ZkClientServiceFactory.getRpcClientServcie(0).getServersByGroup(ZkConfig.getZkServerGroup()).size());
System.out.println( ZkClientServiceFactory.getRpcClientServcie(0).getServersByGroup(ZkConfig.getZkServerGroup()));
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
i++;
if(i>10000){
break;
}
}
}
}
package com.carelink.rpc.registry.server.util;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.GetChildrenBuilder;
import org.apache.curator.framework.api.GetDataBuilder;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import com.carelink.rpc.registry.server.ZkConfig;
import com.carelink.rpc.registry.server.ZkService;
import com.google.common.base.Charsets;
import com.google.common.base.Objects;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
public class RpcRegisteryService {
private RpcRegisteryService() {
}
private static class SingletonHolder {
static final RpcRegisteryService instance = new RpcRegisteryService();
}
public static RpcRegisteryService instance() {
return SingletonHolder.instance;
}
private CuratorFramework zkclient = null;
private CuratorFramework getZkclient() {
return zkclient;
}
private void setZkclient(CuratorFramework zkclient) {
this.zkclient = zkclient;
}
/**
* 连接ZK 创建初始
*
* @param address
* 地址
* @param timeout
* 超时时间
* @param namespace
* 命名空间
* @param group
* 组
* @param groupVal
* 组节点值
* @param node
* 节点
* @param nodeVal
* 节点值
*/
public void connectZookeeper(
String address,
int timeout,
String namespace,
String group,
String groupVal,
String node,
String nodeVal) {
if (getZkclient() != null) {
return;
}
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
// ExponentialBackoffRetry:重试指定的次数, 且每一次重试之间停顿的时间逐渐增加.
// RetryNTimes:指定最大重试次数的重试策略
// RetryOneTime:仅重试一次
// RetryUntilElapsed:一直重试直到达到规定的时
builder.connectString(address).connectionTimeoutMs(timeout).sessionTimeoutMs(timeout)
.retryPolicy(new RetryNTimes(Integer.MAX_VALUE, 2000));
if (!Strings.isNullOrEmpty(namespace)) {
builder.namespace(namespace);
}
setZkclient(builder.build());
RpcConnectionStateListener listener = new RpcConnectionStateListener(group, groupVal, node, nodeVal);
getZkclient().getConnectionStateListenable().addListener(listener);
getZkclient().start();
// 注入服务
startRegisterServer(group, groupVal, node, nodeVal);
}
private void startRegisterServer(String group, String groupVal, String node, String nodeVal) {
registerGroup(group, groupVal);
registerNode(group, node, nodeVal);
}
public boolean registerGroup(String group, String groupVal) {
return createNode("/" + group, groupVal, CreateMode.PERSISTENT);// 创建持久的
}
public boolean registerNode(String group, String node, String nodeVal) {
return createNode("/" + group + "/" + node, nodeVal, CreateMode.EPHEMERAL_SEQUENTIAL); // 创建临时的
}
/**
* 创建节点
*
* @param nodeName
* @param value
* @param createMode
* @return
* @throws Exception
*/
public boolean createNode(String nodeName, String value, CreateMode createMode) {
boolean suc = false;
if (getZkclient() == null) {
return suc;
}
try {
Stat stat = getZkclient().checkExists().forPath(nodeName);
if (stat == null) {
String opResult = null;
// 节点判断值为不为空
if (Strings.isNullOrEmpty(value)) {
opResult = getZkclient().create().creatingParentsIfNeeded().withMode(createMode).forPath(nodeName);
} else {
opResult = getZkclient().create().creatingParentsIfNeeded().withMode(createMode).forPath(nodeName,
value.getBytes(Charsets.UTF_8));
}
suc = Objects.equal(nodeName, opResult);
}
} catch (Exception e) {
System.out.println("create node fail! path: " + nodeName + " value: " + value + " CreateMode: "
+ createMode.name());
e.printStackTrace();
return suc;
}
return suc;
}
public void destory() {
if(getZkclient()==null){
return;
}
getZkclient().close();
}
/**
* 删除节点
*
* @param node
* @return
*/
public boolean deleteNode(String node) {
if (getZkclient() == null) {
return false;
}
try {
Stat stat = getZkclient().checkExists().forPath(node);
if (stat != null) {
getZkclient().delete().deletingChildrenIfNeeded().forPath(node);
}
return true;
} catch (Exception e) {
System.out.println("delete node fail! path: " + node);
return false;
}
}
/**
* 获取指定节点下的子节点路径和值
* @param node
* @return
*/
public Map<String, String> getChildrenDetail(String node) {
if (getZkclient() == null) {
return null;
}
Map<String, String> map = Maps.newHashMap();
try {
GetChildrenBuilder childrenBuilder = getZkclient().getChildren();
List<String> children = childrenBuilder.forPath(node);
GetDataBuilder dataBuilder = getZkclient().getData();
if (children != null) {
for (String child : children) {
String propPath = ZKPaths.makePath(node, child);
map.put(child, new String(dataBuilder.forPath(propPath), Charsets.UTF_8));
}
}
} catch (Exception e) {
System.out.println("get node chilren list fail! path: " + node);
return null;
}
return map;
}
class RpcConnectionStateListener implements ConnectionStateListener{
private String group;
@SuppressWarnings("unused")
private String groupVal;
private String node;
private String nodeVal;
public RpcConnectionStateListener(String group, String groupVal,String node,String nodeVal) {
this.groupVal = groupVal;
this.group = group;
this.node = node;
this.nodeVal = nodeVal;
}
@Override
public void stateChanged(CuratorFramework cf, ConnectionState state) {
if(state == ConnectionState.LOST){
//重新注册服务
while(true){
//只需要注册节点,组已经是持久的
if(registerNode(group, node, nodeVal)){
break;
}
}
}
}
}
public static String getLocalHost(String type){
InetAddress addr = null;
try {
addr = InetAddress.getLocalHost();
if("address".equals(type)){
return addr.getHostName().toString();//获得本机名称
}
return addr.getHostAddress().toString();//获得本机IP
} catch (UnknownHostException e) {
e.printStackTrace();
return "";
}
}
public static void main(String[] args) {
ZkConfig.readConfig();
System.out.println(ZkConfig.forView());
System.out.println(getLocalHost("ip"));
ZkService.startZkRegistery();
int i = 0;
while(true){
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
i++;
if(i>10){
break;
}
}
ZkService.stopZkRegistery();
}
}
原文:http://my.oschina.net/exit/blog/494501