This command is used to recalculate the vnodes of all the nodes of the cluster when size of some nodes changes (for e.g, after plug/unplug) and users expect to rebalance data manually. Signed-off-by: Liu Yuan <namei.unix at gmail.com> --- collie/cluster.c | 14 ++++++++ include/internal_proto.h | 2 ++ sheep/group.c | 18 +++++++++++ sheep/ops.c | 80 ++++++++++++++++++++++++++++++++++++++++++++++ sheep/plain_store.c | 4 ++- sheep/sheep_priv.h | 2 ++ 6 files changed, 119 insertions(+), 1 deletion(-) diff --git a/collie/cluster.c b/collie/cluster.c index 97feade..e7984b6 100644 --- a/collie/cluster.c +++ b/collie/cluster.c @@ -457,6 +457,18 @@ static int cluster_snapshot(int argc, char **argv) return do_generic_subcommand(cluster_snapshot_cmd, argc, argv); } +static int cluster_reweight(int argc, char **argv) +{ + int ret; + struct sd_req hdr; + + sd_init_req(&hdr, SD_OP_REWEIGHT); + ret = send_light_req(&hdr, sdhost, sdport); + if (ret) + return EXIT_FAILURE; + return EXIT_SUCCESS; +} + static struct subcommand cluster_cmd[] = { {"info", NULL, "aprh", "show cluster information", NULL, SUBCMD_FLAG_NEED_NODELIST, cluster_info, cluster_options}, @@ -471,6 +483,8 @@ static struct subcommand cluster_cmd[] = { "See 'collie cluster recover' for more information\n", cluster_recover_cmd, SUBCMD_FLAG_NEED_ARG, cluster_recover, cluster_options}, + {"reweight", NULL, "aph", "reweight the cluster", NULL, 0, + cluster_reweight, cluster_options}, {NULL,}, }; diff --git a/include/internal_proto.h b/include/internal_proto.h index b5dc910..83271ad 100644 --- a/include/internal_proto.h +++ b/include/internal_proto.h @@ -73,6 +73,8 @@ #define SD_OP_MD_PLUG 0xB2 #define SD_OP_MD_UNPLUG 0xB3 #define SD_OP_GET_HASH 0xB4 +#define SD_OP_REWEIGHT 0xB5 +#define SD_OP_UPDATE_SIZE 0xB6 /* internal flags for hdr.flags, must be above 0x80 */ #define SD_FLAG_CMD_RECOVERY 0x0080 diff --git a/sheep/group.c b/sheep/group.c index 16d54cd..c9669da 100644 --- a/sheep/group.c +++ b/sheep/group.c @@ -1235,6 +1235,24 @@ void sd_leave_handler(const struct sd_node *left, const struct sd_node *members, sockfd_cache_del(&left->nid); } +void update_node_size(struct sd_node *node) +{ + struct vnode_info *cur_vinfo = main_thread_get(current_vnode_info); + int idx = get_node_idx(cur_vinfo, node); + assert(idx != -1); + cur_vinfo->nodes[idx].space = node->space; +} + +void kick_node_recover(void) +{ + struct vnode_info *old = main_thread_get(current_vnode_info); + + main_thread_set(current_vnode_info, + alloc_vnode_info(old->nodes, old->nr_nodes)); + start_recovery(main_thread_get(current_vnode_info), old, false); + put_vnode_info(old); +} + int create_cluster(int port, int64_t zone, int nr_vnodes, bool explicit_addr) { diff --git a/sheep/ops.c b/sheep/ops.c index 07a20d9..cbcd2ca 100644 --- a/sheep/ops.c +++ b/sheep/ops.c @@ -701,6 +701,74 @@ static int cluster_recovery_completion(const struct sd_req *req, return SD_RES_SUCCESS; } +static void do_reweight(struct work *work) +{ + struct sd_req hdr; + int ret; + + sd_init_req(&hdr, SD_OP_UPDATE_SIZE); + hdr.flags = SD_FLAG_CMD_WRITE; + hdr.data_length = sizeof(sys->this_node); + + ret = exec_local_req(&hdr, &sys->this_node); + if (ret != SD_RES_SUCCESS) + sd_eprintf("failed to update node size"); +} + +static void reweight_done(struct work *work) +{ + free(work); +} + +static void reweight_node(void) +{ + struct work *rw = xzalloc(sizeof(*rw)); + + rw->fn = do_reweight; + rw->done = reweight_done; + + queue_work(sys->recovery_wqueue, rw); +} + +static bool node_size_varied(void) +{ + uint64_t new, used, old = sys->this_node.space; + double diff; + + assert(old); + new = md_get_size(&used); + diff = new > old ? (double)(new - old) : (double)(old - new); + sd_dprintf("new %"PRIu64 ", old %"PRIu64", ratio %f", new, old, + diff / (double)old); + if (diff / (double)old < 0.01) + return false; + + sys->this_node.space = new; + set_node_space(new); + + return true; +} + +static int cluster_reweight(const struct sd_req *req, struct sd_rsp *rsp, + void *data) +{ + if (node_size_varied()) + reweight_node(); + + return SD_RES_SUCCESS; +} + +static int cluster_update_size(const struct sd_req *req, struct sd_rsp *rsp, + void *data) +{ + struct sd_node *node = (struct sd_node *)data; + + update_node_size(node); + kick_node_recover(); + + return SD_RES_SUCCESS; +} + static int local_set_cache_size(const struct sd_req *req, struct sd_rsp *rsp, void *data) { @@ -1061,6 +1129,18 @@ static struct sd_op_template sd_ops[] = { .process_work = cluster_get_vdi_info, }, + [SD_OP_REWEIGHT] = { + .name = "REWEIGHT", + .type = SD_OP_TYPE_CLUSTER, + .process_main = cluster_reweight, + }, + + [SD_OP_UPDATE_SIZE] = { + .name = "UPDATE_SIZE", + .type = SD_OP_TYPE_CLUSTER, + .process_main = cluster_update_size, + }, + /* local operations */ [SD_OP_RELEASE_VDI] = { .name = "RELEASE_VDI", diff --git a/sheep/plain_store.c b/sheep/plain_store.c index 086e18e..678bf49 100644 --- a/sheep/plain_store.c +++ b/sheep/plain_store.c @@ -272,8 +272,10 @@ int default_read(uint64_t oid, const struct siocb *iocb) /* * If the request is againt the older epoch, try to read from * the stale directory + * + * For reweighting, iocb->epoch == sys_epoch(). */ - if (ret == SD_RES_NO_OBJ && iocb->epoch < sys_epoch()) { + if (ret == SD_RES_NO_OBJ && iocb->epoch <= sys_epoch()) { get_stale_obj_path(oid, iocb->epoch, path); ret = default_read_from_path(oid, path, iocb); } diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h index 27b1b41..00459db 100644 --- a/sheep/sheep_priv.h +++ b/sheep/sheep_priv.h @@ -414,5 +414,7 @@ uint32_t md_get_info(struct sd_md_info *info); int md_plug_disks(char *disks); int md_unplug_disks(char *disks); uint64_t md_get_size(uint64_t *used); +void kick_node_recover(void); +void update_node_size(struct sd_node *node); #endif -- 1.7.9.5 |