Currently, Sheepdog has two node lists; sd_node_list and cpg_node_list. The former is used for consistent hashing and seen from users. The latter is managed in the cluster driver and notified in join_handler/leave_handler. But this design is too complex. We should move all the cluster management stuff into the cluster driver. Main changes of this patch are as follows: - make join process one phase Node joining was really complex; cpg_confchg() notifies the newly joining node, the node multicasts a SD_MSG_JOIN message, and the master node receives it and multicasts the response. Moreover, we couldn't allow any I/O events during two multicasting. This patch moves all of them into the cluster driver. - add check_join_cb() to the join_handler() arguments This callback is called on one of the Sheepdog nodes (e.g. in the case of the corosync driver, the master server will call this). check_join_cb() checks whether the joining node may join the cluster, and returns the result. - use sheepdog_node_list_entry in the arguments of join_handler()/leave_handler() We can use the notified node list for consistent hashing now. Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp> --- sheep/cluster.h | 83 +++----- sheep/cluster/corosync.c | 465 ++++++++++++++++++++++++++++++++------------ sheep/group.c | 488 ++++++++++++++++++++++++++-------------------- sheep/sheep_priv.h | 1 - 4 files changed, 643 insertions(+), 394 deletions(-) diff --git a/sheep/cluster.h b/sheep/cluster.h index 89d0566..11e2922 100644 --- a/sheep/cluster.h +++ b/sheep/cluster.h @@ -21,17 +21,26 @@ #include "sheep.h" #include "logger.h" -struct sheepid { - uint8_t addr[16]; - uint64_t pid; +enum cluster_join_result { + CJ_RES_SUCCESS, /* Success */ + CJ_RES_FAIL, /* Fail to join. The joining node has an invalidepoch. */ + CJ_RES_JOIN_LATER, /* Fail to join. The joining node should + * be added after the cluster start working. */ + CJ_RES_MASTER_TRANSFER, /* Transfer mastership. The joining + * node has a newer epoch, so this node + * will leave the cluster (restart later). */ }; struct cdrv_handlers { - void (*join_handler)(struct sheepid *joined, struct sheepid *members, - size_t nr_members); - void (*leave_handler)(struct sheepid *left, struct sheepid *members, + void (*join_handler)(struct sheepdog_node_list_entry *joined, + struct sheepdog_node_list_entry *members, + size_t nr_members, enum cluster_join_result result, + void *opaque); + void (*leave_handler)(struct sheepdog_node_list_entry *left, + struct sheepdog_node_list_entry *members, size_t nr_members); - void (*notify_handler)(struct sheepid *sender, void *msg, size_t msg_len); + void (*notify_handler)(struct sheepdog_node_list_entry *sender, + void *msg, size_t msg_len); }; struct cluster_driver { @@ -44,17 +53,26 @@ struct cluster_driver { * may be used with the poll(2) to monitor cluster events. On * error, returns -1. */ - int (*init)(struct cdrv_handlers *handlers, struct sheepid *myid); + int (*init)(struct cdrv_handlers *handlers, uint8_t *myaddr); /* * Join the cluster * * This function is used to join the cluster, and notifies a - * join event to all the nodes. + * join event to all the nodes. The copy of 'opaque' is + * passed to check_join_cb() and join_handler(). + * check_join_cb() is called on one of the nodes which already + * paticipate in the cluster. If the content of 'opaque' is + * changed in check_join_cb(), the updated 'opaque' must be + * passed to join_handler(). * * Returns zero on success, -1 on error */ - int (*join)(void); + int (*join)(struct sheepdog_node_list_entry *myself, + enum cluster_join_result (*check_join_cb)( + struct sheepdog_node_list_entry *joining, + void *opaque), + void *opaque, size_t opaque_len); /* * Leave the cluster @@ -112,54 +130,15 @@ static void __attribute__((constructor)) regist_ ## driver(void) { \ list_for_each_entry(driver, &cluster_drivers, list) -static inline int sheepid_find(struct sheepid *sheeps, size_t nr_sheeps, - struct sheepid *key) -{ - int i; - - for (i = 0; i < nr_sheeps; i++) { - if (memcmp(sheeps + i, key, sizeof(*key)) == 0) - return i; - } - return -1; -} - -static inline void sheepid_add(struct sheepid *sheeps1, size_t nr_sheeps1, - struct sheepid *sheeps2, size_t nr_sheeps2) -{ - memcpy(sheeps1 + nr_sheeps1, sheeps2, sizeof(*sheeps2) * nr_sheeps2); -} - -static inline void sheepid_del(struct sheepid *sheeps1, size_t nr_sheeps1, - struct sheepid *sheeps2, size_t nr_sheeps2) -{ - int i, idx; - - for (i = 0; i < nr_sheeps2; i++) { - idx = sheepid_find(sheeps1, nr_sheeps1, sheeps2 + i); - if (idx < 0) - panic("internal error: cannot find sheepid\n"); - - nr_sheeps1--; - memmove(sheeps1 + idx, sheeps1 + idx + 1, - sizeof(*sheeps1) * nr_sheeps1 - idx); - } -} - -static inline char *sheepid_to_str(struct sheepid *id) +static inline char *node_to_str(struct sheepdog_node_list_entry *id) { static char str[256]; char name[256]; - snprintf(str, sizeof(str), "ip: %s, pid: %" PRIu64, - addr_to_str(name, sizeof(name), id->addr, 0), id->pid); + snprintf(str, sizeof(str), "ip: %s, port: %d", + addr_to_str(name, sizeof(name), id->addr, 0), id->port); return str; } -static inline int sheepid_cmp(struct sheepid *id1, struct sheepid *id2) -{ - return memcmp(id1, id2, sizeof(*id1)); -} - #endif diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c index 97aa1c4..5d84856 100644 --- a/sheep/cluster/corosync.c +++ b/sheep/cluster/corosync.c @@ -16,19 +16,30 @@ #include "cluster.h" #include "work.h" +struct cpg_node { + uint32_t nodeid; + uint32_t pid; + struct sheepdog_node_list_entry ent; +}; + static cpg_handle_t cpg_handle; static struct cpg_name cpg_group = { 9, "sheepdog" }; static corosync_cfg_handle_t cfg_handle; -static struct sheepid this_sheepid; +static struct cpg_node this_node; static struct work_queue *corosync_block_wq; static struct cdrv_handlers corosync_handlers; +static enum cluster_join_result (*corosync_check_join_cb)( + struct sheepdog_node_list_entry *joining, void *opaque); static LIST_HEAD(corosync_event_list); static LIST_HEAD(corosync_block_list); +static struct cpg_node cpg_nodes[SD_MAX_NODES]; +static size_t nr_cpg_nodes; + /* event types which are dispatched in corosync_dispatch() */ enum corosync_event_type { COROSYNC_EVENT_TYPE_JOIN, @@ -38,6 +49,8 @@ enum corosync_event_type { /* multicast message type */ enum corosync_message_type { + COROSYNC_MSG_TYPE_JOIN_REQUEST, + COROSYNC_MSG_TYPE_JOIN_RESPONSE, COROSYNC_MSG_TYPE_NOTIFY, COROSYNC_MSG_TYPE_BLOCK, COROSYNC_MSG_TYPE_UNBLOCK, @@ -46,19 +59,31 @@ enum corosync_message_type { struct corosync_event { enum corosync_event_type type; - struct sheepid members[SD_MAX_NODES]; - size_t nr_members; - - struct sheepid sender; + struct cpg_node sender; void *msg; size_t msg_len; + enum cluster_join_result result; + uint32_t nr_nodes; + struct cpg_node nodes[SD_MAX_NODES]; + int blocked; int callbacked; + int first_node; struct list_head list; }; +struct corosync_message { + struct cpg_node sender; + enum corosync_message_type type : 4; + enum cluster_join_result result : 4; + uint32_t msg_len; + uint32_t nr_nodes; + struct cpg_node nodes[SD_MAX_NODES]; + uint8_t msg[0]; +}; + struct corosync_block_msg { void *msg; size_t msg_len; @@ -68,6 +93,44 @@ struct corosync_block_msg { struct list_head list; }; +static int cpg_node_equal(struct cpg_node *a, struct cpg_node *b) +{ + return (a->nodeid == b->nodeid && a->pid == b->pid); +} + +static inline int find_cpg_node(struct cpg_node *nodes, size_t nr_nodes, + struct cpg_node *key) +{ + int i; + + for (i = 0; i < nr_nodes; i++) + if (cpg_node_equal(nodes + i, key)) + return i; + + return -1; +} + +static inline void add_cpg_node(struct cpg_node *nodes, size_t nr_nodes, + struct cpg_node *added) +{ + nodes[nr_nodes++] = *added; +} + +static inline void del_cpg_node(struct cpg_node *nodes, size_t nr_nodes, + struct cpg_node *deled) +{ + int idx; + + idx = find_cpg_node(nodes, nr_nodes, deled); + if (idx < 0) { + dprintf("cannot find node\n"); + return; + } + + nr_nodes--; + memmove(nodes + idx, nodes + idx + 1, sizeof(*nodes) * nr_nodes - idx); +} + static int nodeid_to_addr(uint32_t nodeid, uint8_t *addr) { int ret, nr; @@ -103,24 +166,26 @@ static int nodeid_to_addr(uint32_t nodeid, uint8_t *addr) return 0; } -static void cpg_addr_to_sheepid(const struct cpg_address *cpgs, - struct sheepid *sheeps, size_t nr) -{ - int i; - - for (i = 0; i < nr; i++) { - nodeid_to_addr(cpgs[i].nodeid, sheeps[i].addr); - sheeps[i].pid = cpgs[i].pid; - } -} - -static int send_message(uint64_t type, void *msg, size_t msg_len) +static int send_message(enum corosync_message_type type, + enum cluster_join_result result, + struct cpg_node *sender, struct cpg_node *nodes, + size_t nr_nodes, void *msg, size_t msg_len) { struct iovec iov[2]; int ret, iov_cnt = 1; + struct corosync_message cmsg = { + .type = type, + .msg_len = msg_len, + .result = result, + .sender = *sender, + .nr_nodes = nr_nodes, + }; + + if (nodes) + memcpy(cmsg.nodes, nodes, sizeof(*nodes) * nr_nodes); - iov[0].iov_base = &type; - iov[0].iov_len = sizeof(type); + iov[0].iov_base = &cmsg; + iov[0].iov_len = sizeof(cmsg); if (msg) { iov[1].iov_base = msg; iov[1].iov_len = msg_len; @@ -153,13 +218,15 @@ static void corosync_block_done(struct work *work, int idx) { struct corosync_block_msg *bm = container_of(work, typeof(*bm), work); - send_message(COROSYNC_MSG_TYPE_UNBLOCK, bm->msg, bm->msg_len); + send_message(COROSYNC_MSG_TYPE_UNBLOCK, 0, &this_node, NULL, 0, + bm->msg, bm->msg_len); free(bm->msg); free(bm); } -static struct corosync_event *find_block_event(struct sheepid *sender) +static struct corosync_event *find_block_event(enum corosync_event_type type, + struct cpg_node *sender) { struct corosync_event *cevent; @@ -167,67 +234,193 @@ static struct corosync_event *find_block_event(struct sheepid *sender) if (!cevent->blocked) continue; - if (cevent->type == COROSYNC_EVENT_TYPE_NOTIFY && - sheepid_cmp(&cevent->sender, sender) == 0) + if (cevent->type == type && + cpg_node_equal(&cevent->sender, sender)) return cevent; } return NULL; } +static int is_master(void) +{ + if (nr_cpg_nodes == 0) + /* this node should be the first cpg node */ + return 1; + + return cpg_node_equal(&cpg_nodes[0], &this_node); +} + +static void build_node_list(struct cpg_node *nodes, size_t nr_nodes, + struct sheepdog_node_list_entry *entries) +{ + int i; + + for (i = 0; i < nr_nodes; i++) + entries[i] = nodes[i].ent; +} + +/* + * Process one dispatch event + * + * Returns 1 if the event is processed + */ +static int __corosync_dispatch_one(struct corosync_event *cevent) +{ + struct corosync_block_msg *bm; + enum cluster_join_result res; + struct sheepdog_node_list_entry entries[SD_MAX_NODES]; + int idx; + + switch (cevent->type) { + case COROSYNC_EVENT_TYPE_JOIN: + if (cevent->blocked) { + if (!is_master()) + return 0; + + if (!cevent->msg) + /* we haven't receive JOIN_REQUEST yet */ + return 0; + + if (cevent->callbacked) + /* check_join() must be called only once */ + return 0; + + res = corosync_check_join_cb(&cevent->sender.ent, + cevent->msg); + if (res == CJ_RES_MASTER_TRANSFER) + nr_cpg_nodes = 0; + + send_message(COROSYNC_MSG_TYPE_JOIN_RESPONSE, res, + &cevent->sender, cpg_nodes, nr_cpg_nodes, + cevent->msg, cevent->msg_len); + + if (res == CJ_RES_MASTER_TRANSFER) { + eprintf("Restart me later when master is up, please. Bye.\n"); + exit(1); + } + + cevent->callbacked = 1; + return 0; + } + + switch (cevent->result) { + case CJ_RES_SUCCESS: + case CJ_RES_MASTER_TRANSFER: + add_cpg_node(cpg_nodes, nr_cpg_nodes, &cevent->sender); + nr_cpg_nodes++; + /* fall through */ + case CJ_RES_FAIL: + case CJ_RES_JOIN_LATER: + build_node_list(cpg_nodes, nr_cpg_nodes, entries); + corosync_handlers.join_handler(&cevent->sender.ent, entries, + nr_cpg_nodes, cevent->result, + cevent->msg); + break; + } + break; + case COROSYNC_EVENT_TYPE_LEAVE: + idx = find_cpg_node(cpg_nodes, nr_cpg_nodes, &cevent->sender); + if (idx < 0) + break; + cevent->sender.ent = cpg_nodes[idx].ent; + + del_cpg_node(cpg_nodes, nr_cpg_nodes, &cevent->sender); + nr_cpg_nodes--; + build_node_list(cpg_nodes, nr_cpg_nodes, entries); + corosync_handlers.leave_handler(&cevent->sender.ent, + entries, nr_cpg_nodes); + break; + case COROSYNC_EVENT_TYPE_NOTIFY: + if (cevent->blocked) { + if (cpg_node_equal(&cevent->sender, &this_node) && + !cevent->callbacked) { + /* call a block callback function from a worker thread */ + if (list_empty(&corosync_block_list)) + panic("cannot call block callback\n"); + + bm = list_first_entry(&corosync_block_list, + typeof(*bm), list); + list_del(&bm->list); + + bm->work.fn = corosync_block; + bm->work.done = corosync_block_done; + queue_work(corosync_block_wq, &bm->work); + + cevent->callbacked = 1; + } + + /* block the rest messages until unblock message comes */ + return 0; + } + + corosync_handlers.notify_handler(&cevent->sender.ent, cevent->msg, + cevent->msg_len); + break; + } + + return 1; +} + static void __corosync_dispatch(void) { struct corosync_event *cevent; - struct corosync_block_msg *bm; + static int join_finished; + int done; while (!list_empty(&corosync_event_list)) { cevent = list_first_entry(&corosync_event_list, typeof(*cevent), list); - switch (cevent->type) { - case COROSYNC_EVENT_TYPE_JOIN: - corosync_handlers.join_handler(&cevent->sender, - cevent->members, - cevent->nr_members); - break; - case COROSYNC_EVENT_TYPE_LEAVE: - corosync_handlers.leave_handler(&cevent->sender, - cevent->members, - cevent->nr_members); - break; - case COROSYNC_EVENT_TYPE_NOTIFY: - if (cevent->blocked) { - if (sheepid_cmp(&cevent->sender, &this_sheepid) == 0 && - !cevent->callbacked) { - /* call a block callback function from a worker thread */ - if (list_empty(&corosync_block_list)) - panic("cannot call block callback\n"); - - bm = list_first_entry(&corosync_block_list, - typeof(*bm), list); - list_del(&bm->list); - - bm->work.fn = corosync_block; - bm->work.done = corosync_block_done; - queue_work(corosync_block_wq, &bm->work); - - cevent->callbacked = 1; - } - - /* block the rest messages until unblock message comes */ - goto out; + /* update join status */ + if (!join_finished && cevent->type == COROSYNC_EVENT_TYPE_JOIN) { + if (cevent->first_node) { + join_finished = 1; + nr_cpg_nodes = 0; + } + if (!cevent->blocked && cpg_node_equal(&cevent->sender, &this_node)) { + join_finished = 1; + nr_cpg_nodes = cevent->nr_nodes; + memcpy(cpg_nodes, cevent->nodes, + sizeof(*cevent->nodes) * cevent->nr_nodes); } + } + + if (join_finished) + done = __corosync_dispatch_one(cevent); + else + done = !cevent->blocked; - corosync_handlers.notify_handler(&cevent->sender, - cevent->msg, - cevent->msg_len); + if (!done) break; - } list_del(&cevent->list); free(cevent); } -out: - return; +} + +static struct corosync_event *update_block_event(enum corosync_event_type type, + struct cpg_node *sender, + void *msg, size_t msg_len) +{ + struct corosync_event *cevent; + + cevent = find_block_event(type, sender); + if (!cevent) + /* block message was casted before this node joins */ + return NULL; + + cevent->msg_len = msg_len; + if (msg_len) { + cevent->msg = realloc(cevent->msg, msg_len); + if (!cevent->msg) + panic("oom\n"); + memcpy(cevent->msg, msg, msg_len); + } else { + free(cevent->msg); + cevent->msg = NULL; + } + + return cevent; } static void cdrv_cpg_deliver(cpg_handle_t handle, @@ -236,57 +429,67 @@ static void cdrv_cpg_deliver(cpg_handle_t handle, void *msg, size_t msg_len) { struct corosync_event *cevent; - uint64_t type; - struct sheepid sender; - - nodeid_to_addr(nodeid, sender.addr); - sender.pid = pid; - - memcpy(&type, msg, sizeof(type)); - msg = (uint8_t *)msg + sizeof(type); - msg_len -= sizeof(type); + struct corosync_message *cmsg = msg; cevent = zalloc(sizeof(*cevent)); if (!cevent) panic("oom\n"); - switch (type) { + switch (cmsg->type) { + case COROSYNC_MSG_TYPE_JOIN_REQUEST: + free(cevent); /* we don't add a new cluster event in this case */ + + cevent = update_block_event(COROSYNC_EVENT_TYPE_JOIN, &cmsg->sender, + cmsg->msg, cmsg->msg_len); + if (!cevent) + break; + + cevent->sender = cmsg->sender; + cevent->msg_len = cmsg->msg_len; + break; case COROSYNC_MSG_TYPE_BLOCK: cevent->blocked = 1; /* fall through */ case COROSYNC_MSG_TYPE_NOTIFY: cevent->type = COROSYNC_EVENT_TYPE_NOTIFY; - cevent->sender = sender; - cevent->msg_len = msg_len; - if (msg_len) { - cevent->msg = zalloc(msg_len); + + cevent->sender = cmsg->sender; + cevent->msg_len = cmsg->msg_len; + if (cmsg->msg_len) { + cevent->msg = zalloc(cmsg->msg_len); if (!cevent->msg) panic("oom\n"); - memcpy(cevent->msg, msg, msg_len); + memcpy(cevent->msg, cmsg->msg, cmsg->msg_len); } else cevent->msg = NULL; list_add_tail(&cevent->list, &corosync_event_list); break; + case COROSYNC_MSG_TYPE_JOIN_RESPONSE: + free(cevent); /* we don't add a new cluster event in this case */ + + cevent = update_block_event(COROSYNC_EVENT_TYPE_JOIN, &cmsg->sender, + cmsg->msg, cmsg->msg_len); + if (!cevent) + break; + + cevent->blocked = 0; + + cevent->result = cmsg->result; + cevent->nr_nodes = cmsg->nr_nodes; + memcpy(cevent->nodes, cmsg->nodes, + sizeof(*cmsg->nodes) * cmsg->nr_nodes); + + break; case COROSYNC_MSG_TYPE_UNBLOCK: free(cevent); /* we don't add a new cluster event in this case */ - cevent = find_block_event(&sender); + cevent = update_block_event(COROSYNC_EVENT_TYPE_NOTIFY, + &cmsg->sender, cmsg->msg, cmsg->msg_len); if (!cevent) - /* block message was casted before this node joins */ break; cevent->blocked = 0; - cevent->msg_len = msg_len; - if (msg_len) { - cevent->msg = realloc(cevent->msg, msg_len); - if (!cevent->msg) - panic("oom\n"); - memcpy(cevent->msg, msg, msg_len); - } else { - free(cevent->msg); - cevent->msg = NULL; - } break; } @@ -304,27 +507,32 @@ static void cdrv_cpg_confchg(cpg_handle_t handle, { struct corosync_event *cevent; int i; - struct sheepid member_sheeps[SD_MAX_NODES]; - struct sheepid joined_sheeps[SD_MAX_NODES]; - struct sheepid left_sheeps[SD_MAX_NODES]; - - /* convert cpg_address to sheepid*/ - cpg_addr_to_sheepid(member_list, member_sheeps, member_list_entries); - cpg_addr_to_sheepid(left_list, left_sheeps, left_list_entries); - cpg_addr_to_sheepid(joined_list, joined_sheeps, joined_list_entries); + struct cpg_node joined_sheeps[SD_MAX_NODES]; + struct cpg_node left_sheeps[SD_MAX_NODES]; - /* calculate a start member list */ - sheepid_del(member_sheeps, member_list_entries, - joined_sheeps, joined_list_entries); - member_list_entries -= joined_list_entries; - - sheepid_add(member_sheeps, member_list_entries, - left_sheeps, left_list_entries); - member_list_entries += left_list_entries; + /* convert cpg_address to cpg_node */ + for (i = 0; i < left_list_entries; i++) { + left_sheeps[i].nodeid = left_list[i].nodeid; + left_sheeps[i].pid = left_list[i].pid; + } + for (i = 0; i < joined_list_entries; i++) { + joined_sheeps[i].nodeid = joined_list[i].nodeid; + joined_sheeps[i].pid = joined_list[i].pid; + } /* dispatch leave_handler */ for (i = 0; i < left_list_entries; i++) { - cevent = find_block_event(left_sheeps + i); + cevent = find_block_event(COROSYNC_EVENT_TYPE_JOIN, + left_sheeps + i); + if (cevent) { + /* the node left before joining */ + list_del(&cevent->list); + free(cevent); + continue; + } + + cevent = find_block_event(COROSYNC_EVENT_TYPE_NOTIFY, + left_sheeps + i); if (cevent) { /* the node left before sending UNBLOCK */ list_del(&cevent->list); @@ -335,14 +543,8 @@ static void cdrv_cpg_confchg(cpg_handle_t handle, if (!cevent) panic("oom\n"); - sheepid_del(member_sheeps, member_list_entries, - left_sheeps + i, 1); - member_list_entries--; - cevent->type = COROSYNC_EVENT_TYPE_LEAVE; cevent->sender = left_sheeps[i]; - memcpy(cevent->members, member_sheeps, sizeof(member_sheeps)); - cevent->nr_members = member_list_entries; list_add_tail(&cevent->list, &corosync_event_list); } @@ -353,14 +555,12 @@ static void cdrv_cpg_confchg(cpg_handle_t handle, if (!cevent) panic("oom\n"); - sheepid_add(member_sheeps, member_list_entries, - joined_sheeps, 1); - member_list_entries++; - cevent->type = COROSYNC_EVENT_TYPE_JOIN; cevent->sender = joined_sheeps[i]; - memcpy(cevent->members, member_sheeps, sizeof(member_sheeps)); - cevent->nr_members = member_list_entries; + cevent->blocked = 1; /* FIXME: add explanation */ + if (member_list_entries == joined_list_entries - left_list_entries && + cpg_node_equal(&joined_sheeps[0], &this_node)) + cevent->first_node = 1; list_add_tail(&cevent->list, &corosync_event_list); } @@ -368,7 +568,7 @@ static void cdrv_cpg_confchg(cpg_handle_t handle, __corosync_dispatch(); } -static int corosync_init(struct cdrv_handlers *handlers, struct sheepid *myid) +static int corosync_init(struct cdrv_handlers *handlers, uint8_t *myaddr) { int ret, fd; uint32_t nodeid; @@ -398,14 +598,14 @@ static int corosync_init(struct cdrv_handlers *handlers, struct sheepid *myid) return -1; } - ret = nodeid_to_addr(nodeid, myid->addr); + ret = nodeid_to_addr(nodeid, myaddr); if (ret < 0) { eprintf("failed to get local address\n"); return -1; } - myid->pid = getpid(); - this_sheepid = *myid; + this_node.nodeid = nodeid; + this_node.pid = getpid(); ret = cpg_fd_get(cpg_handle, &fd); if (ret != CPG_OK) { @@ -418,9 +618,15 @@ static int corosync_init(struct cdrv_handlers *handlers, struct sheepid *myid) return fd; } -static int corosync_join(void) +static int corosync_join(struct sheepdog_node_list_entry *myself, + enum cluster_join_result (*check_join_cb)( + struct sheepdog_node_list_entry *joining, + void *opaque), + void *opaque, size_t opaque_len) { int ret; + + corosync_check_join_cb = check_join_cb; retry: ret = cpg_join(cpg_handle, &cpg_group); switch (ret) { @@ -438,7 +644,12 @@ retry: return -1; } - return 0; + this_node.ent = *myself; + + ret = send_message(COROSYNC_MSG_TYPE_JOIN_REQUEST, 0, &this_node, + NULL, 0, opaque, opaque_len); + + return ret; } static int corosync_leave(void) @@ -472,9 +683,11 @@ static int corosync_notify(void *msg, size_t msg_len, void (*block_cb)(void *)) bm->cb = block_cb; list_add_tail(&bm->list, &corosync_block_list); - ret = send_message(COROSYNC_MSG_TYPE_BLOCK, NULL, 0); + ret = send_message(COROSYNC_MSG_TYPE_BLOCK, 0, &this_node, + NULL, 0, NULL, 0); } else - ret = send_message(COROSYNC_MSG_TYPE_NOTIFY, msg, msg_len); + ret = send_message(COROSYNC_MSG_TYPE_NOTIFY, 0, &this_node, + NULL, 0, msg, msg_len); return ret; } diff --git a/sheep/group.c b/sheep/group.c index 6a5d309..f74db62 100644 --- a/sheep/group.c +++ b/sheep/group.c @@ -25,7 +25,6 @@ #include "cluster.h" struct node { - struct sheepid sheepid; struct sheepdog_node_list_entry ent; struct list_head list; }; @@ -42,7 +41,6 @@ struct message_header { uint8_t op; uint8_t state; uint32_t msg_length; - struct sheepid sheepid; struct sheepdog_node_list_entry from; }; @@ -58,12 +56,10 @@ struct join_message { uint8_t inc_epoch; /* set non-zero when we increment epoch of all nodes */ uint8_t pad[3]; struct { - struct sheepid sheepid; struct sheepdog_node_list_entry ent; } nodes[SD_MAX_NODES]; uint32_t nr_leave_nodes; struct { - struct sheepid sheepid; struct sheepdog_node_list_entry ent; } leave_nodes[SD_MAX_NODES]; }; @@ -94,17 +90,19 @@ struct work_notify { struct work_join { struct cpg_event cev; - struct sheepid *member_list; + struct sheepdog_node_list_entry *member_list; size_t member_list_entries; - struct sheepid joined; + struct sheepdog_node_list_entry joined; + + struct join_message jm; }; struct work_leave { struct cpg_event cev; - struct sheepid *member_list; + struct sheepdog_node_list_entry *member_list; size_t member_list_entries; - struct sheepid left; + struct sheepdog_node_list_entry left; }; #define print_node_list(node_list) \ @@ -112,11 +110,11 @@ struct work_leave { struct node *__node; \ char __name[128]; \ list_for_each_entry(__node, node_list, list) { \ - dprintf("%c pid: %ld, ip: %s\n", \ + dprintf("%c ip: %s, port: %d\n", \ is_myself(__node->ent.addr, __node->ent.port) ? 'l' : ' ', \ - __node->sheepid.pid, \ addr_to_str(__name, sizeof(__name), \ - __node->ent.addr, __node->ent.port)); \ + __node->ent.addr, __node->ent.port), \ + __node->ent.port); \ } \ }) @@ -402,12 +400,13 @@ out: exit(1); } -static struct node *find_node(struct list_head *node_list, struct sheepid *id) +static struct node *find_node(struct list_head *node_list, + struct sheepdog_node_list_entry *ent) { struct node *node; list_for_each_entry(node, node_list, list) { - if (sheepid_cmp(&node->sheepid, id) == 0) + if (node_cmp(&node->ent, ent) == 0) return node; } @@ -495,7 +494,6 @@ static int add_node_to_leave_list(struct message_header *msg) goto ret; } - n->sheepid = msg->sheepid; n->ent = msg->from; list_add_tail(&n->list, &sys->leave_list); @@ -516,7 +514,6 @@ static int add_node_to_leave_list(struct message_header *msg) continue; } - n->sheepid = jm->leave_nodes[i].sheepid; n->ent = jm->leave_nodes[i].ent; list_add_tail(&n->list, &tmp_list); @@ -661,7 +658,6 @@ out: static void join(struct join_message *msg) { - struct node *node; struct sheepdog_node_list_entry entry[SD_MAX_NODES]; int i; @@ -681,12 +677,6 @@ static void join(struct join_message *msg) msg->nr_sobjs = sys->nr_sobjs; msg->cluster_flags = sys->flags; msg->ctime = get_cluster_ctime(); - msg->nr_nodes = 0; - list_for_each_entry(node, &sys->sd_node_list, list) { - msg->nodes[msg->nr_nodes].sheepid = node->sheepid; - msg->nodes[msg->nr_nodes].ent = node->ent; - msg->nr_nodes++; - } } static int get_vdi_bitmap_from(struct sheepdog_node_list_entry *node) @@ -752,18 +742,16 @@ static void get_vdi_bitmap_from_sd_list(void) get_vdi_bitmap_from(&nodes[i]); } -static int move_node_to_sd_list(struct sheepid *id, - struct sheepdog_node_list_entry ent) +static int move_node_to_sd_list(struct sheepdog_node_list_entry ent) { struct node *node; - node = find_node(&sys->cpg_node_list, id); + node = zalloc(sizeof(*node)); if (!node) - return 1; + panic("failed to alloc memory for a new node\n"); node->ent = ent; - list_del(&node->list); list_add_tail(&node->list, &sys->sd_node_list); sys->nr_vnodes = 0; @@ -786,22 +774,14 @@ static int update_epoch_log(int epoch) return ret; } -static void update_cluster_info(struct join_message *msg) +static void update_cluster_info(struct join_message *msg, + struct sheepdog_node_list_entry *nodes, + size_t nr_nodes) { int i; - int ret, nr_nodes = msg->nr_nodes; + int ret; eprintf("status = %d, epoch = %d, %x, %d\n", msg->cluster_status, msg->epoch, msg->result, sys->join_finished); - if (msg->result != SD_RES_SUCCESS) { - if (is_myself(msg->header.from.addr, msg->header.from.port)) { - eprintf("failed to join sheepdog, %x\n", msg->result); - leave_cluster(); - eprintf("Restart me later when master is up, please.Bye.\n"); - exit(1); - /* sys->status = SD_STATUS_JOIN_FAILED; */ - } - return; - } if (sys->status == SD_STATUS_JOIN_FAILED) return; @@ -814,15 +794,16 @@ static void update_cluster_info(struct join_message *msg) sys->flags = msg->cluster_status; for (i = 0; i < nr_nodes; i++) { - ret = move_node_to_sd_list(&msg->nodes[i].sheepid, - msg->nodes[i].ent); + if (node_cmp(nodes + i, &msg->header.from) == 0) + continue; + ret = move_node_to_sd_list(nodes[i]); /* * the node belonged to sheepdog when the master build * the JOIN response however it has gone. */ if (ret) vprintf(SDOG_INFO, "%s has gone\n", - sheepid_to_str(&msg->nodes[i].sheepid)); + node_to_str(&nodes[i])); } if (msg->cluster_status == SD_STATUS_WAIT_FOR_JOIN) @@ -835,14 +816,14 @@ static void update_cluster_info(struct join_message *msg) update_epoch_log(sys->epoch); join_finished: - ret = move_node_to_sd_list(&msg->header.sheepid, msg->header.from); + ret = move_node_to_sd_list(msg->header.from); /* * this should not happen since __sd_deliver() checks if the * host from msg on cpg_node_list. */ if (ret) vprintf(SDOG_ERR, "%s has gone\n", - sheepid_to_str(&msg->header.sheepid)); + node_to_str(&msg->header.from)); if (msg->cluster_status == SD_STATUS_OK || msg->cluster_status == SD_STATUS_HALT) { @@ -1036,24 +1017,24 @@ static void __sd_notify(struct cpg_event *cevent) char name[128]; struct node *node; - dprintf("op: %d, state: %u, size: %d, from: %s, pid: %ld\n", + dprintf("op: %d, state: %u, size: %d, from: %s, port: %d\n", m->op, m->state, m->msg_length, addr_to_str(name, sizeof(name), m->from.addr, m->from.port), - m->sheepid.pid); + m->from.port); /* * we don't want to perform any deliver events except mastership_tx event * until we join; we wait for our JOIN message. */ if (!sys->join_finished && !master_tx_message(m)) { - if (sheepid_cmp(&m->sheepid, &sys->this_sheepid) != 0) { + if (node_cmp(&m->from, &sys->this_node) != 0) { cevent->skip = 1; return; } } if (join_message(m)) { - node = find_node(&sys->cpg_node_list, &m->sheepid); + node = find_node(&sys->cpg_node_list, &m->from); if (!node) { dprintf("the node was left before join operation is finished\n"); return; @@ -1061,71 +1042,6 @@ static void __sd_notify(struct cpg_event *cevent) node->ent = m->from; } - - if (m->state == DM_FIN) { - switch (m->op) { - case SD_MSG_JOIN: - if (((struct join_message *)m)->cluster_status == SD_STATUS_OK) - if (sys->status != SD_STATUS_OK) { - struct join_message *msg = (struct join_message *)m; - int i; - - get_vdi_bitmap_from_sd_list(); - get_vdi_bitmap_from(&m->from); - for (i = 0; i < msg->nr_nodes;i++) - get_vdi_bitmap_from(&msg->nodes[i].ent); - } - break; - } - } - -} - -static int tx_mastership(void) -{ - struct mastership_tx_message msg; - memset(&msg, 0, sizeof(msg)); - msg.header.proto_ver = SD_SHEEP_PROTO_VER; - msg.header.op = SD_MSG_MASTER_TRANSFER; - msg.header.state = DM_FIN; - msg.header.msg_length = sizeof(msg); - msg.header.from = sys->this_node; - msg.header.sheepid = sys->this_sheepid; - - return sys->cdrv->notify(&msg, msg.header.msg_length, NULL); -} - -static void send_join_response(struct work_notify *w) -{ - struct message_header *m; - struct join_message *jm; - struct node *node; - - m = w->msg; - jm = (struct join_message *)m; - join(jm); - m->state = DM_FIN; - - dprintf("%d, %d\n", jm->result, jm->cluster_status); - if (jm->result == SD_RES_SUCCESS && - jm->cluster_status == SD_STATUS_WAIT_FOR_JOIN) { - jm->nr_leave_nodes = 0; - list_for_each_entry(node, &sys->leave_list, list) { - jm->leave_nodes[jm->nr_leave_nodes].sheepid = node->sheepid; - jm->leave_nodes[jm->nr_leave_nodes].ent = node->ent; - jm->nr_leave_nodes++; - } - print_node_list(&sys->leave_list); - } else if (jm->result != SD_RES_SUCCESS && - jm->epoch > sys->epoch && - jm->cluster_status == SD_STATUS_WAIT_FOR_JOIN) { - eprintf("Transfer mastership.\n"); - tx_mastership(); - eprintf("Restart me later when master is up, please.Bye.\n"); - exit(1); - } - jm->epoch = sys->epoch; - sys->cdrv->notify(m, m->msg_length, NULL); } static void __sd_notify_done(struct cpg_event *cevent) @@ -1142,10 +1058,9 @@ static void __sd_notify_done(struct cpg_event *cevent) if (m->state == DM_FIN) { switch (m->op) { case SD_MSG_JOIN: - update_cluster_info((struct join_message *)m); break; case SD_MSG_LEAVE: - node = find_node(&sys->sd_node_list, &m->sheepid); + node = find_node(&sys->sd_node_list, &m->from); if (node) { sys->nr_vnodes = 0; @@ -1167,7 +1082,7 @@ static void __sd_notify_done(struct cpg_event *cevent) */ if (!sys->join_finished) { sys->join_finished = 1; - move_node_to_sd_list(&sys->this_sheepid, sys->this_node); + move_node_to_sd_list(sys->this_node); sys->epoch = get_latest_epoch(); } @@ -1202,7 +1117,6 @@ static void __sd_notify_done(struct cpg_event *cevent) if (m->state == DM_INIT && is_master()) { switch (m->op) { case SD_MSG_JOIN: - send_join_response(w); break; default: eprintf("unknown message %d\n", m->op); @@ -1226,17 +1140,18 @@ static void __sd_notify_done(struct cpg_event *cevent) } } -static void sd_notify_handler(struct sheepid *sender, void *msg, size_t msg_len) +static void sd_notify_handler(struct sheepdog_node_list_entry *sender, + void *msg, size_t msg_len) { struct cpg_event *cevent; struct work_notify *w; struct message_header *m = msg; char name[128]; - dprintf("op: %d, state: %u, size: %d, from: %s, pid: %lu\n", + dprintf("op: %d, state: %u, size: %d, from: %s, pid: %u\n", m->op, m->state, m->msg_length, addr_to_str(name, sizeof(name), m->from.addr, m->from.port), - sender->pid); + sender->port); w = zalloc(sizeof(*w)); if (!w) @@ -1264,7 +1179,7 @@ static void sd_notify_handler(struct sheepid *sender, void *msg, size_t msg_len) start_cpg_event_work(); } -static void add_node(struct sheepid *id) +static void add_node(struct sheepdog_node_list_entry *ent) { struct node *node; @@ -1272,16 +1187,16 @@ static void add_node(struct sheepid *id) if (!node) panic("failed to alloc memory for a new node\n"); - node->sheepid = *id; + node->ent = *ent; list_add_tail(&node->list, &sys->cpg_node_list); } -static int del_node(struct sheepid *id) +static int del_node(struct sheepdog_node_list_entry *ent) { struct node *node; - node = find_node(&sys->sd_node_list, id); + node = find_node(&sys->sd_node_list, ent); if (node) { int nr; struct sheepdog_node_list_entry e[SD_MAX_NODES]; @@ -1305,7 +1220,7 @@ static int del_node(struct sheepid *id) return 1; } - node = find_node(&sys->cpg_node_list, id); + node = find_node(&sys->cpg_node_list, ent); if (node) { list_del(&node->list); free(node); @@ -1317,7 +1232,7 @@ static int del_node(struct sheepid *id) /* * Check whether the majority of Sheepdog nodes are still alive or not */ -static int check_majority(struct sheepid *left) +static int check_majority(struct sheepdog_node_list_entry *left) { int nr_nodes = 0, nr_majority, nr_reachable = 0, fd; struct node *node; @@ -1332,7 +1247,7 @@ static int check_majority(struct sheepid *left) return 1; list_for_each_entry(node, &sys->sd_node_list, list) { - if (sheepid_cmp(&node->sheepid, left) == 0) + if (node_cmp(&node->ent, left) == 0) continue; addr_to_str(name, sizeof(name), node->ent.addr, 0); @@ -1352,6 +1267,23 @@ static int check_majority(struct sheepid *left) return 0; } +static void __sd_join(struct cpg_event *cevent) +{ + struct work_join *w = container_of(cevent, struct work_join, cev); + struct join_message *msg = &w->jm; + int i; + + if (msg->cluster_status != SD_STATUS_OK) + return; + + if (sys->status == SD_STATUS_OK) + return; + + get_vdi_bitmap_from_sd_list(); + for (i = 0; i < w->member_list_entries; i++) + get_vdi_bitmap_from(w->member_list + i); +} + static void __sd_leave(struct cpg_event *cevent) { struct work_leave *w = container_of(cevent, struct work_leave, cev); @@ -1362,7 +1294,73 @@ static void __sd_leave(struct cpg_event *cevent) } } -static void send_join_request(struct sheepid *id) +static enum cluster_join_result sd_check_join_cb( + struct sheepdog_node_list_entry *joining, void *opaque) +{ + struct message_header *m = opaque; + struct join_message *jm; + struct node *node; + + jm = (struct join_message *)m; + + if (node_cmp(joining, &sys->this_node) == 0) { + struct sheepdog_node_list_entry entries[SD_MAX_NODES]; + int nr_entries; + uint64_t ctime; + uint32_t epoch; + int ret; + + /* + * If I'm the first sheep joins in colosync, I + * becomes the master without sending JOIN. + */ + + vprintf(SDOG_DEBUG, "%s\n", node_to_str(&sys->this_node)); + + jm->header.from = sys->this_node; + + nr_entries = ARRAY_SIZE(entries); + ret = read_epoch(&epoch, &ctime, entries, &nr_entries); + if (ret == SD_RES_SUCCESS) { + sys->epoch = epoch; + jm->ctime = ctime; + get_cluster_status(&jm->header.from, entries, nr_entries, + ctime, epoch, &jm->cluster_status, NULL); + } else + jm->cluster_status = SD_STATUS_WAIT_FOR_FORMAT; + + return CJ_RES_SUCCESS; + } + + join(jm); + m->state = DM_FIN; + + dprintf("%d, %d\n", jm->result, jm->cluster_status); + if (jm->result == SD_RES_SUCCESS && jm->cluster_status != SD_STATUS_OK) { + jm->nr_leave_nodes = 0; + list_for_each_entry(node, &sys->leave_list, list) { + jm->leave_nodes[jm->nr_leave_nodes].ent = node->ent; + jm->nr_leave_nodes++; + } + print_node_list(&sys->leave_list); + } else if (jm->result != SD_RES_SUCCESS && + jm->epoch > sys->epoch && + jm->cluster_status == SD_STATUS_WAIT_FOR_JOIN) { + eprintf("Transfer mastership. %d, %d\n", jm->epoch, sys->epoch); + return CJ_RES_MASTER_TRANSFER; + } + jm->epoch = sys->epoch; + + if (jm->result == SD_RES_SUCCESS) + return CJ_RES_SUCCESS; + else if (jm->result == SD_RES_OLD_NODE_VER || + jm->result == SD_RES_NEW_NODE_VER) + return CJ_RES_JOIN_LATER; + else + return CJ_RES_FAIL; +} + +static int send_join_request(struct sheepdog_node_list_entry *ent) { struct join_message msg; struct sheepdog_node_list_entry entries[SD_MAX_NODES]; @@ -1373,8 +1371,7 @@ static void send_join_request(struct sheepid *id) msg.header.op = SD_MSG_JOIN; msg.header.state = DM_INIT; msg.header.msg_length = sizeof(msg); - msg.header.from = sys->this_node; - msg.header.sheepid = sys->this_sheepid; + msg.header.from = *ent; get_global_nr_copies(&msg.nr_sobjs); get_cluster_flags(&msg.cluster_flags); @@ -1387,23 +1384,24 @@ static void send_join_request(struct sheepid *id) msg.nodes[i].ent = entries[i]; } - sys->cdrv->notify(&msg, msg.header.msg_length, NULL); + ret = sys->cdrv->join(ent, sd_check_join_cb, &msg, msg.header.msg_length); + + vprintf(SDOG_INFO, "%s\n", node_to_str(&sys->this_node)); - vprintf(SDOG_INFO, "%s\n", sheepid_to_str(&sys->this_sheepid)); + return ret; } static void __sd_join_done(struct cpg_event *cevent) { struct work_join *w = container_of(cevent, struct work_join, cev); - int ret, i; - int first_cpg_node = 0; + struct join_message *jm = &w->jm; + struct node *node, *t; + int i; if (w->member_list_entries == 1 && - sheepid_cmp(&w->joined, &sys->this_sheepid) == 0) { + node_cmp(&w->joined, &sys->this_node) == 0) { sys->join_finished = 1; get_global_nr_copies(&sys->nr_sobjs); - get_cluster_flags(&sys->flags); - first_cpg_node = 1; } if (list_empty(&sys->cpg_node_list)) { @@ -1412,47 +1410,23 @@ static void __sd_join_done(struct cpg_event *cevent) } else add_node(&w->joined); - if (first_cpg_node) { - struct join_message msg; - struct sheepdog_node_list_entry entries[SD_MAX_NODES]; - int nr_entries; - uint64_t ctime; - uint32_t epoch; - - /* - * If I'm the first sheep joins in colosync, I - * becomes the master without sending JOIN. - */ - - vprintf(SDOG_DEBUG, "%s\n", sheepid_to_str(&sys->this_sheepid)); - - memset(&msg, 0, sizeof(msg)); - - msg.header.from = sys->this_node; - msg.header.sheepid = sys->this_sheepid; - - nr_entries = ARRAY_SIZE(entries); - ret = read_epoch(&epoch, &ctime, entries, &nr_entries); - if (ret == SD_RES_SUCCESS) { - sys->epoch = epoch; - msg.ctime = ctime; - get_cluster_status(&msg.header.from, entries, nr_entries, - ctime, epoch, &msg.cluster_status, NULL); - } else - msg.cluster_status = SD_STATUS_WAIT_FOR_FORMAT; - - update_cluster_info(&msg); + print_node_list(&sys->sd_node_list); - if (sys->status == SD_STATUS_OK) /* sheepdog starts with one node */ - start_recovery(sys->epoch); + update_cluster_info(jm, w->member_list, w->member_list_entries); - return; + if (sys->status == SD_STATUS_OK || sys->status == SD_STATUS_HALT) { + list_for_each_entry_safe(node, t, &sys->leave_list, list) { + list_del(&node->list); + } + start_recovery(sys->epoch); } - print_node_list(&sys->sd_node_list); + if (sys->status == SD_STATUS_HALT) { + int nr_zones = get_zones_nr_from(&sys->sd_node_list); - if (sheepid_cmp(&w->joined, &sys->this_sheepid) == 0) - send_join_request(&w->joined); + if (nr_zones >= sys->nr_sobjs) + sys->status = SD_STATUS_OK; + } } int sys_flag_nohalt() @@ -1523,6 +1497,7 @@ static void cpg_event_fn(struct work *work, int idx) switch (cevent->ctype) { case CPG_EVENT_JOIN: + __sd_join(cevent); break; case CPG_EVENT_LEAVE: __sd_leave(cevent); @@ -1815,62 +1790,147 @@ do_retry: queue_work(sys->cpg_wqueue, &cpg_event_work); } -static void sd_join_handler(struct sheepid *joined, struct sheepid *members, - size_t nr_members) +static void sd_join_handler(struct sheepdog_node_list_entry *joined, + struct sheepdog_node_list_entry *members, + size_t nr_members, enum cluster_join_result result, + void *opaque) { struct cpg_event *cevent; struct work_join *w = NULL; int i, size; + int nr, nr_local, nr_leave; + struct node *n; + struct join_message *jm; + int le = get_latest_epoch(); - dprintf("join %s\n", sheepid_to_str(joined)); - for (i = 0; i < nr_members; i++) - dprintf("[%x] %s\n", i, sheepid_to_str(members + i)); + if (node_cmp(joined, &sys->this_node) == 0) { + if (result == CJ_RES_FAIL) { + eprintf("failed to join sheepdog\n"); + sys->cdrv->leave(); + exit(1); + } else if (result == CJ_RES_JOIN_LATER) { + eprintf("Restart me later when master is up, please .Bye.\n"); + sys->cdrv->leave(); + exit(1); + } + } - if (sys->status == SD_STATUS_SHUTDOWN) - return; + switch (result) { + case CJ_RES_SUCCESS: + dprintf("join %s\n", node_to_str(joined)); + for (i = 0; i < nr_members; i++) + dprintf("[%x] %s\n", i, node_to_str(members + i)); - w = zalloc(sizeof(*w)); - if (!w) - goto oom; + if (sys->status == SD_STATUS_SHUTDOWN) + break; - cevent = &w->cev; - cevent->ctype = CPG_EVENT_JOIN; + w = zalloc(sizeof(*w)); + if (!w) + panic("oom"); + cevent = &w->cev; + cevent->ctype = CPG_EVENT_JOIN; - vprintf(SDOG_DEBUG, "allow new confchg, %p\n", cevent); + vprintf(SDOG_DEBUG, "allow new confchg, %p\n", cevent); - size = sizeof(struct sheepid) * nr_members; - w->member_list = zalloc(size); - if (!w->member_list) - goto oom; - memcpy(w->member_list, members, size); - w->member_list_entries = nr_members; + size = sizeof(struct sheepdog_node_list_entry) * nr_members; + w->member_list = zalloc(size); + if (!w->member_list) + panic("oom"); - w->joined = *joined; + memcpy(w->member_list, members, size); + w->member_list_entries = nr_members; - list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings); - start_cpg_event_work(); + w->joined = *joined; - return; -oom: - if (w) { - if (w->member_list) - free(w->member_list); - free(w); + memcpy(&w->jm, opaque, sizeof(w->jm)); + + list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings); + start_cpg_event_work(); + break; + case CJ_RES_FAIL: + case CJ_RES_JOIN_LATER: + if (sys->status != SD_STATUS_WAIT_FOR_JOIN) + break; + + n = zalloc(sizeof(*n)); + if (!n) + panic("oom\n"); + + if (find_entry_list(joined, &sys->leave_list) + || !find_entry_epoch(joined, le)) { + free(n); + break; + } + + n->ent = *joined; + + list_add_tail(&n->list, &sys->leave_list); + + nr_local = get_nodes_nr_epoch(sys->epoch); + nr = nr_members; + nr_leave = get_nodes_nr_from(&sys->leave_list); + + dprintf("%d == %d + %d \n", nr_local, nr, nr_leave); + if (nr_local == nr + nr_leave) { + sys->status = SD_STATUS_OK; + update_epoch_log(sys->epoch); + update_epoch_store(sys->epoch); + } + break; + case CJ_RES_MASTER_TRANSFER: + jm = (struct join_message *)opaque; + nr = jm->nr_leave_nodes; + for (i = 0; i < nr; i++) { + n = zalloc(sizeof(*n)); + if (!n) + panic("oom\n"); + + if (find_entry_list(&jm->leave_nodes[i].ent, &sys->leave_list) + || !find_entry_epoch(&jm->leave_nodes[i].ent, le)) { + free(n); + continue; + } + + n->ent = jm->leave_nodes[i].ent; + + list_add_tail(&n->list, &sys->leave_list); + } + + /* Sheep needs this to identify itself as master. + * Now mastership transfer is done. + */ + if (!sys->join_finished) { + sys->join_finished = 1; + move_node_to_sd_list(sys->this_node); + sys->epoch = get_latest_epoch(); + } + + nr_local = get_nodes_nr_epoch(sys->epoch); + nr = nr_members; + nr_leave = get_nodes_nr_from(&sys->leave_list); + + dprintf("%d == %d + %d \n", nr_local, nr, nr_leave); + if (nr_local == nr + nr_leave) { + sys->status = SD_STATUS_OK; + update_epoch_log(sys->epoch); + update_epoch_store(sys->epoch); + } + break; } - panic("failed to allocate memory for a confchg event\n"); } -static void sd_leave_handler(struct sheepid *left, struct sheepid *members, +static void sd_leave_handler(struct sheepdog_node_list_entry *left, + struct sheepdog_node_list_entry *members, size_t nr_members) { struct cpg_event *cevent; struct work_leave *w = NULL; int i, size; - dprintf("leave %s\n", sheepid_to_str(left)); + dprintf("leave %s\n", node_to_str(left)); for (i = 0; i < nr_members; i++) - dprintf("[%x] %s\n", i, sheepid_to_str(members + i)); + dprintf("[%x] %s\n", i, node_to_str(members + i)); if (sys->status == SD_STATUS_SHUTDOWN) return; @@ -1885,7 +1945,7 @@ static void sd_leave_handler(struct sheepid *left, struct sheepid *members, vprintf(SDOG_DEBUG, "allow new confchg, %p\n", cevent); - size = sizeof(struct sheepid) * nr_members; + size = sizeof(struct sheepdog_node_list_entry) * nr_members; w->member_list = zalloc(size); if (!w->member_list) goto oom; @@ -1927,21 +1987,15 @@ int create_cluster(int port, int64_t zone) } } - fd = sys->cdrv->init(&handlers, &sys->this_sheepid); + fd = sys->cdrv->init(&handlers, sys->this_node.addr); if (fd < 0) return -1; - ret = sys->cdrv->join(); - if (ret != 0) - return -1; - - memcpy(sys->this_node.addr, sys->this_sheepid.addr, - sizeof(sys->this_node.addr)); sys->this_node.port = port; sys->this_node.nr_vnodes = SD_DEFAULT_VNODES; if (zone == -1) { /* use last 4 bytes as zone id */ - uint8_t *b = sys->this_sheepid.addr + 12; + uint8_t *b = sys->this_node.addr + 12; sys->this_node.zone = b[0] | b[1] << 8 | b[2] << 16 | b[3] << 24; } else sys->this_node.zone = zone; @@ -1967,6 +2021,11 @@ int create_cluster(int port, int64_t zone) eprintf("Failed to register epoll events, %d\n", ret); return 1; } + + ret = send_join_request(&sys->this_node); + if (ret != 0) + return -1; + return 0; } @@ -1981,7 +2040,6 @@ int leave_cluster(void) msg.header.state = DM_FIN; msg.header.msg_length = sizeof(msg); msg.header.from = sys->this_node; - msg.header.sheepid = sys->this_sheepid; msg.epoch = get_latest_epoch(); dprintf("%d\n", msg.epoch); diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h index ff59203..798aee0 100644 --- a/sheep/sheep_priv.h +++ b/sheep/sheep_priv.h @@ -111,7 +111,6 @@ struct cluster_info { /* set after finishing the JOIN procedure */ int join_finished; - struct sheepid this_sheepid; struct sheepdog_node_list_entry this_node; uint32_t epoch; -- 1.7.2.5 |