[sheepdog] [PATCH 08/10] sheep: add 'cluster reweight' support
Liu Yuan
namei.unix at gmail.com
Sun May 26 15:45:07 CEST 2013
This command is used to recalculate the vnodes of all the nodes of the cluster
when size of some nodes changes (for e.g, after plug/unplug) and users expect
to rebalance data manually.
Signed-off-by: Liu Yuan <namei.unix at gmail.com>
---
collie/cluster.c | 14 ++++++++
include/internal_proto.h | 2 ++
sheep/group.c | 18 +++++++++++
sheep/ops.c | 80 ++++++++++++++++++++++++++++++++++++++++++++++
sheep/plain_store.c | 4 ++-
sheep/sheep_priv.h | 2 ++
6 files changed, 119 insertions(+), 1 deletion(-)
diff --git a/collie/cluster.c b/collie/cluster.c
index 97feade..e7984b6 100644
--- a/collie/cluster.c
+++ b/collie/cluster.c
@@ -457,6 +457,18 @@ static int cluster_snapshot(int argc, char **argv)
return do_generic_subcommand(cluster_snapshot_cmd, argc, argv);
}
+static int cluster_reweight(int argc, char **argv)
+{
+ int ret;
+ struct sd_req hdr;
+
+ sd_init_req(&hdr, SD_OP_REWEIGHT);
+ ret = send_light_req(&hdr, sdhost, sdport);
+ if (ret)
+ return EXIT_FAILURE;
+ return EXIT_SUCCESS;
+}
+
static struct subcommand cluster_cmd[] = {
{"info", NULL, "aprh", "show cluster information",
NULL, SUBCMD_FLAG_NEED_NODELIST, cluster_info, cluster_options},
@@ -471,6 +483,8 @@ static struct subcommand cluster_cmd[] = {
"See 'collie cluster recover' for more information\n",
cluster_recover_cmd, SUBCMD_FLAG_NEED_ARG,
cluster_recover, cluster_options},
+ {"reweight", NULL, "aph", "reweight the cluster", NULL, 0,
+ cluster_reweight, cluster_options},
{NULL,},
};
diff --git a/include/internal_proto.h b/include/internal_proto.h
index b5dc910..83271ad 100644
--- a/include/internal_proto.h
+++ b/include/internal_proto.h
@@ -73,6 +73,8 @@
#define SD_OP_MD_PLUG 0xB2
#define SD_OP_MD_UNPLUG 0xB3
#define SD_OP_GET_HASH 0xB4
+#define SD_OP_REWEIGHT 0xB5
+#define SD_OP_UPDATE_SIZE 0xB6
/* internal flags for hdr.flags, must be above 0x80 */
#define SD_FLAG_CMD_RECOVERY 0x0080
diff --git a/sheep/group.c b/sheep/group.c
index 16d54cd..c9669da 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -1235,6 +1235,24 @@ void sd_leave_handler(const struct sd_node *left, const struct sd_node *members,
sockfd_cache_del(&left->nid);
}
+void update_node_size(struct sd_node *node)
+{
+ struct vnode_info *cur_vinfo = main_thread_get(current_vnode_info);
+ int idx = get_node_idx(cur_vinfo, node);
+ assert(idx != -1);
+ cur_vinfo->nodes[idx].space = node->space;
+}
+
+void kick_node_recover(void)
+{
+ struct vnode_info *old = main_thread_get(current_vnode_info);
+
+ main_thread_set(current_vnode_info,
+ alloc_vnode_info(old->nodes, old->nr_nodes));
+ start_recovery(main_thread_get(current_vnode_info), old, false);
+ put_vnode_info(old);
+}
+
int create_cluster(int port, int64_t zone, int nr_vnodes,
bool explicit_addr)
{
diff --git a/sheep/ops.c b/sheep/ops.c
index 07a20d9..cbcd2ca 100644
--- a/sheep/ops.c
+++ b/sheep/ops.c
@@ -701,6 +701,74 @@ static int cluster_recovery_completion(const struct sd_req *req,
return SD_RES_SUCCESS;
}
+static void do_reweight(struct work *work)
+{
+ struct sd_req hdr;
+ int ret;
+
+ sd_init_req(&hdr, SD_OP_UPDATE_SIZE);
+ hdr.flags = SD_FLAG_CMD_WRITE;
+ hdr.data_length = sizeof(sys->this_node);
+
+ ret = exec_local_req(&hdr, &sys->this_node);
+ if (ret != SD_RES_SUCCESS)
+ sd_eprintf("failed to update node size");
+}
+
+static void reweight_done(struct work *work)
+{
+ free(work);
+}
+
+static void reweight_node(void)
+{
+ struct work *rw = xzalloc(sizeof(*rw));
+
+ rw->fn = do_reweight;
+ rw->done = reweight_done;
+
+ queue_work(sys->recovery_wqueue, rw);
+}
+
+static bool node_size_varied(void)
+{
+ uint64_t new, used, old = sys->this_node.space;
+ double diff;
+
+ assert(old);
+ new = md_get_size(&used);
+ diff = new > old ? (double)(new - old) : (double)(old - new);
+ sd_dprintf("new %"PRIu64 ", old %"PRIu64", ratio %f", new, old,
+ diff / (double)old);
+ if (diff / (double)old < 0.01)
+ return false;
+
+ sys->this_node.space = new;
+ set_node_space(new);
+
+ return true;
+}
+
+static int cluster_reweight(const struct sd_req *req, struct sd_rsp *rsp,
+ void *data)
+{
+ if (node_size_varied())
+ reweight_node();
+
+ return SD_RES_SUCCESS;
+}
+
+static int cluster_update_size(const struct sd_req *req, struct sd_rsp *rsp,
+ void *data)
+{
+ struct sd_node *node = (struct sd_node *)data;
+
+ update_node_size(node);
+ kick_node_recover();
+
+ return SD_RES_SUCCESS;
+}
+
static int local_set_cache_size(const struct sd_req *req, struct sd_rsp *rsp,
void *data)
{
@@ -1061,6 +1129,18 @@ static struct sd_op_template sd_ops[] = {
.process_work = cluster_get_vdi_info,
},
+ [SD_OP_REWEIGHT] = {
+ .name = "REWEIGHT",
+ .type = SD_OP_TYPE_CLUSTER,
+ .process_main = cluster_reweight,
+ },
+
+ [SD_OP_UPDATE_SIZE] = {
+ .name = "UPDATE_SIZE",
+ .type = SD_OP_TYPE_CLUSTER,
+ .process_main = cluster_update_size,
+ },
+
/* local operations */
[SD_OP_RELEASE_VDI] = {
.name = "RELEASE_VDI",
diff --git a/sheep/plain_store.c b/sheep/plain_store.c
index 086e18e..678bf49 100644
--- a/sheep/plain_store.c
+++ b/sheep/plain_store.c
@@ -272,8 +272,10 @@ int default_read(uint64_t oid, const struct siocb *iocb)
/*
* If the request is againt the older epoch, try to read from
* the stale directory
+ *
+ * For reweighting, iocb->epoch == sys_epoch().
*/
- if (ret == SD_RES_NO_OBJ && iocb->epoch < sys_epoch()) {
+ if (ret == SD_RES_NO_OBJ && iocb->epoch <= sys_epoch()) {
get_stale_obj_path(oid, iocb->epoch, path);
ret = default_read_from_path(oid, path, iocb);
}
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 27b1b41..00459db 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -414,5 +414,7 @@ uint32_t md_get_info(struct sd_md_info *info);
int md_plug_disks(char *disks);
int md_unplug_disks(char *disks);
uint64_t md_get_size(uint64_t *used);
+void kick_node_recover(void);
+void update_node_size(struct sd_node *node);
#endif
--
1.7.9.5
More information about the sheepdog
mailing list