[Sheepdog] [PATCH] cluster: add zookeeper cluster driver

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Fri Nov 11 10:06:48 CET 2011


At Tue,  8 Nov 2011 23:45:43 +0900,
MORITA Kazutaka wrote:
> 
> This adds initial support for the ZooKeeper cluster driver.  To use
> this driver, please specify comma separated host:port pairs (each
> corresponding to a ZooKeeper server) to the driver option.
> 
> For example:
>   $ sheep /store -c zookeeper:host1:3000,host2:3000,host3:3000
> 
> TODO:
>  - use asynchronous ZooKeeper APIs
>  - use watch notification instead of loop and sleep
> 
> Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
> ---
>  configure.ac              |   14 ++
>  sheep/Makefile.am         |    3 +
>  sheep/cluster/zookeeper.c |  557 +++++++++++++++++++++++++++++++++++++++++++++
>  3 files changed, 574 insertions(+), 0 deletions(-)
>  create mode 100644 sheep/cluster/zookeeper.c

Applied.

Kazutaka

> 
> diff --git a/configure.ac b/configure.ac
> index 5a0c9748..af15f8c 100644
> --- a/configure.ac
> +++ b/configure.ac
> @@ -176,6 +176,11 @@ AC_ARG_ENABLE([corosync],
>  	[ enable_corosync="yes" ],)
>  AM_CONDITIONAL(BUILD_COROSYNC, test x$enable_corosync = xyes)
>  
> +AC_ARG_ENABLE([zookeeper],
> +	[  --enable-zookeeper      : build zookeeper cluster driver ],,
> +	[ enable_zookeeper="no" ],)
> +AM_CONDITIONAL(BUILD_ZOOKEEPER, test x$enable_zookeeper = xyes)
> +
>  AC_ARG_WITH([initddir],
>  	[  --with-initddir=DIR     : path to init script directory. ],
>  	[ INITDDIR="$withval" ],
> @@ -227,6 +232,15 @@ if test "x${enable_corosync}" = xyes; then
>  	PACKAGE_FEATURES="$PACKAGE_FEATURES corosync"
>  fi
>  
> +if test "x${enable_zookeeper}" = xyes; then
> +	AC_CHECK_LIB([zookeeper_mt], [zookeeper_init],,
> +		AC_MSG_ERROR(libzookeeper not found))
> +	AC_CHECK_HEADERS([zookeeper/zookeeper.h],,
> +		AC_MSG_ERROR(zookeeper.h header missing))
> +	AC_DEFINE_UNQUOTED([HAVE_ZOOKEEPER], 1, [have zookeeper])
> +	PACKAGE_FEATURES="$PACKAGE_FEATURES zookeeper"
> +fi
> +
>  # extra warnings
>  EXTRA_WARNINGS=""
>  
> diff --git a/sheep/Makefile.am b/sheep/Makefile.am
> index da8be16..d86898b 100644
> --- a/sheep/Makefile.am
> +++ b/sheep/Makefile.am
> @@ -28,6 +28,9 @@ sheep_SOURCES		= sheep.c group.c sdnet.c store.c vdi.c work.c journal.c ops.c \
>  if BUILD_COROSYNC
>  sheep_SOURCES		+= cluster/corosync.c
>  endif
> +if BUILD_ZOOKEEPER
> +sheep_SOURCES		+= cluster/zookeeper.c
> +endif
>  
>  sheep_LDADD	  	= $(libcpg_LIBS) $(libcfg_LIBS) ../lib/libsheepdog.a -lpthread
>  sheep_DEPENDENCIES	= ../lib/libsheepdog.a
> diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
> new file mode 100644
> index 0000000..3fb0744
> --- /dev/null
> +++ b/sheep/cluster/zookeeper.c
> @@ -0,0 +1,557 @@
> +/*
> + * Copyright (C) 2011 Nippon Telegraph and Telephone Corporation.
> + *
> + * This program is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU General Public License version
> + * 2 as published by the Free Software Foundation.
> + *
> + * You should have received a copy of the GNU General Public License
> + * along with this program. If not, see <http://www.gnu.org/licenses/>.
> + */
> +#include <stdio.h>
> +#include <string.h>
> +#include <unistd.h>
> +#include <netdb.h>
> +#include <search.h>
> +#include <assert.h>
> +#include <sys/eventfd.h>
> +#include <zookeeper/zookeeper.h>
> +
> +#include "cluster.h"
> +#include "work.h"
> +
> +#define MAX_EVENT_BUF_SIZE (64 * 1024)
> +
> +#define BASE_ZNODE "/sheepdog"
> +#define LOCK_ZNODE BASE_ZNODE "/lock"
> +#define QUEUE_ZNODE BASE_ZNODE "/queue"
> +#define MEMBER_ZNODE BASE_ZNODE "/member"
> +
> +/* iterate child znodes */
> +#define FOR_EACH_ZNODE(zh, parent, path, strs)			       \
> +	for (zoo_get_children(zh, parent, 1, strs),		       \
> +		     (strs)->data += (strs)->count;		       \
> +	     (strs)->count-- ?					       \
> +		     sprintf(path, "%s/%s", parent, *--(strs)->data) : \
> +		     (free((strs)->data), 0);			       \
> +	     free(*(strs)->data))
> +
> +enum zk_event_type {
> +	EVENT_JOIN = 1,
> +	EVENT_LEAVE,
> +	EVENT_NOTIFY,
> +};
> +
> +struct zk_event {
> +	enum zk_event_type type;
> +	struct sheepdog_node_list_entry sender;
> +
> +	size_t buf_len;
> +	uint8_t buf[MAX_EVENT_BUF_SIZE];
> +
> +	size_t nr_nodes; /* the number of sheeps */
> +	struct sheepdog_node_list_entry nodes[SD_MAX_NODES];
> +
> +	enum cluster_join_result join_result;
> +
> +	void (*block_cb)(void *arg);
> +
> +	int blocked; /* set non-zero when sheep must block this event */
> +	int callbacked; /* set non-zero if sheep already called block_cb() */
> +};
> +
> +
> +/* ZooKeeper-based lock */
> +
> +static void zk_lock(zhandle_t *zh)
> +{
> +	int rc;
> +again:
> +	rc = zoo_create(zh, LOCK_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE,
> +			ZOO_EPHEMERAL, NULL, 0);
> +	if (rc == ZOK)
> +		return;
> +	else if (rc == ZNODEEXISTS) {
> +		dprintf("retry\n");
> +		usleep(10000); /* FIXME: use watch notification */
> +		goto again;
> +	} else
> +		panic("failed to create a lock znode\n");
> +}
> +
> +static void zk_unlock(zhandle_t *zh)
> +{
> +	int rc;
> +
> +	rc = zoo_delete(zh, LOCK_ZNODE, -1);
> +	if (rc != ZOK)
> +		panic("failed to release lock\n");
> +}
> +
> +
> +/* ZooKeeper-based queue */
> +
> +static int queue_pos;
> +
> +static int zk_queue_empty(zhandle_t *zh)
> +{
> +	int rc;
> +	char path[256];
> +
> +	sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
> +
> +	rc = zoo_exists(zh, path, 1, NULL);
> +	if (rc == ZOK)
> +		return 0;
> +
> +	return 1;
> +}
> +
> +static void zk_queue_push(zhandle_t *zh, struct zk_event *ev)
> +{
> +	int rc;
> +	char path[256], buf[256];
> +
> +	sprintf(path, "%s/", QUEUE_ZNODE);
> +	rc = zoo_create(zh, path, (char *)ev, sizeof(*ev),
> +			&ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE, buf, sizeof(buf));
> +
> +	if (queue_pos < 0) {
> +		/* the first pushed data should be EVENT_JOIN */
> +		assert(ev->type == EVENT_JOIN);
> +		sscanf(buf, QUEUE_ZNODE "/%010d", &queue_pos);
> +
> +		/* watch */
> +		zoo_exists(zh, buf, 1, NULL);
> +	}
> +}
> +
> +static int zk_queue_push_back(zhandle_t *zh, struct zk_event *ev)
> +{
> +	int rc;
> +	char path[256];
> +
> +	queue_pos--;
> +
> +	if (ev) {
> +		/* update the last popped data */
> +		sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
> +		rc = zoo_set(zh, path, (char *)ev, sizeof(*ev), -1);
> +	}
> +
> +	return 0;
> +}
> +
> +static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
> +{
> +	int rc, len;
> +	char path[256];
> +
> +	if (zk_queue_empty(zh))
> +		return -1;
> +
> +	sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
> +	len = sizeof(*ev);
> +	rc = zoo_get(zh, path, 1, (char *)ev, &len, NULL);
> +
> +	/* watch next data */
> +	queue_pos++;
> +	sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
> +	zoo_exists(zh, path, 1, NULL);
> +
> +	return 0;
> +}
> +
> +static int is_zk_queue_valid(zhandle_t *zh)
> +{
> +	int rc, len;
> +	struct String_vector strs;
> +	uint64_t joined;
> +	char path[256];
> +
> +	FOR_EACH_ZNODE(zh, MEMBER_ZNODE, path, &strs) {
> +		len = sizeof(joined);
> +		rc = zoo_get(zh, path, 1, (char *)&joined, &len, NULL);
> +		assert(rc == ZOK);
> +
> +		if (joined)
> +			return 1;
> +	}
> +
> +	return 0;
> +}
> +
> +static void zk_queue_init(zhandle_t *zh)
> +{
> +	int rc;
> +	struct String_vector strs;
> +	char path[256];
> +
> +	zoo_create(zh, BASE_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0);
> +	zoo_create(zh, QUEUE_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0);
> +	zoo_create(zh, MEMBER_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0);
> +
> +	zk_lock(zh);
> +
> +	queue_pos = -1;
> +
> +	if (!is_zk_queue_valid(zh)) {
> +		dprintf("clean zookeeper store\n");
> +
> +		FOR_EACH_ZNODE(zh, MEMBER_ZNODE, path, &strs) {
> +			rc = zoo_delete(zh, path, -1);
> +			assert(rc == ZOK);
> +		}
> +
> +		FOR_EACH_ZNODE(zh, QUEUE_ZNODE, path, &strs) {
> +			rc = zoo_delete(zh, path, -1);
> +			assert(rc == ZOK);
> +		}
> +	}
> +
> +	zk_unlock(zh);
> +}
> +
> +
> +/* ZooKeeper driver APIs */
> +
> +static zhandle_t *zhandle;
> +static int efd;
> +
> +static struct work_queue *zk_block_wq;
> +
> +static struct sheepdog_node_list_entry this_node;
> +
> +static struct cdrv_handlers zk_hdlrs;
> +static enum cluster_join_result (*zk_check_join_cb)(
> +	struct sheepdog_node_list_entry *joining, void *opaque);
> +
> +/* get node list from the last pushed data */
> +static size_t get_nodes(zhandle_t *zh, struct sheepdog_node_list_entry *nodes)
> +{
> +	int rc, len;
> +	struct zk_event ev;
> +	struct String_vector strs;
> +	char path[256], max[256] = "";
> +
> +	FOR_EACH_ZNODE(zh, QUEUE_ZNODE, path, &strs) {
> +		if (strcmp(max, path) < 0)
> +			strcpy(max, path);
> +	}
> +
> +	if (max[0] == '\0')
> +		return 0;
> +
> +	len = sizeof(ev);
> +	rc = zoo_get(zh, max, 1, (char *)&ev, &len, NULL);
> +	assert(rc == ZOK);
> +
> +	memcpy(nodes, ev.nodes, sizeof(ev.nodes));
> +
> +	return ev.nr_nodes;
> +}
> +
> +static int add_event(zhandle_t *zh, enum zk_event_type type,
> +		     struct sheepdog_node_list_entry *node, void *buf,
> +		     size_t buf_len, void (*block_cb)(void *arg))
> +{
> +	int idx;
> +	struct sheepdog_node_list_entry *n;
> +	struct zk_event ev;
> +
> +	zk_lock(zh);
> +
> +	ev.type = type;
> +	ev.sender = *node;
> +	ev.buf_len = buf_len;
> +	if (buf)
> +		memcpy(ev.buf, buf, buf_len);
> +
> +	ev.nr_nodes = get_nodes(zh, ev.nodes);
> +
> +	switch (type) {
> +	case EVENT_JOIN:
> +		ev.blocked = 1;
> +		ev.nodes[ev.nr_nodes] = *node;
> +		ev.nr_nodes++;
> +		break;
> +	case EVENT_LEAVE:
> +		n = lfind(node, ev.nodes, &ev.nr_nodes, sizeof(*n), node_cmp);
> +		if (!n)
> +			goto out;
> +		idx = n - ev.nodes;
> +
> +		ev.nr_nodes--;
> +		memmove(n, n + 1, sizeof(*n) * (ev.nr_nodes - idx));
> +		break;
> +	case EVENT_NOTIFY:
> +		ev.blocked = !!block_cb;
> +		ev.block_cb = block_cb;
> +		break;
> +	}
> +
> +	zk_queue_push(zh, &ev);
> +out:
> +	zk_unlock(zh);
> +
> +	return 0;
> +}
> +
> +static void watcher(zhandle_t *zh, int type, int state, const char *path, void* ctx)
> +{
> +	eventfd_t value = 1;
> +	char str[256];
> +	int ret, i;
> +	size_t nr_nodes;
> +	struct sheepdog_node_list_entry nodes[SD_MAX_NODES];
> +
> +	if (type == ZOO_DELETED_EVENT) {
> +		ret = sscanf(path, MEMBER_ZNODE "/%[^\n]", str);
> +		if (ret != 1)
> +			goto out;
> +
> +		/* check the failed node */
> +		nr_nodes = get_nodes(zh, nodes);
> +		for (i = 0; i < nr_nodes; i++) {
> +			if (strcmp(str, node_to_str(nodes + i)) == 0) {
> +				add_event(zh, EVENT_LEAVE, nodes + i, NULL, 0,
> +					  NULL);
> +				goto out;
> +			}
> +		}
> +	}
> +out:
> +	eventfd_write(efd, value);
> +}
> +
> +static int get_addr(uint8_t *bytes)
> +{
> +	int ret;
> +	char name[INET6_ADDRSTRLEN];
> +	struct addrinfo hints, *res, *res0;
> +
> +	gethostname(name, sizeof(name));
> +
> +	memset(&hints, 0, sizeof(hints));
> +
> +	hints.ai_socktype = SOCK_STREAM;
> +	ret = getaddrinfo(name, NULL, &hints, &res0);
> +	if (ret)
> +		exit(1);
> +
> +	for (res = res0; res; res = res->ai_next) {
> +		if (res->ai_family == AF_INET) {
> +			struct sockaddr_in *addr;
> +			addr = (struct sockaddr_in *)res->ai_addr;
> +
> +			if (((char *) &addr->sin_addr)[0] == 127)
> +				continue;
> +
> +			memset(bytes, 0, 12);
> +			memcpy(bytes + 12, &addr->sin_addr, 4);
> +			break;
> +		} else if (res->ai_family == AF_INET6) {
> +			struct sockaddr_in6 *addr;
> +			uint8_t localhost[16] = { 0, 0, 0, 0, 0, 0, 0, 0,
> +						  0, 0, 0, 0, 0, 0, 0, 1 };
> +
> +			addr = (struct sockaddr_in6 *)res->ai_addr;
> +
> +			if (memcmp(&addr->sin6_addr, localhost, 16) == 0)
> +				continue;
> +
> +			memcpy(bytes, &addr->sin6_addr, 16);
> +			break;
> +		} else
> +			dprintf("unknown address family\n");
> +	}
> +
> +	if (res == NULL) {
> +		eprintf("failed to get address info\n");
> +		return -1;
> +	}
> +
> +	freeaddrinfo(res0);
> +
> +	return 0;
> +}
> +
> +static int zk_init(struct cdrv_handlers *handlers, const char *option,
> +		   uint8_t *myaddr)
> +{
> +	zk_hdlrs = *handlers;
> +	if (!option) {
> +		eprintf("specify comma separated host:port pairs, each corresponding to a zk server.\n");
> +		eprintf("e.g. sheep /store -c zookeeper:127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002\n");
> +		return -1;
> +	}
> +
> +	zhandle = zookeeper_init(option, watcher, 10000, 0, NULL, 0);
> +	if (!zhandle) {
> +		eprintf("failed to connect to zk server %s\n", option);
> +		return -1;
> +	}
> +
> +	if (get_addr(myaddr) < 0)
> +		return -1;
> +
> +	zk_queue_init(zhandle);
> +
> +	efd = eventfd(0, EFD_NONBLOCK);
> +	if (efd < 0) {
> +		eprintf("failed to create an event fd: %m\n");
> +		return -1;
> +	}
> +
> +	zk_block_wq = init_work_queue(1);
> +
> +	return efd;
> +}
> +
> +static int zk_join(struct sheepdog_node_list_entry *myself,
> +		   enum cluster_join_result (*check_join_cb)(
> +			   struct sheepdog_node_list_entry *joining,
> +			   void *opaque),
> +		   void *opaque, size_t opaque_len)
> +{
> +	int rc;
> +	uint64_t joined;
> +	char path[256];
> +
> +	this_node = *myself;
> +	zk_check_join_cb = check_join_cb;
> +
> +	sprintf(path, MEMBER_ZNODE "/%s", node_to_str(myself));
> +	joined = 0;
> +	rc = zoo_create(zhandle, path, (char *)&joined, sizeof(joined),
> +			&ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, NULL, 0);
> +	if (rc != ZOK)
> +		panic("failed to create an ephemeral znode\n");
> +
> +	return add_event(zhandle, EVENT_JOIN, &this_node, opaque, opaque_len, NULL);
> +}
> +
> +static int zk_leave(void)
> +{
> +	return add_event(zhandle, EVENT_LEAVE, &this_node, NULL, 0, NULL);
> +}
> +
> +static int zk_notify(void *msg, size_t msg_len, void (*block_cb)(void *arg))
> +{
> +	return add_event(zhandle, EVENT_NOTIFY, &this_node, msg, msg_len, block_cb);
> +}
> +
> +static void zk_block(struct work *work, int idx)
> +{
> +	struct zk_event ev;
> +
> +	zk_queue_pop(zhandle, &ev);
> +
> +	ev.block_cb(ev.buf);
> +	ev.blocked = 0;
> +
> +	zk_queue_push_back(zhandle, &ev);
> +}
> +
> +static void zk_block_done(struct work *work, int idx)
> +{
> +}
> +
> +static int zk_dispatch(void)
> +{
> +	int ret, rc;
> +	char path[256];
> +	uint64_t joined;
> +	eventfd_t value;
> +	struct zk_event ev;
> +	enum cluster_join_result res;
> +	static struct work work = {
> +		.fn = zk_block,
> +		.done = zk_block_done,
> +	};
> +
> +	dprintf("read event\n");
> +	ret = eventfd_read(efd, &value);
> +	if (ret < 0)
> +		return 0;
> +
> +	ret = zk_queue_pop(zhandle, &ev);
> +	if (ret < 0)
> +		goto out;
> +
> +	switch (ev.type) {
> +	case EVENT_JOIN:
> +		if (ev.blocked) {
> +			if (node_cmp(&ev.nodes[0], &this_node) == 0) {
> +				res = zk_check_join_cb(&ev.sender, ev.buf);
> +				ev.join_result = res;
> +				ev.blocked = 0;
> +
> +				sprintf(path, MEMBER_ZNODE "/%s", node_to_str(&ev.sender));
> +				joined = 1;
> +				rc = zoo_set(zhandle, path, (char *)&joined, sizeof(joined), -1);
> +				assert(rc == ZOK);
> +
> +				zk_queue_push_back(zhandle, &ev);
> +
> +				if (res == CJ_RES_MASTER_TRANSFER) {
> +					eprintf("failed to join sheepdog cluster: "
> +						"please retry when master is up\n");
> +					exit(1);
> +				}
> +			} else
> +				zk_queue_push_back(zhandle, NULL);
> +
> +			goto out;
> +		}
> +
> +		if (ev.join_result == CJ_RES_MASTER_TRANSFER) {
> +			/* FIXME: This code is tricky, but Sheepdog assumes that */
> +			/* nr_nodes = 1 when join_result = MASTER_TRANSFER... */
> +			ev.nr_nodes = 1;
> +			ev.nodes[0] = this_node;
> +			zk_queue_push_back(zhandle, &ev);
> +			zk_queue_pop(zhandle, &ev);
> +		}
> +
> +		sprintf(path, MEMBER_ZNODE "/%s", node_to_str(&ev.sender));
> +		zoo_exists(zhandle, path, 1, NULL);
> +
> +		zk_hdlrs.join_handler(&ev.sender, ev.nodes, ev.nr_nodes,
> +				    ev.join_result, ev.buf);
> +		break;
> +	case EVENT_LEAVE:
> +		zk_hdlrs.leave_handler(&ev.sender, ev.nodes, ev.nr_nodes);
> +		break;
> +	case EVENT_NOTIFY:
> +		if (ev.blocked) {
> +			if (node_cmp(&ev.sender, &this_node) == 0 && !ev.callbacked) {
> +				queue_work(zk_block_wq, &work);
> +
> +				ev.callbacked = 1;
> +
> +				zk_queue_push_back(zhandle, &ev);
> +			} else
> +				zk_queue_push_back(zhandle, NULL);
> +
> +			goto out;
> +		}
> +
> +		zk_hdlrs.notify_handler(&ev.sender, ev.buf, ev.buf_len);
> +		break;
> +	}
> +out:
> +	return 0;
> +}
> +
> +struct cluster_driver cdrv_zookeeper = {
> +	.name       = "zookeeper",
> +
> +	.init       = zk_init,
> +	.join       = zk_join,
> +	.leave      = zk_leave,
> +	.notify     = zk_notify,
> +	.dispatch   = zk_dispatch,
> +};
> +
> +cdrv_register(cdrv_zookeeper);
> -- 
> 1.7.2.5
> 
> -- 
> sheepdog mailing list
> sheepdog at lists.wpkg.org
> http://lists.wpkg.org/mailman/listinfo/sheepdog



More information about the sheepdog mailing list