[Sheepdog] [PATCH 2/4] cluster: add local cluster driver
MORITA Kazutaka
morita.kazutaka at lab.ntt.co.jp
Sat Oct 29 11:56:49 CEST 2011
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
sheep/Makefile.am | 2 +-
sheep/cluster/local.c | 474 +++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 475 insertions(+), 1 deletions(-)
create mode 100644 sheep/cluster/local.c
diff --git a/sheep/Makefile.am b/sheep/Makefile.am
index 85652dd..745fdde 100644
--- a/sheep/Makefile.am
+++ b/sheep/Makefile.am
@@ -24,7 +24,7 @@ INCLUDES = -I$(top_builddir)/include -I$(top_srcdir)/include $(libcpg_CFLAGS) $
sbin_PROGRAMS = sheep
sheep_SOURCES = sheep.c group.c sdnet.c store.c vdi.c work.c journal.c ops.c \
- cluster/corosync.c
+ cluster/corosync.c cluster/local.c
sheep_LDADD = $(libcpg_LIBS) $(libcfg_LIBS) ../lib/libsheepdog.a -lpthread
sheep_DEPENDENCIES = ../lib/libsheepdog.a
diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c
new file mode 100644
index 0000000..890228d
--- /dev/null
+++ b/sheep/cluster/local.c
@@ -0,0 +1,474 @@
+/*
+ * Copyright (C) 2011 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <sys/mman.h>
+#include <sys/signalfd.h>
+#include <sys/file.h>
+#include <search.h>
+#include <signal.h>
+#include <fcntl.h>
+#include <assert.h>
+
+#include "cluster.h"
+#include "event.h"
+#include "work.h"
+
+#define MAX_EVENTS 500
+#define MAX_EVENT_BUF_SIZE (64 * 1024)
+
+const char *shmfile = "/tmp/sheepdog_shm";
+static int shmfd;
+static int sigfd;
+static int event_pos;
+static struct sheepdog_node_list_entry this_node;
+
+static struct work_queue *local_block_wq;
+
+static struct cdrv_handlers lhdlrs;
+static enum cluster_join_result (*local_check_join_cb)(
+ struct sheepdog_node_list_entry *joining, void *opaque);
+
+enum local_event_type {
+ EVENT_JOIN = 1,
+ EVENT_LEAVE,
+ EVENT_NOTIFY,
+};
+
+struct local_event {
+ enum local_event_type type;
+ struct sheepdog_node_list_entry sender;
+
+ size_t buf_len;
+ uint8_t buf[MAX_EVENT_BUF_SIZE];
+
+ size_t nr_nodes; /* the number of sheep processes */
+ struct sheepdog_node_list_entry nodes[SD_MAX_NODES];
+ pid_t pids[SD_MAX_NODES];
+
+ enum cluster_join_result join_result;
+
+ void (*block_cb)(void *arg);
+
+ int blocked; /* set non-zero when sheep must block this event */
+ int callbacked; /* set non-zero if sheep already called block_cb() */
+};
+
+
+/* shared memory queue */
+
+struct shm_queue {
+ uint64_t chksum;
+
+ int pos;
+ struct local_event events[MAX_EVENTS];
+} *shm_queue;
+
+static void shm_queue_lock(void)
+{
+ flock(shmfd, LOCK_EX);
+}
+
+static void shm_queue_unlock(void)
+{
+ flock(shmfd, LOCK_UN);
+}
+
+static int shm_queue_empty(void)
+{
+ return event_pos == shm_queue->pos;
+}
+
+static size_t get_nodes(struct sheepdog_node_list_entry *n, pid_t *p)
+{
+ struct local_event *ev;
+
+ ev = shm_queue->events + shm_queue->pos;
+
+ if (n)
+ memcpy(n, ev->nodes, sizeof(ev->nodes));
+ if (p)
+ memcpy(p, ev->pids, sizeof(ev->pids));
+
+ return ev->nr_nodes;
+}
+
+static int process_exists(pid_t pid)
+{
+ return kill(pid, 0) == 0;
+}
+
+static struct local_event *shm_queue_peek(void)
+{
+ if (shm_queue_empty())
+ return NULL;
+
+ return shm_queue->events + (event_pos + 1) % MAX_EVENTS;
+}
+
+static void shm_queue_push(struct local_event *ev)
+{
+ shm_queue->pos = (shm_queue->pos + 1) % MAX_EVENTS;
+ shm_queue->events[shm_queue->pos] = *ev;
+
+ msync(shm_queue->events + shm_queue->pos, sizeof(*ev), MS_SYNC);
+ msync(&shm_queue->pos, sizeof(shm_queue->pos), MS_SYNC);
+}
+
+static struct local_event *shm_queue_pop(void)
+{
+ if (shm_queue_empty())
+ return NULL;
+
+ event_pos = (event_pos + 1) % MAX_EVENTS;
+
+ return shm_queue->events + event_pos;
+}
+
+static uint64_t shm_queue_calc_chksum(void)
+{
+ return fnv_64a_buf(shm_queue->events + shm_queue->pos,
+ sizeof(*shm_queue->events), FNV1A_64_INIT);
+}
+
+static void shm_queue_set_chksum(void)
+{
+ shm_queue->chksum = shm_queue_calc_chksum();
+ msync(&shm_queue->chksum, sizeof(shm_queue->chksum), MS_SYNC);
+}
+
+static void shm_queue_notify(void)
+{
+ int i;
+ size_t nr;
+ pid_t pids[SD_MAX_NODES];
+
+ shm_queue_set_chksum();
+
+ nr = get_nodes(NULL, pids);
+
+ for (i = 0; i < nr; i++)
+ kill(pids[i], SIGUSR1);
+}
+
+static int is_shm_queue_valid(void)
+{
+ int i;
+ size_t nr;
+ pid_t pids[SD_MAX_NODES];
+
+ if (shm_queue->chksum != shm_queue_calc_chksum()) {
+ dprintf("invalid shm queue\n");
+ return 0;
+ }
+
+ nr = get_nodes(NULL, pids);
+
+ if (nr == 0)
+ return 1;
+
+ for (i = 0; i < nr; i++)
+ if (process_exists(pids[i]))
+ return 1;
+
+ return 0;
+}
+
+static void shm_queue_init(void)
+{
+ int ret;
+
+ shmfd = open(shmfile, O_CREAT | O_RDWR, 0644);
+ assert(shmfd >= 0);
+
+ shm_queue_lock();
+
+ ret = ftruncate(shmfd, sizeof(*shm_queue));
+ assert(ret == 0);
+
+ shm_queue = mmap(0, sizeof(*shm_queue),
+ PROT_READ | PROT_WRITE, MAP_SHARED, shmfd, 0);
+ assert(shm_queue != MAP_FAILED);
+
+ if (is_shm_queue_valid())
+ event_pos = shm_queue->pos;
+ else {
+ /* initialize shared memory */
+ event_pos = 0;
+ memset(shm_queue, 0, sizeof(*shm_queue));
+ ret = ftruncate(shmfd, 0);
+ assert(ret == 0);
+ ret = ftruncate(shmfd, sizeof(*shm_queue));
+ assert(ret == 0);
+
+ shm_queue_set_chksum();
+ }
+
+ shm_queue_unlock();
+}
+
+static void add_event(enum local_event_type type,
+ struct sheepdog_node_list_entry *node, void *buf,
+ size_t buf_len, void (*block_cb)(void *arg))
+{
+ int idx;
+ struct sheepdog_node_list_entry *n;
+ pid_t *p;
+ struct local_event ev = {
+ .type = type,
+ .sender = *node,
+ };
+
+ ev.buf_len = buf_len;
+ if (buf)
+ memcpy(ev.buf, buf, buf_len);
+
+ ev.nr_nodes = get_nodes(ev.nodes, ev.pids);
+
+ switch (type) {
+ case EVENT_JOIN:
+ ev.blocked = 1;
+ ev.nodes[ev.nr_nodes] = *node;
+ ev.pids[ev.nr_nodes] = getpid(); /* must be local node */
+ ev.nr_nodes++;
+ break;
+ case EVENT_LEAVE:
+ n = lfind(node, ev.nodes, &ev.nr_nodes, sizeof(*n), node_cmp);
+ if (!n)
+ panic("internal error\n");
+ idx = n - ev.nodes;
+ p = ev.pids + idx;
+
+ ev.nr_nodes--;
+ memmove(n, n + 1, sizeof(*n) * (ev.nr_nodes - idx));
+ memmove(p, p + 1, sizeof(*p) * (ev.nr_nodes - idx));
+ break;
+ case EVENT_NOTIFY:
+ ev.blocked = !!block_cb;
+ ev.block_cb = block_cb;
+ break;
+ }
+
+ shm_queue_push(&ev);
+
+ shm_queue_notify();
+}
+
+static void check_pids(void *arg)
+{
+ int i;
+ size_t nr;
+ struct sheepdog_node_list_entry nodes[SD_MAX_NODES];
+ pid_t pids[SD_MAX_NODES];
+
+ shm_queue_lock();
+
+ nr = get_nodes(nodes, pids);
+
+ for (i = 0; i < nr; i++)
+ if (!process_exists(pids[i]))
+ add_event(EVENT_LEAVE, nodes + i, NULL, 0, NULL);
+
+ shm_queue_unlock();
+
+ add_timer(arg, 1);
+}
+
+
+/* Local driver APIs */
+
+static int local_init(struct cdrv_handlers *handlers, uint8_t *myaddr)
+{
+ sigset_t mask;
+ static struct timer t = {
+ .callback = check_pids,
+ .data = &t,
+ };
+
+ lhdlrs = *handlers;
+
+ /* set 127.0.0.1 */
+ memset(myaddr, 0, 16);
+ myaddr[12] = 127;
+ myaddr[15] = 1;
+
+ shm_queue_init();
+
+ sigemptyset(&mask);
+ sigaddset(&mask, SIGUSR1);
+ sigprocmask(SIG_BLOCK, &mask, NULL);
+
+ sigfd = signalfd(-1, &mask, SFD_NONBLOCK);
+ if (sigfd < 0) {
+ eprintf("failed to create a signal fd, %m\n");
+ return 1;
+ }
+
+ add_timer(&t, 1);
+
+ local_block_wq = init_work_queue(1);
+
+ return sigfd;
+}
+
+static int local_join(struct sheepdog_node_list_entry *myself,
+ enum cluster_join_result (*check_join_cb)(
+ struct sheepdog_node_list_entry *joining,
+ void *opaque),
+ void *opaque, size_t opaque_len)
+{
+ this_node = *myself;
+ local_check_join_cb = check_join_cb;
+
+ shm_queue_lock();
+
+ add_event(EVENT_JOIN, &this_node, opaque, opaque_len, NULL);
+
+ shm_queue_unlock();
+
+ return 0;
+}
+
+static int local_leave(void)
+{
+ shm_queue_lock();
+
+ add_event(EVENT_LEAVE, &this_node, NULL, 0, NULL);
+
+ shm_queue_unlock();
+
+ return 0;
+}
+
+static int local_notify(void *msg, size_t msg_len, void (*block_cb)(void *arg))
+{
+ shm_queue_lock();
+
+ add_event(EVENT_NOTIFY, &this_node, msg, msg_len, block_cb);
+
+ shm_queue_unlock();
+
+ return 0;
+}
+
+static void local_block(struct work *work, int idx)
+{
+ struct local_event *ev;
+
+ shm_queue_lock();
+
+ ev = shm_queue_peek();
+
+ ev->block_cb(ev->buf);
+ ev->blocked = 0;
+ msync(ev, sizeof(*ev), MS_SYNC);
+
+ shm_queue_notify();
+
+ shm_queue_unlock();
+}
+
+static void local_block_done(struct work *work, int idx)
+{
+}
+
+static int local_dispatch(void)
+{
+ int ret;
+ struct signalfd_siginfo siginfo;
+ struct local_event *ev;
+ enum cluster_join_result res;
+ static struct work work = {
+ .fn = local_block,
+ .done = local_block_done,
+ };
+
+ dprintf("read siginfo\n");
+ ret = read(sigfd, &siginfo, sizeof(siginfo));
+ assert(ret == sizeof(siginfo));
+
+ shm_queue_lock();
+
+ ev = shm_queue_peek();
+ if (!ev)
+ goto out;
+
+ switch (ev->type) {
+ case EVENT_JOIN:
+ if (ev->blocked) {
+ if (node_cmp(&ev->nodes[0], &this_node) == 0) {
+ res = local_check_join_cb(&ev->sender, ev->buf);
+ ev->join_result = res;
+ ev->blocked = 0;
+ msync(ev, sizeof(*ev), MS_SYNC);
+
+ shm_queue_notify();
+
+ if (res == CJ_RES_MASTER_TRANSFER) {
+ eprintf("Restart me after master is up.\n");
+ shm_queue_unlock();
+ exit(1);
+ }
+ }
+ goto out;
+ }
+
+ if (ev->join_result == CJ_RES_MASTER_TRANSFER) {
+ /* FIXME: This code is tricky, but Sheepdog assumes that */
+ /* nr_nodes = 1 when join_result = MASTER_TRANSFER... */
+ ev->nr_nodes = 1;
+ ev->nodes[0] = this_node;
+ ev->pids[0] = getpid();
+
+ shm_queue_set_chksum();
+ }
+
+ lhdlrs.join_handler(&ev->sender, ev->nodes, ev->nr_nodes,
+ ev->join_result, ev->buf);
+ break;
+ case EVENT_LEAVE:
+ lhdlrs.leave_handler(&ev->sender, ev->nodes, ev->nr_nodes);
+ break;
+ case EVENT_NOTIFY:
+ if (ev->blocked) {
+ if (node_cmp(&ev->sender, &this_node) == 0) {
+ if (!ev->callbacked) {
+ queue_work(local_block_wq, &work);
+
+ ev->callbacked = 1;
+ }
+ }
+ goto out;
+ }
+
+ lhdlrs.notify_handler(&ev->sender, ev->buf, ev->buf_len);
+ break;
+ }
+
+ shm_queue_pop();
+out:
+ shm_queue_unlock();
+
+ return 0;
+}
+
+struct cluster_driver cdrv_local = {
+ .name = "local",
+
+ .init = local_init,
+ .join = local_join,
+ .leave = local_leave,
+ .notify = local_notify,
+ .dispatch = local_dispatch,
+};
+
+cdrv_register(cdrv_local);
--
1.7.2.5
More information about the sheepdog
mailing list