[sheepdog] [PATCH v5 5/8] sheep: fetch vdi copy list after sheep joins the cluster

levin li levin108 at gmail.com
Wed Aug 22 10:55:16 CEST 2012


From: levin li <xingke.lwp at taobao.com>

The new joined node doesn't have the vdi copy list, or have
incomplete vdi copy list, so we need to fetch the copy list
data from other nodes

Signed-off-by: levin li <xingke.lwp at taobao.com>
---
 collie/common.c    |   15 +++++--
 sheep/farm/farm.c  |   31 +++++++++++++++-
 sheep/group.c      |  106 +++++++++++++++++++++++++++++++++++++++++----------
 sheep/recovery.c   |    2 +
 sheep/sheep.c      |    2 +
 sheep/sheep_priv.h |   10 +++++
 sheep/vdi.c        |   28 ++++++++++++-
 7 files changed, 165 insertions(+), 29 deletions(-)

diff --git a/collie/common.c b/collie/common.c
index f885c8c..364e36c 100644
--- a/collie/common.c
+++ b/collie/common.c
@@ -131,7 +131,14 @@ int parse_vdi(vdi_parser_func_t func, size_t size, void *data)
 	static struct sheepdog_inode i;
 	struct sd_req req;
 	static DECLARE_BITMAP(vdi_inuse, SD_NR_VDIS);
-	unsigned int rlen, wlen = 0;
+	unsigned int wlen = 0, rlen = sizeof(vdi_inuse) * 2;
+	char *buf;
+
+	buf = zalloc(rlen);
+	if (!buf) {
+		fprintf(stderr, "Failed to allocate memory\n");
+		return -1;
+	}
 
 	fd = connect_to(sdhost, sdport);
 	if (fd < 0) {
@@ -140,11 +147,10 @@ int parse_vdi(vdi_parser_func_t func, size_t size, void *data)
 	}
 
 	sd_init_req(&req, SD_OP_READ_VDIS);
-	req.data_length = sizeof(vdi_inuse);
+	req.data_length = rlen;
 	req.epoch = sd_epoch;
 
-	rlen = sizeof(vdi_inuse);
-	ret = exec_req(fd, &req, vdi_inuse, &wlen, &rlen);
+	ret = exec_req(fd, &req, buf, &wlen, &rlen);
 	if (ret < 0) {
 		fprintf(stderr, "Failed to read VDIs from %s:%d\n",
 			sdhost, sdport);
@@ -153,6 +159,7 @@ int parse_vdi(vdi_parser_func_t func, size_t size, void *data)
 	}
 	close(fd);
 
+	memcpy(&vdi_inuse, buf, sizeof(vdi_inuse));
 	for (nr = 0; nr < SD_NR_VDIS; nr++) {
 		uint64_t oid;
 
diff --git a/sheep/farm/farm.c b/sheep/farm/farm.c
index 7eeae9a..221a3a7 100644
--- a/sheep/farm/farm.c
+++ b/sheep/farm/farm.c
@@ -294,10 +294,36 @@ out:
 	return ret;
 }
 
+static int read_vdi_copies(uint64_t oid)
+{
+	char path[PATH_MAX];
+	int fd, flags = def_open_flags, ret;
+	struct sheepdog_inode inode;
+
+	snprintf(path, sizeof(path), "%s%016" PRIx64, obj_path, oid);
+
+	fd = open(path, flags);
+	if (fd < 0) {
+		eprintf("%m\n");
+		return SD_RES_EIO;
+	}
+
+	ret = xpread(fd, (void *)&inode, SD_INODE_HEADER_SIZE, 0);
+	if (ret != SD_INODE_HEADER_SIZE) {
+		eprintf("%m\n");
+		return SD_RES_EIO;
+	}
+
+	add_vdi_copies(oid_to_vid(oid), inode.nr_copies);
+
+	return SD_RES_SUCCESS;
+}
+
 static int init_sys_vdi_bitmap(char *path)
 {
 	DIR *dir;
 	struct dirent *dent;
+	int ret = SD_RES_SUCCESS;
 
 	dir = opendir(path);
 	if (!dir) {
@@ -322,10 +348,13 @@ static int init_sys_vdi_bitmap(char *path)
 		vprintf(SDOG_DEBUG, "found the VDI object %" PRIx64 "\n", oid);
 
 		set_bit(oid_to_vid(oid), sys->vdi_inuse);
+		ret = read_vdi_copies(oid);
+		if (ret != SD_RES_SUCCESS)
+			break;
 	}
 	closedir(dir);
 
-	return 0;
+	return ret;
 }
 
 static bool is_xattr_enabled(char *path)
diff --git a/sheep/group.c b/sheep/group.c
index 3f5a8fb..e935218 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -16,6 +16,8 @@
 #include <arpa/inet.h>
 #include <sys/time.h>
 #include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <pthread.h>
 #include <urcu/uatomic.h>
 #include <math.h>
 
@@ -32,13 +34,17 @@ struct node {
 	struct list_head list;
 };
 
-struct vdi_bitmap_work {
+struct get_vdis_work {
 	struct work work;
 	DECLARE_BITMAP(vdi_inuse, SD_NR_VDIS);
 	size_t nr_members;
 	struct sd_node members[];
 };
 
+#define SD_GET_VDIS_NONE    0
+#define SD_GET_VDIS_BEGIN   1
+#define SD_GET_VDIS_WAITING 2
+
 struct sd_node joining_nodes[SD_MAX_NODES];
 size_t nr_joining_nodes;
 struct sd_node all_nodes[SD_MAX_NODES];
@@ -622,14 +628,24 @@ static int cluster_running_check(struct join_message *jm)
 	return CJ_RES_SUCCESS;
 }
 
-static int get_vdi_bitmap_from(struct sd_node *node)
+static int get_vdis_from(struct sd_node *node)
 {
 	struct sd_req hdr;
 	struct sd_rsp *rsp = (struct sd_rsp *)&hdr;
+	struct vdi_copy *vc;
 	static DECLARE_BITMAP(tmp_vdi_inuse, SD_NR_VDIS);
 	int fd, i, ret = SD_RES_SUCCESS;
-	unsigned int rlen, wlen;
+	unsigned int rlen = SD_NR_VDIS * 3, wlen;
 	char host[128];
+	char *buf = NULL;
+	int count;
+
+	buf = zalloc(rlen);
+	if (!buf) {
+		vprintf(SDOG_ERR, "unable to allocate memory\n");
+		ret = SD_RES_NO_MEM;
+		goto out;
+	}
 
 	if (is_myself(node->nid.addr, node->nid.port))
 		goto out;
@@ -647,13 +663,10 @@ static int get_vdi_bitmap_from(struct sd_node *node)
 
 	sd_init_req(&hdr, SD_OP_READ_VDIS);
 	hdr.epoch = sys->epoch;
-	hdr.data_length = sizeof(tmp_vdi_inuse);
-	rlen = hdr.data_length;
+	hdr.data_length = rlen;
 	wlen = 0;
 
-	ret = exec_req(fd, &hdr, (char *)tmp_vdi_inuse,
-			&wlen, &rlen);
-
+	ret = exec_req(fd, &hdr, buf, &wlen, &rlen);
 	close(fd);
 
 	if (ret || rsp->result != SD_RES_SUCCESS) {
@@ -662,24 +675,31 @@ static int get_vdi_bitmap_from(struct sd_node *node)
 		goto out;
 	}
 
+	memcpy(tmp_vdi_inuse, buf, sizeof(tmp_vdi_inuse));
 	for (i = 0; i < ARRAY_SIZE(sys->vdi_inuse); i++)
 		sys->vdi_inuse[i] |= tmp_vdi_inuse[i];
+
+	count = (rsp->data_length - sizeof(tmp_vdi_inuse)) / sizeof(*vc);
+	vc = (struct vdi_copy *)(buf + sizeof(tmp_vdi_inuse));
+	for (i = 0; i < count; i++, vc++)
+		add_vdi_copies(vc->vid, vc->nr_copies);
 out:
+	free(buf);
 	return ret;
 }
 
-static void do_get_vdi_bitmap(struct work *work)
+static void do_get_vdis(struct work *work)
 {
-	struct vdi_bitmap_work *w =
-		container_of(work, struct vdi_bitmap_work, work);
+	struct get_vdis_work *w =
+		container_of(work, struct get_vdis_work, work);
 	int i;
 
 	for (i = 0; i < w->nr_members; i++) {
-		/* We should not fetch vdi_bitmap from myself */
+		/* We should not fetch vdi_bitmap and copy list from myself */
 		if (node_eq(&w->members[i], &sys->this_node))
 			continue;
 
-		get_vdi_bitmap_from(&w->members[i]);
+		get_vdis_from(&w->members[i]);
 
 		/*
 		 * If a new comer try to join the running cluster, it only
@@ -690,10 +710,21 @@ static void do_get_vdi_bitmap(struct work *work)
 	}
 }
 
-static void get_vdi_bitmap_done(struct work *work)
+static void get_vdis_done(struct work *work)
 {
-	struct vdi_bitmap_work *w =
-		container_of(work, struct vdi_bitmap_work, work);
+	struct get_vdis_work *w =
+		container_of(work, struct get_vdis_work, work);
+
+	pthread_mutex_lock(&sys->wait_vdis_lock);
+	if (sys->wait_vdis_state == SD_GET_VDIS_WAITING) {
+		sys->wait_vdis_state = SD_GET_VDIS_BEGIN;
+		pthread_cond_signal(&sys->wait_vdis_cond);
+		dprintf("wake up the recovery thread\n");
+	} else {
+		sys->wait_vdis_state = SD_GET_VDIS_NONE;
+		dprintf("no thread waiting for vdis\n");
+	}
+	pthread_mutex_unlock(&sys->wait_vdis_lock);
 
 	free(w);
 }
@@ -754,20 +785,50 @@ static void finish_join(struct join_message *msg, struct sd_node *joined,
 	sockfd_cache_add_group(nodes, nr_nodes);
 }
 
-static void get_vdi_bitmap(struct sd_node *nodes, size_t nr_nodes)
+static void get_vdis(struct sd_node *nodes, size_t nr_nodes)
 {
 	int array_len = nr_nodes * sizeof(struct sd_node);
-	struct vdi_bitmap_work *w;
+	struct get_vdis_work *w;
 
 	w = xmalloc(sizeof(*w) + array_len);
 	w->nr_members = nr_nodes;
 	memcpy(w->members, nodes, array_len);
 
-	w->work.fn = do_get_vdi_bitmap;
-	w->work.done = get_vdi_bitmap_done;
+	pthread_mutex_lock(&sys->wait_vdis_lock);
+	/* If there's a existing thread waiting for get-vdis
+	 * event to be done, wake it up before setting up a
+	 * new one */
+	if (sys->wait_vdis_state == SD_GET_VDIS_WAITING)
+		pthread_cond_broadcast(&sys->wait_vdis_cond);
+
+	sys->wait_vdis_state = SD_GET_VDIS_BEGIN;
+	pthread_mutex_unlock(&sys->wait_vdis_lock);
+
+	w->work.fn = do_get_vdis;
+	w->work.done = get_vdis_done;
 	queue_work(sys->block_wqueue, &w->work);
 }
 
+void wait_get_vdis_done(void)
+{
+	dprintf("waiting for vdi list\n");
+	pthread_mutex_lock(&sys->wait_vdis_lock);
+	if (!sys->wait_vdis_state) {
+		dprintf("vdi list already finished\n");
+		goto out;
+	}
+
+	sys->wait_vdis_state = SD_GET_VDIS_WAITING;
+
+	while (sys->wait_vdis_state == SD_GET_VDIS_WAITING)
+		pthread_cond_wait(&sys->wait_vdis_cond, &sys->wait_vdis_lock);
+
+	sys->wait_vdis_state = SD_GET_VDIS_NONE;
+out:
+	pthread_mutex_unlock(&sys->wait_vdis_lock);
+	dprintf("vdi list ready\n");
+}
+
 static void prepare_recovery(struct sd_node *joined,
 				    struct sd_node *nodes, size_t nr_nodes)
 {
@@ -844,7 +905,7 @@ static void update_cluster_info(struct join_message *msg,
 			set_cluster_ctime(msg->ctime);
 			/*FALLTHROUGH*/
 		case SD_STATUS_WAIT_FOR_JOIN:
-			get_vdi_bitmap(nodes, nr_nodes);
+			get_vdis(nodes, nr_nodes);
 			break;
 		default:
 			break;
@@ -1215,6 +1276,9 @@ int create_cluster(int port, int64_t zone, int nr_vnodes,
 		sys->status = SD_STATUS_WAIT_FOR_FORMAT;
 	}
 
+	pthread_mutex_init(&sys->wait_vdis_lock, NULL);
+	pthread_cond_init(&sys->wait_vdis_cond, NULL);
+
 	INIT_LIST_HEAD(&sys->pending_list);
 	INIT_LIST_HEAD(&sys->failed_nodes);
 	INIT_LIST_HEAD(&sys->delayed_nodes);
diff --git a/sheep/recovery.c b/sheep/recovery.c
index 5164aa7..369a534 100644
--- a/sheep/recovery.c
+++ b/sheep/recovery.c
@@ -555,6 +555,8 @@ static void prepare_object_list(struct work *work)
 
 	dprintf("%u\n", rw->epoch);
 
+	wait_get_vdis_done();
+
 	buf = xmalloc(buf_size);
 again:
 	/* We need to start at random node for better load balance */
diff --git a/sheep/sheep.c b/sheep/sheep.c
index b10197e..31af42c 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -22,6 +22,8 @@
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <sys/signalfd.h>
+#include <pthread.h>
+#include <sys/eventfd.h>
 #include <fcntl.h>
 #include <errno.h>
 
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 88c0b08..7d2869d 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -98,6 +98,10 @@ struct cluster_info {
 	uint8_t nr_copies;
 	int req_efd;
 
+	pthread_mutex_t wait_vdis_lock;
+	pthread_cond_t wait_vdis_cond;
+	int wait_vdis_state;
+
 	pthread_mutex_t wait_req_lock;
 	struct list_head wait_req_queue;
 	struct list_head wait_rw_queue;
@@ -137,6 +141,11 @@ struct vdi_iocb {
 	int nr_copies;
 };
 
+struct vdi_copy {
+	uint32_t vid;
+	uint32_t nr_copies;
+};
+
 struct store_driver {
 	struct list_head list;
 	const char *name;
@@ -231,6 +240,7 @@ void update_vnode_info(struct vnode_info *vnode_info);
 struct vnode_info *alloc_vnode_info(struct sd_node *nodes, size_t nr_nodes);
 void put_vnode_info(struct vnode_info *vinfo);
 struct vnode_info *get_vnode_info_epoch(uint32_t epoch);
+void wait_get_vdis_done(void);
 
 int get_nr_copies(struct vnode_info *vnode_info);
 
diff --git a/sheep/vdi.c b/sheep/vdi.c
index 52576a5..2d58ce7 100644
--- a/sheep/vdi.c
+++ b/sheep/vdi.c
@@ -139,6 +139,26 @@ int add_vdi_copies(uint32_t vid, int nr_copies)
 	return SD_RES_SUCCESS;
 }
 
+static int fill_vdi_copy_list(void *data)
+{
+	int nr = 0;
+	struct rb_node *n;
+	struct vdi_copy *vc = data;
+	struct vdi_copy_entry *entry;
+
+	pthread_rwlock_rdlock(&vdi_copy_lock);
+	for (n = rb_first(&vdi_copy_root); n; n = rb_next(n)) {
+		entry = rb_entry(n, struct vdi_copy_entry, node);
+		vc->vid = entry->vid;
+		vc->nr_copies = entry->nr_copies;
+		vc++;
+		nr++;
+	}
+	pthread_rwlock_unlock(&vdi_copy_lock);
+
+	return nr * sizeof(*vc);
+}
+
 int vdi_exist(uint32_t vid)
 {
 	struct sheepdog_inode *inode;
@@ -488,11 +508,13 @@ out:
 
 int read_vdis(char *data, int len, unsigned int *rsp_len)
 {
-	if (len != sizeof(sys->vdi_inuse))
-		return SD_RES_INVALID_PARMS;
+	int length;
 
 	memcpy(data, sys->vdi_inuse, sizeof(sys->vdi_inuse));
-	*rsp_len = sizeof(sys->vdi_inuse);
+	/* put vdi copy list at the end of vdi bitmap */
+	length = fill_vdi_copy_list(data + sizeof(sys->vdi_inuse));
+
+	*rsp_len = sizeof(sys->vdi_inuse) + length;
 
 	return SD_RES_SUCCESS;
 }
-- 
1.7.1




More information about the sheepdog mailing list