[sheepdog] [PATCH RFC 03/11] sheep: delay to process recovery caused by LEAVE event just like JOIN event
Yunkai Zhang
yunkai.me at gmail.com
Wed Aug 8 23:14:15 CEST 2012
From: Yunkai Zhang <qiushu.zyk at taobao.com>
Add an inner array used to contain all leaving nodes which will be shared with
newly joining sheep. We need leaving nodes to recalculate all_nodes when
generate current_vnode_info.
Add a new flag named *left* in sd_node structure. This flag indicates whether
this node have been left after disabled recovery, even if this node joined again
later. All nodes of which flag was set may lose updating of its data objects,
we will restore these objects according this flag in flow-up patches.
Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com>
---
include/internal_proto.h | 3 ++
sheep/group.c | 135 +++++++++++++++++++++++++++++++++++++++++++----
sheep/ops.c | 13 ++++-
sheep/sheep_priv.h | 32 +++++++++++
4 files changed, 171 insertions(+), 12 deletions(-)
diff --git a/include/internal_proto.h b/include/internal_proto.h
index 717fb79..e0ea5cd 100644
--- a/include/internal_proto.h
+++ b/include/internal_proto.h
@@ -177,6 +177,7 @@ struct sd_node {
uint16_t nr_vnodes;
uint32_t zone;
uint64_t space;
+ uint8_t left;
};
struct epoch_log {
@@ -195,6 +196,7 @@ struct join_message {
uint16_t nr_failed_nodes;
uint16_t nr_delayed_nodes;
uint16_t nr_joining_nodes;
+ uint16_t nr_leaving_nodes;
uint16_t cluster_flags;
uint32_t cluster_status;
uint32_t epoch;
@@ -210,6 +212,7 @@ struct join_message {
* [ failed nodes ]: size = nr_failed_nodes
* [ delayed nodes ]: size = nr_delayed_nodes
* [ joining nodes ]: size = nr_joining_nodes
+ * [ leaving nodes ]: size = nr_leaving_nodes
*/
struct sd_node nodes[];
};
diff --git a/sheep/group.c b/sheep/group.c
index ad3447d..30730df 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -40,11 +40,24 @@ struct vdi_bitmap_work {
struct sd_node joining_nodes[SD_MAX_NODES];
size_t nr_joining_nodes;
+struct sd_node leaving_nodes[SD_MAX_NODES];
+size_t nr_leaving_nodes;
struct sd_node all_nodes[SD_MAX_NODES];
size_t nr_all_nodes;
static struct vnode_info *current_vnode_info;
+static void node_list_show(struct sd_node *nodes, size_t nr_nodes,
+ const char *tag) {
+ int i;
+
+ dprintf("%s: %zu\n", tag, nr_nodes);
+
+ for (i = 0; i < nr_nodes; i++) {
+ dprintf("%s: %s\n", tag, node_to_str(nodes + i));
+ }
+}
+
static size_t get_join_message_size(struct join_message *jm)
{
/*
@@ -767,16 +780,34 @@ static void get_vdi_bitmap(struct sd_node *nodes, size_t nr_nodes)
queue_work(sys->block_wqueue, &w->work);
}
-static void prepare_recovery(struct join_message *jm,
+static void prepare_join_recovery(struct join_message *jm,
struct sd_node *joined,
struct sd_node *nodes, size_t nr_nodes)
{
int i, j, n, found;
+ struct sd_node *node;
+
+ if ((node = find_node(joined, leaving_nodes, nr_leaving_nodes))) {
+ node->left = 0;
+ return;
+ }
joining_nodes[nr_joining_nodes++] = *joined;
if (!nr_all_nodes) {
+ /* initialize leaving_nodes */
+ n = jm->nr_failed_nodes + jm->nr_delayed_nodes
+ + jm->nr_joining_nodes;
+ memcpy(leaving_nodes, &jm->nodes[n],
+ jm->nr_leaving_nodes * sizeof(*leaving_nodes));
+ nr_leaving_nodes = jm->nr_leaving_nodes;
+
/* initialize joining_nodes */
+ if ((node = find_node(joined, leaving_nodes,
+ nr_leaving_nodes))) {
+ nr_joining_nodes = 0;
+ node->left = 0;
+ }
n = jm->nr_failed_nodes + jm->nr_delayed_nodes;
memcpy(&joining_nodes[nr_joining_nodes], &jm->nodes[n],
jm->nr_joining_nodes * sizeof(*joining_nodes));
@@ -795,10 +826,79 @@ static void prepare_recovery(struct join_message *jm,
if (!found)
all_nodes[nr_all_nodes++] = nodes[i];
}
+
+ for (found = 0, i = 0; i < nr_leaving_nodes; i++) {
+ /* include leaving nodes which does not join back */
+ if (leaving_nodes[i].left) {
+ all_nodes[nr_all_nodes] = leaving_nodes[i];
+ all_nodes[nr_all_nodes++].left = 0;
+ }
+ }
}
+ node_list_show(joining_nodes, nr_joining_nodes, "joining_nodes");
+ node_list_show(leaving_nodes, nr_leaving_nodes, "leaving_nodes");
+ node_list_show(all_nodes, nr_all_nodes, "all_nodes");
- if (!current_vnode_info)
+ if (!current_vnode_info) {
+ struct vnode_info *vi;
current_vnode_info = alloc_vnode_info(all_nodes, nr_all_nodes);
+
+ /*
+ * update node's left state in current_vnode_info
+ * according leaving_nodes
+ */
+ vi = current_vnode_info;
+ for (i = 0; i < vi->nr_nodes; i++) {
+ for (j = 0; j < nr_leaving_nodes; j++) {
+ if (node_eq(vi->nodes + i, leaving_nodes + j)) {
+ vi->nodes[i].left = 1;
+ break;
+ }
+ }
+ }
+ }
+}
+
+static void prepare_leave_recovery(struct sd_node *left,
+ struct sd_node *nodes, size_t nr_nodes)
+{
+ int i, n;
+ struct sd_node *node;
+ struct vnode_info *vi;
+
+ if (!nr_all_nodes) {
+ memcpy(all_nodes, nodes, nr_nodes * sizeof(*nodes));
+ nr_all_nodes = nr_nodes;
+
+ /* include the newly left one */
+ all_nodes[nr_all_nodes++] = *left;
+ }
+
+ n = delete_node(left, joining_nodes, nr_joining_nodes);
+ if (n < nr_joining_nodes) {
+ nr_joining_nodes = n;
+ return;
+ }
+
+ if ((node = find_node(left, leaving_nodes, nr_leaving_nodes))) {
+ node->left = 1;
+ return;
+ }
+
+ leaving_nodes[nr_leaving_nodes] = *left;
+ leaving_nodes[nr_leaving_nodes++].left = 1;
+
+ /*
+ * update node's left state in current_vnode_info
+ * according left node
+ */
+ vi = current_vnode_info;
+ for (i = 0; i < vi->nr_nodes; i++) {
+ if (node_eq(vi->nodes + i, left)) {
+ vi->nodes[i].left = 1;
+ break;
+ }
+ }
}
void recalculate_vnodes(struct sd_node *nodes, int nr_nodes)
@@ -882,7 +982,8 @@ static void update_cluster_info(struct join_message *msg,
start_recovery(current_vnode_info,
old_vnode_info);
} else
- prepare_recovery(msg, joined, nodes, nr_nodes);
+ prepare_join_recovery(msg, joined, nodes,
+ nr_nodes);
}
if (have_enough_zones())
@@ -1034,10 +1135,18 @@ enum cluster_join_result sd_check_join_cb(struct sd_node *joining,
memcpy(&jm->nodes[n], joining_nodes,
nr_joining_nodes * sizeof(*joining_nodes));
jm->nr_joining_nodes = nr_joining_nodes;
+
+ n += jm->nr_joining_nodes;
+ memcpy(&jm->nodes[n], leaving_nodes,
+ nr_leaving_nodes * sizeof(*leaving_nodes));
+ jm->nr_leaving_nodes = nr_leaving_nodes;
+
}
- n = jm->nr_failed_nodes + jm->nr_delayed_nodes + jm->nr_joining_nodes;
+ n = jm->nr_failed_nodes + jm->nr_delayed_nodes
+ + jm->nr_joining_nodes + jm->nr_leaving_nodes;
*opaque_len = sizeof(*jm) + n * sizeof(jm->nodes[0]);
+
return ret;
}
@@ -1158,7 +1267,7 @@ void sd_join_handler(struct sd_node *joined, struct sd_node *members,
void sd_leave_handler(struct sd_node *left, struct sd_node *members,
size_t nr_members)
{
- struct vnode_info *old_vnode_info;
+ struct vnode_info *old_vnode_info = NULL;
int i;
dprintf("leave %s\n", node_to_str(left));
@@ -1168,15 +1277,20 @@ void sd_leave_handler(struct sd_node *left, struct sd_node *members,
if (sys->status == SD_STATUS_SHUTDOWN)
return;
- old_vnode_info = current_vnode_info;
- current_vnode_info = alloc_vnode_info(members, nr_members);
+ if (!sys->disable_recovery) {
+ old_vnode_info = current_vnode_info;
+ current_vnode_info = alloc_vnode_info(members, nr_members);
+ }
switch (sys->status) {
case SD_STATUS_HALT:
case SD_STATUS_OK:
- uatomic_inc(&sys->epoch);
- log_current_epoch();
- start_recovery(current_vnode_info, old_vnode_info);
+ if (!sys->disable_recovery) {
+ uatomic_inc(&sys->epoch);
+ log_current_epoch();
+ start_recovery(current_vnode_info, old_vnode_info);
+ } else
+ prepare_leave_recovery(left, members, nr_members);
if (!have_enough_zones())
sys->status = SD_STATUS_HALT;
@@ -1230,6 +1344,7 @@ int create_cluster(int port, int64_t zone, int nr_vnodes,
dprintf("zone id = %u\n", sys->this_node.zone);
sys->this_node.space = sys->disk_space;
+ sys->this_node.left = 0;
if (get_latest_epoch() > 0) {
sys->status = SD_STATUS_WAIT_FOR_JOIN;
diff --git a/sheep/ops.c b/sheep/ops.c
index b2e9c69..38634f5 100644
--- a/sheep/ops.c
+++ b/sheep/ops.c
@@ -271,14 +271,22 @@ static int cluster_shutdown(const struct sd_req *req, struct sd_rsp *rsp,
static int cluster_enable_recover(const struct sd_req *req,
struct sd_rsp *rsp, void *data)
{
- int i;
+ int i, n;
struct vnode_info *old_vnode_info, *vnode_info;
- if (nr_joining_nodes) {
+ if (nr_joining_nodes + nr_leaving_nodes) {
for (i = 0; i < nr_joining_nodes; i++)
all_nodes[nr_all_nodes++] = joining_nodes[i];
+ for (i = 0; i < nr_leaving_nodes; i++) {
+ if (leaving_nodes[i].left) {
+ n = delete_node(&leaving_nodes[i],
+ all_nodes, nr_all_nodes);
+ nr_all_nodes = n;
+ }
+ }
+
old_vnode_info = get_vnode_info();
vnode_info = alloc_vnode_info(all_nodes, nr_all_nodes);
update_vnode_info(vnode_info);
@@ -294,6 +302,7 @@ static int cluster_enable_recover(const struct sd_req *req,
nr_all_nodes = 0;
nr_joining_nodes = 0;
+ nr_leaving_nodes = 0;
sys->disable_recovery = 0;
return SD_RES_SUCCESS;
}
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 857cf87..1bf52da 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -177,6 +177,8 @@ extern mode_t def_fmode;
extern mode_t def_dmode;
extern struct sd_node joining_nodes[];
extern size_t nr_joining_nodes;
+extern struct sd_node leaving_nodes[];
+extern size_t nr_leaving_nodes;
extern struct sd_node all_nodes[];
extern size_t nr_all_nodes;
@@ -302,6 +304,36 @@ struct jrnl_descriptor *jrnl_begin(const void *buf, size_t count, off_t offset,
int jrnl_end(struct jrnl_descriptor * jd);
int jrnl_recover(const char *jrnl_dir);
+static inline struct sd_node *find_node(struct sd_node *target,
+ struct sd_node *nodes, size_t nr_nodes)
+{
+ int i;
+
+ for (i = 0; i < nr_nodes; i++) {
+ if (node_eq(nodes + i, target)) {
+ return nodes + i;
+ }
+ }
+
+ return NULL;
+}
+
+static inline size_t delete_node(struct sd_node *target,
+ struct sd_node *nodes, size_t nr_nodes)
+{
+ int i;
+
+ for (i = 0; i < nr_nodes; i++) {
+ if (node_eq(nodes + i, target)) {
+ memmove(nodes + i, nodes + i + 1,
+ (--nr_nodes - i) * sizeof(*nodes));
+ break;
+ }
+ }
+
+ return nr_nodes;
+}
+
static inline int is_myself(uint8_t *addr, uint16_t port)
{
return (memcmp(addr, sys->this_node.nid.addr,
--
1.7.11.2
More information about the sheepdog
mailing list