[sheepdog] [PATCH v2 4/7] sheep: snapshot and collect vdi state during joining to cluster

Hitoshi Mitake mitake.hitoshi at lab.ntt.co.jp
Wed Jul 30 09:24:39 CEST 2014


Newly joining sheep must know which vdi is locked or unlocked. For
propagating the state, new sheep must copy it from existing nodes
during join process.

This patch implements a solution for this. Below is the brief
description:
1. When a cluster accepts newly joining node, every existing node take
   a snapshot of vdi state at the epoch.
2. Newly joining node copy it via a newly defined request.
3. After applying the copied state to its internal state, the new
   sheep issues discard request to existing nodes.
4. The new sheep plays its operation log and solves difference between
   copied snapshot and latest state (produced by requests issued after
   taking snapshot).

With this scheme, new sheep can know which vdi is locked or unlocked.

Cc: Fabian Zimmermann <dev.faz at gmail.com>
Cc: Valerio Pachera <sirio81 at gmail.com>
Signed-off-by: Hitoshi Mitake <mitake.hitoshi at lab.ntt.co.jp>
---
 include/internal_proto.h |   7 ++
 include/sheepdog_proto.h |   6 ++
 sheep/group.c            | 160 +++++++++++++++++++++++++++++++++++++++++--
 sheep/ops.c              |  67 ++++++++++++++++--
 sheep/sheep.c            |   2 +
 sheep/sheep_priv.h       |  11 ++-
 sheep/vdi.c              | 174 +++++++++++++++++++++++++++++++++++++++++++++++
 7 files changed, 415 insertions(+), 12 deletions(-)

diff --git a/include/internal_proto.h b/include/internal_proto.h
index 37afb46..b1afc60 100644
--- a/include/internal_proto.h
+++ b/include/internal_proto.h
@@ -108,6 +108,7 @@
 #define SD_OP_ALLOW_INODE_UPDATE      0xC4
 #define SD_OP_REPAIR_REPLICA	0xC5
 #define SD_OP_OIDS_EXIST	0xC6
+#define SD_OP_VDI_STATE_SNAPSHOT_CTL  0xC7
 
 /* internal flags for hdr.flags, must be above 0x80 */
 #define SD_FLAG_CMD_RECOVERY 0x0080
@@ -148,6 +149,12 @@ enum sd_status {
 	SD_STATUS_KILLED,
 };
 
+enum sd_node_status {
+	SD_NODE_STATUS_INITIALIZATION = 1,
+	SD_NODE_STATUS_COLLECTING_CINFO,
+	SD_NODE_STATUS_OK,
+};
+
 struct node_id {
 	uint8_t addr[16];
 	uint16_t port;
diff --git a/include/sheepdog_proto.h b/include/sheepdog_proto.h
index 7cfdccb..96b1727 100644
--- a/include/sheepdog_proto.h
+++ b/include/sheepdog_proto.h
@@ -83,6 +83,7 @@
 #define SD_RES_HALT          0x19 /* Sheepdog is stopped doing IO */
 #define SD_RES_READONLY      0x1A /* Object is read-only */
 #define SD_RES_INCOMPLETE    0x1B /* Object (in kv) is incomplete uploading */
+#define SD_RES_COLLECTING_CINFO 0x1C /* sheep is collecting cluster wide status, not ready for operation */
 
 /* errors above 0x80 are sheepdog-internal */
 
@@ -184,6 +185,11 @@ struct sd_req {
 			uint8_t		addr[16];
 			uint16_t	port;
 		} forw;
+		struct {
+			uint32_t        get; /* 0 means free, 1 means get */
+			uint32_t        tgt_epoch;
+		} vdi_state_snapshot;
+
 
 		uint32_t		__pad[8];
 	};
diff --git a/sheep/group.c b/sheep/group.c
index f162026..2a610c5 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -550,6 +550,8 @@ static void do_get_vdis(struct work *work)
 	}
 }
 
+static void collect_cinfo(void);
+
 static void get_vdis_done(struct work *work)
 {
 	struct get_vdis_work *w =
@@ -562,6 +564,14 @@ static void get_vdis_done(struct work *work)
 
 	rb_destroy(&w->nroot, struct sd_node, rb);
 	free(w);
+
+	if (refcount_read(&nr_get_vdis_works) == 0)
+		/*
+		 * Now this sheep process could construct its vdi state.
+		 * It can collect other state e.g. vdi locking.
+		 */
+		collect_cinfo();
+
 }
 
 int inc_and_log_epoch(void)
@@ -651,6 +661,121 @@ static void get_vdis(const struct rb_root *nroot, const struct sd_node *joined)
 	queue_work(sys->block_wqueue, &w->work);
 }
 
+struct cinfo_collection_work {
+	struct work work;
+
+	int epoch;
+	struct vnode_info *members;
+
+	int nr_vdi_states;
+	struct vdi_state *result;
+};
+
+static struct cinfo_collection_work *collect_work;
+
+static void cinfo_collection_work(struct work *work)
+{
+	struct sd_req hdr;
+	struct sd_rsp *rsp = (struct sd_rsp *)&hdr;
+	struct vdi_state *vs = NULL;
+	unsigned int rlen;
+	struct cinfo_collection_work *w =
+		container_of(work, struct cinfo_collection_work, work);
+	struct sd_node *n;
+	int ret;
+
+	sd_debug("start collection of cinfo...");
+
+	assert(w == collect_work);
+
+	rlen = SD_DATA_OBJ_SIZE; /* FIXME */
+	vs = xzalloc(rlen);
+
+	rb_for_each_entry(n, &w->members->nroot, rb) {
+		if (node_is_local(n))
+			continue;
+
+		sd_init_req(&hdr, SD_OP_VDI_STATE_SNAPSHOT_CTL);
+		hdr.vdi_state_snapshot.get = 1;
+		hdr.vdi_state_snapshot.tgt_epoch = w->epoch;
+		hdr.data_length = rlen;
+
+		ret = sheep_exec_req(&n->nid, &hdr, (char *)vs);
+		if (ret == SD_RES_SUCCESS)
+			goto get_succeed;
+	}
+
+	panic("getting a snapshot of vdi state at epoch %d failed", w->epoch);
+
+get_succeed:
+
+	w->nr_vdi_states = rsp->data_length / sizeof(*vs);
+	w->result = vs;
+
+	sd_debug("collecting cinfo done, freeing from remote nodes");
+
+	rb_for_each_entry(n, &w->members->nroot, rb) {
+		if (node_is_local(n))
+			continue;
+
+		sd_init_req(&hdr, SD_OP_VDI_STATE_SNAPSHOT_CTL);
+		hdr.vdi_state_snapshot.get = 0;
+		hdr.vdi_state_snapshot.tgt_epoch = w->epoch;
+
+		ret = sheep_exec_req(&n->nid, &hdr, (char *)vs);
+		if (ret != SD_RES_SUCCESS)
+			sd_err("error at freeing a snapshot of vdi state"
+			       " at epoch %d", w->epoch);
+	}
+
+	sd_debug("collection done");
+}
+
+static void cinfo_collection_done(struct work *work)
+{
+	struct cinfo_collection_work *w =
+		container_of(work, struct cinfo_collection_work, work);
+
+	assert(w == collect_work);
+
+	for (int i = 0; i < w->nr_vdi_states; i++) {
+		struct vdi_state *vs = &w->result[i];
+
+		sd_debug("VID: %"PRIx32, vs->vid);
+		sd_debug("nr_copies: %d", vs->nr_copies);
+		sd_debug("snapshot: %d", vs->snapshot);
+		sd_debug("copy_policy: %d", vs->copy_policy);
+		sd_debug("lock_state: %x", vs->lock_state);
+		sd_debug("owner: %s",
+			 addr_to_str(vs->lock_owner.addr, vs->lock_owner.port));
+
+		apply_vdi_lock_state(vs);
+	}
+
+	put_vnode_info(w->members);
+	free(w->result);
+	free(w);
+	collect_work = NULL;
+
+	play_logged_vdi_ops();
+
+	sd_debug("cluster info collection finished");
+	sys->node_status = SD_NODE_STATUS_OK;
+}
+
+static void collect_cinfo(void)
+{
+	if (!collect_work)
+		return;
+
+	sd_debug("start cluster info collection for epoch %d",
+		 collect_work->epoch);
+
+	collect_work->work.fn = cinfo_collection_work;
+	collect_work->work.done = cinfo_collection_done;
+	queue_work(sys->block_wqueue, &collect_work->work);
+}
+
 void wait_get_vdis_done(void)
 {
 	sd_debug("waiting for vdi list");
@@ -701,10 +826,6 @@ static void update_cluster_info(const struct cluster_info *cinfo,
 	if (!sys->gateway_only)
 		setup_backend_store(cinfo);
 
-	if (node_is_local(joined))
-		sockfd_cache_add_group(nroot);
-	sockfd_cache_add(&joined->nid);
-
 	/*
 	 * We need use main_thread_get() to obtain current_vnode_info. The
 	 * reference count of old_vnode_info is decremented at the last of this
@@ -714,6 +835,22 @@ static void update_cluster_info(const struct cluster_info *cinfo,
 	old_vnode_info = main_thread_get(current_vnode_info);
 	main_thread_set(current_vnode_info, alloc_vnode_info(nroot));
 
+	if (node_is_local(joined)) {
+		sockfd_cache_add_group(nroot);
+
+		if (0 < cinfo->epoch && cinfo->status == SD_STATUS_OK) {
+			collect_work = xzalloc(sizeof(*collect_work));
+			collect_work->epoch = cinfo->epoch - 1;
+			collect_work->members = grab_vnode_info(
+				main_thread_get(current_vnode_info));
+		}
+	} else {
+		if (0 < cinfo->epoch && cinfo->status == SD_STATUS_OK)
+			take_vdi_state_snapshot(cinfo->epoch - 1);
+	}
+
+	sockfd_cache_add(&joined->nid);
+
 	get_vdis(nroot, joined);
 
 	if (cinfo->status == SD_STATUS_OK) {
@@ -969,9 +1106,18 @@ main_fn void sd_accept_handler(const struct sd_node *joined,
 
 	update_cluster_info(cinfo, joined, nroot, nr_nodes);
 
-	if (node_is_local(joined))
-		/* this output is used for testing */
-		sd_debug("join Sheepdog cluster");
+	if (node_is_local(joined)) {
+ 		/* this output is used for testing */
+ 		sd_debug("join Sheepdog cluster");
+
+		if (collect_work) {
+			sd_debug("status is SD_NODE_STATUS_COLLECTING_CINFO");
+			sys->node_status = SD_NODE_STATUS_COLLECTING_CINFO;
+		} else {
+			sd_debug("status is SD_NODE_STATUS_OK");
+			sys->node_status = SD_NODE_STATUS_OK;
+		}
+	}
 }
 
 main_fn void sd_leave_handler(const struct sd_node *left,
diff --git a/sheep/ops.c b/sheep/ops.c
index 7ca06d6..cc74986 100644
--- a/sheep/ops.c
+++ b/sheep/ops.c
@@ -1306,11 +1306,31 @@ static int local_repair_replica(struct request *req)
 	return ret;
 }
 
-static int cluster_lock_vdi(const struct sd_req *req, struct sd_rsp *rsp,
-			    void *data, const struct sd_node *sender)
+static int cluster_lock_vdi_work(struct request *req)
+{
+	if (sys->node_status == SD_NODE_STATUS_COLLECTING_CINFO) {
+		/*
+		 * this node is collecting vdi locking status, not ready for
+		 * allowing lock by itself
+		 */
+		sd_err("This node is not ready for vdi locking, try later");
+		return SD_RES_COLLECTING_CINFO;
+	}
+
+	return cluster_get_vdi_info(req);
+}
+
+static int cluster_lock_vdi_main(const struct sd_req *req, struct sd_rsp *rsp,
+				 void *data, const struct sd_node *sender)
 {
 	uint32_t vid = rsp->vdi.vdi_id;
 
+	if (sys->node_status == SD_NODE_STATUS_COLLECTING_CINFO) {
+		sd_debug("logging vdi unlock information for later replay");
+		log_vdi_op_lock(vid, &sender->nid);
+		return SD_RES_SUCCESS;
+	}
+
 	sd_info("node: %s is locking VDI: %"PRIx32, node_to_str(sender), vid);
 
 	if (!lock_vdi(vid, &sender->nid)) {
@@ -1327,6 +1347,12 @@ static int cluster_release_vdi_main(const struct sd_req *req,
 {
 	uint32_t vid = req->vdi.base_vdi_id;
 
+	if (sys->node_status == SD_NODE_STATUS_COLLECTING_CINFO) {
+		sd_debug("logging vdi lock information for later replay");
+		log_vdi_op_unlock(vid, &sender->nid);
+		return SD_RES_SUCCESS;
+	}
+
 	sd_info("node: %s is unlocking VDI: %"PRIx32, node_to_str(sender), vid);
 
 	unlock_vdi(vid, &sender->nid);
@@ -1334,6 +1360,33 @@ static int cluster_release_vdi_main(const struct sd_req *req,
 	return SD_RES_SUCCESS;
 }
 
+static int local_vdi_state_snapshot_ctl(const struct sd_req *req,
+					struct sd_rsp *rsp, void *data,
+					const struct sd_node *sender)
+{
+	bool get = !!req->vdi_state_snapshot.get;
+	int epoch = req->vdi_state_snapshot.tgt_epoch;
+	int ret;
+
+	sd_info("%s vdi state snapshot at epoch %d",
+		get ? "getting" : "freeing", epoch);
+
+	if (get) {
+		/*
+		 * FIXME: assuming request has enough space for storing
+		 * the snapshot
+		 */
+		ret = get_vdi_state_snapshot(epoch, data);
+		if (0 <= ret)
+			rsp->data_length = ret;
+		else
+			return SD_RES_AGAIN;
+	} else
+		free_vdi_state_snapshot(epoch);
+
+	return SD_RES_SUCCESS;
+}
+
 static struct sd_op_template sd_ops[] = {
 
 	/* cluster operations */
@@ -1427,8 +1480,8 @@ static struct sd_op_template sd_ops[] = {
 	[SD_OP_LOCK_VDI] = {
 		.name = "LOCK_VDI",
 		.type = SD_OP_TYPE_CLUSTER,
-		.process_work = cluster_get_vdi_info,
-		.process_main = cluster_lock_vdi,
+		.process_work = cluster_lock_vdi_work,
+		.process_main = cluster_lock_vdi_main,
 	},
 
 	[SD_OP_RELEASE_VDI] = {
@@ -1701,6 +1754,12 @@ static struct sd_op_template sd_ops[] = {
 		.process_work = local_repair_replica,
 	},
 
+	[SD_OP_VDI_STATE_SNAPSHOT_CTL] = {
+		.name = "VDI_STATE_SNAPSHOT_CTL",
+		.type = SD_OP_TYPE_LOCAL,
+		.process_main = local_vdi_state_snapshot_ctl,
+	},
+
 	/* gateway I/O operations */
 	[SD_OP_CREATE_AND_WRITE_OBJ] = {
 		.name = "CREATE_AND_WRITE_OBJ",
diff --git a/sheep/sheep.c b/sheep/sheep.c
index c5d97ea..e6ac8c5 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -624,6 +624,8 @@ int main(int argc, char **argv)
 	struct stat logdir_st;
 	enum log_dst_type log_dst_type;
 
+	sys->node_status = SD_NODE_STATUS_INITIALIZATION;
+
 	install_crash_handler(crash_handler);
 	signal(SIGPIPE, SIG_IGN);
 
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 74aa08e..c21aa4d 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -126,6 +126,7 @@ struct system_info {
 	struct sd_node this_node;
 
 	struct cluster_info cinfo;
+	enum sd_node_status node_status;
 
 	uint64_t disk_space;
 
@@ -233,7 +234,8 @@ struct vdi_state {
 	uint8_t nr_copies;
 	uint8_t snapshot;
 	uint8_t copy_policy;
-	uint8_t _pad;
+	uint8_t lock_state;
+	struct node_id lock_owner;
 };
 
 struct store_driver {
@@ -347,6 +349,13 @@ int sd_create_hyper_volume(const char *name, uint32_t *vdi_id);
 
 bool lock_vdi(uint32_t vid, const struct node_id *owner);
 bool unlock_vdi(uint32_t vid, const struct node_id *owner);
+void apply_vdi_lock_state(struct vdi_state *vs);
+void take_vdi_state_snapshot(int epoch);
+int get_vdi_state_snapshot(int epoch, void *data);
+void free_vdi_state_snapshot(int epoch);
+void log_vdi_op_lock(uint32_t vid, const struct node_id *owner);
+void log_vdi_op_unlock(uint32_t vid, const struct node_id *owner);
+void play_logged_vdi_ops(void);
 
 extern int ec_max_data_strip;
 
diff --git a/sheep/vdi.c b/sheep/vdi.c
index 8bcc7d4..a2709db 100644
--- a/sheep/vdi.c
+++ b/sheep/vdi.c
@@ -206,6 +206,8 @@ int fill_vdi_state_list(const struct sd_req *hdr,
 		vs[last].nr_copies = entry->nr_copies;
 		vs[last].snapshot = entry->snapshot;
 		vs[last].copy_policy = entry->copy_policy;
+		vs[last].lock_state = entry->lock_state.state;
+		vs[last].lock_owner = entry->lock_state.owner;
 		last++;
 	}
 	sd_rw_unlock(&vdi_state_lock);
@@ -221,6 +223,36 @@ int fill_vdi_state_list(const struct sd_req *hdr,
 	return SD_RES_SUCCESS;
 }
 
+static struct vdi_state *fill_vdi_state_list_with_alloc(int *result_nr)
+{
+	struct vdi_state *vs;
+	struct vdi_state_entry *entry;
+	int i = 0, nr = 0;
+
+	sd_read_lock(&vdi_state_lock);
+	rb_for_each_entry(entry, &vdi_state_root, node) {
+		nr++;
+	}
+
+	vs = xcalloc(nr, sizeof(*vs));
+	rb_for_each_entry(entry, &vdi_state_root, node) {
+		vs[i].vid = entry->vid;
+		vs[i].nr_copies = entry->nr_copies;
+		vs[i].snapshot = entry->snapshot;
+		vs[i].copy_policy = entry->copy_policy;
+		vs[i].lock_state = entry->lock_state.state;
+		vs[i].lock_owner = entry->lock_state.owner;
+
+		i++;
+		assert(i < nr);
+	}
+
+	sd_rw_unlock(&vdi_state_lock);
+
+	*result_nr = nr;
+	return vs;
+}
+
 static inline bool vdi_is_deleted(struct sd_inode *inode)
 {
 	return *inode->name == '\0';
@@ -321,6 +353,79 @@ out:
 	return ret;
 }
 
+void apply_vdi_lock_state(struct vdi_state *vs)
+{
+	struct vdi_state_entry *entry;
+
+	sd_write_lock(&vdi_state_lock);
+	entry = vdi_state_search(&vdi_state_root, vs->vid);
+	if (!entry) {
+		sd_err("no vdi state entry of %"PRIx32" found", vs->vid);
+		goto out;
+	}
+
+	entry->lock_state.state = vs->lock_state;
+	memcpy(&entry->lock_state.owner, &vs->lock_owner,
+	       sizeof(vs->lock_owner));
+
+out:
+	sd_rw_unlock(&vdi_state_lock);
+}
+
+static LIST_HEAD(logged_vdi_ops);
+
+struct vdi_op_log {
+	bool lock;
+	uint32_t vid;
+	struct node_id owner;
+
+	struct list_node list;
+};
+
+void log_vdi_op_lock(uint32_t vid, const struct node_id *owner)
+{
+	struct vdi_op_log *op;
+
+	op = xzalloc(sizeof(*op));
+	op->lock = true;
+	op->vid = vid;
+	memcpy(&op->owner, owner, sizeof(*owner));
+	INIT_LIST_NODE(&op->list);
+	list_add_tail(&op->list, &logged_vdi_ops);
+}
+
+void log_vdi_op_unlock(uint32_t vid, const struct node_id *owner)
+{
+	struct vdi_op_log *op;
+
+	op = xzalloc(sizeof(*op));
+	op->lock = false;
+	op->vid = vid;
+	memcpy(&op->owner, owner, sizeof(*owner));
+	INIT_LIST_NODE(&op->list);
+	list_add_tail(&op->list, &logged_vdi_ops);
+}
+
+void play_logged_vdi_ops(void)
+{
+	struct vdi_op_log *op;
+
+	list_for_each_entry(op, &logged_vdi_ops, list) {
+		struct vdi_state entry;
+
+		memset(&entry, 0, sizeof(entry));
+		entry.vid = op->vid;
+		memcpy(&entry.lock_owner, &op->owner,
+		       sizeof(op->owner));
+		if (op->lock)
+			entry.lock_state = LOCK_STATE_LOCKED;
+		else
+			entry.lock_state = LOCK_STATE_UNLOCKED;
+
+		apply_vdi_lock_state(&entry);
+	}
+}
+
 static struct sd_inode *alloc_inode(const struct vdi_iocb *iocb,
 				    uint32_t new_snapid, uint32_t new_vid,
 				    uint32_t *data_vdi_id,
@@ -1198,3 +1303,72 @@ int sd_create_hyper_volume(const char *name, uint32_t *vdi_id)
 out:
 	return ret;
 }
+
+struct vdi_state_snapshot {
+	int epoch, nr_vs;
+	struct vdi_state *vs;
+
+	struct list_node list;
+};
+
+static LIST_HEAD(vdi_state_snapshot_list);
+
+main_fn void take_vdi_state_snapshot(int epoch)
+{
+	/*
+	 * take a snapshot of current vdi state and associate it with
+	 * the given epoch
+	 */
+	struct vdi_state_snapshot *snapshot;
+
+	list_for_each_entry(snapshot, &vdi_state_snapshot_list, list) {
+		if (snapshot->epoch == epoch) {
+			sd_debug("duplicate snapshot of epoch %d", epoch);
+			return;
+		}
+
+	}
+
+	snapshot = xzalloc(sizeof(*snapshot));
+	snapshot->epoch = epoch;
+	snapshot->vs = fill_vdi_state_list_with_alloc(&snapshot->nr_vs);
+	INIT_LIST_NODE(&snapshot->list);
+	list_add_tail(&snapshot->list, &vdi_state_snapshot_list);
+
+	sd_debug("taking a snapshot of vdi state at epoch %d succeed", epoch);
+	sd_debug("a number of vdi state: %d", snapshot->nr_vs);
+}
+
+main_fn int get_vdi_state_snapshot(int epoch, void *data)
+{
+	struct vdi_state_snapshot *snapshot;
+
+	list_for_each_entry(snapshot, &vdi_state_snapshot_list, list) {
+		if (snapshot->epoch == epoch) {
+			memcpy(data, snapshot->vs,
+			       sizeof(*snapshot->vs) * snapshot->nr_vs);
+			return sizeof(*snapshot->vs) * snapshot->nr_vs;
+		}
+	}
+
+	sd_info("get request for not prepared vdi state snapshot, epoch: %d",
+		epoch);
+	return -1;
+}
+
+main_fn void free_vdi_state_snapshot(int epoch)
+{
+	struct vdi_state_snapshot *snapshot;
+
+	list_for_each_entry(snapshot, &vdi_state_snapshot_list, list) {
+		if (snapshot->epoch == epoch) {
+			list_del(&snapshot->list);
+			free(snapshot->vs);
+			free(snapshot);
+
+			return;
+		}
+	}
+
+	panic("invalid free request for vdi state snapshot, epoch: %d", epoch);
+}
-- 
1.8.3.2




More information about the sheepdog mailing list