[Sheepdog] [PATCH] serialize all cpg events
FUJITA Tomonori
fujita.tomonori at lab.ntt.co.jp
Tue Apr 20 08:59:35 CEST 2010
There is a bug that deliver and confchg events are not performed in
order.
- we link all the cpg events to sys->cpg_event_siblings.
- the above events are performed serially.
Signed-off-by: FUJITA Tomonori <fujita.tomonori at lab.ntt.co.jp>
---
collie/collie.h | 4 +-
collie/group.c | 421 ++++++++++++++++++++++++++-----------------------------
2 files changed, 203 insertions(+), 222 deletions(-)
diff --git a/collie/collie.h b/collie/collie.h
index 734b1a4..164f700 100644
--- a/collie/collie.h
+++ b/collie/collie.h
@@ -80,7 +80,9 @@ struct cluster_info {
int nr_sobjs;
- struct list_head work_deliver_siblings;
+ struct list_head cpg_event_siblings;
+ struct cpg_event *cur_cevent;
+ unsigned int cpg_event_work_flags;
};
struct cluster_info *sys;
diff --git a/collie/group.c b/collie/group.c
index d8ebad9..6b2e532 100644
--- a/collie/group.c
+++ b/collie/group.c
@@ -77,9 +77,6 @@ struct vdi_op_message {
struct work_deliver {
struct message_header *msg;
-
- struct work work;
- struct list_head work_deliver_list;
};
struct work_confch {
@@ -92,8 +89,22 @@ struct work_confch {
unsigned long *failed_vdis;
int nr_failed_vdis;
+ int first_cpg_node;
+};
+
+enum cpg_event_type {
+ CPG_EVENT_CONCHG,
+ CPG_EVENT_DELIVER,
+};
+
+struct cpg_event {
+ enum cpg_event_type ctype;
+ struct list_head cpg_event_list;
- struct work work;
+ union {
+ struct work_confch c;
+ struct work_deliver d;
+ };
};
#define print_node_list(node_list) \
@@ -753,61 +764,17 @@ out:
req->done(req);
}
-static void handle_join(struct work_deliver *w)
+static void __sd_deliver(struct cpg_event *cevent)
{
- struct message_header *m;
- struct vm *vm;
- struct sheepdog_vm_list_entry *e;
- int i, nr = 2000;
- char *buf;
-
- buf = malloc(sizeof(*m) + sizeof(*e) * nr);
- m = (struct message_header *)buf;
- e = (struct sheepdog_vm_list_entry *)(buf + sizeof(*m));
-
- i = 0;
- m->state = DM_CONT;
- m->pid = ((struct join_message *)w->msg)->pid;
- m->nodeid = ((struct join_message *)w->msg)->nodeid;
-
- vprintf(SDOG_DEBUG "%u %u\n", m->pid, m->nodeid);
-
- list_for_each_entry(vm, &sys->vm_list, list) {
- *e = vm->ent;
- vprintf(SDOG_DEBUG "%d %s\n", i, e->name);
- e++;
- i++;
-
- if (!(i % nr)) {
- m->msg_length = sizeof(*m) + i * sizeof(*e);
- send_message(sys->handle, m);
- e = (struct sheepdog_vm_list_entry *)(buf + sizeof(*m));
- i = 0;
- }
- }
-
- if (i) {
- m->msg_length = sizeof(*m) + i * sizeof(*e);
- vprintf(SDOG_DEBUG "%d %d\n", i, m->msg_length);
- send_message(sys->handle, m);
- }
-
- m = w->msg;
- join((struct join_message *)m);
- m->state = DM_FIN;
- send_message(sys->handle, m);
-}
-
-static void __sd_deliver(struct work *work, int idx)
-{
- struct work_deliver *w = container_of(work, struct work_deliver, work);
+ struct work_deliver *w = &cevent->d;
struct message_header *m = w->msg;
char name[128];
struct node *node;
- dprintf("op: %d, state: %u, size: %d, from: %s\n",
+ dprintf("op: %d, state: %u, size: %d, from: %s, pid: %d\n",
m->op, m->state, m->msg_length,
- addr_to_str(name, sizeof(name), m->from.addr, m->from.port));
+ addr_to_str(name, sizeof(name), m->from.addr, m->from.port),
+ m->pid);
if (m->op == SD_MSG_JOIN) {
uint32_t nodeid = ((struct join_message *)m)->nodeid;
@@ -823,24 +790,7 @@ static void __sd_deliver(struct work *work, int idx)
node->ent = m->from;
}
- if (m->state == DM_INIT) {
- if (!is_master())
- return;
-
- switch (m->op) {
- case SD_MSG_JOIN:
- handle_join(w);
- break;
- case SD_MSG_VDI_OP:
- vdi_op((struct vdi_op_message *)m);
- m->state = DM_FIN;
- send_message(sys->handle, m);
- break;
- default:
- eprintf("unknown message %d\n", m->op);
- break;
- }
- } else if (m->state == DM_FIN) {
+ if (m->state == DM_FIN) {
switch (m->op) {
case SD_MSG_JOIN:
update_cluster_info((struct join_message *)m);
@@ -855,27 +805,35 @@ static void __sd_deliver(struct work *work, int idx)
}
}
-static void __sd_deliver_done(struct work *work, int idx)
+static void __sd_deliver_done(struct cpg_event *cevent)
{
- struct work_deliver *w, *n = NULL;
+ struct work_deliver *w = &cevent->d;
struct message_header *m;
+ char name[128];
- w = container_of(work, struct work_deliver, work);
m = w->msg;
- list_del(&w->work_deliver_list);
-
- /*
- * for the master node, when I finished one message, if I have
- * pending messages, I need to perform the first of them now.
- *
- * for the non master nodes, when I get one finished message,
- * if I can forget it.
- */
- if (m->state == DM_FIN && !list_empty(&sys->work_deliver_siblings)) {
+ dprintf("op: %d, state: %u, size: %d, from: %s\n",
+ m->op, m->state, m->msg_length,
+ addr_to_str(name, sizeof(name), m->from.addr,
+ m->from.port));
- n = list_first_entry(&sys->work_deliver_siblings,
- struct work_deliver, work_deliver_list);
+ if (m->state == DM_INIT && is_master()) {
+ switch (m->op) {
+ case SD_MSG_JOIN:
+ join((struct join_message *)m);
+ m->state = DM_FIN;
+ send_message(sys->handle, m);
+ break;
+ case SD_MSG_VDI_OP:
+ vdi_op((struct vdi_op_message *)m);
+ m->state = DM_FIN;
+ send_message(sys->handle, m);
+ break;
+ default:
+ eprintf("unknown message %d\n", m->op);
+ break;
+ }
}
/*
@@ -887,142 +845,51 @@ static void __sd_deliver_done(struct work *work, int idx)
start_recovery(sys->epoch, NULL, 0);
free(w->msg);
- free(w);
-
- if (!n)
- return;
-
- if (is_master())
- queue_work(dobj_queue, &n->work);
- else {
- char name[128];
- m = n->msg;
-
- dprintf("op: %d, state: %u, size: %d, from: %s\n",
- m->op, m->state, m->msg_length,
- addr_to_str(name, sizeof(name), m->from.addr, m->from.port));
-
- list_del(&n->work_deliver_list);
- free(n->msg);
- free(n);
- }
+ free(cevent);
}
+static void start_cpg_event_work(void);
+
static void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name,
uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
{
+ struct cpg_event *cevent;
struct work_deliver *w;
struct message_header *m = msg;
char name[128];
- dprintf("op: %d, state: %u, size: %d, from: %s\n",
+ dprintf("op: %d, state: %u, size: %d, from: %s, nodeid: %u, pid: %u\n",
m->op, m->state, m->msg_length,
- addr_to_str(name, sizeof(name), m->from.addr, m->from.port));
-
- if (m->state == DM_CONT) {
- struct sheepdog_vm_list_entry *e;
- int nr, i;
- struct vm *vm;
-
- dprintf("op: %d, state: %u, size: %d, from: %s\n",
- m->op, m->state, m->msg_length,
- addr_to_str(name, sizeof(name), m->from.addr, m->from.port));
+ addr_to_str(name, sizeof(name), m->from.addr, m->from.port),
+ nodeid, pid);
- if (is_master())
- return;
-
- vprintf(SDOG_DEBUG "%u %u %u %u\n",
- m->nodeid, m->pid, sys->this_nodeid, sys->this_pid);
-
- if (sys->this_nodeid != m->nodeid ||
- sys->this_pid != m->pid)
- return;
-
- /* This is my JOIN message. */
- vprintf(SDOG_DEBUG "we update the vm list\n");
-
- nr = (m->msg_length - sizeof(*m)) / sizeof(*e);
- e = (struct sheepdog_vm_list_entry *)((char *)msg + sizeof(*m));
-
- for (i = 0; i < nr; i++) {
- vm = zalloc(sizeof(*vm));
- if (!vm)
- break;
-
- vm->ent = e[i];
- vprintf(SDOG_DEBUG "%d, got %s\n", i, e[i].name);
- list_add(&vm->list, &sys->vm_list);
- }
+ cevent = zalloc(sizeof(*cevent));
+ if (!cevent)
return;
- }
- w = zalloc(sizeof(*w));
- if (!w)
- return;
+ cevent->ctype = CPG_EVENT_DELIVER;
+ w = &cevent->d;
+
+ vprintf(SDOG_DEBUG "allow new deliver, %p\n", cevent);
w->msg = zalloc(msg_len);
if (!w->msg)
return;
memcpy(w->msg, msg, msg_len);
- INIT_LIST_HEAD(&w->work_deliver_list);
-
- w->work.fn = __sd_deliver;
- w->work.done = __sd_deliver_done;
-
- if (is_master()) {
- if (m->state == DM_INIT) {
- int run = 0;
-
- /*
- * I can broadcast this message if there is no
- * outstanding messages.
- */
- if (list_empty(&sys->work_deliver_siblings))
- run = 1;
-
- list_add_tail(&w->work_deliver_list,
- &sys->work_deliver_siblings);
- if (run) {
- vprintf(SDOG_DEBUG "%u\n", pid);
- queue_work(dobj_queue, &w->work);
- } else
- vprintf(SDOG_DEBUG "%u\n", pid);
- return;
- } else
- /*
- * must be blocked until the message with
- * m->state == DM_INIT is completely finished
- * (__sd_deliver_done is called)
- */
- w->work.attr = WORK_ORDERED;
- } else {
- if (m->state == DM_INIT) {
- list_add_tail(&w->work_deliver_list,
- &sys->work_deliver_siblings);
-
- /*
- * non master nodes just links it to
- * work_deliver_siblings.
- */
- return;
- } else
- /*
- * __sd_deliver_done() frees requests on
- * work_deliver_siblings in order.
- */
- w->work.attr = WORK_ORDERED;
- }
+ if (m->state == DM_FIN)
+ list_add(&cevent->cpg_event_list, &sys->cpg_event_siblings);
+ else
+ list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings);
- queue_work(dobj_queue, &w->work);
+ start_cpg_event_work();
}
-static void __sd_confch(struct work *work, int idx)
+static void __sd_confch(struct cpg_event *cevent)
{
- struct work_confch *w = container_of(work, struct work_confch, work);
+ struct work_confch *w = &cevent->c;
struct node *node;
int i;
- int init = 0;
const struct cpg_address *member_list = w->member_list;
size_t member_list_entries = w->member_list_entries;
@@ -1035,7 +902,7 @@ static void __sd_confch(struct work *work, int idx)
sys->this_nodeid == member_list[0].nodeid &&
sys->this_pid == member_list[0].pid){
sys->synchronized = 1;
- init = 1;
+ w->first_cpg_node = 1;
}
if (list_empty(&sys->cpg_node_list)) {
@@ -1111,7 +978,7 @@ static void __sd_confch(struct work *work, int idx)
}
}
- if (init) {
+ if (w->first_cpg_node) {
struct join_message msg;
/*
@@ -1133,9 +1000,20 @@ static void __sd_confch(struct work *work, int idx)
return;
}
- for (i = 0; i < joined_list_entries; i++) {
- if (sys->this_nodeid == joined_list[i].nodeid &&
- sys->this_pid == joined_list[i].pid) {
+ print_node_list(&sys->sd_node_list);
+}
+
+static void __sd_confch_done(struct cpg_event *cevent)
+{
+ struct work_confch *w = &cevent->c;
+ int i;
+
+ if (w->first_cpg_node)
+ goto skip_join;
+
+ for (i = 0; i < w->joined_list_entries; i++) {
+ if (sys->this_nodeid == w->joined_list[i].nodeid &&
+ sys->this_pid == w->joined_list[i].pid) {
struct join_message msg;
msg.header.op = SD_MSG_JOIN;
@@ -1155,16 +1033,7 @@ static void __sd_confch(struct work *work, int idx)
}
}
- if (left_list_entries == 0)
- return;
-
- print_node_list(&sys->sd_node_list);
-}
-
-static void __sd_confch_done(struct work *work, int idx)
-{
- struct work_confch *w = container_of(work, struct work_confch, work);
-
+skip_join:
/* FIXME: worker threads can't call start_recovery */
if (w->left_list_entries) {
if (w->left_list_entries > 1)
@@ -1176,7 +1045,112 @@ static void __sd_confch_done(struct work *work, int idx)
free(w->left_list);
free(w->joined_list);
free(w->failed_vdis);
- free(w);
+ free(cevent);
+}
+
+static struct work cpg_event_work;
+
+#define CPG_EVENT_WORK_FLAG_RUNNING (1U << 0)
+#define CPG_EVENT_WORK_FLAG_SUSPENDED (1U << 1)
+
+
+void cpg_event_fn(struct work *w, int idx)
+{
+ struct cpg_event *cevent = sys->cur_cevent;
+
+ vprintf(SDOG_DEBUG "%p, %d %x\n", cevent, cevent->ctype,
+ sys->cpg_event_work_flags);
+
+ /*
+ * we can't touch sys->cpg_event_siblings because of a race
+ * with sd_deliver() and sd_confch()...
+ */
+
+ switch (cevent->ctype) {
+ case CPG_EVENT_CONCHG:
+ __sd_confch(cevent);
+ break;
+ case CPG_EVENT_DELIVER:
+ vprintf(SDOG_DEBUG "%d\n", cevent->d.msg->state);
+ __sd_deliver(cevent);
+ break;
+ default:
+ vprintf(SDOG_ERR "unknown event %d\n", cevent->ctype);
+ }
+}
+
+void cpg_event_done(struct work *w, int idx)
+{
+ struct cpg_event *cevent;
+
+ if (!sys->cur_cevent)
+ vprintf(SDOG_ERR "bug\n");
+
+ cevent = sys->cur_cevent;
+ sys->cur_cevent = NULL;
+
+ vprintf(SDOG_DEBUG "%p\n", cevent);
+
+ if (sys->cpg_event_work_flags & CPG_EVENT_WORK_FLAG_SUSPENDED) {
+ sys->cpg_event_work_flags &= ~CPG_EVENT_WORK_FLAG_RUNNING;
+ return;
+ }
+
+ switch (cevent->ctype) {
+ case CPG_EVENT_CONCHG:
+ __sd_confch_done(cevent);
+ break;
+ case CPG_EVENT_DELIVER:
+ if (cevent->d.msg->state == DM_INIT)
+ sys->cpg_event_work_flags |= CPG_EVENT_WORK_FLAG_SUSPENDED;
+ __sd_deliver_done(cevent);
+ break;
+ default:
+ vprintf(SDOG_ERR "unknown event %d\n", cevent->ctype);
+ }
+
+ vprintf(SDOG_DEBUG "free %p\n", cevent);
+
+ sys->cpg_event_work_flags &= ~CPG_EVENT_WORK_FLAG_RUNNING;
+
+ if (!list_empty(&sys->cpg_event_siblings))
+ start_cpg_event_work();
+}
+
+/* can be called only by the main process */
+static void start_cpg_event_work(void)
+{
+ struct cpg_event *cevent;
+
+ if (sys->cpg_event_work_flags & CPG_EVENT_WORK_FLAG_RUNNING)
+ return;
+
+ if (list_empty(&sys->cpg_event_siblings))
+ vprintf(SDOG_ERR "bug\n");
+
+ cevent = list_first_entry(&sys->cpg_event_siblings,
+ struct cpg_event, cpg_event_list);
+
+ if (sys->cpg_event_work_flags & CPG_EVENT_WORK_FLAG_SUSPENDED) {
+ if (cevent->ctype != CPG_EVENT_DELIVER)
+ return;
+ if (cevent->d.msg->state != DM_FIN)
+ return;
+ }
+
+ list_del(&cevent->cpg_event_list);
+ sys->cur_cevent = cevent;
+
+ sys->cpg_event_work_flags &= ~CPG_EVENT_WORK_FLAG_SUSPENDED;
+ sys->cpg_event_work_flags |= CPG_EVENT_WORK_FLAG_RUNNING;
+
+ INIT_LIST_HEAD(&cpg_event_work.w_list);
+ /* we should remove this */
+ cpg_event_work.attr = WORK_ORDERED;
+ cpg_event_work.fn = cpg_event_fn;
+ cpg_event_work.done = cpg_event_done;
+
+ queue_work(dobj_queue, &cpg_event_work);
}
static void sd_confch(cpg_handle_t handle, const struct cpg_name *group_name,
@@ -1187,6 +1161,7 @@ static void sd_confch(cpg_handle_t handle, const struct cpg_name *group_name,
const struct cpg_address *joined_list,
size_t joined_list_entries)
{
+ struct cpg_event *cevent;
struct work_confch *w = NULL;
int i, size;
@@ -1202,9 +1177,14 @@ static void sd_confch(cpg_handle_t handle, const struct cpg_name *group_name,
if (sys->status == SD_STATUS_SHUTDOWN || sys->status == SD_STATUS_INCONSISTENT_EPOCHS)
return;
- w = zalloc(sizeof(*w));
- if (!w)
- return;
+ cevent = zalloc(sizeof(*cevent));
+ if (!cevent)
+ return; /* should die */
+
+ cevent->ctype = CPG_EVENT_CONCHG;
+ w = &cevent->c;
+
+ vprintf(SDOG_DEBUG "allow new confchg, %p\n", cevent);
size = sizeof(struct cpg_address) * member_list_entries;
w->member_list = zalloc(size);
@@ -1227,11 +1207,9 @@ static void sd_confch(cpg_handle_t handle, const struct cpg_name *group_name,
memcpy(w->joined_list, joined_list, size);
w->joined_list_entries = joined_list_entries;
- w->work.fn = __sd_confch;
- w->work.done = __sd_confch_done;
- w->work.attr = WORK_ORDERED;
+ list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings);
+ start_cpg_event_work();
- queue_work(dobj_queue, &w->work);
return;
err:
if (!w)
@@ -1356,7 +1334,8 @@ join_retry:
INIT_LIST_HEAD(&sys->cpg_node_list);
INIT_LIST_HEAD(&sys->vm_list);
INIT_LIST_HEAD(&sys->pending_list);
- INIT_LIST_HEAD(&sys->work_deliver_siblings);
+
+ INIT_LIST_HEAD(&sys->cpg_event_siblings);
cpg_context_set(cpg_handle, sys);
cpg_fd_get(cpg_handle, &fd);
--
1.6.5
More information about the sheepdog
mailing list