At Tue, 15 Nov 2011 09:14:12 +0900, MORITA Kazutaka wrote: > > This adds initial support for the Accord cluster driver. > > Usage: > $ sheep /store -c accord:[accord server address] > > TODO: > - use asynchronous Accord APIs > - use watch notification instead of loop and sleep > - use transaction instead of global distributed lock > > Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp> > --- > configure.ac | 11 + > sheep/Makefile.am | 9 +- > sheep/cluster/accord.c | 651 ++++++++++++++++++++++++++++++++++++++++++++++++ > 3 files changed, 669 insertions(+), 2 deletions(-) > create mode 100644 sheep/cluster/accord.c Applied. Kazutaka > > diff --git a/configure.ac b/configure.ac > index c406dd5..d8db4c5 100644 > --- a/configure.ac > +++ b/configure.ac > @@ -187,6 +187,11 @@ AC_ARG_ENABLE([zookeeper], > [ enable_zookeeper="no" ],) > AM_CONDITIONAL(BUILD_ZOOKEEPER, test x$enable_zookeeper = xyes) > > +AC_ARG_ENABLE([accord], > + [ --enable-accord : build accord cluster driver ],, > + [ enable_accord="no" ],) > +AM_CONDITIONAL(BUILD_ACCORD, test x$enable_accord = xyes) > + > AC_ARG_WITH([initddir], > [ --with-initddir=DIR : path to init script directory. ], > [ INITDDIR="$withval" ], > @@ -247,6 +252,12 @@ if test "x${enable_zookeeper}" = xyes; then > PACKAGE_FEATURES="$PACKAGE_FEATURES zookeeper" > fi > > +if test "x${enable_accord}" = xyes; then > + PKG_CHECK_MODULES([libacrd],[libacrd]) > + AC_DEFINE_UNQUOTED([HAVE_ACCORD], 1, [have accord]) > + PACKAGE_FEATURES="$PACKAGE_FEATURES accord" > +fi > + > # extra warnings > EXTRA_WARNINGS="" > > diff --git a/sheep/Makefile.am b/sheep/Makefile.am > index d86898b..bb4b8ff 100644 > --- a/sheep/Makefile.am > +++ b/sheep/Makefile.am > @@ -19,7 +19,8 @@ MAINTAINERCLEANFILES = Makefile.in > > AM_CFLAGS = > > -INCLUDES = -I$(top_builddir)/include -I$(top_srcdir)/include $(libcpg_CFLAGS) $(libcfg_CFLAGS) > +INCLUDES = -I$(top_builddir)/include -I$(top_srcdir)/include \ > + $(libcpg_CFLAGS) $(libcfg_CFLAGS) $(libacrd_CFLAGS) > > sbin_PROGRAMS = sheep > > @@ -31,8 +32,12 @@ endif > if BUILD_ZOOKEEPER > sheep_SOURCES += cluster/zookeeper.c > endif > +if BUILD_ACCORD > +sheep_SOURCES += cluster/accord.c > +endif > > -sheep_LDADD = $(libcpg_LIBS) $(libcfg_LIBS) ../lib/libsheepdog.a -lpthread > +sheep_LDADD = ../lib/libsheepdog.a -lpthread \ > + $(libcpg_LIBS) $(libcfg_LIBS) $(libacrd_LIBS) > sheep_DEPENDENCIES = ../lib/libsheepdog.a > > > diff --git a/sheep/cluster/accord.c b/sheep/cluster/accord.c > new file mode 100644 > index 0000000..337f631 > --- /dev/null > +++ b/sheep/cluster/accord.c > @@ -0,0 +1,651 @@ > +/* > + * Copyright (C) 2011 Nippon Telegraph and Telephone Corporation. > + * > + * This program is free software; you can redistribute it and/or > + * modify it under the terms of the GNU General Public License version > + * 2 as published by the Free Software Foundation. > + * > + * You should have received a copy of the GNU General Public License > + * along with this program. If not, see <http://www.gnu.org/licenses/>. > + */ > +#include <stdio.h> > +#include <string.h> > +#include <unistd.h> > +#include <netdb.h> > +#include <search.h> > +#include <assert.h> > +#include <pthread.h> > +#include <sys/eventfd.h> > +#include <accord/accord.h> > + > +#include "cluster.h" > +#include "work.h" > + > +#define MAX_EVENT_BUF_SIZE (64 * 1024) > + > +#define BASE_FILE "/sheepdog" > +#define LOCK_FILE BASE_FILE "/lock" > +#define QUEUE_FILE BASE_FILE "/queue" > + > +enum acrd_event_type { > + EVENT_JOIN = 1, > + EVENT_LEAVE, > + EVENT_NOTIFY, > +}; > + > +struct acrd_event { > + enum acrd_event_type type; > + struct sheepdog_node_list_entry sender; > + > + size_t buf_len; > + uint8_t buf[MAX_EVENT_BUF_SIZE]; > + > + size_t nr_nodes; /* the number of sheeps */ > + struct sheepdog_node_list_entry nodes[SD_MAX_NODES]; > + uint64_t ids[SD_MAX_NODES]; > + > + enum cluster_join_result join_result; > + > + void (*block_cb)(void *arg); > + > + int blocked; /* set non-zero when sheep must block this event */ > + int callbacked; /* set non-zero if sheep already called block_cb() */ > +}; > + > +static struct sheepdog_node_list_entry this_node; > +static uint64_t this_id; > + > + > +/* misc functions */ > + > +struct acrd_path_list_entry { > + char *path; > + > + struct list_head list; > +}; > + > +static void acrd_list_cb(struct acrd_handle *ah, const char *path, void *arg) > +{ > + struct acrd_path_list_entry *entry = malloc(sizeof(*entry)); > + struct list_head *head = arg; > + > + entry->path = strdup(path); > + list_add_tail(&entry->list, head); > +} > + > +static void for_each_acrd_file(struct acrd_handle *ah, const char *parent, > + void (*func)(struct acrd_handle *ah, > + const char *path, void *arg), > + void *arg) > +{ > + LIST_HEAD(path_list); > + struct acrd_path_list_entry *entry; > + struct acrd_listcb listcb = { > + .cb = acrd_list_cb, > + .arg = &path_list, > + }; > + > + acrd_list(ah, parent, 0, &listcb); > + > + while (!list_empty(&path_list)) { > + entry = list_first_entry(&path_list, typeof(*entry), list); > + > + func(ah, entry->path, arg); > + > + list_del(&entry->list); > + free(entry->path); > + free(entry); > + } > +} > + > +static void __acrd_del(struct acrd_handle *ah, const char *path, void *arg) > +{ > + acrd_del(ah, path, 0); > +} > + > + > +/* Accord-based lock */ > + > +static void acrd_lock(struct acrd_handle *ah) > +{ > + int rc; > +again: > + rc = acrd_write(ah, LOCK_FILE, &this_id, sizeof(this_id), 0, > + ACRD_FLAG_CREATE | ACRD_FLAG_EXCL); > + if (rc == ACRD_SUCCESS) > + return; > + else if (rc == ACRD_ERR_EXIST) { > + dprintf("retry\n"); > + usleep(10000); /* FIXME: use acrd notification */ > + goto again; > + } else > + panic("failed to create a lock file\n"); > +} > + > +static void acrd_unlock(struct acrd_handle *ah) > +{ > + int rc; > + > + rc = acrd_del(ah, LOCK_FILE, 0); > + if (rc != ACRD_SUCCESS) > + panic("failed to release lock\n"); > +} > + > + > +/* Accord-based queue */ > + > +static int queue_start_pos; > +static int queue_end_pos; > + > +static int acrd_queue_empty(struct acrd_handle *ah) > +{ > + int rc; > + char path[256]; > + uint32_t count = 0; > + > + sprintf(path, QUEUE_FILE "/%d", queue_start_pos); > + > + rc = acrd_read(ah, path, NULL, &count, 0, 0); > + if (rc == ACRD_SUCCESS) > + return 0; > + > + return 1; > +} > + > +static void acrd_queue_push(struct acrd_handle *ah, struct acrd_event *ev) > +{ > + int rc; > + char path[256]; > +again: > + queue_end_pos++; > + sprintf(path, "%s/%d", QUEUE_FILE, queue_end_pos); > + rc = acrd_write(ah, path, ev, sizeof(*ev), 0, > + ACRD_FLAG_CREATE | ACRD_FLAG_EXCL); > + if (rc == ACRD_ERR_EXIST) > + goto again; > + > + assert(rc == ACRD_SUCCESS); > + > + if (queue_start_pos < 0) { > + /* the first pushed data should be EVENT_JOIN */ > + assert(ev->type == EVENT_JOIN); > + queue_start_pos = queue_end_pos; > + } > +} > + > +static int acrd_queue_push_back(struct acrd_handle *ah, struct acrd_event *ev) > +{ > + int rc; > + char path[256]; > + > + queue_start_pos--; > + > + if (ev) { > + /* update the last popped data */ > + sprintf(path, QUEUE_FILE "/%d", queue_start_pos); > + rc = acrd_write(ah, path, ev, sizeof(*ev), 0, 0); > + assert(rc == ACRD_SUCCESS); > + } > + > + return 0; > +} > + > +static int acrd_queue_pop(struct acrd_handle *ah, struct acrd_event *ev) > +{ > + int rc; > + char path[256]; > + uint32_t len; > + > + if (acrd_queue_empty(ah)) > + return -1; > + > + sprintf(path, QUEUE_FILE "/%d", queue_start_pos); > + len = sizeof(*ev); > + rc = acrd_read(ah, path, ev, &len, 0, 0); > + assert(rc == ACRD_SUCCESS); > + > + queue_start_pos++; > + > + return 0; > +} > + > + > +/* Accord driver APIs */ > + > +static struct acrd_handle *ahandle; > +static int efd; > + > +static struct work_queue *acrd_wq; > + > +static struct cdrv_handlers acrd_hdlrs; > +static enum cluster_join_result (*acrd_check_join_cb)( > + struct sheepdog_node_list_entry *joining, void *opaque); > + > +/* get node list from the last pushed data */ > +static size_t get_nodes(struct acrd_handle *ah, > + struct sheepdog_node_list_entry *nodes, > + uint64_t *ids) > +{ > + int rc; > + struct acrd_event ev; > + char path[256]; > + uint32_t len; > +again: > + len = sizeof(ev); > + sprintf(path, "%s/%d", QUEUE_FILE, queue_end_pos); > + rc = acrd_read(ah, path, &ev, &len, 0, 0); > + if (rc == ACRD_SUCCESS) { > + /* find the latest event */ > + queue_end_pos++; > + goto again; > + } > + > + queue_end_pos--; > + > + memcpy(nodes, ev.nodes, sizeof(ev.nodes)); > + memcpy(ids, ev.ids, sizeof(ev.ids)); > + > + return ev.nr_nodes; > +} > + > +static int add_event(struct acrd_handle *ah, enum acrd_event_type type, > + struct sheepdog_node_list_entry *node, void *buf, > + size_t buf_len, void (*block_cb)(void *arg)) > +{ > + int idx; > + struct sheepdog_node_list_entry *n; > + uint64_t *i; > + struct acrd_event ev; > + > + acrd_lock(ah); > + > + ev.type = type; > + ev.sender = *node; > + ev.buf_len = buf_len; > + if (buf) > + memcpy(ev.buf, buf, buf_len); > + > + ev.nr_nodes = get_nodes(ah, ev.nodes, ev.ids); > + > + switch (type) { > + case EVENT_JOIN: > + ev.blocked = 1; > + ev.nodes[ev.nr_nodes] = *node; > + ev.ids[ev.nr_nodes] = this_id; /* must be local node */ > + ev.nr_nodes++; > + break; > + case EVENT_LEAVE: > + n = lfind(node, ev.nodes, &ev.nr_nodes, sizeof(*n), node_cmp); > + if (!n) > + goto out; > + idx = n - ev.nodes; > + i = ev.ids + idx; > + > + ev.nr_nodes--; > + memmove(n, n + 1, sizeof(*n) * (ev.nr_nodes - idx)); > + memmove(i, i + 1, sizeof(*i) * (ev.nr_nodes - idx)); > + break; > + case EVENT_NOTIFY: > + ev.blocked = !!block_cb; > + ev.block_cb = block_cb; > + break; > + } > + > + acrd_queue_push(ah, &ev); > +out: > + acrd_unlock(ah); > + return 0; > +} > + > +static int get_addr(uint8_t *bytes) > +{ > + int ret; > + char name[INET6_ADDRSTRLEN]; > + struct addrinfo hints, *res, *res0; > + > + gethostname(name, sizeof(name)); > + > + memset(&hints, 0, sizeof(hints)); > + > + hints.ai_socktype = SOCK_STREAM; > + ret = getaddrinfo(name, NULL, &hints, &res0); > + if (ret) > + exit(1); > + > + for (res = res0; res; res = res->ai_next) { > + if (res->ai_family == AF_INET) { > + struct sockaddr_in *addr; > + addr = (struct sockaddr_in *)res->ai_addr; > + > + if (((char *) &addr->sin_addr)[0] == 127) > + continue; > + > + memset(bytes, 0, 12); > + memcpy(bytes + 12, &addr->sin_addr, 4); > + break; > + } else if (res->ai_family == AF_INET6) { > + struct sockaddr_in6 *addr; > + uint8_t localhost[16] = { 0, 0, 0, 0, 0, 0, 0, 0, > + 0, 0, 0, 0, 0, 0, 0, 1 }; > + > + addr = (struct sockaddr_in6 *)res->ai_addr; > + > + if (memcmp(&addr->sin6_addr, localhost, 16) == 0) > + continue; > + > + memcpy(bytes, &addr->sin6_addr, 16); > + break; > + } else > + dprintf("unknown address family\n"); > + } > + > + if (res == NULL) { > + eprintf("failed to get address info\n"); > + return -1; > + } > + > + freeaddrinfo(res0); > + > + return 0; > +} > + > +static void find_queue_end(struct acrd_handle *ah, const char *path, void *arg) > +{ > + int max; > + > + sscanf(path, QUEUE_FILE "/%d", &max); > + if (max > *(int *)arg) > + *(int *)arg = max; > +} > + > +static pthread_mutex_t start_lock = PTHREAD_MUTEX_INITIALIZER; > +static pthread_cond_t start_cond = PTHREAD_COND_INITIALIZER; > + > +/* protect queue_start_pos */ > +static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER; > + > +static int need_cleanup; > + > +static void acrd_join_fn(struct acrd_handle *ah, const uint64_t *member_list, > + size_t member_list_entries, uint64_t nodeid, void *arg) > +{ > + static int init = 0; > + > + if (!init) { > + this_id = nodeid; > + > + if (member_list_entries == 1) > + need_cleanup = 1; > + > + pthread_mutex_lock(&start_lock); > + pthread_cond_signal(&start_cond); > + pthread_mutex_unlock(&start_lock); > + > + init = 1; > + } > +} > + > +struct acrd_leave_info { > + struct acrd_handle *ah; > + uint64_t left_nodeid; > + struct work work; > +}; > + > +static void __acrd_leave(struct work *work, int idx) > +{ > + struct acrd_leave_info *info = container_of(work, typeof(*info), work); > + struct acrd_handle *ah = info->ah; > + int i; > + size_t nr_nodes; > + uint64_t ids[SD_MAX_NODES]; > + struct sheepdog_node_list_entry nodes[SD_MAX_NODES]; > + struct acrd_tx *atx; > + > + pthread_mutex_lock(&queue_lock); > + > + /* unlock if left node is locking one */ > + atx = acrd_tx_init(ah); > + acrd_tx_cmp(atx, LOCK_FILE, &info->left_nodeid, > + sizeof(info->left_nodeid), 0); > + acrd_tx_del(atx, LOCK_FILE, 0); > + acrd_tx_commit(atx, 0); > + acrd_tx_close(atx); > + > + /* check the failed node */ > + nr_nodes = get_nodes(ah, nodes, ids); > + > + for (i = 0; i < nr_nodes; i++) { > + if (ids[i] == info->left_nodeid) { > + add_event(ah, EVENT_LEAVE, nodes + i, NULL, 0, > + NULL); > + break; > + } > + } > + > + pthread_mutex_unlock(&queue_lock); > +} > + > +static void __acrd_leave_done(struct work *work, int idx) > +{ > + struct acrd_leave_info *info = container_of(work, typeof(*info), work); > + > + free(info); > +} > + > +static void acrd_leave_fn(struct acrd_handle *ah, const uint64_t *member_list, > + size_t member_list_entries, uint64_t nodeid, void *arg) > +{ > + struct acrd_leave_info *info; > + static int left; > + > + if (nodeid == this_id) { > + left = 1; > + close(efd); > + } > + > + if(left) > + return; > + > + info = zalloc(sizeof(*info)); > + if (!info) > + panic("failed to allocate memory"); > + > + info->ah = ah; > + info->left_nodeid = nodeid; > + info->work.fn = __acrd_leave; > + info->work.done = __acrd_leave_done; > + > + /* we cannot call accord APIs in the callback... */ > + queue_work(acrd_wq, &info->work); > +} > + > +static void acrd_watch_fn(struct acrd_handle *ah, struct acrd_watch_info *info, > + void *arg) > +{ > + eventfd_t value = 1; > + > + eventfd_write(efd, value); > +} > + > +static int accord_init(struct cdrv_handlers *handlers, const char *option, > + uint8_t *myaddr) > +{ > + acrd_hdlrs = *handlers; > + if (!option) { > + eprintf("specify one of the accord servers.\n"); > + eprintf("e.g. sheep /store -c accord:127.0.0.1\n"); > + return -1; > + } > + > + pthread_mutex_lock(&start_lock); > + > + ahandle = acrd_init(option, 9090, acrd_join_fn, acrd_leave_fn, NULL); > + if (!ahandle) { > + eprintf("failed to connect to accrd server %s\n", option); > + return -1; > + } > + > + if (get_addr(myaddr) < 0) > + return -1; > + > + efd = eventfd(0, EFD_NONBLOCK); > + if (efd < 0) { > + eprintf("failed to create an event fd: %m\n"); > + return -1; > + } > + > + acrd_wq = init_work_queue(1); > + > + pthread_cond_wait(&start_cond, &start_lock); > + pthread_mutex_unlock(&start_lock); > + > + if (need_cleanup) > + for_each_acrd_file(ahandle, BASE_FILE, __acrd_del, NULL); > + else { > + queue_start_pos = -1; > + queue_end_pos = -1; > + for_each_acrd_file(ahandle, QUEUE_FILE, find_queue_end, > + &queue_end_pos); > + } > + > + acrd_add_watch(ahandle, QUEUE_FILE, ACRD_EVENT_PREFIX | ACRD_EVENT_ALL, > + acrd_watch_fn, NULL); > + > + return efd; > +} > + > +static int accord_join(struct sheepdog_node_list_entry *myself, > + enum cluster_join_result (*check_join_cb)( > + struct sheepdog_node_list_entry *joining, > + void *opaque), > + void *opaque, size_t opaque_len) > +{ > + this_node = *myself; > + acrd_check_join_cb = check_join_cb; > + > + return add_event(ahandle, EVENT_JOIN, &this_node, opaque, opaque_len, NULL); > +} > + > +static int accord_leave(void) > +{ > + return add_event(ahandle, EVENT_LEAVE, &this_node, NULL, 0, NULL); > +} > + > +static int accord_notify(void *msg, size_t msg_len, void (*block_cb)(void *arg)) > +{ > + return add_event(ahandle, EVENT_NOTIFY, &this_node, msg, msg_len, block_cb); > +} > + > +static void acrd_block(struct work *work, int idx) > +{ > + struct acrd_event ev; > + > + pthread_mutex_lock(&queue_lock); > + > + acrd_queue_pop(ahandle, &ev); > + > + ev.block_cb(ev.buf); > + ev.blocked = 0; > + > + acrd_queue_push_back(ahandle, &ev); > + > + pthread_mutex_unlock(&queue_lock); > +} > + > +static void acrd_block_done(struct work *work, int idx) > +{ > +} > + > +static int accord_dispatch(void) > +{ > + int ret; > + eventfd_t value; > + struct acrd_event ev; > + enum cluster_join_result res; > + static struct work work = { > + .fn = acrd_block, > + .done = acrd_block_done, > + }; > + > + dprintf("read event\n"); > + ret = eventfd_read(efd, &value); > + if (ret < 0) > + return 0; > + > + pthread_mutex_lock(&queue_lock); > + > + ret = acrd_queue_pop(ahandle, &ev); > + if (ret < 0) > + goto out; > + > + switch (ev.type) { > + case EVENT_JOIN: > + if (ev.blocked) { > + if (node_cmp(&ev.nodes[0], &this_node) == 0) { > + res = acrd_check_join_cb(&ev.sender, ev.buf); > + ev.join_result = res; > + ev.blocked = 0; > + > + acrd_queue_push_back(ahandle, &ev); > + > + if (res == CJ_RES_MASTER_TRANSFER) { > + eprintf("failed to join sheepdog cluster: " > + "please retry when master is up\n"); > + exit(1); > + } > + } else > + acrd_queue_push_back(ahandle, NULL); > + > + goto out; > + } > + > + if (ev.join_result == CJ_RES_MASTER_TRANSFER) { > + /* FIXME: This code is tricky, but Sheepdog assumes that */ > + /* nr_nodes = 1 when join_result = MASTER_TRANSFER... */ > + ev.nr_nodes = 1; > + ev.nodes[0] = this_node; > + ev.ids[0] = this_id; > + acrd_queue_push_back(ahandle, &ev); > + acrd_queue_pop(ahandle, &ev); > + } > + > + acrd_hdlrs.join_handler(&ev.sender, ev.nodes, ev.nr_nodes, > + ev.join_result, ev.buf); > + break; > + case EVENT_LEAVE: > + acrd_hdlrs.leave_handler(&ev.sender, ev.nodes, ev.nr_nodes); > + break; > + case EVENT_NOTIFY: > + if (ev.blocked) { > + if (node_cmp(&ev.sender, &this_node) == 0 && !ev.callbacked) { > + queue_work(acrd_wq, &work); > + > + ev.callbacked = 1; > + > + acrd_queue_push_back(ahandle, &ev); > + } else > + acrd_queue_push_back(ahandle, NULL); > + > + goto out; > + } > + > + acrd_hdlrs.notify_handler(&ev.sender, ev.buf, ev.buf_len); > + break; > + } > +out: > + pthread_mutex_unlock(&queue_lock); > + > + return 0; > +} > + > +struct cluster_driver cdrv_accord = { > + .name = "accord", > + > + .init = accord_init, > + .join = accord_join, > + .leave = accord_leave, > + .notify = accord_notify, > + .dispatch = accord_dispatch, > +}; > + > +cdrv_register(cdrv_accord); > -- > 1.7.2.5 > > -- > sheepdog mailing list > sheepdog at lists.wpkg.org > http://lists.wpkg.org/mailman/listinfo/sheepdog |