C++集成zookeeper客户端

zookeeper是Java下常用的分布式应用程序协调服务。因接触到Java业务系统的一些架构,比如流行的微服务架构等等都采用zookeeper做服务发现。某些系统服务可以采用其他语言如C\C++实现来提升性能或者实现某些特殊功能。

本文为C++集成zookeeper客户端的开发调研。

开始

zookeeper的C\C++开发客户端源代码和zookeeper在一起,从官网下载zookeeper包,解压后C\C++客户端在src\c目录中。网上说多线程程序链接zookeeper_mt,单线程zookeeper_st,实测在Windows环境下只有zookeeper一个库,请注意。

数据模型

zookeeper表现为一个分层的文件系统目录树结构(不同于文件系统的是,节点可以有自己的数据,而文件系统中的目录节点只有子节点)。

监视WATCHES

zookeeper中所有的读操作都可以设置监视,监视事件可以理解为一次性触发器。其中:

  • 一次性触发

    当设置监视的数据发生改变时,该监视事件会被发送到客户端,例如,如果客户端调用了getData(“/znode1”,true) 并且稍后/znode1节点上的数据发生了改变或者被删除了,客户端将会获取到/znode1发生变化的监视事件,而如果/znode1再一次发生了变化,除非客户端再次对/znode1设置监视,否则客户端不会收到事件通知。

  • 发送至客户端

    zookeeper客户端和服务端是通过socket进行通信的,由于网络存在故障,所以监视事件很有可能不会成功地到达客户端,监视事件是异步发送至监视者的,zookeeper本身提供了保序性(ordering guarantee):即客户端只有首先看到了监视事件后,才会感知到它所设置监视的znode发生了变化(a client will never see a hange for which it has set a watch until it first sees the watch event)。网络延迟或者其他因素可能导致不同的客户端在不同的时刻感知某一监视事件,但是不同的客户端所看到的一切具有一致的顺序。

  • 被监视的数据

    这意味着znode节点本身具有不同的改变方式。你也可以想象Zookeeper维护了两条监视链表:数据监视和子节点监视(data watches and child watches) getData() and exists()设置数据监视,getChildren()设置子节点监视。 或者,你也可以想象zookeeper设置的不同监视返回不同的数据,getData()和exists()返回znode节点的相关信息,而getChildren()返回子节点列表。因此,setData()会触发设置在某一节点上所设置的数据监视(假定数据设置成功),而一次成功的create()操作则会出发当前节点上所设置的数据监视以及父节点的子节点监视。一次成功的delete()操作将会触发当前节点的数据监视和子节点监视事件,同时也会触发该节点父节点的child watch。

zookeeper中的监视是轻量级的,因此容易设置、维护和分发。当客户端与zookeeper服务器端失去联系时,客户端并不会收到监视事件的通知,只有当客户端重新连接后,若在必要的情况下,以前注册的监视会重新被注册并触发,对于开发人员来说 这通常是透明的。只有一种情况会导致监视事件的丢失,即:通过exists()设置了某个znode节点的监视,但是如果某个客户端在此znode节点被创建和删除的时间间隔内与zookeeper服务器失去了联系,该客户端即使稍后重新连接zookeeper服务器后也得不到事件通知。

连接状态Stat相关的常量

  • ZOO_EXPIRED_SESSION_STATE
  • ZOO_AUTH_FAILED_STATE
  • ZOO_CONNECTING_STATE
  • ZOO_ASSOCIATING_STATE
  • ZOO_CONNECTED_STATE

与监视类型相关的常量

  • ZOO_CREATED_EVENT; //节点被创建(此前该节点不存在),通过zoo_exists()设置监视。
  • ZOO_DELETED_EVENT; //节点被删除,通过zoo_exists()和zoo_get()设置监视。
  • ZOO_CHANGED_EVENT; //节点发生变化,通过zoo_exists()和zoo_get()设置监视。
  • ZOO_CHILD_EVENT; //子节点事件,通过zoo_get_children()和zoo_get_children2()设置监视。
  • ZOO_SESSION_EVENT; //会话丢失
  • ZOO_NOTWATCHING_EVENT; //监视被移除。

开发

初始化

/*const char* host = "127.0.0.1:2181,127.0.0.1:2182," 
"127.0.0.1:2183,127.0.0.1:2184,127.0.0.1:2185";*/
const char* host = "192.168.58.80:2181";
int timeout = 30000;
zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
// 初始化 zookeeper ,倒数第二个参数没啥用,
// 但是可以作为内部的参数传递,对应 zktest_watcher_g 函数最后一个参数
zkhandle = zookeeper_init(host,
zktest_watcher_g, timeout, 0, "hello zookeeper.", 0);
if (zkhandle == NULL) {
fprintf(stderr, "Error when connecting to zookeeper servers...\n");
exit(EXIT_FAILURE);
}

其中 zktest_watcher_g 是监视回调函数,格式如下

void zktest_watcher_g(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx);

watcherCtx 设置的值,见前面代码注释;

释放

使用结束后释放

zookeeper_close(zkhandle);

监视数据

一般在初始化的回调中会产生一些回调事件,如连接状态等,监视数据我们就在连接成功之后触发,且目录必须是节点创建之后才可以监听。

void zktest_watcher_g(zhandle_t *zh, int type, int state,
const char *path, void *watcherCtx) {
printf("Something happened.\n");
printf("type: %d\t", type);
printf("state: %d\t", state);
printf("path: %s\t", path);
printf("watcherCtx: %s\n", (char *) watcherCtx);

//事件类型
if (type == ZOO_SESSION_EVENT) {
if (state == ZOO_CONNECTED_STATE) {
printf("--------------ZOO_CONNECTED_STATE----------------\n");
int ret = zoo_aexists(zkhandle, "/xyz", 1, zktest_stat_completion, "aexists");

//我们真实的情况应该是监听一个目录
zoo_awget_children2(zkhandle, "/xyz", zktest_watcher_g,
"childwatchctx", zktest_string_completion2, "childwatch");
}
} else if (type == ZOO_CREATED_EVENT || type == ZOO_DELETED_EVENT ||
type == ZOO_CHANGED_EVENT) {
printf("---ZOO_CREATED_EVENT|ZOO_DELETED_EVENT|ZOO_CHANGED_EVENT---\n");
//zoo_aexists 可以监听 ZOO_CREATED_EVENT;ZOO_DELETED_EVENT;ZOO_CHANGED_EVENT
int ret = zoo_aexists(zkhandle, "/xyz", 1, zktest_stat_completion, "aexists");

///!!!必须在监听的目录创建之后才可以监听
if (type == ZOO_CREATED_EVENT) {
zoo_awget_children2(zkhandle, "/xyz", zktest_watcher_g,
"childwatchctx", zktest_string_completion2, "childwatch");
}
} else if (type == ZOO_CHILD_EVENT) {
printf("--------------ZOO_CHILD_EVENT----------------\n");
//zoo_get_children() 和 zoo_get_children2() 监视的消息

//测试此处貌似还必须继续监听才行
zoo_awget_children2(zkhandle, "/xyz", zktest_watcher_g,
"childwatchctx", zktest_string_completion2, "childwatch");
} else if (type == ZOO_SESSION_EVENT) {
//会话丢失
printf("--------------ZOO_SESSION_EVENT----------------\n");
} else if (type == ZOO_NOTWATCHING_EVENT) {
//监视被移除
printf("--------------ZOO_NOTWATCHING_EVENT----------------\n");
}
}

响应的回调函数实现如下,特别注意各个数据之间的流程(在回调中根据返回码确认是否继续监听),数据的监听采用exist,之后get,节点则用zoo_awget_children2监听节点变化,之后根据变化再获取值,各处注意之处可看注释及日志输出说明。

void zktest_dump_stat(const struct Stat *stat) {
if (!stat) {
return;
}

fprintf(stderr, "version=%x\taversion=%x"
"\tephemeralOwner = %llx\n",
(unsigned int) stat->version, (unsigned int) stat->aversion,
stat->ephemeralOwner);
}

void zktest_string_completion2(int rc, const struct String_vector *name,
const struct Stat *stat, const void *data) {
fprintf(stderr, "zktest_string_completion2: rc = %d\n", rc);
if (rc == 0) {
for (int i = 0; i < name->count; i++) {
fprintf(stderr, "name = %s\n", name->data[i]);
}
}

zktest_dump_stat(stat);
}

void zktest_data_completion(int rc, const char *value, int value_len,
const struct Stat *stat, const void *data) {
if (rc == ZOK) {
//value和value_len就是取出来的值
std::string buf(value, value_len);
fprintf(stderr, "%s: rc = %d valle=%s\n",
(char *) data, rc, buf.c_str());
} else {
fprintf(stderr, "%s: rc = %d valle=%s\n",
(char *) data, rc, value);
}
}

void zktest_watcher_g(zhandle_t *zh, int type, int state,
const char *path, void *watcherCtx);

void zktest_stat_completion(int rc, const struct Stat *stat, const void *data) {
fprintf(stderr, "%s: rc = %d Stat:\n", (char *) data, rc);


if (rc == ZNONODE) {
//数据被删除了
} else if (rc == ZOK) {
zktest_dump_stat(stat);
//监听到变化就取值
int ret = zoo_aget(zkhandle, "/xyz", NULL, zktest_data_completion, "mydata");
} else {
//其他错误
}
}

测试

zookeeper安装目录bin下有客户端zkCli.cmd (linux下为zkClient.sh ),启动之后默认连接本地服务,可使用connect命令连接远程。
上述例子(可见完整代码)测试,可先启动程序,再执行创建命令,在zookeeper中创建目录必须层层创建,比如例子中/xyz/目录下的/xyz/abc,首先建立/xyz节点(且实际测试必须赋值,网上提供的例子均不需要赋值)

create /xyz "anyvalue"

当然可通过set修改值

set /xyz "newvalue"

创建目录(子节点)

create /xyz/abc "nodevalue"

修改节点

set /xyz/abc "newvalue"

删除节点

rmr /xyz

依次操作可查看程序日志输出,同时也可先创建节点再启动程序进行修改等操作观察日志输出。即可了解zookeeper客户端的回调机制。

最近的文章

用微信做现网告警

一般对于比较重要的现网监控,都有专人维护。但是个别规模很小切偶发问题几率很小的业务,用现有的邮件告警实时性比较低,一个是因为非办公地点很少登录邮件,另外一个就是垃圾邮件比较多。而短信和微信就比较方便了,不过短信一般需要收费,我们可以使用免费的微信实现实时告警消息。 实现业务是一个Spring的Web …

技术 继续阅读
更早的文章

网页自适应显示WebP图片

生命不息,折腾不止。 起因最近上传了个背景图,加载的非常慢,才发现上传了个1M多的图。优化手段无非就是做做图片裁剪已经质量压缩,网上确实也有不少的分析压缩的方法,但基本都是jpeg和png等。偶然发现最近流行的WebP格式,又是对图片压缩的一大进步,可以大幅优化流量。 WebP就不多做介绍了,有兴趣 …

技术 继续阅读