From: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp> sd_check_join_cb doesn't check the joining node now. sd_accept_handler is a better name. Also, this renames join_request to join and join_response to accept for each cluster driver. Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp> --- include/shepherd.h | 4 ++-- sheep/cluster.h | 8 +++---- sheep/cluster/corosync.c | 53 ++++++++++++++++++++++----------------------- sheep/cluster/local.c | 26 ++++++++++------------ sheep/cluster/shepherd.c | 8 +++---- sheep/cluster/zookeeper.c | 20 ++++++++--------- sheep/group.c | 8 +++---- shepherd/shepherd.c | 4 ++-- 8 files changed, 64 insertions(+), 67 deletions(-) diff --git a/include/shepherd.h b/include/shepherd.h index fcb4655..9a4cf81 100644 --- a/include/shepherd.h +++ b/include/shepherd.h @@ -4,7 +4,7 @@ enum sph_cli_msg_type { /* messages sent by a cluster driver, received by shepherd */ SPH_CLI_MSG_JOIN = 0, - SPH_CLI_MSG_NEW_NODE_REPLY, + SPH_CLI_MSG_ACCEPT, SPH_CLI_MSG_NOTIFY, SPH_CLI_MSG_BLOCK, SPH_CLI_MSG_LEAVE, @@ -84,7 +84,7 @@ static inline const char *sph_cli_msg_to_str(enum sph_cli_msg_type msg) const char *desc; } msgs[] = { { SPH_CLI_MSG_JOIN, "SPH_CLI_MSG_JOIN" }, - { SPH_CLI_MSG_NEW_NODE_REPLY, "SPH_CLI_MSG_NEW_NODE_REPLY" }, + { SPH_CLI_MSG_ACCEPT, "SPH_CLI_MSG_ACCEPT" }, { SPH_CLI_MSG_NOTIFY, "SPH_CLI_MSG_NOTIFY" }, { SPH_CLI_MSG_BLOCK, "SPH_CLI_MSG_BLOCK" }, { SPH_CLI_MSG_LEAVE, "SPH_CLI_MSG_LEAVE" }, diff --git a/sheep/cluster.h b/sheep/cluster.h index efb7c1e..2db020c 100644 --- a/sheep/cluster.h +++ b/sheep/cluster.h @@ -52,7 +52,7 @@ struct cluster_driver { * * This function is used to join the cluster, and notifies a join * event to all the nodes. The copy of 'opaque' is passed to - * sd_check_join_cb() and sd_join_handler(). + * sd_accept_handler() and sd_join_handler(). * * sd_check_join_cb() is called on one of the nodes which already * paticipate in the cluster. If the content of 'opaque' is @@ -172,9 +172,9 @@ void sd_notify_handler(const struct sd_node *sender, void *msg, size_t msg_len); bool sd_block_handler(const struct sd_node *sender); int sd_reconnect_handler(void); void sd_update_node_handler(struct sd_node *); -void sd_check_join_cb(const struct sd_node *joining, - const struct sd_node *nodes, size_t nr_nodes, - void *opaque); +void sd_accept_handler(const struct sd_node *joining, + const struct sd_node *nodes, size_t nr_nodes, + void *opaque); void recalculate_vnodes(struct sd_node *nodes, int nr_nodes); #endif diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c index fa81b55..6a58044 100644 --- a/sheep/cluster/corosync.c +++ b/sheep/cluster/corosync.c @@ -46,8 +46,8 @@ static size_t nr_majority; /* used for network partition detection */ /* event types which are dispatched in corosync_dispatch() */ enum corosync_event_type { - COROSYNC_EVENT_TYPE_JOIN_REQUEST, - COROSYNC_EVENT_TYPE_JOIN_RESPONSE, + COROSYNC_EVENT_TYPE_JOIN, + COROSYNC_EVENT_TYPE_ACCEPT, COROSYNC_EVENT_TYPE_LEAVE, COROSYNC_EVENT_TYPE_BLOCK, COROSYNC_EVENT_TYPE_NOTIFY, @@ -56,8 +56,8 @@ enum corosync_event_type { /* multicast message type */ enum corosync_message_type { - COROSYNC_MSG_TYPE_JOIN_REQUEST, - COROSYNC_MSG_TYPE_JOIN_RESPONSE, + COROSYNC_MSG_TYPE_JOIN, + COROSYNC_MSG_TYPE_ACCEPT, COROSYNC_MSG_TYPE_LEAVE, COROSYNC_MSG_TYPE_NOTIFY, COROSYNC_MSG_TYPE_BLOCK, @@ -281,28 +281,28 @@ static bool __corosync_dispatch_one(struct corosync_event *cevent) int idx; switch (cevent->type) { - case COROSYNC_EVENT_TYPE_JOIN_REQUEST: + case COROSYNC_EVENT_TYPE_JOIN: if (is_master(&this_node) < 0) return false; if (!cevent->msg) - /* we haven't receive JOIN_REQUEST yet */ + /* we haven't receive JOIN yet */ return false; if (cevent->callbacked) - /* check_join() must be called only once */ + /* sd_accept_handler() must be called only once */ return false; build_node_list(cpg_nodes, nr_cpg_nodes, entries); - sd_check_join_cb(&cevent->sender.node, entries, nr_cpg_nodes, - cevent->msg); - send_message(COROSYNC_MSG_TYPE_JOIN_RESPONSE, &cevent->sender, + sd_accept_handler(&cevent->sender.node, entries, nr_cpg_nodes, + cevent->msg); + send_message(COROSYNC_MSG_TYPE_ACCEPT, &cevent->sender, cpg_nodes, nr_cpg_nodes, cevent->msg, cevent->msg_len); cevent->callbacked = true; return false; - case COROSYNC_EVENT_TYPE_JOIN_RESPONSE: + case COROSYNC_EVENT_TYPE_ACCEPT: add_cpg_node(cpg_nodes, nr_cpg_nodes, &cevent->sender); nr_cpg_nodes++; @@ -384,13 +384,13 @@ static void __corosync_dispatch(void) /* update join status */ if (!join_finished) { switch (cevent->type) { - case COROSYNC_EVENT_TYPE_JOIN_REQUEST: + case COROSYNC_EVENT_TYPE_JOIN: if (self_elect) { join_finished = true; nr_cpg_nodes = 0; } break; - case COROSYNC_EVENT_TYPE_JOIN_RESPONSE: + case COROSYNC_EVENT_TYPE_ACCEPT: if (cpg_node_equal(&cevent->sender, &this_node)) { join_finished = true; @@ -410,7 +410,7 @@ static void __corosync_dispatch(void) return; } else { switch (cevent->type) { - case COROSYNC_MSG_TYPE_JOIN_REQUEST: + case COROSYNC_MSG_TYPE_JOIN: case COROSYNC_MSG_TYPE_BLOCK: return; default: @@ -469,9 +469,9 @@ static void cdrv_cpg_deliver(cpg_handle_t handle, sd_dprintf("%d", cmsg->type); switch (cmsg->type) { - case COROSYNC_MSG_TYPE_JOIN_REQUEST: - cevent = update_event(COROSYNC_EVENT_TYPE_JOIN_REQUEST, - &cmsg->sender, cmsg->msg, cmsg->msg_len); + case COROSYNC_MSG_TYPE_JOIN: + cevent = update_event(COROSYNC_EVENT_TYPE_JOIN, &cmsg->sender, + cmsg->msg, cmsg->msg_len); if (!cevent) break; @@ -536,13 +536,13 @@ static void cdrv_cpg_deliver(cpg_handle_t handle, queue_event(cevent); break; - case COROSYNC_MSG_TYPE_JOIN_RESPONSE: - cevent = update_event(COROSYNC_EVENT_TYPE_JOIN_REQUEST, - &cmsg->sender, cmsg->msg, cmsg->msg_len); + case COROSYNC_MSG_TYPE_ACCEPT: + cevent = update_event(COROSYNC_EVENT_TYPE_JOIN, &cmsg->sender, + cmsg->msg, cmsg->msg_len); if (!cevent) break; - cevent->type = COROSYNC_EVENT_TYPE_JOIN_RESPONSE; + cevent->type = COROSYNC_EVENT_TYPE_ACCEPT; cevent->nr_nodes = cmsg->nr_nodes; memcpy(cevent->nodes, cmsg->nodes, sizeof(*cmsg->nodes) * cmsg->nr_nodes); @@ -610,8 +610,7 @@ static void cdrv_cpg_confchg(cpg_handle_t handle, /* dispatch leave_handler */ for (i = 0; i < left_list_entries; i++) { int master; - cevent = find_event(COROSYNC_EVENT_TYPE_JOIN_REQUEST, - left_sheep + i); + cevent = find_event(COROSYNC_EVENT_TYPE_JOIN, left_sheep + i); if (cevent) { /* the node left before joining */ list_del(&cevent->list); @@ -647,7 +646,7 @@ static void cdrv_cpg_confchg(cpg_handle_t handle, /* dispatch join_handler */ for (i = 0; i < joined_list_entries; i++) { cevent = xzalloc(sizeof(*cevent)); - cevent->type = COROSYNC_EVENT_TYPE_JOIN_REQUEST; + cevent->type = COROSYNC_EVENT_TYPE_JOIN; cevent->sender = joined_sheep[i]; queue_event(cevent); } @@ -658,7 +657,7 @@ static void cdrv_cpg_confchg(cpg_handle_t handle, * all other members, because events are ordered. */ for (i = 0; i < member_list_entries; i++) { - cevent = find_event(COROSYNC_EVENT_TYPE_JOIN_REQUEST, + cevent = find_event(COROSYNC_EVENT_TYPE_JOIN, &member_sheep[i]); if (!cevent) { sd_dprintf("Not promoting because member is " @@ -702,8 +701,8 @@ retry: this_node.node = *myself; - ret = send_message(COROSYNC_MSG_TYPE_JOIN_REQUEST, &this_node, NULL, 0, - opaque, opaque_len); + ret = send_message(COROSYNC_MSG_TYPE_JOIN, &this_node, NULL, 0, opaque, + opaque_len); return ret; } diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c index 707224c..8757996 100644 --- a/sheep/cluster/local.c +++ b/sheep/cluster/local.c @@ -61,8 +61,8 @@ static bool lnode_eq(const struct local_node *a, const struct local_node *b) } enum local_event_type { - EVENT_JOIN_REQUEST = 1, - EVENT_JOIN_RESPONSE, + EVENT_JOIN = 1, + EVENT_ACCEPT, EVENT_LEAVE, EVENT_GATEWAY, EVENT_BLOCK, @@ -259,7 +259,7 @@ static int add_event(enum local_event_type type, struct local_node *lnode, ev.nr_lnodes = get_nodes(ev.lnodes); switch (type) { - case EVENT_JOIN_REQUEST: + case EVENT_JOIN: ev.lnodes[ev.nr_lnodes] = *lnode; ev.nr_lnodes++; break; @@ -277,7 +277,7 @@ static int add_event(enum local_event_type type, struct local_node *lnode, n = xlfind(lnode, ev.lnodes, ev.nr_lnodes, lnode_cmp); n->node = lnode->node; break; - case EVENT_JOIN_RESPONSE: + case EVENT_ACCEPT: abort(); } @@ -341,8 +341,7 @@ static int local_join(const struct sd_node *myself, this_node.pid = getpid(); this_node.gateway = false; - return add_event_lock(EVENT_JOIN_REQUEST, &this_node, opaque, - opaque_len); + return add_event_lock(EVENT_JOIN, &this_node, opaque, opaque_len); } static int local_leave(void) @@ -409,14 +408,13 @@ static bool local_process_event(void) if (ev->callbacked) return false; /* wait for unblock event */ - if (ev->type == EVENT_JOIN_RESPONSE && - lnode_eq(&this_node, &ev->sender)) { + if (ev->type == EVENT_ACCEPT && lnode_eq(&this_node, &ev->sender)) { sd_dprintf("join Sheepdog"); joined = true; } if (!joined) { - if (ev->type == EVENT_JOIN_REQUEST && + if (ev->type == EVENT_JOIN && lnode_eq(&this_node, &ev->sender)) { struct local_node lnodes[SD_MAX_NODES]; @@ -432,18 +430,18 @@ static bool local_process_event(void) } switch (ev->type) { - case EVENT_JOIN_REQUEST: + case EVENT_JOIN: /* nodes[nr_nodes - 1] is a sender, so don't include it */ assert(node_eq(&ev->sender.node, &nodes[nr_nodes - 1])); - sd_check_join_cb(&ev->sender.node, nodes, nr_nodes - 1, - ev->buf); - ev->type = EVENT_JOIN_RESPONSE; + sd_accept_handler(&ev->sender.node, nodes, nr_nodes - 1, + ev->buf); + ev->type = EVENT_ACCEPT; msync(ev, sizeof(*ev), MS_SYNC); shm_queue_notify(); return false; - case EVENT_JOIN_RESPONSE: + case EVENT_ACCEPT: sd_join_handler(&ev->sender.node, nodes, nr_nodes, ev->buf); break; case EVENT_LEAVE: diff --git a/sheep/cluster/shepherd.c b/sheep/cluster/shepherd.c index 28ffc66..ed79452 100644 --- a/sheep/cluster/shepherd.c +++ b/sheep/cluster/shepherd.c @@ -114,13 +114,13 @@ retry: * FIXME: member change events must be ordered with nonblocked * events */ - sd_check_join_cb(&join->new_node, NULL, 0, join->opaque); + sd_accept_handler(&join->new_node, NULL, 0, join->opaque); /* FIXME: join->master_elected is needed? */ assert(join->master_elected); is_master = true; - snd.type = SPH_CLI_MSG_NEW_NODE_REPLY; + snd.type = SPH_CLI_MSG_ACCEPT; snd.body_len = join_len; ret = writev2(sph_comm_fd, &snd, join, join_len); @@ -325,10 +325,10 @@ static void msg_new_node(struct sph_msg *rcv) } /* FIXME: member change events must be ordered with nonblocked events */ - sd_check_join_cb(&join->new_node, nodes, nr_nodes, join->opaque); + sd_accept_handler(&join->new_node, nodes, nr_nodes, join->opaque); memset(&snd, 0, sizeof(snd)); - snd.type = SPH_CLI_MSG_NEW_NODE_REPLY; + snd.type = SPH_CLI_MSG_ACCEPT; snd.body_len = rcv->body_len; ret = writev2(sph_comm_fd, &snd, join, rcv->body_len); diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c index 3fa22d0..f450790 100644 --- a/sheep/cluster/zookeeper.c +++ b/sheep/cluster/zookeeper.c @@ -41,8 +41,8 @@ free(*(strs)->data)) enum zk_event_type { - EVENT_JOIN_REQUEST = 1, - EVENT_JOIN_RESPONSE, + EVENT_JOIN = 1, + EVENT_ACCEPT, EVENT_LEAVE, EVENT_BLOCK, EVENT_UNBLOCK, @@ -406,7 +406,7 @@ static int push_join_response(struct zk_event *ev) char path[MAX_NODE_STR_LEN]; int len; - ev->type = EVENT_JOIN_RESPONSE; + ev->type = EVENT_ACCEPT; ev->nr_nodes = nr_sd_nodes; memcpy(zk_event_sd_nodes(ev), sd_nodes, nr_sd_nodes * sizeof(struct sd_node)); @@ -597,7 +597,7 @@ static int add_join_event(void *msg, size_t msg_len) assert(len <= SD_MAX_EVENT_BUF_SIZE); ev.id = get_uniq_id(); - ev.type = EVENT_JOIN_REQUEST; + ev.type = EVENT_JOIN; ev.sender = this_node; ev.msg_len = msg_len; ev.buf_len = len; @@ -836,7 +836,7 @@ static int zk_unblock(void *msg, size_t msg_len) return add_event(EVENT_UNBLOCK, &this_node, msg, msg_len); } -static void zk_handle_join_request(struct zk_event *ev) +static void zk_handle_join(struct zk_event *ev) { sd_dprintf("sender: %s", node_to_str(&ev->sender.node)); if (!uatomic_is_true(&is_master)) { @@ -845,7 +845,7 @@ static void zk_handle_join_request(struct zk_event *ev) return; } - sd_check_join_cb(&ev->sender.node, sd_nodes, nr_sd_nodes, ev->buf); + sd_accept_handler(&ev->sender.node, sd_nodes, nr_sd_nodes, ev->buf); push_join_response(ev); sd_dprintf("I'm the master now"); @@ -880,12 +880,12 @@ static void init_node_list(struct zk_event *ev) watch_all_nodes(); } -static void zk_handle_join_response(struct zk_event *ev) +static void zk_handle_accept(struct zk_event *ev) { char path[MAX_NODE_STR_LEN]; int rc; - sd_dprintf("JOIN RESPONSE"); + sd_dprintf("ACCEPT"); if (node_eq(&ev->sender.node, &this_node.node)) /* newly joined node */ init_node_list(ev); @@ -1002,8 +1002,8 @@ static void zk_handle_update_node(struct zk_event *ev) } static void (*const zk_event_handlers[])(struct zk_event *ev) = { - [EVENT_JOIN_REQUEST] = zk_handle_join_request, - [EVENT_JOIN_RESPONSE] = zk_handle_join_response, + [EVENT_JOIN] = zk_handle_join, + [EVENT_ACCEPT] = zk_handle_accept, [EVENT_LEAVE] = zk_handle_leave, [EVENT_BLOCK] = zk_handle_block, [EVENT_UNBLOCK] = zk_handle_unblock, diff --git a/sheep/group.c b/sheep/group.c index 87746d7..8b52198 100644 --- a/sheep/group.c +++ b/sheep/group.c @@ -776,13 +776,13 @@ void sd_notify_handler(const struct sd_node *sender, void *data, } /* - * Check whether the joining nodes can join the sheepdog cluster. + * Accept the joining node and pass the cluster info to it. * * Note that 'nodes' doesn't contain 'joining'. */ -void sd_check_join_cb(const struct sd_node *joining, - const struct sd_node *nodes, size_t nr_nodes, - void *opaque) +void sd_accept_handler(const struct sd_node *joining, + const struct sd_node *nodes, size_t nr_nodes, + void *opaque) { struct join_message *jm = opaque; char str[MAX_NODE_STR_LEN]; diff --git a/shepherd/shepherd.c b/shepherd/shepherd.c index d294cb5..64795da 100644 --- a/shepherd/shepherd.c +++ b/shepherd/shepherd.c @@ -377,7 +377,7 @@ purge_current_sheep: remove_sheep(sheep); } -static void sph_handle_new_node_reply(struct sph_msg *msg, struct sheep *sheep) +static void sph_handle_accept(struct sph_msg *msg, struct sheep *sheep) { int fd = sheep->fd, removed = 0; ssize_t rbytes, wbytes; @@ -627,7 +627,7 @@ fwd_leave_failed: static void (*msg_handlers[])(struct sph_msg*, struct sheep *) = { [SPH_CLI_MSG_JOIN] = sph_handle_join, - [SPH_CLI_MSG_NEW_NODE_REPLY] = sph_handle_new_node_reply, + [SPH_CLI_MSG_ACCEPT] = sph_handle_accept, [SPH_CLI_MSG_NOTIFY] = sph_handle_notify, [SPH_CLI_MSG_BLOCK] = sph_handle_block, [SPH_CLI_MSG_LEAVE] = sph_handle_leave, -- 1.7.9.5 |