[sheepdog] [PATCH v4 06/10] sheep: fetch vdi copy list after sheep joins the cluster

levin li levin108 at gmail.com
Thu Aug 9 07:27:41 CEST 2012


From: levin li <xingke.lwp at taobao.com>

The new joined node doesn't have the vdi copy list, or have
incomplete vdi copy list, so we need to fetch the copy list
data from other nodes

Signed-off-by: levin li <xingke.lwp at taobao.com>
---
 include/internal_proto.h |    1 +
 sheep/group.c            |   11 +++++--
 sheep/ops.c              |   24 +++++++++++++--
 sheep/recovery.c         |   42 +++++++++++++++++++++++++--
 sheep/sheep_priv.h       |    5 ++-
 sheep/vdi.c              |   70 ++++++++++++++++++++++++++++++++++++++++++++++
 6 files changed, 142 insertions(+), 11 deletions(-)

diff --git a/include/internal_proto.h b/include/internal_proto.h
index 83d98f1..3d70ba9 100644
--- a/include/internal_proto.h
+++ b/include/internal_proto.h
@@ -63,6 +63,7 @@
 #define SD_OP_ENABLE_RECOVER 0xA8
 #define SD_OP_DISABLE_RECOVER 0xA9
 #define SD_OP_INFO_RECOVER 0xAA
+#define SD_OP_GET_VDI_COPIES 0xAB
 
 /* internal flags for hdr.flags, must be above 0x80 */
 #define SD_FLAG_CMD_RECOVERY 0x0080
diff --git a/sheep/group.c b/sheep/group.c
index 05ffb3e..fd631ec 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -853,6 +853,8 @@ static void update_cluster_info(struct join_message *msg,
 
 		if (msg->inc_epoch) {
 			if (!sys->disable_recovery) {
+				int is_newly_joined = 0;
+
 				uatomic_inc(&sys->epoch);
 				log_current_epoch();
 				clear_exceptional_node_lists();
@@ -863,8 +865,11 @@ static void update_cluster_info(struct join_message *msg,
 							nodes, nr_nodes);
 				}
 
-				start_recovery(current_vnode_info,
-					       old_vnode_info);
+				if (node_eq(joined, &sys->this_node))
+					is_newly_joined = 1;
+
+				start_recovery(current_vnode_info, old_vnode_info,
+					       is_newly_joined);
 			} else
 				prepare_recovery(joined, nodes, nr_nodes);
 		}
@@ -1148,7 +1153,7 @@ void sd_leave_handler(struct sd_node *left, struct sd_node *members,
 	case SD_STATUS_OK:
 		uatomic_inc(&sys->epoch);
 		log_current_epoch();
-		start_recovery(current_vnode_info, old_vnode_info);
+		start_recovery(current_vnode_info, old_vnode_info, 0);
 
 		if (!have_enough_zones())
 			sys->status = SD_STATUS_HALT;
diff --git a/sheep/ops.c b/sheep/ops.c
index efaf979..ce0f8a4 100644
--- a/sheep/ops.c
+++ b/sheep/ops.c
@@ -282,13 +282,16 @@ static int cluster_shutdown(const struct sd_req *req, struct sd_rsp *rsp,
 static int cluster_enable_recover(const struct sd_req *req,
 				    struct sd_rsp *rsp, void *data)
 {
-	int i;
+	int i, is_newly_joined = 0;
 	struct vnode_info *old_vnode_info, *vnode_info;
 
 	if (nr_joining_nodes) {
 
-		for (i = 0; i < nr_joining_nodes; i++)
+		for (i = 0; i < nr_joining_nodes; i++) {
 			all_nodes[nr_all_nodes++] = joining_nodes[i];
+			if (node_eq(&joining_nodes[i], &sys->this_node))
+				is_newly_joined = 1;
+		}
 
 		old_vnode_info = get_vnode_info();
 		vnode_info = alloc_vnode_info(all_nodes, nr_all_nodes);
@@ -298,7 +301,7 @@ static int cluster_enable_recover(const struct sd_req *req,
 		log_current_epoch();
 		clear_exceptional_node_lists();
 
-		start_recovery(vnode_info, old_vnode_info);
+		start_recovery(vnode_info, old_vnode_info, is_newly_joined);
 
 		put_vnode_info(old_vnode_info);
 	}
@@ -447,6 +450,13 @@ static int local_get_obj_list(struct request *req)
 			    (struct sd_list_rsp *)&req->rp, req->data);
 }
 
+static int local_get_vdi_copies(struct request *req)
+{
+	req->rp.data_length = fill_vdi_copy_list(req->data);
+
+	return SD_RES_SUCCESS;
+}
+
 static int local_get_epoch(struct request *req)
 {
 	uint32_t epoch = req->rq.obj.tgt_epoch;
@@ -510,7 +520,7 @@ static int cluster_force_recover(const struct sd_req *req, struct sd_rsp *rsp,
 		sys->status = SD_STATUS_HALT;
 
 	vnode_info = get_vnode_info();
-	start_recovery(vnode_info, old_vnode_info);
+	start_recovery(vnode_info, old_vnode_info, 1);
 	put_vnode_info(vnode_info);
 out:
 	put_vnode_info(old_vnode_info);
@@ -989,6 +999,12 @@ static struct sd_op_template sd_ops[] = {
 		.process_work = local_get_obj_list,
 	},
 
+	[SD_OP_GET_VDI_COPIES] = {
+		.name = "GET_VDI_COPIES",
+		.type = SD_OP_TYPE_LOCAL,
+		.process_work = local_get_vdi_copies,
+	},
+
 	[SD_OP_GET_EPOCH] = {
 		.name = "GET_EPOCH",
 		.type = SD_OP_TYPE_LOCAL,
diff --git a/sheep/recovery.c b/sheep/recovery.c
index 5164aa7..3fdcad2 100644
--- a/sheep/recovery.c
+++ b/sheep/recovery.c
@@ -592,7 +592,38 @@ static inline bool node_is_gateway_only(void)
 	return sys->this_node.nr_vnodes == 0 ? true : false;
 }
 
-int start_recovery(struct vnode_info *cur_vinfo, struct vnode_info *old_vinfo)
+static void prepare_vdi_copy_list(struct work *work)
+{
+	struct recovery_work *rw = container_of(work, struct recovery_work,
+						work);
+	struct sd_node *nodes = rw->cur_vinfo->nodes;
+	int i, nr_nodes = rw->cur_vinfo->nr_nodes;
+
+	for (i = 0; i < nr_nodes; i++) {
+		if (node_eq(nodes + i, &sys->this_node))
+			continue;
+
+		fetch_vdi_copies_from(nodes + i);
+	}
+}
+
+static void finish_vdi_copy_list(struct work *work)
+{
+	struct recovery_work *rw = container_of(work, struct recovery_work,
+						work);
+
+	if (next_rw) {
+		run_next_rw(rw);
+		return;
+	}
+
+	rw->work.fn = prepare_object_list;
+	rw->work.done = finish_object_list;
+	queue_work(sys->recovery_wqueue, &rw->work);
+}
+
+int start_recovery(struct vnode_info *cur_vinfo, struct vnode_info *old_vinfo,
+		   int is_newly_joined)
 {
 	struct recovery_work *rw;
 
@@ -613,8 +644,13 @@ int start_recovery(struct vnode_info *cur_vinfo, struct vnode_info *old_vinfo)
 	rw->cur_vinfo = grab_vnode_info(cur_vinfo);
 	rw->old_vinfo = grab_vnode_info(old_vinfo);
 
-	rw->work.fn = prepare_object_list;
-	rw->work.done = finish_object_list;
+	if (is_newly_joined) {
+		rw->work.fn = prepare_vdi_copy_list;
+		rw->work.done = finish_vdi_copy_list;
+	} else {
+		rw->work.fn = prepare_object_list;
+		rw->work.done = finish_object_list;
+	}
 
 	if (sd_store->begin_recover) {
 		struct siocb iocb = { 0 };
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 3f763c4..335e337 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -207,6 +207,8 @@ int get_max_copy_number(void);
 int get_req_copy_number(struct request *req);
 int add_vdi_copies(uint32_t vid, int nr_copies);
 int load_vdi_copies(void);
+int fetch_vdi_copies_from(struct sd_node *node);
+int fill_vdi_copy_list(void *data);
 int vdi_exist(uint32_t vid);
 int add_vdi(struct vdi_iocb *iocb, uint32_t *new_vid);
 
@@ -272,7 +274,8 @@ uint64_t get_cluster_ctime(void);
 int get_obj_list(const struct sd_list_req *, struct sd_list_rsp *, void *);
 int objlist_cache_cleanup(uint32_t vid);
 
-int start_recovery(struct vnode_info *cur_vinfo, struct vnode_info *old_vinfo);
+int start_recovery(struct vnode_info *cur_vinfo, struct vnode_info *old_vinfo,
+		   int is_newly_joined);
 void resume_recovery_work(void);
 bool oid_in_recovery(uint64_t oid);
 int is_recovery_init(void);
diff --git a/sheep/vdi.c b/sheep/vdi.c
index 0581ad6..72fbd7b 100644
--- a/sheep/vdi.c
+++ b/sheep/vdi.c
@@ -18,6 +18,11 @@
 #include "sheepdog_proto.h"
 #include "sheep_priv.h"
 
+struct vdi_copy {
+	uint32_t vid;
+	uint32_t nr_copies;
+};
+
 struct vdi_copy_entry {
 	uint32_t vid;
 	unsigned int nr_copies;
@@ -226,6 +231,71 @@ out:
 	return ret;
 }
 
+int fetch_vdi_copies_from(struct sd_node *node)
+{
+	char host[128];
+	struct sd_req hdr;
+	struct sd_rsp *rsp = (struct sd_rsp *)&hdr;
+	struct vdi_copy *vc;
+	int fd, ret, i;
+	unsigned int wlen, rlen;
+	int count = 1 << 15;
+
+	addr_to_str(host, sizeof(host), node->nid.addr, 0);
+	dprintf("fetch vdi copy list from %s:%d\n", host, node->nid.port);
+	fd = connect_to(host, node->nid.port);
+	if (fd < 0) {
+		dprintf("fail: %m\n");
+		return SD_RES_NETWORK_ERROR;
+	}
+
+	sd_init_req(&hdr, SD_OP_GET_VDI_COPIES);
+	hdr.epoch = sys->epoch;
+	hdr.data_length = count * sizeof(*vc);
+	rlen = hdr.data_length;
+	wlen = 0;
+
+	vc = xzalloc(rlen);
+
+	ret = exec_req(fd, &hdr, (char *)vc, &wlen, &rlen);
+	close(fd);
+
+	if (ret || rsp->result != SD_RES_SUCCESS) {
+		eprintf("fail to get VDI copy list (%d, %d)\n",
+			ret, rsp->result);
+		ret = SD_RES_NETWORK_ERROR;
+		goto out;
+	}
+
+	count = rsp->data_length / sizeof(*vc);
+	dprintf("got %d vdi copy data\n", count);
+	for (i = 0; i < count; i++)
+		add_vdi_copies(vc[i].vid, vc[i].nr_copies);
+out:
+	free(vc);
+	return ret;
+}
+
+int fill_vdi_copy_list(void *data)
+{
+	int nr = 0;
+	struct rb_node *n;
+	struct vdi_copy *vc = data;
+	struct vdi_copy_entry *entry;
+
+	pthread_rwlock_rdlock(&vdi_copy_lock);
+	for (n = rb_first(&vdi_copy_root); n; n = rb_next(n)) {
+		entry = rb_entry(n, struct vdi_copy_entry, node);
+		vc->vid = entry->vid;
+		vc->nr_copies = entry->nr_copies;
+		vc++;
+		nr++;
+	}
+	pthread_rwlock_unlock(&vdi_copy_lock);
+
+	return nr * sizeof(*vc);
+}
+
 int vdi_exist(uint32_t vid)
 {
 	struct sheepdog_inode *inode;
-- 
1.7.1




More information about the sheepdog mailing list