This adds initial support for the ZooKeeper cluster driver. To use this driver, please specify comma separated host:port pairs (each corresponding to a ZooKeeper server) to the driver option. For example: $ sheep /store -c zookeeper:host1:3000,host2:3000,host3:3000 TODO: - use asynchronous ZooKeeper APIs - use watch notification instead of loop and sleep Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp> --- configure.ac | 14 ++ sheep/Makefile.am | 3 + sheep/cluster/zookeeper.c | 557 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 574 insertions(+), 0 deletions(-) create mode 100644 sheep/cluster/zookeeper.c diff --git a/configure.ac b/configure.ac index 5a0c9748..af15f8c 100644 --- a/configure.ac +++ b/configure.ac @@ -176,6 +176,11 @@ AC_ARG_ENABLE([corosync], [ enable_corosync="yes" ],) AM_CONDITIONAL(BUILD_COROSYNC, test x$enable_corosync = xyes) +AC_ARG_ENABLE([zookeeper], + [ --enable-zookeeper : build zookeeper cluster driver ],, + [ enable_zookeeper="no" ],) +AM_CONDITIONAL(BUILD_ZOOKEEPER, test x$enable_zookeeper = xyes) + AC_ARG_WITH([initddir], [ --with-initddir=DIR : path to init script directory. ], [ INITDDIR="$withval" ], @@ -227,6 +232,15 @@ if test "x${enable_corosync}" = xyes; then PACKAGE_FEATURES="$PACKAGE_FEATURES corosync" fi +if test "x${enable_zookeeper}" = xyes; then + AC_CHECK_LIB([zookeeper_mt], [zookeeper_init],, + AC_MSG_ERROR(libzookeeper not found)) + AC_CHECK_HEADERS([zookeeper/zookeeper.h],, + AC_MSG_ERROR(zookeeper.h header missing)) + AC_DEFINE_UNQUOTED([HAVE_ZOOKEEPER], 1, [have zookeeper]) + PACKAGE_FEATURES="$PACKAGE_FEATURES zookeeper" +fi + # extra warnings EXTRA_WARNINGS="" diff --git a/sheep/Makefile.am b/sheep/Makefile.am index da8be16..d86898b 100644 --- a/sheep/Makefile.am +++ b/sheep/Makefile.am @@ -28,6 +28,9 @@ sheep_SOURCES = sheep.c group.c sdnet.c store.c vdi.c work.c journal.c ops.c \ if BUILD_COROSYNC sheep_SOURCES += cluster/corosync.c endif +if BUILD_ZOOKEEPER +sheep_SOURCES += cluster/zookeeper.c +endif sheep_LDADD = $(libcpg_LIBS) $(libcfg_LIBS) ../lib/libsheepdog.a -lpthread sheep_DEPENDENCIES = ../lib/libsheepdog.a diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c new file mode 100644 index 0000000..3fb0744 --- /dev/null +++ b/sheep/cluster/zookeeper.c @@ -0,0 +1,557 @@ +/* + * Copyright (C) 2011 Nippon Telegraph and Telephone Corporation. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License version + * 2 as published by the Free Software Foundation. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +#include <stdio.h> +#include <string.h> +#include <unistd.h> +#include <netdb.h> +#include <search.h> +#include <assert.h> +#include <sys/eventfd.h> +#include <zookeeper/zookeeper.h> + +#include "cluster.h" +#include "work.h" + +#define MAX_EVENT_BUF_SIZE (64 * 1024) + +#define BASE_ZNODE "/sheepdog" +#define LOCK_ZNODE BASE_ZNODE "/lock" +#define QUEUE_ZNODE BASE_ZNODE "/queue" +#define MEMBER_ZNODE BASE_ZNODE "/member" + +/* iterate child znodes */ +#define FOR_EACH_ZNODE(zh, parent, path, strs) \ + for (zoo_get_children(zh, parent, 1, strs), \ + (strs)->data += (strs)->count; \ + (strs)->count-- ? \ + sprintf(path, "%s/%s", parent, *--(strs)->data) : \ + (free((strs)->data), 0); \ + free(*(strs)->data)) + +enum zk_event_type { + EVENT_JOIN = 1, + EVENT_LEAVE, + EVENT_NOTIFY, +}; + +struct zk_event { + enum zk_event_type type; + struct sheepdog_node_list_entry sender; + + size_t buf_len; + uint8_t buf[MAX_EVENT_BUF_SIZE]; + + size_t nr_nodes; /* the number of sheeps */ + struct sheepdog_node_list_entry nodes[SD_MAX_NODES]; + + enum cluster_join_result join_result; + + void (*block_cb)(void *arg); + + int blocked; /* set non-zero when sheep must block this event */ + int callbacked; /* set non-zero if sheep already called block_cb() */ +}; + + +/* ZooKeeper-based lock */ + +static void zk_lock(zhandle_t *zh) +{ + int rc; +again: + rc = zoo_create(zh, LOCK_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, + ZOO_EPHEMERAL, NULL, 0); + if (rc == ZOK) + return; + else if (rc == ZNODEEXISTS) { + dprintf("retry\n"); + usleep(10000); /* FIXME: use watch notification */ + goto again; + } else + panic("failed to create a lock znode\n"); +} + +static void zk_unlock(zhandle_t *zh) +{ + int rc; + + rc = zoo_delete(zh, LOCK_ZNODE, -1); + if (rc != ZOK) + panic("failed to release lock\n"); +} + + +/* ZooKeeper-based queue */ + +static int queue_pos; + +static int zk_queue_empty(zhandle_t *zh) +{ + int rc; + char path[256]; + + sprintf(path, QUEUE_ZNODE "/%010d", queue_pos); + + rc = zoo_exists(zh, path, 1, NULL); + if (rc == ZOK) + return 0; + + return 1; +} + +static void zk_queue_push(zhandle_t *zh, struct zk_event *ev) +{ + int rc; + char path[256], buf[256]; + + sprintf(path, "%s/", QUEUE_ZNODE); + rc = zoo_create(zh, path, (char *)ev, sizeof(*ev), + &ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE, buf, sizeof(buf)); + + if (queue_pos < 0) { + /* the first pushed data should be EVENT_JOIN */ + assert(ev->type == EVENT_JOIN); + sscanf(buf, QUEUE_ZNODE "/%010d", &queue_pos); + + /* watch */ + zoo_exists(zh, buf, 1, NULL); + } +} + +static int zk_queue_push_back(zhandle_t *zh, struct zk_event *ev) +{ + int rc; + char path[256]; + + queue_pos--; + + if (ev) { + /* update the last popped data */ + sprintf(path, QUEUE_ZNODE "/%010d", queue_pos); + rc = zoo_set(zh, path, (char *)ev, sizeof(*ev), -1); + } + + return 0; +} + +static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev) +{ + int rc, len; + char path[256]; + + if (zk_queue_empty(zh)) + return -1; + + sprintf(path, QUEUE_ZNODE "/%010d", queue_pos); + len = sizeof(*ev); + rc = zoo_get(zh, path, 1, (char *)ev, &len, NULL); + + /* watch next data */ + queue_pos++; + sprintf(path, QUEUE_ZNODE "/%010d", queue_pos); + zoo_exists(zh, path, 1, NULL); + + return 0; +} + +static int is_zk_queue_valid(zhandle_t *zh) +{ + int rc, len; + struct String_vector strs; + uint64_t joined; + char path[256]; + + FOR_EACH_ZNODE(zh, MEMBER_ZNODE, path, &strs) { + len = sizeof(joined); + rc = zoo_get(zh, path, 1, (char *)&joined, &len, NULL); + assert(rc == ZOK); + + if (joined) + return 1; + } + + return 0; +} + +static void zk_queue_init(zhandle_t *zh) +{ + int rc; + struct String_vector strs; + char path[256]; + + zoo_create(zh, BASE_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0); + zoo_create(zh, QUEUE_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0); + zoo_create(zh, MEMBER_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0); + + zk_lock(zh); + + queue_pos = -1; + + if (!is_zk_queue_valid(zh)) { + dprintf("clean zookeeper store\n"); + + FOR_EACH_ZNODE(zh, MEMBER_ZNODE, path, &strs) { + rc = zoo_delete(zh, path, -1); + assert(rc == ZOK); + } + + FOR_EACH_ZNODE(zh, QUEUE_ZNODE, path, &strs) { + rc = zoo_delete(zh, path, -1); + assert(rc == ZOK); + } + } + + zk_unlock(zh); +} + + +/* ZooKeeper driver APIs */ + +static zhandle_t *zhandle; +static int efd; + +static struct work_queue *zk_block_wq; + +static struct sheepdog_node_list_entry this_node; + +static struct cdrv_handlers zk_hdlrs; +static enum cluster_join_result (*zk_check_join_cb)( + struct sheepdog_node_list_entry *joining, void *opaque); + +/* get node list from the last pushed data */ +static size_t get_nodes(zhandle_t *zh, struct sheepdog_node_list_entry *nodes) +{ + int rc, len; + struct zk_event ev; + struct String_vector strs; + char path[256], max[256] = ""; + + FOR_EACH_ZNODE(zh, QUEUE_ZNODE, path, &strs) { + if (strcmp(max, path) < 0) + strcpy(max, path); + } + + if (max[0] == '\0') + return 0; + + len = sizeof(ev); + rc = zoo_get(zh, max, 1, (char *)&ev, &len, NULL); + assert(rc == ZOK); + + memcpy(nodes, ev.nodes, sizeof(ev.nodes)); + + return ev.nr_nodes; +} + +static int add_event(zhandle_t *zh, enum zk_event_type type, + struct sheepdog_node_list_entry *node, void *buf, + size_t buf_len, void (*block_cb)(void *arg)) +{ + int idx; + struct sheepdog_node_list_entry *n; + struct zk_event ev; + + zk_lock(zh); + + ev.type = type; + ev.sender = *node; + ev.buf_len = buf_len; + if (buf) + memcpy(ev.buf, buf, buf_len); + + ev.nr_nodes = get_nodes(zh, ev.nodes); + + switch (type) { + case EVENT_JOIN: + ev.blocked = 1; + ev.nodes[ev.nr_nodes] = *node; + ev.nr_nodes++; + break; + case EVENT_LEAVE: + n = lfind(node, ev.nodes, &ev.nr_nodes, sizeof(*n), node_cmp); + if (!n) + goto out; + idx = n - ev.nodes; + + ev.nr_nodes--; + memmove(n, n + 1, sizeof(*n) * (ev.nr_nodes - idx)); + break; + case EVENT_NOTIFY: + ev.blocked = !!block_cb; + ev.block_cb = block_cb; + break; + } + + zk_queue_push(zh, &ev); +out: + zk_unlock(zh); + + return 0; +} + +static void watcher(zhandle_t *zh, int type, int state, const char *path, void* ctx) +{ + eventfd_t value = 1; + char str[256]; + int ret, i; + size_t nr_nodes; + struct sheepdog_node_list_entry nodes[SD_MAX_NODES]; + + if (type == ZOO_DELETED_EVENT) { + ret = sscanf(path, MEMBER_ZNODE "/%[^\n]", str); + if (ret != 1) + goto out; + + /* check the failed node */ + nr_nodes = get_nodes(zh, nodes); + for (i = 0; i < nr_nodes; i++) { + if (strcmp(str, node_to_str(nodes + i)) == 0) { + add_event(zh, EVENT_LEAVE, nodes + i, NULL, 0, + NULL); + goto out; + } + } + } +out: + eventfd_write(efd, value); +} + +static int get_addr(uint8_t *bytes) +{ + int ret; + char name[INET6_ADDRSTRLEN]; + struct addrinfo hints, *res, *res0; + + gethostname(name, sizeof(name)); + + memset(&hints, 0, sizeof(hints)); + + hints.ai_socktype = SOCK_STREAM; + ret = getaddrinfo(name, NULL, &hints, &res0); + if (ret) + exit(1); + + for (res = res0; res; res = res->ai_next) { + if (res->ai_family == AF_INET) { + struct sockaddr_in *addr; + addr = (struct sockaddr_in *)res->ai_addr; + + if (((char *) &addr->sin_addr)[0] == 127) + continue; + + memset(bytes, 0, 12); + memcpy(bytes + 12, &addr->sin_addr, 4); + break; + } else if (res->ai_family == AF_INET6) { + struct sockaddr_in6 *addr; + uint8_t localhost[16] = { 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 1 }; + + addr = (struct sockaddr_in6 *)res->ai_addr; + + if (memcmp(&addr->sin6_addr, localhost, 16) == 0) + continue; + + memcpy(bytes, &addr->sin6_addr, 16); + break; + } else + dprintf("unknown address family\n"); + } + + if (res == NULL) { + eprintf("failed to get address info\n"); + return -1; + } + + freeaddrinfo(res0); + + return 0; +} + +static int zk_init(struct cdrv_handlers *handlers, const char *option, + uint8_t *myaddr) +{ + zk_hdlrs = *handlers; + if (!option) { + eprintf("specify comma separated host:port pairs, each corresponding to a zk server.\n"); + eprintf("e.g. sheep /store -c zookeeper:127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002\n"); + return -1; + } + + zhandle = zookeeper_init(option, watcher, 10000, 0, NULL, 0); + if (!zhandle) { + eprintf("failed to connect to zk server %s\n", option); + return -1; + } + + if (get_addr(myaddr) < 0) + return -1; + + zk_queue_init(zhandle); + + efd = eventfd(0, EFD_NONBLOCK); + if (efd < 0) { + eprintf("failed to create an event fd: %m\n"); + return -1; + } + + zk_block_wq = init_work_queue(1); + + return efd; +} + +static int zk_join(struct sheepdog_node_list_entry *myself, + enum cluster_join_result (*check_join_cb)( + struct sheepdog_node_list_entry *joining, + void *opaque), + void *opaque, size_t opaque_len) +{ + int rc; + uint64_t joined; + char path[256]; + + this_node = *myself; + zk_check_join_cb = check_join_cb; + + sprintf(path, MEMBER_ZNODE "/%s", node_to_str(myself)); + joined = 0; + rc = zoo_create(zhandle, path, (char *)&joined, sizeof(joined), + &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, NULL, 0); + if (rc != ZOK) + panic("failed to create an ephemeral znode\n"); + + return add_event(zhandle, EVENT_JOIN, &this_node, opaque, opaque_len, NULL); +} + +static int zk_leave(void) +{ + return add_event(zhandle, EVENT_LEAVE, &this_node, NULL, 0, NULL); +} + +static int zk_notify(void *msg, size_t msg_len, void (*block_cb)(void *arg)) +{ + return add_event(zhandle, EVENT_NOTIFY, &this_node, msg, msg_len, block_cb); +} + +static void zk_block(struct work *work, int idx) +{ + struct zk_event ev; + + zk_queue_pop(zhandle, &ev); + + ev.block_cb(ev.buf); + ev.blocked = 0; + + zk_queue_push_back(zhandle, &ev); +} + +static void zk_block_done(struct work *work, int idx) +{ +} + +static int zk_dispatch(void) +{ + int ret, rc; + char path[256]; + uint64_t joined; + eventfd_t value; + struct zk_event ev; + enum cluster_join_result res; + static struct work work = { + .fn = zk_block, + .done = zk_block_done, + }; + + dprintf("read event\n"); + ret = eventfd_read(efd, &value); + if (ret < 0) + return 0; + + ret = zk_queue_pop(zhandle, &ev); + if (ret < 0) + goto out; + + switch (ev.type) { + case EVENT_JOIN: + if (ev.blocked) { + if (node_cmp(&ev.nodes[0], &this_node) == 0) { + res = zk_check_join_cb(&ev.sender, ev.buf); + ev.join_result = res; + ev.blocked = 0; + + sprintf(path, MEMBER_ZNODE "/%s", node_to_str(&ev.sender)); + joined = 1; + rc = zoo_set(zhandle, path, (char *)&joined, sizeof(joined), -1); + assert(rc == ZOK); + + zk_queue_push_back(zhandle, &ev); + + if (res == CJ_RES_MASTER_TRANSFER) { + eprintf("failed to join sheepdog cluster: " + "please retry when master is up\n"); + exit(1); + } + } else + zk_queue_push_back(zhandle, NULL); + + goto out; + } + + if (ev.join_result == CJ_RES_MASTER_TRANSFER) { + /* FIXME: This code is tricky, but Sheepdog assumes that */ + /* nr_nodes = 1 when join_result = MASTER_TRANSFER... */ + ev.nr_nodes = 1; + ev.nodes[0] = this_node; + zk_queue_push_back(zhandle, &ev); + zk_queue_pop(zhandle, &ev); + } + + sprintf(path, MEMBER_ZNODE "/%s", node_to_str(&ev.sender)); + zoo_exists(zhandle, path, 1, NULL); + + zk_hdlrs.join_handler(&ev.sender, ev.nodes, ev.nr_nodes, + ev.join_result, ev.buf); + break; + case EVENT_LEAVE: + zk_hdlrs.leave_handler(&ev.sender, ev.nodes, ev.nr_nodes); + break; + case EVENT_NOTIFY: + if (ev.blocked) { + if (node_cmp(&ev.sender, &this_node) == 0 && !ev.callbacked) { + queue_work(zk_block_wq, &work); + + ev.callbacked = 1; + + zk_queue_push_back(zhandle, &ev); + } else + zk_queue_push_back(zhandle, NULL); + + goto out; + } + + zk_hdlrs.notify_handler(&ev.sender, ev.buf, ev.buf_len); + break; + } +out: + return 0; +} + +struct cluster_driver cdrv_zookeeper = { + .name = "zookeeper", + + .init = zk_init, + .join = zk_join, + .leave = zk_leave, + .notify = zk_notify, + .dispatch = zk_dispatch, +}; + +cdrv_register(cdrv_zookeeper); -- 1.7.2.5 |