This is a preparation for blocking callback support. Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp> --- sheep/cluster/corosync.c | 100 ++++++++++++++++++++++++++++++++++++++++++---- 1 files changed, 92 insertions(+), 8 deletions(-) diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c index 002e06f..e0f9a9c 100644 --- a/sheep/cluster/corosync.c +++ b/sheep/cluster/corosync.c @@ -22,6 +22,29 @@ static corosync_cfg_handle_t cfg_handle; static struct cdrv_handlers corosync_handlers; +static LIST_HEAD(corosync_event_list); + +enum corosync_event_type { + COROSYNC_EVENT_TYPE_JOIN, + COROSYNC_EVENT_TYPE_LEAVE, + COROSYNC_EVENT_TYPE_NOTIFY, +}; + +struct corosync_event { + enum corosync_event_type type; + + struct sheepid joined; + struct sheepid left; + struct sheepid members[SD_MAX_NODES]; + size_t nr_members; + + struct sheepid sender; + void *msg; + size_t msg_len; + + struct list_head list; +}; + static int nodeid_to_addr(uint32_t nodeid, uint8_t *addr) { int ret, nr; @@ -68,17 +91,59 @@ static void cpg_addr_to_sheepid(const struct cpg_address *cpgs, } } +static void __corosync_dispatch(void) +{ + struct corosync_event *cevent; + + while (!list_empty(&corosync_event_list)) { + cevent = list_first_entry(&corosync_event_list, typeof(*cevent), list); + + switch (cevent->type) { + case COROSYNC_EVENT_TYPE_JOIN: + corosync_handlers.join_handler(&cevent->joined, + cevent->members, + cevent->nr_members); + break; + case COROSYNC_EVENT_TYPE_LEAVE: + corosync_handlers.leave_handler(&cevent->left, + cevent->members, + cevent->nr_members); + break; + case COROSYNC_EVENT_TYPE_NOTIFY: + corosync_handlers.notify_handler(&cevent->sender, + cevent->msg, + cevent->msg_len); + break; + } + + list_del(&cevent->list); + free(cevent); + } +} + static void cdrv_cpg_deliver(cpg_handle_t handle, const struct cpg_name *group_name, uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) { - struct sheepid sender; + struct corosync_event *cevent; + + cevent = zalloc(sizeof(*cevent)); + if (!cevent) + panic("oom\n"); + cevent->msg = zalloc(msg_len); + if (!cevent->msg) + panic("oom\n"); + + cevent->type = COROSYNC_EVENT_TYPE_NOTIFY; + nodeid_to_addr(nodeid, cevent->sender.addr); + cevent->sender.pid = pid; + memcpy(cevent->msg, msg, msg_len); + cevent->msg_len = msg_len; - nodeid_to_addr(nodeid, sender.addr); - sender.pid = pid; + list_add(&cevent->list, &corosync_event_list); - corosync_handlers.notify_handler(&sender, msg, msg_len); + __corosync_dispatch(); } static void cdrv_cpg_confchg(cpg_handle_t handle, @@ -90,6 +155,7 @@ static void cdrv_cpg_confchg(cpg_handle_t handle, const struct cpg_address *joined_list, size_t joined_list_entries) { + struct corosync_event *cevent; int i; struct sheepid member_sheeps[SD_MAX_NODES]; struct sheepid joined_sheeps[SD_MAX_NODES]; @@ -111,23 +177,41 @@ static void cdrv_cpg_confchg(cpg_handle_t handle, /* dispatch leave_handler */ for (i = 0; i < left_list_entries; i++) { + cevent = zalloc(sizeof(*cevent)); + if (!cevent) + panic("oom\n"); + sheepid_del(member_sheeps, member_list_entries, left_sheeps + i, 1); member_list_entries--; - corosync_handlers.leave_handler(left_sheeps + i, member_sheeps, - member_list_entries); + cevent->type = COROSYNC_EVENT_TYPE_LEAVE; + cevent->left = left_sheeps[i]; + memcpy(cevent->members, member_sheeps, sizeof(member_sheeps)); + cevent->nr_members = member_list_entries; + + list_add(&cevent->list, &corosync_event_list); } /* dispatch join_handler */ for (i = 0; i < joined_list_entries; i++) { + cevent = zalloc(sizeof(*cevent)); + if (!cevent) + panic("oom\n"); + sheepid_add(member_sheeps, member_list_entries, joined_sheeps, 1); member_list_entries++; - corosync_handlers.join_handler(joined_sheeps + i, member_sheeps, - member_list_entries); + cevent->type = COROSYNC_EVENT_TYPE_JOIN; + cevent->joined = joined_sheeps[i]; + memcpy(cevent->members, member_sheeps, sizeof(member_sheeps)); + cevent->nr_members = member_list_entries; + + list_add(&cevent->list, &corosync_event_list); } + + __corosync_dispatch(); } static int corosync_init(struct cdrv_handlers *handlers, struct sheepid *myid) -- 1.7.2.5 |