From: Yunkai Zhang <qiushu.zyk at taobao.com> After delay recovery start, all recovery operation in sd_join_handler or sd_leave_handler will be paused. old vnode information will be kept in a newly static variable named old_vnode_info in group.c wich will be used by following recovery operation. a delay_recovery variable was added in join_message so that joining sheep can share cluster's delay_recovery status. During delay recovery transaction, joined and left nodes will be stored into an global array so that we can show inner status to user when necessary(next patch will use it). Only one recovery operation will be executed when user sending "collie delay_recovery stop" command. One flag do_delay_recovery is added to indicate whether there are reovery works to be done. Ceating a new function get_old_vnode_info() so that other code can access old_vnode_info variable. Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com> --- include/internal_proto.h | 6 ++++ sheep/group.c | 72 +++++++++++++++++++++++++++++++++++------------- sheep/ops.c | 16 +++++++++++ sheep/sheep_priv.h | 4 +++ 4 files changed, 79 insertions(+), 19 deletions(-) diff --git a/include/internal_proto.h b/include/internal_proto.h index d6a7989..e38d843 100644 --- a/include/internal_proto.h +++ b/include/internal_proto.h @@ -196,6 +196,7 @@ struct join_message { uint32_t epoch; uint64_t ctime; uint8_t inc_epoch; /* set non-zero when we increment epoch of all nodes */ + uint8_t delay_recovery; uint8_t store[STORE_LEN]; /* @@ -214,4 +215,9 @@ struct vdi_op_message { uint8_t data[0]; }; +struct recovery_node { + int type; /* 0:join, 1:left */ + struct sd_node node; +}; + #endif /* __INTERNAL_PROTO_H__ */ diff --git a/sheep/group.c b/sheep/group.c index f7c8ca7..c20a12b 100644 --- a/sheep/group.c +++ b/sheep/group.c @@ -38,6 +38,11 @@ struct vdi_bitmap_work { struct sd_node members[]; }; +/* Using for delay recovery */ +struct recovery_node dr_nodes[SD_MAX_NODES]; +size_t nr_dr_nodes = 0; + +static struct vnode_info *old_vnode_info; static struct vnode_info *current_vnode_info; static size_t get_join_message_size(struct join_message *jm) @@ -147,6 +152,17 @@ struct vnode_info *get_vnode_info(void) } /* + * Get a reference to the old vnode information structure, + * this must only be called from the main thread. + */ +struct vnode_info *get_old_vnode_info(void) +{ + assert(old_vnode_info); + + return grab_vnode_info(old_vnode_info); +} + +/* * Release a reference to the current vnode information. * * Must be called from the main thread. @@ -478,7 +494,7 @@ static void format_exceptional_node_list(struct join_message *jm) jm->nodes[jm->nr_failed_nodes + jm->nr_delayed_nodes++] = n->ent; } -static void clear_exceptional_node_lists(void) +void clear_exceptional_node_lists(void) { struct node *n, *t; @@ -780,15 +796,19 @@ static void update_cluster_info(struct join_message *msg, struct sd_node *joined, struct sd_node *nodes, size_t nr_nodes) { - struct vnode_info *old_vnode_info; - eprintf("status = %d, epoch = %d, finished: %d\n", msg->cluster_status, msg->epoch, sys->join_finished); + sys->delay_recovery = msg->delay_recovery; + if (!sys->join_finished) finish_join(msg, joined, nodes, nr_nodes); - old_vnode_info = current_vnode_info; + if (sys->delay_recovery == SD_DELAY_RECOVERY_STOP) { + put_vnode_info(old_vnode_info); + old_vnode_info = current_vnode_info; + } + current_vnode_info = alloc_vnode_info(nodes, nr_nodes); switch (msg->cluster_status) { @@ -813,16 +833,23 @@ static void update_cluster_info(struct join_message *msg, sys->status = msg->cluster_status; if (msg->inc_epoch) { - uatomic_inc(&sys->epoch); - log_current_epoch(); - clear_exceptional_node_lists(); - if (!old_vnode_info) { old_vnode_info = alloc_old_vnode_info(joined, - nodes, nr_nodes); + nodes, nr_nodes); } - start_recovery(current_vnode_info, old_vnode_info); + if (sys->delay_recovery == SD_DELAY_RECOVERY_STOP) { + uatomic_inc(&sys->epoch); + log_current_epoch(); + clear_exceptional_node_lists(); + + start_recovery(current_vnode_info, + old_vnode_info); + } else { + /* 0:joined, 1:left */ + dr_nodes[nr_dr_nodes].type = 0; + dr_nodes[nr_dr_nodes++].node = *joined; + } } if (have_enough_zones()) @@ -833,8 +860,6 @@ static void update_cluster_info(struct join_message *msg, break; } - put_vnode_info(old_vnode_info); - sockfd_cache_add(&joined->nid); } @@ -898,6 +923,7 @@ enum cluster_join_result sd_check_join_cb(struct sd_node *joining, void *opaque) vprintf(SDOG_DEBUG, "%s\n", node_to_str(&sys->this_node)); jm->cluster_status = sys->status; + jm->delay_recovery = sys->delay_recovery; epoch = get_latest_epoch(); if (!epoch) @@ -922,6 +948,7 @@ enum cluster_join_result sd_check_join_cb(struct sd_node *joining, void *opaque) } jm->cluster_status = sys->status; + jm->delay_recovery = sys->delay_recovery; jm->inc_epoch = 0; switch (sys->status) { @@ -1084,7 +1111,6 @@ void sd_join_handler(struct sd_node *joined, struct sd_node *members, void sd_leave_handler(struct sd_node *left, struct sd_node *members, size_t nr_members) { - struct vnode_info *old_vnode_info; int i; dprintf("leave %s\n", node_to_str(left)); @@ -1094,15 +1120,25 @@ void sd_leave_handler(struct sd_node *left, struct sd_node *members, if (sys->status == SD_STATUS_SHUTDOWN) return; - old_vnode_info = current_vnode_info; + if (sys->delay_recovery == SD_DELAY_RECOVERY_STOP) { + put_vnode_info(old_vnode_info); + old_vnode_info = current_vnode_info; + } + current_vnode_info = alloc_vnode_info(members, nr_members); switch (sys->status) { case SD_STATUS_HALT: case SD_STATUS_OK: - uatomic_inc(&sys->epoch); - log_current_epoch(); - start_recovery(current_vnode_info, old_vnode_info); + if (sys->delay_recovery == SD_DELAY_RECOVERY_STOP) { + uatomic_inc(&sys->epoch); + log_current_epoch(); + start_recovery(current_vnode_info, old_vnode_info); + }else { + /* 0:joined, 1:left */ + dr_nodes[nr_dr_nodes].type = 1; + dr_nodes[nr_dr_nodes++].node = *left; + } if (!have_enough_zones()) sys->status = SD_STATUS_HALT; @@ -1111,8 +1147,6 @@ void sd_leave_handler(struct sd_node *left, struct sd_node *members, break; } - put_vnode_info(old_vnode_info); - sockfd_cache_del(&left->nid); } diff --git a/sheep/ops.c b/sheep/ops.c index c0fa98e..45279ff 100644 --- a/sheep/ops.c +++ b/sheep/ops.c @@ -270,6 +270,22 @@ static int cluster_start_delay_recovery(const struct sd_req *req, static int cluster_stop_delay_recovery(const struct sd_req *req, struct sd_rsp *rsp, void *data) { + struct vnode_info *old_vnode_info, *current_vnode_info; + + if (nr_dr_nodes) { + + old_vnode_info = get_old_vnode_info(); + current_vnode_info = get_vnode_info(); + + uatomic_inc(&sys->epoch); + log_current_epoch(); + clear_exceptional_node_lists(); + + start_recovery(current_vnode_info, + old_vnode_info); + } + + nr_dr_nodes = 0; sys->delay_recovery = SD_DELAY_RECOVERY_STOP; return SD_RES_SUCCESS; } diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h index 22d9d6c..b81e20b 100644 --- a/sheep/sheep_priv.h +++ b/sheep/sheep_priv.h @@ -181,6 +181,8 @@ extern char *jrnl_path; extern char *epoch_path; extern mode_t def_fmode; extern mode_t def_dmode; +extern struct recovery_node dr_nodes[]; +extern size_t nr_dr_nodes; /* One should call this function to get sys->epoch outside main thread */ static inline uint32_t sys_epoch(void) @@ -213,8 +215,10 @@ int local_get_node_list(const struct sd_req *req, struct sd_rsp *rsp, void *data); bool have_enough_zones(void); +void clear_exceptional_node_lists(void); struct vnode_info *grab_vnode_info(struct vnode_info *vnode_info); struct vnode_info *get_vnode_info(void); +struct vnode_info *get_old_vnode_info(void); void put_vnode_info(struct vnode_info *vnodes); struct vnode_info *get_vnode_info_epoch(uint32_t epoch); -- 1.7.11.2 |