[sheepdog] [PATCH 2/2] cluster/local: handle leave events
MORITA Kazutaka
morita.kazutaka at lab.ntt.co.jp
Tue Sep 18 10:43:49 CEST 2012
A cluster driver needs to call event handlers even after leave_cluster
is called. This patch fixes it.
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
sheep/cluster.h | 4 +-
sheep/cluster/local.c | 166 ++++++++++++++++++++++++++++++-------------------
2 files changed, 105 insertions(+), 65 deletions(-)
diff --git a/sheep/cluster.h b/sheep/cluster.h
index 65b95a4..3e881d7 100644
--- a/sheep/cluster.h
+++ b/sheep/cluster.h
@@ -76,7 +76,9 @@ struct cluster_driver {
* Leave the cluster
*
* This function is used to leave the cluster, and notifies a
- * leave event to all the nodes.
+ * leave event to all the nodes. The cluster driver calls event
+ * handlers even after this function is called, so the left node can
+ * work as a gateway.
*
* Returns zero on success, -1 on error
*/
diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c
index 8a53abc..c7ef38e 100644
--- a/sheep/cluster/local.c
+++ b/sheep/cluster/local.c
@@ -15,7 +15,6 @@
#include <sys/mman.h>
#include <sys/signalfd.h>
#include <sys/file.h>
-#include <search.h>
#include <signal.h>
#include <fcntl.h>
#include <assert.h>
@@ -32,20 +31,41 @@ static int shmfd;
static int sigfd;
static int block_event_pos;
static int nonblock_event_pos;
-static struct sd_node this_node;
+static struct local_node this_node;
static bool joined;
+struct local_node {
+ struct sd_node node;
+ pid_t pid;
+ bool gateway;
+};
+
+static char *lnode_to_str(struct local_node *lnode)
+{
+ char *s = node_to_str(&lnode->node);
+
+ sprintf(s + strlen(s), " pid:%d", lnode->pid);
+
+ return s;
+}
+
+static bool lnode_eq(const struct local_node *a, const struct local_node *b)
+{
+ return node_eq(&a->node, &b->node);
+}
+
enum local_event_type {
EVENT_JOIN_REQUEST = 1,
EVENT_JOIN_RESPONSE,
EVENT_LEAVE,
+ EVENT_GATEWAY,
EVENT_BLOCK,
EVENT_NOTIFY,
};
struct local_event {
enum local_event_type type;
- struct sd_node sender;
+ struct local_node sender;
bool callbacked;
bool removed;
@@ -53,9 +73,8 @@ struct local_event {
size_t buf_len;
uint8_t buf[SD_MAX_EVENT_BUF_SIZE];
- size_t nr_nodes; /* the number of sheep processes */
- struct sd_node nodes[SD_MAX_NODES];
- pid_t pids[SD_MAX_NODES];
+ size_t nr_lnodes; /* the number of sheep processes */
+ struct local_node lnodes[SD_MAX_NODES];
enum cluster_join_result join_result;
};
@@ -80,18 +99,16 @@ static void shm_queue_unlock(void)
flock(shmfd, LOCK_UN);
}
-static size_t get_nodes(struct sd_node *n, pid_t *p)
+static size_t get_nodes(struct local_node *n)
{
struct local_event *ev;
ev = shm_queue->nonblock_events + shm_queue->nonblock_event_pos;
if (n)
- memcpy(n, ev->nodes, sizeof(ev->nodes));
- if (p)
- memcpy(p, ev->pids, sizeof(ev->pids));
+ memcpy(n, ev->lnodes, sizeof(ev->lnodes));
- return ev->nr_nodes;
+ return ev->nr_lnodes;
}
static int process_exists(pid_t pid)
@@ -156,14 +173,13 @@ static void shm_queue_notify(void)
{
int i;
size_t nr;
- pid_t pids[SD_MAX_NODES];
- struct sd_node nodes[SD_MAX_NODES];
+ struct local_node lnodes[SD_MAX_NODES];
- nr = get_nodes(nodes, pids);
+ nr = get_nodes(lnodes);
for (i = 0; i < nr; i++) {
- dprintf("send signal to %s\n", node_to_str(nodes + i));
- kill(pids[i], SIGUSR1);
+ dprintf("send signal to %s\n", lnode_to_str(lnodes + i));
+ kill(lnodes[i].pid, SIGUSR1);
}
}
@@ -171,15 +187,15 @@ static int is_shm_queue_valid(void)
{
int i;
size_t nr;
- pid_t pids[SD_MAX_NODES];
+ struct local_node lnodes[SD_MAX_NODES];
- nr = get_nodes(NULL, pids);
+ nr = get_nodes(lnodes);
if (nr == 0)
return 1;
for (i = 0; i < nr; i++)
- if (process_exists(pids[i]))
+ if (process_exists(lnodes[i].pid))
return 1;
return 0;
@@ -218,39 +234,49 @@ static void shm_queue_init(void)
shm_queue_unlock();
}
-static void add_event(enum local_event_type type, struct sd_node *node,
+static struct local_node *find_lnode(struct local_node *key, size_t nr_lnodes,
+ struct local_node *lnodes)
+{
+ int i;
+
+ for (i = 0; i < nr_lnodes; i++)
+ if (lnode_eq(key, lnodes + i))
+ return lnodes + i;
+
+ panic("internal error\n");
+}
+
+static void add_event(enum local_event_type type, struct local_node *lnode,
void *buf, size_t buf_len)
{
int idx, i;
- struct sd_node *n;
- pid_t *p;
+ struct local_node *n;
struct local_event ev = {
.type = type,
- .sender = *node,
+ .sender = *lnode,
};
ev.buf_len = buf_len;
if (buf)
memcpy(ev.buf, buf, buf_len);
- ev.nr_nodes = get_nodes(ev.nodes, ev.pids);
+ ev.nr_lnodes = get_nodes(ev.lnodes);
switch (type) {
case EVENT_JOIN_REQUEST:
- ev.nodes[ev.nr_nodes] = *node;
- ev.pids[ev.nr_nodes] = getpid(); /* must be local node */
- ev.nr_nodes++;
+ ev.lnodes[ev.nr_lnodes] = *lnode;
+ ev.nr_lnodes++;
break;
case EVENT_LEAVE:
- n = lfind(node, ev.nodes, &ev.nr_nodes, sizeof(*n), node_id_cmp);
- if (!n)
- panic("internal error\n");
- idx = n - ev.nodes;
- p = ev.pids + idx;
-
- ev.nr_nodes--;
- memmove(n, n + 1, sizeof(*n) * (ev.nr_nodes - idx));
- memmove(p, p + 1, sizeof(*p) * (ev.nr_nodes - idx));
+ n = find_lnode(lnode, ev.nr_lnodes, ev.lnodes);
+ idx = n - ev.lnodes;
+
+ ev.nr_lnodes--;
+ memmove(n, n + 1, sizeof(*n) * (ev.nr_lnodes - idx));
+ break;
+ case EVENT_GATEWAY:
+ n = find_lnode(lnode, ev.nr_lnodes, ev.lnodes);
+ n->gateway = true;
break;
case EVENT_NOTIFY:
case EVENT_BLOCK:
@@ -259,9 +285,9 @@ static void add_event(enum local_event_type type, struct sd_node *node,
abort();
}
- dprintf("type = %d, sender = %s\n", ev.type, node_to_str(&ev.sender));
- for (i = 0; i < ev.nr_nodes; i++)
- dprintf("%d: %s\n", i, node_to_str(ev.nodes + i));
+ dprintf("type = %d, sender = %s\n", ev.type, lnode_to_str(&ev.sender));
+ for (i = 0; i < ev.nr_lnodes; i++)
+ dprintf("%d: %s\n", i, lnode_to_str(ev.lnodes + i));
shm_queue_push(&ev);
@@ -272,21 +298,20 @@ static void check_pids(void *arg)
{
int i;
size_t nr;
- struct sd_node nodes[SD_MAX_NODES];
- pid_t pids[SD_MAX_NODES];
+ struct local_node lnodes[SD_MAX_NODES];
struct local_event *ev;
shm_queue_lock();
- nr = get_nodes(nodes, pids);
+ nr = get_nodes(lnodes);
for (i = 0; i < nr; i++)
- if (!process_exists(pids[i])) {
- add_event(EVENT_LEAVE, nodes + i, NULL, 0);
+ if (!process_exists(lnodes[i].pid)) {
+ add_event(EVENT_LEAVE, lnodes + i, NULL, 0);
/* unblock blocking event if sender has gone */
ev = shm_queue_peek_block_event();
- if (node_eq(nodes + i, &ev->sender)) {
+ if (lnode_eq(lnodes + i, &ev->sender)) {
ev->removed = true;
msync(ev, sizeof(*ev), MS_SYNC);
}
@@ -303,7 +328,9 @@ static void check_pids(void *arg)
static int local_join(struct sd_node *myself,
void *opaque, size_t opaque_len)
{
- this_node = *myself;
+ this_node.node = *myself;
+ this_node.pid = getpid();
+ this_node.gateway = false;
shm_queue_lock();
@@ -318,7 +345,7 @@ static int local_leave(void)
{
shm_queue_lock();
- add_event(EVENT_LEAVE, &this_node, NULL, 0);
+ add_event(EVENT_GATEWAY, &this_node, NULL, 0);
shm_queue_unlock();
@@ -369,15 +396,23 @@ static bool local_process_event(void)
struct local_event *ev;
enum cluster_join_result res;
int i;
+ struct sd_node nodes[SD_MAX_NODES];
+ size_t nr_nodes;
ev = shm_queue_peek();
if (!ev)
return false;
- dprintf("type = %d, sender = %s\n", ev->type, node_to_str(&ev->sender));
+ dprintf("type = %d, sender = %s\n", ev->type,
+ lnode_to_str(&ev->sender));
dprintf("callbacked = %d, removed = %d\n", ev->callbacked, ev->removed);
- for (i = 0; i < ev->nr_nodes; i++)
- dprintf("%d: %s\n", i, node_to_str(ev->nodes + i));
+
+ nr_nodes = 0;
+ for (i = 0; i < ev->nr_lnodes; i++) {
+ dprintf("%d: %s\n", i, lnode_to_str(ev->lnodes + i));
+ if (!ev->lnodes[i].gateway)
+ nodes[nr_nodes++] = ev->lnodes[i].node;
+ }
if (ev->removed)
goto out;
@@ -385,19 +420,20 @@ static bool local_process_event(void)
if (ev->callbacked)
return false; /* wait for unblock event */
- if (ev->type == EVENT_JOIN_RESPONSE && node_eq(&this_node, &ev->sender)) {
+ if (ev->type == EVENT_JOIN_RESPONSE &&
+ lnode_eq(&this_node, &ev->sender)) {
dprintf("join Sheepdog\n");
joined = true;
}
if (!joined) {
if (ev->type == EVENT_JOIN_REQUEST &&
- node_eq(&this_node, &ev->sender)) {
- struct sd_node nodes[SD_MAX_NODES];
+ lnode_eq(&this_node, &ev->sender)) {
+ struct local_node lnodes[SD_MAX_NODES];
- get_nodes(nodes, NULL);
+ get_nodes(lnodes);
- if (!node_eq(&this_node, &nodes[0])) {
+ if (!lnode_eq(&this_node, &lnodes[0])) {
dprintf("wait for another node to accept this "
"node\n");
return false;
@@ -408,7 +444,7 @@ static bool local_process_event(void)
switch (ev->type) {
case EVENT_JOIN_REQUEST:
- res = sd_check_join_cb(&ev->sender, ev->buf);
+ res = sd_check_join_cb(&ev->sender.node, ev->buf);
ev->join_result = res;
ev->type = EVENT_JOIN_RESPONSE;
msync(ev, sizeof(*ev), MS_SYNC);
@@ -426,24 +462,26 @@ static bool local_process_event(void)
if (ev->join_result == CJ_RES_MASTER_TRANSFER) {
/* FIXME: This code is tricky, but Sheepdog assumes that */
/* nr_nodes = 1 when join_result = MASTER_TRANSFER... */
- ev->nr_nodes = 1;
- ev->nodes[0] = this_node;
- ev->pids[0] = getpid();
+ ev->nr_lnodes = 1;
+ ev->lnodes[0] = this_node;
+ nr_nodes = 1;
+ nodes[0] = this_node.node;
msync(ev, sizeof(*ev), MS_SYNC);
}
- sd_join_handler(&ev->sender, ev->nodes, ev->nr_nodes,
- ev->join_result, ev->buf);
+ sd_join_handler(&ev->sender.node, nodes, nr_nodes,
+ ev->join_result, ev->buf);
break;
case EVENT_LEAVE:
- sd_leave_handler(&ev->sender, ev->nodes, ev->nr_nodes);
+ case EVENT_GATEWAY:
+ sd_leave_handler(&ev->sender.node, nodes, nr_nodes);
break;
case EVENT_BLOCK:
- ev->callbacked = sd_block_handler(&ev->sender);
+ ev->callbacked = sd_block_handler(&ev->sender.node);
msync(ev, sizeof(*ev), MS_SYNC);
return false;
case EVENT_NOTIFY:
- sd_notify_handler(&ev->sender, ev->buf, ev->buf_len);
+ sd_notify_handler(&ev->sender.node, ev->buf, ev->buf_len);
break;
}
out:
--
1.7.2.5
More information about the sheepdog
mailing list