[sheepdog] [PATCH 8/9] sheep: snapshot and collect vdi state during joining to cluster

Hitoshi Mitake mitake.hitoshi at lab.ntt.co.jp
Fri Jun 27 08:13:55 CEST 2014


Newly joining sheep must know which vdi is locked or unlocked. For
propagating the state, new sheep must copy it from existing nodes
during join process.

This patch implements a solution for this. Below is the brief
description:
1. When a cluster accepts newly joining node, every existing node take
   a snapshot of vdi state at the epoch.
2. Newly joining node copy it via a newly defined request.
3. After applying the copied state to its internal state, the new
   sheep issues discard request to existing nodes.

With this scheme, new sheep can know which vdi is locked or unlocked.

Signed-off-by: Hitoshi Mitake <mitake.hitoshi at lab.ntt.co.jp>
---
 include/internal_proto.h |    2 +
 include/sheepdog_proto.h |    5 ++
 sheep/group.c            |  154 ++++++++++++++++++++++++++++++++++++++++++++--
 sheep/ops.c              |   33 ++++++++++
 sheep/sheep_priv.h       |    7 ++-
 sheep/vdi.c              |  120 ++++++++++++++++++++++++++++++++++++
 6 files changed, 315 insertions(+), 6 deletions(-)

diff --git a/include/internal_proto.h b/include/internal_proto.h
index 7ec2872..ba3a942 100644
--- a/include/internal_proto.h
+++ b/include/internal_proto.h
@@ -106,6 +106,7 @@
 #define SD_OP_DECREF_PEER    0xC2
 #define SD_OP_PREVENT_INODE_UPDATE    0xC3
 #define SD_OP_ALLOW_INODE_UPDATE      0xC4
+#define SD_OP_VDI_STATE_SNAPSHOT_CTL  0xC5
 
 /* internal flags for hdr.flags, must be above 0x80 */
 #define SD_FLAG_CMD_RECOVERY 0x0080
@@ -144,6 +145,7 @@ enum sd_status {
 	SD_STATUS_WAIT,
 	SD_STATUS_SHUTDOWN,
 	SD_STATUS_KILLED,
+	SD_STATUS_COLLECTING_CINFO,
 };
 
 struct node_id {
diff --git a/include/sheepdog_proto.h b/include/sheepdog_proto.h
index 76fad51..40d516d 100644
--- a/include/sheepdog_proto.h
+++ b/include/sheepdog_proto.h
@@ -80,6 +80,7 @@
 #define SD_RES_HALT          0x19 /* Sheepdog is stopped doing IO */
 #define SD_RES_READONLY      0x1A /* Object is read-only */
 #define SD_RES_INCOMPLETE    0x1B /* Object (in kv) is incomplete uploading */
+#define SD_RES_COLLECTING_CINFO 0x1C /* sheep is collecting cluster wide status, not ready for operation */
 
 /* errors above 0x80 are sheepdog-internal */
 
@@ -180,6 +181,10 @@ struct sd_req {
 			uint8_t		addr[16];
 			uint16_t	port;
 		} node_addr;
+		struct {
+			uint32_t	get; /* 0 means free, 1 means get */
+			uint32_t	tgt_epoch;
+		} vdi_state_snapshot;
 
 		uint32_t		__pad[8];
 	};
diff --git a/sheep/group.c b/sheep/group.c
index 91fce50..2a02d11 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -537,6 +537,8 @@ static void do_get_vdis(struct work *work)
 	}
 }
 
+static void collect_cinfo(void);
+
 static void get_vdis_done(struct work *work)
 {
 	struct get_vdis_work *w =
@@ -549,6 +551,13 @@ static void get_vdis_done(struct work *work)
 
 	rb_destroy(&w->nroot, struct sd_node, rb);
 	free(w);
+
+	if (refcount_read(&nr_get_vdis_works) == 0)
+		/*
+		 * Now this sheep process could construct its vdi state.
+		 * It can collect other state e.g. vdi locking.
+		 */
+		collect_cinfo();
 }
 
 int inc_and_log_epoch(void)
@@ -638,6 +647,122 @@ static void get_vdis(const struct rb_root *nroot, const struct sd_node *joined)
 	queue_work(sys->block_wqueue, &w->work);
 }
 
+struct cinfo_collection_work {
+	struct work work;
+
+	enum sd_status next_status;
+	int epoch;
+	struct vnode_info *members;
+
+	int nr_vdi_states;
+	struct vdi_state *result;
+};
+
+static struct cinfo_collection_work *collect_work;
+
+static void cinfo_collection_work(struct work *work)
+{
+	struct sd_req hdr;
+	struct sd_rsp *rsp = (struct sd_rsp *)&hdr;
+	struct vdi_state *vs = NULL;
+	unsigned int rlen;
+	struct cinfo_collection_work *w =
+		container_of(work, struct cinfo_collection_work, work);
+	struct sd_node *n;
+	int ret;
+
+	sd_debug("start collection of cinfo...");
+
+	assert(w == collect_work);
+
+	rlen = SD_DATA_OBJ_SIZE; /* FIXME */
+	vs = xzalloc(rlen);
+
+	rb_for_each_entry(n, &w->members->nroot, rb) {
+		if (node_is_local(n))
+			continue;
+
+		sd_init_req(&hdr, SD_OP_VDI_STATE_SNAPSHOT_CTL);
+		hdr.vdi_state_snapshot.get = 1;
+		hdr.vdi_state_snapshot.tgt_epoch = w->epoch;
+		hdr.data_length = rlen;
+
+		ret = sheep_exec_req(&n->nid, &hdr, (char *)vs);
+		if (ret == SD_RES_SUCCESS)
+			goto get_succeed;
+	}
+
+	panic("getting a snapshot of vdi state at epoch %d failed", w->epoch);
+
+get_succeed:
+
+	w->nr_vdi_states = rsp->data_length / sizeof(*vs);
+	w->result = vs;
+
+	sd_debug("collecting cinfo done, freeing from remote nodes");
+
+	rb_for_each_entry(n, &w->members->nroot, rb) {
+		if (node_is_local(n))
+			continue;
+
+		sd_init_req(&hdr, SD_OP_VDI_STATE_SNAPSHOT_CTL);
+		hdr.vdi_state_snapshot.get = 0;
+		hdr.vdi_state_snapshot.tgt_epoch = w->epoch;
+
+		ret = sheep_exec_req(&n->nid, &hdr, (char *)vs);
+		if (ret != SD_RES_SUCCESS)
+			sd_err("error at freeing a snapshot of vdi state"
+			       " at epoch %d", w->epoch);
+	}
+
+	sd_debug("collection done");
+}
+
+static void cinfo_collection_done(struct work *work)
+{
+	struct cinfo_collection_work *w =
+		container_of(work, struct cinfo_collection_work, work);
+	enum sd_status next_status;
+
+	assert(w == collect_work);
+
+	for (int i = 0; i < w->nr_vdi_states; i++) {
+		struct vdi_state *vs = &w->result[i];
+
+		sd_debug("VID: %"PRIx32, vs->vid);
+		sd_debug("nr_copies: %d", vs->nr_copies);
+		sd_debug("snapshot: %d", vs->snapshot);
+		sd_debug("copy_policy: %d", vs->copy_policy);
+		sd_debug("lock_state: %x", vs->lock_state);
+		sd_debug("owner: %s",
+			 addr_to_str(vs->lock_owner.addr, vs->lock_owner.port));
+
+		apply_vdi_lock_state(vs);
+	}
+
+	next_status = w->next_status;
+	put_vnode_info(w->members);
+	free(w->result);
+	free(w);
+	collect_work = NULL;
+
+	sd_debug("cluster info collection finished");
+	sys->cinfo.status = next_status;
+}
+
+static void collect_cinfo(void)
+{
+	if (!collect_work)
+		return;
+
+	sd_debug("start cluster info collection for epoch %d",
+		 collect_work->epoch);
+
+	collect_work->work.fn = cinfo_collection_work;
+	collect_work->work.done = cinfo_collection_done;
+	queue_work(sys->block_wqueue, &collect_work->work);
+}
+
 void wait_get_vdis_done(void)
 {
 	sd_debug("waiting for vdi list");
@@ -688,10 +813,6 @@ static void update_cluster_info(const struct cluster_info *cinfo,
 	if (!sys->gateway_only)
 		setup_backend_store(cinfo);
 
-	if (node_is_local(joined))
-		sockfd_cache_add_group(nroot);
-	sockfd_cache_add(&joined->nid);
-
 	/*
 	 * We need use main_thread_get() to obtain current_vnode_info. The
 	 * reference count of old_vnode_info is decremented at the last of this
@@ -701,6 +822,22 @@ static void update_cluster_info(const struct cluster_info *cinfo,
 	old_vnode_info = main_thread_get(current_vnode_info);
 	main_thread_set(current_vnode_info, alloc_vnode_info(nroot));
 
+	if (node_is_local(joined)) {
+		sockfd_cache_add_group(nroot);
+
+		if (0 < cinfo->epoch && cinfo->status == SD_STATUS_OK) {
+			collect_work = xzalloc(sizeof(*collect_work));
+			collect_work->epoch = cinfo->epoch - 1;
+			collect_work->members = grab_vnode_info(
+				main_thread_get(current_vnode_info));
+		}
+	} else {
+		if (0 < cinfo->epoch && cinfo->status == SD_STATUS_OK)
+			take_vdi_state_snapshot(cinfo->epoch - 1);
+	}
+
+	sockfd_cache_add(&joined->nid);
+
 	get_vdis(nroot, joined);
 
 	if (cinfo->status == SD_STATUS_OK) {
@@ -956,9 +1093,16 @@ main_fn void sd_accept_handler(const struct sd_node *joined,
 
 	update_cluster_info(cinfo, joined, nroot, nr_nodes);
 
-	if (node_is_local(joined))
+	if (node_is_local(joined)) {
 		/* this output is used for testing */
 		sd_debug("join Sheepdog cluster");
+
+		if (collect_work) {
+			sd_debug("status is SD_STATUS_COLLECTING_CINFO");
+			collect_work->next_status = sys->cinfo.status;
+			sys->cinfo.status = SD_STATUS_COLLECTING_CINFO;
+		}
+	}
 }
 
 main_fn void sd_leave_handler(const struct sd_node *left,
diff --git a/sheep/ops.c b/sheep/ops.c
index 3469c61..ac219cb 100644
--- a/sheep/ops.c
+++ b/sheep/ops.c
@@ -1312,6 +1312,33 @@ static int cluster_release_vdi_main(const struct sd_req *req,
 	return SD_RES_SUCCESS;
 }
 
+static int local_vdi_state_snapshot_ctl(const struct sd_req *req,
+					struct sd_rsp *rsp, void *data,
+					const struct sd_node *sender)
+{
+	bool get = !!req->vdi_state_snapshot.get;
+	int epoch = req->vdi_state_snapshot.tgt_epoch;
+	int ret;
+
+	sd_info("%s vdi state snapshot at epoch %d",
+		get ? "getting" : "freeing", epoch);
+
+	if (get) {
+		/*
+		 * FIXME: assuming request has enough space for storing
+		 * the snapshot
+		 */
+		ret = get_vdi_state_snapshot(epoch, data);
+		if (0 <= ret)
+			rsp->data_length = ret;
+		else
+			return SD_RES_AGAIN;
+	} else
+		free_vdi_state_snapshot(epoch);
+
+	return SD_RES_SUCCESS;
+}
+
 static struct sd_op_template sd_ops[] = {
 
 	/* cluster operations */
@@ -1665,6 +1692,12 @@ static struct sd_op_template sd_ops[] = {
 		.process_main = local_allow_inode_update,
 	},
 
+	[SD_OP_VDI_STATE_SNAPSHOT_CTL] = {
+		.name = "VDI_STATE_SNAPSHOT_CTL",
+		.type = SD_OP_TYPE_LOCAL,
+		.process_main = local_vdi_state_snapshot_ctl,
+	},
+
 	/* gateway I/O operations */
 	[SD_OP_CREATE_AND_WRITE_OBJ] = {
 		.name = "CREATE_AND_WRITE_OBJ",
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index fc174d4..ab6180f 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -238,7 +238,8 @@ struct vdi_state {
 	uint8_t nr_copies;
 	uint8_t snapshot;
 	uint8_t copy_policy;
-	uint8_t _pad;
+	uint8_t lock_state;
+	struct node_id lock_owner;
 };
 
 struct store_driver {
@@ -353,7 +354,11 @@ int sd_create_hyper_volume(const char *name, uint32_t *vdi_id);
 bool lock_vdi(uint32_t vid, const struct node_id *owner);
 bool unlock_vdi(uint32_t vid, const struct node_id *owner);
 void unlock_all_vdis(const struct node_id *owner);
+void apply_vdi_lock_state(struct vdi_state *vs);
 void notify_release_vdi(uint32_t vid);
+void take_vdi_state_snapshot(int epoch);
+int get_vdi_state_snapshot(int epoch, void *data);
+void free_vdi_state_snapshot(int epoch);
 
 extern int ec_max_data_strip;
 
diff --git a/sheep/vdi.c b/sheep/vdi.c
index c36692f..13f0f5d 100644
--- a/sheep/vdi.c
+++ b/sheep/vdi.c
@@ -200,6 +200,8 @@ int fill_vdi_state_list(void *data)
 		vs->nr_copies = entry->nr_copies;
 		vs->snapshot = entry->snapshot;
 		vs->copy_policy = entry->copy_policy;
+		vs->lock_state = entry->lock_state.state;
+		vs->lock_owner = entry->lock_state.owner;
 		vs++;
 		nr++;
 	}
@@ -208,6 +210,36 @@ int fill_vdi_state_list(void *data)
 	return nr * sizeof(*vs);
 }
 
+static struct vdi_state *fill_vdi_state_list_with_alloc(int *result_nr)
+{
+	struct vdi_state *vs;
+	struct vdi_state_entry *entry;
+	int i = 0, nr = 0;
+
+	sd_read_lock(&vdi_state_lock);
+	rb_for_each_entry(entry, &vdi_state_root, node) {
+		nr++;
+	}
+
+	vs = xcalloc(nr, sizeof(*vs));
+	rb_for_each_entry(entry, &vdi_state_root, node) {
+		vs[i].vid = entry->vid;
+		vs[i].nr_copies = entry->nr_copies;
+		vs[i].snapshot = entry->snapshot;
+		vs[i].copy_policy = entry->copy_policy;
+		vs[i].lock_state = entry->lock_state.state;
+		vs[i].lock_owner = entry->lock_state.owner;
+
+		i++;
+		assert(i < nr);
+	}
+
+	sd_rw_unlock(&vdi_state_lock);
+
+	*result_nr = nr;
+	return vs;
+}
+
 static inline bool vdi_is_deleted(struct sd_inode *inode)
 {
 	return *inode->name == '\0';
@@ -327,6 +359,25 @@ void unlock_all_vdis(const struct node_id *owner)
 	}
 }
 
+void apply_vdi_lock_state(struct vdi_state *vs)
+{
+	struct vdi_state_entry *entry;
+
+	sd_write_lock(&vdi_state_lock);
+	entry = vdi_state_search(&vdi_state_root, vs->vid);
+	if (!entry) {
+		sd_err("no vdi state entry of %"PRIx32" found", vs->vid);
+		goto out;
+	}
+
+	entry->lock_state.state = vs->lock_state;
+	memcpy(&entry->lock_state.owner, &vs->lock_owner,
+	       sizeof(vs->lock_owner));
+
+out:
+	sd_rw_unlock(&vdi_state_lock);
+}
+
 static struct sd_inode *alloc_inode(const struct vdi_iocb *iocb,
 				    uint32_t new_snapid, uint32_t new_vid,
 				    uint32_t *data_vdi_id,
@@ -1247,3 +1298,72 @@ void notify_release_vdi(uint32_t vid)
 	w->vid = vid;
 	queue_work(sys->gateway_wqueue, &w->work);
 }
+
+struct vdi_state_snapshot {
+	int epoch, nr_vs;
+	struct vdi_state *vs;
+
+	struct list_node list;
+};
+
+static LIST_HEAD(vdi_state_snapshot_list);
+
+main_fn void take_vdi_state_snapshot(int epoch)
+{
+	/*
+	 * take a snapshot of current vdi state and associate it with
+	 * the given epoch
+	 */
+	struct vdi_state_snapshot *snapshot;
+
+	list_for_each_entry(snapshot, &vdi_state_snapshot_list, list) {
+		if (snapshot->epoch == epoch) {
+			sd_debug("duplicate snapshot of epoch %d", epoch);
+			return;
+		}
+
+	}
+
+	snapshot = xzalloc(sizeof(*snapshot));
+	snapshot->epoch = epoch;
+	snapshot->vs = fill_vdi_state_list_with_alloc(&snapshot->nr_vs);
+	INIT_LIST_NODE(&snapshot->list);
+	list_add_tail(&snapshot->list, &vdi_state_snapshot_list);
+
+	sd_debug("taking a snapshot of vdi state at epoch %d succeed", epoch);
+	sd_debug("a number of vdi state: %d", snapshot->nr_vs);
+}
+
+main_fn int get_vdi_state_snapshot(int epoch, void *data)
+{
+	struct vdi_state_snapshot *snapshot;
+
+	list_for_each_entry(snapshot, &vdi_state_snapshot_list, list) {
+		if (snapshot->epoch == epoch) {
+			memcpy(data, snapshot->vs,
+			       sizeof(*snapshot->vs) * snapshot->nr_vs);
+			return sizeof(*snapshot->vs) * snapshot->nr_vs;
+		}
+	}
+
+	sd_info("get request for not prepared vdi state snapshot, epoch: %d",
+		epoch);
+	return -1;
+}
+
+main_fn void free_vdi_state_snapshot(int epoch)
+{
+	struct vdi_state_snapshot *snapshot;
+
+	list_for_each_entry(snapshot, &vdi_state_snapshot_list, list) {
+		if (snapshot->epoch == epoch) {
+			list_del(&snapshot->list);
+			free(snapshot->vs);
+			free(snapshot);
+
+			return;
+		}
+	}
+
+	panic("invalid free request for vdi state snapshot, epoch: %d", epoch);
+}
-- 
1.7.1




More information about the sheepdog mailing list