From: HaiTing Yao <wujue.yht at taobao.com> On the consistent hash ring, member change should only influence its neighbors. Now every member change leads to whole cluster recovery. 1, Some node need not to start recovery. 2, One node does recovery, it need not to connect to all of other nodes to get its object list. Signed-off-by: HaiTing Yao <wujue.yht at taobao.com> --- include/sheep.h | 7 +++ sheep/group.c | 142 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ sheep/recovery.c | 9 ++++ 3 files changed, 158 insertions(+), 0 deletions(-) diff --git a/include/sheep.h b/include/sheep.h index ac9179c..cc630de 100644 --- a/include/sheep.h +++ b/include/sheep.h @@ -147,11 +147,18 @@ struct sd_node_rsp { uint64_t store_free; }; +enum recovery_type { + SD_FULL_RECOVERY, + SD_PART_RECOVERY, + SD_NO_RECOVERY +}; + struct sd_node { uint8_t addr[16]; uint16_t port; uint16_t nr_vnodes; uint32_t zone; + enum recovery_type recovery; }; struct sd_vnode { diff --git a/sheep/group.c b/sheep/group.c index ac23661..0f34a3d 100644 --- a/sheep/group.c +++ b/sheep/group.c @@ -207,6 +207,139 @@ void oid_to_vnodes(struct vnode_info *vnode_info, uint64_t oid, int nr_copies, } } +static int get_hash_changed_nodes(struct vnode_info *new_vnodes, + uint32_t vnode_idx, uint32_t *node_idx) +{ + int i, j, t; + int sum = 0; + + node_idx[sum++] = new_vnodes->entries[vnode_idx].node_idx; + + /* left changed nodes */ + for (i = vnode_idx - 1; ; i--) { + if (i < 0) + i = new_vnodes->nr_vnodes - 1; + if (i == vnode_idx) + break; + + t = new_vnodes->entries[i].node_idx; + + for (j = 0; j < sum; j++) + if (t == node_idx[j]) + break; + + if (j == sum) + node_idx[sum++] = t; + if (sum == sys->nr_copies) + break; + } + + /* right changed nodes */ + for (i = vnode_idx + 1; ; i++) { + if (i == new_vnodes->nr_vnodes) + i = 0; + if (i == vnode_idx) + break; + + t = new_vnodes->entries[i].node_idx; + + for (j = 0; j < sum; j++) + if (t == node_idx[j]) + break; + + if (j == sum) + node_idx[sum++] = t; + if (sum == (2 * sys->nr_copies - 1)) + break; + } + + return sum; +} + +static void new_comer_check_partition(struct vnode_info *new_vnodes) +{ + int i, t; + uint32_t changed_nodes_idx[SD_MAX_REDUNDANCY * 2]; + int changed_sum = 0; + + for (i = 0; i < sys->nr_nodes; i++) + sys->nodes[i].recovery = SD_NO_RECOVERY; + + sys->this_node.recovery = SD_PART_RECOVERY; + + if (new_vnodes->nr_vnodes == 1) + return; + + for (i = 0; i < new_vnodes->nr_vnodes; i++) { + if (is_myself(new_vnodes->entries[i].addr, + new_vnodes->entries[i].port)) { + sys->nodes[new_vnodes->entries[i].node_idx].recovery = + SD_PART_RECOVERY; + + memset(changed_nodes_idx, 0, sizeof(changed_nodes_idx)); + changed_sum = get_hash_changed_nodes(new_vnodes, i, + changed_nodes_idx); + + for (t = 0; t < changed_sum; t++) + sys->nodes[changed_nodes_idx[t]].recovery = + SD_PART_RECOVERY; + } + } + + return; +} + +static void joined_nodes_check_partition(struct vnode_info *new_vnodes, + struct vnode_info *old_vnodes) +{ + int i = 0, j = 0, t = 0, idx = 0; + int ret = 0; + uint32_t changed_nodes_idx[SD_MAX_REDUNDANCY * 2]; + int changed_sum = 0; + + for (i = 0; i < sys->nr_nodes; i++) { + if (node_eq(&sys->this_node, sys->nodes + i)) + idx = i; + sys->nodes[i].recovery = SD_NO_RECOVERY; + } + + for (i = 0; i < new_vnodes->nr_vnodes; i++) { + ret = vnode_cmp(new_vnodes->entries + i, + old_vnodes->entries + j); + + if (!ret) { + j++; + continue; + } else { + memset(changed_nodes_idx, 0, sizeof(changed_nodes_idx)); + changed_sum = get_hash_changed_nodes(new_vnodes, i, + changed_nodes_idx); + + for (t = 0; t < changed_sum; t++) { + if (idx == changed_nodes_idx[t]) + break; + } + + /* self be changed */ + if (t != changed_sum) { + sys->this_node.recovery = SD_PART_RECOVERY; + for (t = 0; t < changed_sum; t++) + sys->nodes[changed_nodes_idx[t]]. + recovery = SD_PART_RECOVERY; + } + + if (ret > 0) { + if (j < old_vnodes->nr_vnodes - 1) { + j++; + i--; + } + } + } + } + + return; +} + static int update_vnode_info(void) { struct vnode_info *vnode_info; @@ -222,6 +355,15 @@ static int update_vnode_info(void) vnode_info->nr_zones = get_zones_nr_from(sys->nodes, sys->nr_nodes); uatomic_set(&vnode_info->refcnt, 1); + sys->this_node.recovery = SD_NO_RECOVERY; + + if (!current_vnode_info) + new_comer_check_partition(vnode_info); + else + joined_nodes_check_partition(vnode_info, current_vnode_info); + + dprintf("the recovery status %u\n", sys->this_node.recovery); + put_vnode_info(current_vnode_info); current_vnode_info = vnode_info; return 0; diff --git a/sheep/recovery.c b/sheep/recovery.c index 72a9797..c35b74b 100644 --- a/sheep/recovery.c +++ b/sheep/recovery.c @@ -595,6 +595,10 @@ again: /* new node doesn't have a list file */ continue; + if ((sys->this_node.recovery == SD_PART_RECOVERY) && + (sys->nodes[i].recovery == SD_NO_RECOVERY)) + continue; + retry_cnt = 0; retry: buf_nr = request_obj_list(node, rw->epoch, buf, buf_size); @@ -707,7 +711,12 @@ int start_recovery(uint32_t epoch) free(next_rw); } next_rw = rw; + sys->this_node.recovery = SD_FULL_RECOVERY; } else { + if (sys->this_node.recovery == SD_NO_RECOVERY) { + free(next_rw); + return 0; + } recovering_work = rw; queue_work(sys->recovery_wqueue, &rw->work); } -- 1.7.1 |