通知/协调机制通常有两种方式。
总的来说,利用Zookeeper的watcher注册和异步通知功能,通知的发送者创建一个节点,并将通知的数据写入的该节点;通知的接受者则对该节点注册watch,当节点变化时,就算作通知的到来。
通过上面的说明,其实实现还是非常容易的。看下关键的几个地方:
g_monitor_child
:变量等于0标识只监控节点,等于1标识监控所有子节点。
show_notify(zh,g_path);
:打印接受到的通知
show_list(zh,g_path);
:打印所有子节点的进度
再来看监控函数:
void zktest_watcher_g(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx)
{
//监控节点数据变化
if(type == ZOO_CHANGED_EVENT &&
state == ZOO_CONNECTED_STATE &&
g_monitor_child == 0){
show_notify(zh,g_path);
//监控子节点个数变化
}else if(type == ZOO_CHILD_EVENT &&
state == ZOO_CONNECTED_STATE &&
g_monitor_child == 1){
show_list(zh,g_path);
//监控子节点数据变化
}else if(type == ZOO_CHANGED_EVENT &&
state == ZOO_CONNECTED_STATE &&
g_monitor_child == 1){
show_list(zh,g_path);
}
}
下面是完整的代码:
#include<stdio.h>
#include<string.h>
#include<unistd.h>
#include"zookeeper.h"
#include"zookeeper_log.h"
char g_host[512]= "172.17.0.36:2181";
char g_path[512]= "/Notify";
int g_monitor_child = 0;
//watch function when child list changed
void zktest_watcher_g(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx);
void show_notify(zhandle_t *zkhandle,const char *path);
//show all process ip:pid
void show_list(zhandle_t *zkhandle,const char *path);
void print_usage();
void get_option(int argc,const char* argv[]);
/**********unitl*********************/
void print_usage()
{
printf("Usage : [notify] [-h] [-c] [-p path][-s ip:port] \n");
printf(" -h Show help\n");
printf(" -p path\n");
printf(" -c monitor the child nodes\n");
printf(" -s zookeeper server ip:port\n");
printf("For example:\n");
printf("notify -s172.17.0.36:2181 -p /Notify\n");
}
void get_option(int argc,const char* argv[])
{
extern char *optarg;
int optch;
int dem = 1;
const char optstring[] = "hcp:s:";
while((optch = getopt(argc , (char * const *)argv , optstring)) != -1 )
{
switch( optch )
{
case ‘h‘:
print_usage();
exit(-1);
case ‘?‘:
print_usage();
printf("unknown parameter: %c\n", optopt);
exit(-1);
case ‘:‘:
print_usage();
printf("need parameter: %c\n", optopt);
exit(-1);
case ‘c‘:
g_monitor_child = 1;
break;
case ‘s‘:
strncpy(g_host,optarg,sizeof(g_host));
break;
case ‘p‘:
strncpy(g_path,optarg,sizeof(g_path));
break;
default:
break;
}
}
}
void zktest_watcher_g(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx)
{
/*
printf("watcher event\n");
printf("type: %d\n", type);
printf("state: %d\n", state);
printf("path: %s\n", path);
printf("watcherCtx: %s\n", (char *)watcherCtx);
*/
if(type == ZOO_CHANGED_EVENT &&
state == ZOO_CONNECTED_STATE &&
g_monitor_child == 0){
show_notify(zh,g_path);
}else if(type == ZOO_CHILD_EVENT &&
state == ZOO_CONNECTED_STATE &&
g_monitor_child == 1){
show_list(zh,g_path);
}else if(type == ZOO_CHANGED_EVENT &&
state == ZOO_CONNECTED_STATE &&
g_monitor_child == 1){
show_list(zh,g_path);
}
}
void show_notify(zhandle_t *zkhandle,const char *path)
{
char notify_buffer[1024]={0};
int notify_len = sizeof(notify_buffer);
int ret = zoo_get(zkhandle,g_path,1,notify_buffer,¬ify_len,NULL);
if(ret != ZOK){
fprintf(stderr,"failed to get the data of path %s!\n",g_path);
}else{
printf("Notice:%s\n",notify_buffer);
}
}
void show_list(zhandle_t *zkhandle,const char *path)
{
struct String_vector children;
int i = 0;
int ret = zoo_get_children(zkhandle,path,1,&children);
if(ret == ZOK){
char child_path[512] ={0};
char notify_buffer[1024] = {0};
int notify_len = sizeof(notify_buffer);
printf("--------------\n");
for(i = 0; i < children.count; ++i){
sprintf(child_path,"%s/%s",g_path,children.data[i]);
ret = zoo_get(zkhandle,child_path,1,notify_buffer,¬ify_len,NULL);
if(ret != ZOK){
fprintf(stderr,"failed to get the data of path %s!\n",child_path);
}else{
printf("%s:%s\n",children.data[i],notify_buffer);
}
}
}else{
fprintf(stderr,"failed to get the children of path %s!\n",path);
}
for(i = 0; i < children.count; ++i){
free(children.data[i]);
children.data[i] = NULL;
}
}
int main(int argc, const char *argv[])
{
int timeout = 30000;
char path_buffer[512];
int bufferlen=sizeof(path_buffer);
zoo_set_debug_level(ZOO_LOG_LEVEL_WARN); //设置日志级别,避免出现一些其他信息
get_option(argc,argv);
zhandle_t* zkhandle = zookeeper_init(g_host,zktest_watcher_g, timeout, 0, (char *)"Notify Test", 0);
if (zkhandle ==NULL)
{
fprintf(stderr, "Error when connecting to zookeeper servers...\n");
exit(EXIT_FAILURE);
}
int ret = zoo_exists(zkhandle,g_path,0,NULL);
if(ret != ZOK){
ret = zoo_create(zkhandle,g_path,"1.0",strlen("1.0"),
&ZOO_OPEN_ACL_UNSAFE,0,
path_buffer,bufferlen);
if(ret != ZOK){
fprintf(stderr,"failed to create the path %s!\n",g_path);
}else{
printf("create path %s successfully!\n",g_path);
}
}
if(ret == ZOK && g_monitor_child == 0){
show_notify(zkhandle,g_path);
}else if(ret == ZOK && g_monitor_child == 1){
show_list(zkhandle,g_path);
}
getchar();
zookeeper_close(zkhandle);
return 0;
}
程序由3个参数选项
-s
:指定Zookeeper的服务器的ip:port-p
:指定要监控的路径,默认为/Notify-c
:使用此项表示监控子节点列表
如
notify -s172.17.0.36:2181 -p /Notify
当你在客户端修改数据的时候,程序就能收到对应的通知了。
原文:http://blog.csdn.net/qq910894904/article/details/40833981