[Sheepdog] [PATCH 2/2] remove access to sys->epoch in threads

FUJITA Tomonori fujita.tomonori at lab.ntt.co.jp
Thu May 6 12:45:37 CEST 2010


We can't access to sys->epoch in threads. Use hdr->epoch instead.

Signed-off-by: FUJITA Tomonori <fujita.tomonori at lab.ntt.co.jp>
---
 collie/collie.h |    7 ++---
 collie/group.c  |    8 +++---
 collie/store.c  |   32 +++++++++++++--------------
 collie/vdi.c    |   64 +++++++++++++++++++++++++++++-------------------------
 4 files changed, 56 insertions(+), 55 deletions(-)

diff --git a/collie/collie.h b/collie/collie.h
index 20d8b5e..d792d58 100644
--- a/collie/collie.h
+++ b/collie/collie.h
@@ -106,13 +106,13 @@ int create_listen_port(int port, void *data);
 int is_io_request(unsigned op);
 int init_store(char *dir);
 
-int add_vdi(char *data, int data_len, uint64_t size,
+int add_vdi(uint32_t epoch, char *data, int data_len, uint64_t size,
 	    uint32_t *new_vid, uint32_t base_vid, uint32_t copies,
 	    int is_snapshot);
 
-int del_vdi(char *data, int data_len, uint32_t snapid);
+int del_vdi(uint32_t epoch, char *data, int data_len, uint32_t snapid);
 
-int lookup_vdi(char *data, int data_len, uint32_t *vid, uint32_t snapid);
+int lookup_vdi(uint32_t epoch, char *data, int data_len, uint32_t *vid, uint32_t snapid);
 
 int read_vdis(char *data, int len, unsigned int *rsp_len);
 
@@ -146,7 +146,6 @@ int set_cluster_ctime(uint64_t ctime);
 uint64_t get_cluster_ctime(void);
 
 int start_recovery(uint32_t epoch, uint32_t *failed_vdis, int nr_failed_vdis);
-int start_deletion(uint32_t vid);
 
 static inline int is_myself(struct sheepdog_node_list_entry *e)
 {
diff --git a/collie/group.c b/collie/group.c
index 617f262..377da49 100644
--- a/collie/group.c
+++ b/collie/group.c
@@ -628,7 +628,7 @@ static void vdi_op(struct vdi_op_message *msg)
 
 	switch (hdr->opcode) {
 	case SD_OP_NEW_VDI:
-		ret = add_vdi(data, hdr->data_length, hdr->vdi_size, &vid,
+		ret = add_vdi(hdr->epoch, data, hdr->data_length, hdr->vdi_size, &vid,
 			      hdr->base_vdi_id, hdr->copies,
 			      hdr->snapid);
 		break;
@@ -637,11 +637,11 @@ static void vdi_op(struct vdi_op_message *msg)
 			ret = SD_RES_VDI_LOCKED;
 			break;
 		}
-		ret = del_vdi(data, hdr->data_length, hdr->snapid);
+		ret = del_vdi(hdr->epoch, data, hdr->data_length, hdr->snapid);
 		break;
 	case SD_OP_LOCK_VDI:
 	case SD_OP_GET_VDI_INFO:
-		ret = lookup_vdi(data, hdr->data_length, &vid, hdr->snapid);
+		ret = lookup_vdi(hdr->epoch, data, hdr->data_length, &vid, hdr->snapid);
 		if (ret != SD_RES_SUCCESS)
 			break;
 		break;
@@ -1042,7 +1042,7 @@ static void del_node(struct cpg_address *addr, struct work_confchg *w)
 				w->failed_vdis = buf;
 			}
 
-			ret = lookup_vdi((char *)vm->ent.name,
+			ret = lookup_vdi(sys->epoch, (char *)vm->ent.name,
 					 sizeof(vm->ent.name), &vid, 0);
 			if (ret == SD_RES_SUCCESS)
 				w->failed_vdis[w->nr_failed_vdis++] = vid;
diff --git a/collie/store.c b/collie/store.c
index 2dfaf54..3e42cf9 100644
--- a/collie/store.c
+++ b/collie/store.c
@@ -281,7 +281,7 @@ out:
 
 static int ob_open(uint32_t epoch, uint64_t oid, int aflags, int *ret);
 
-static int read_from_one(uint64_t oid,
+static int read_from_one(uint32_t epoch, uint64_t oid,
 			 unsigned *ori_rlen, void *buf, uint64_t offset)
 {
 	int i, n, nr, fd, ret;
@@ -301,7 +301,7 @@ again:
 		addr_to_str(name, sizeof(name), e[n].addr, 0);
 
 		if (is_myself(&e[n])) {
-			fd = ob_open(sys->epoch, oid, 0, &ret);
+			fd = ob_open(epoch, oid, 0, &ret);
 			if (fd < 0 || ret != 0)
 				continue;
 
@@ -320,7 +320,7 @@ again:
 		memset(&hdr, 0, sizeof(hdr));
 		hdr.opcode = SD_OP_READ_OBJ;
 		hdr.oid = oid;
-		hdr.epoch = sys->epoch;
+		hdr.epoch = epoch;
 
 		rlen = *ori_rlen;
 		wlen = 0;
@@ -358,14 +358,14 @@ out:
 	return ret;
 }
 
-static int read_from_other_sheeps(uint64_t oid, char *buf, int copies)
+static int read_from_other_sheeps(uint32_t epoch, uint64_t oid, char *buf, int copies)
 {
 	int ret;
 	unsigned int rlen;
 
 	rlen = SD_DATA_OBJ_SIZE;
 
-	ret = read_from_one(oid, &rlen, buf, 0);
+	ret = read_from_one(epoch, oid, &rlen, buf, 0);
 
 	return ret;
 }
@@ -396,14 +396,13 @@ static int forward_read_obj_req(struct request *req, char *buf)
 		copies = sys->nr_sobjs;
 
 	hdr->flags |= SD_FLAG_CMD_DIRECT;
-	hdr->epoch = sys->epoch;
 
 	/* TODO: we can do better; we need to check this first */
 	for (i = 0; i < copies; i++) {
 		n = obj_to_sheep(e, nr, oid, i);
 
 		if (is_myself(&e[n])) {
-			ret = store_queue_request_local(req, buf, sys->epoch);
+			ret = store_queue_request_local(req, buf, hdr->epoch);
 			goto out;
 		}
 	}
@@ -471,7 +470,6 @@ static int forward_write_obj_req(struct request *req, char *buf)
 		pfds[i].fd = -1;
 
 	hdr->flags |= SD_FLAG_CMD_DIRECT;
-	hdr->epoch = sys->epoch;
 
 	wlen = hdr->data_length;
 	rlen = 0;
@@ -506,7 +504,7 @@ static int forward_write_obj_req(struct request *req, char *buf)
 	}
 
 	if (local) {
-		ret = store_queue_request_local(req, buf, sys->epoch);
+		ret = store_queue_request_local(req, buf, hdr->epoch);
 		rsp->result = ret;
 
 		if (nr_fds == 0) {
@@ -657,7 +655,7 @@ static int store_queue_request_local(struct request *req, char *buf, uint32_t ep
 		if (hdr->flags & SD_FLAG_CMD_COW) {
 			dprintf("%" PRIx64 "\n", hdr->cow_oid);
 
-			ret = read_from_other_sheeps(hdr->cow_oid, buf,
+			ret = read_from_other_sheeps(hdr->epoch, hdr->cow_oid, buf,
 						     hdr->copies);
 			if (ret) {
 				eprintf("failed to read old object\n");
@@ -785,11 +783,10 @@ void store_queue_request(struct work *work, int idx)
 	struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&req->rp;
 	uint64_t oid = hdr->oid;
 	uint32_t opcode = hdr->opcode;
-	uint32_t epoch = sys->epoch;
-	uint32_t req_epoch = hdr->epoch;
+	uint32_t epoch = hdr->epoch;
 	struct sd_node_rsp *nrsp = (struct sd_node_rsp *)&req->rp;
 
-	dprintf("%d, %x, %" PRIx64" , %u, %u\n", idx, opcode, oid, epoch, req_epoch);
+	dprintf("%d, %x, %" PRIx64" , %u\n", idx, opcode, oid, epoch);
 
 	if (hdr->flags & SD_FLAG_CMD_RECOVERY)
 		epoch = hdr->tgt_epoch;
@@ -815,8 +812,8 @@ void store_queue_request(struct work *work, int idx)
 	ret = store_queue_request_local(req, buf, epoch);
 out:
 	if (ret != SD_RES_SUCCESS) {
-		dprintf("failed, %d, %x, %" PRIx64" , %u, %u, %d\n",
-			idx, opcode, oid, epoch, req_epoch, ret);
+		dprintf("failed, %d, %x, %" PRIx64" , %u, %d\n",
+			idx, opcode, oid, epoch, ret);
 		rsp->result = ret;
 	}
 }
@@ -1123,7 +1120,7 @@ next:
 	memset(&hdr, 0, sizeof(hdr));
 	hdr.opcode = SD_OP_READ_OBJ;
 	hdr.oid = oid;
-	hdr.epoch = sys->epoch;
+	hdr.epoch = epoch;
 	hdr.flags = SD_FLAG_CMD_RECOVERY | SD_FLAG_CMD_DIRECT;
 	hdr.tgt_epoch = tgt_epoch;
 	hdr.data_length = rlen;
@@ -1340,7 +1337,8 @@ static int __fill_obj_list(struct recovery_work *rw,
 
 	memset(&hdr, 0, sizeof(hdr));
 	hdr.opcode = SD_OP_GET_OBJ_LIST;
-	hdr.epoch = sys->epoch;
+	/* we don't need to set epoch */
+	hdr.epoch = epoch;
 	hdr.start = start_hash;
 	hdr.end = end_hash;
 	hdr.tgt_epoch = epoch - 1;
diff --git a/collie/vdi.c b/collie/vdi.c
index dc2c082..a563f42 100644
--- a/collie/vdi.c
+++ b/collie/vdi.c
@@ -18,7 +18,7 @@
 
 
 /* TODO: should be performed atomically */
-static int create_vdi_obj(char *name, uint32_t new_vid, uint64_t size,
+static int create_vdi_obj(uint32_t epoch, char *name, uint32_t new_vid, uint64_t size,
 			  uint32_t base_vid, uint32_t cur_vid, uint32_t copies,
 			  uint32_t snapid, int is_snapshot)
 {
@@ -32,7 +32,7 @@ static int create_vdi_obj(char *name, uint32_t new_vid, uint64_t size,
 	nr_nodes = get_ordered_sd_node_list(entries);
 
 	if (base_vid) {
-		ret = read_object(entries, nr_nodes, sys->epoch,
+		ret = read_object(entries, nr_nodes, epoch,
 				  vid_to_vdi_oid(base_vid), (char *)&base,
 				  sizeof(base), 0, copies);
 		if (ret < 0)
@@ -46,7 +46,7 @@ static int create_vdi_obj(char *name, uint32_t new_vid, uint64_t size,
 			vprintf(SDOG_INFO "tree snapshot %s %" PRIx32 " %" PRIx32 "\n",
 				name, cur_vid, base_vid);
 
-			ret = read_object(entries, nr_nodes, sys->epoch,
+			ret = read_object(entries, nr_nodes, epoch,
 					  vid_to_vdi_oid(cur_vid), (char *)&cur,
 					  sizeof(cur), 0, copies);
 			if (ret < 0) {
@@ -89,7 +89,7 @@ static int create_vdi_obj(char *name, uint32_t new_vid, uint64_t size,
 	}
 
 	if (is_snapshot && cur_vid != base_vid) {
-		ret = write_object(entries, nr_nodes, sys->epoch,
+		ret = write_object(entries, nr_nodes, epoch,
 				   vid_to_vdi_oid(cur_vid), (char *)&cur,
 				   sizeof(cur), 0, copies, 0);
 		if (ret < 0) {
@@ -99,7 +99,7 @@ static int create_vdi_obj(char *name, uint32_t new_vid, uint64_t size,
 	}
 
 	if (base_vid) {
-		ret = write_object(entries, nr_nodes, sys->epoch,
+		ret = write_object(entries, nr_nodes, epoch,
 				   vid_to_vdi_oid(base_vid), (char *)&base,
 				   sizeof(base), 0, copies, 0);
 		if (ret < 0) {
@@ -108,7 +108,7 @@ static int create_vdi_obj(char *name, uint32_t new_vid, uint64_t size,
 		}
 	}
 
-	ret = write_object(entries, nr_nodes, sys->epoch,
+	ret = write_object(entries, nr_nodes, epoch,
 			   vid_to_vdi_oid(new_vid), (char *)&new, sizeof(new),
 			   0, copies, 1);
 	if (ret < 0)
@@ -117,7 +117,7 @@ static int create_vdi_obj(char *name, uint32_t new_vid, uint64_t size,
 	return ret;
 }
 
-static int find_first_vdi(unsigned long start, unsigned long end,
+static int find_first_vdi(uint32_t epoch, unsigned long start, unsigned long end,
 			  char *name, int namelen, uint32_t snapid, uint32_t *vid,
 			  unsigned long *deleted_nr, uint32_t *next_snap)
 {
@@ -134,7 +134,7 @@ static int find_first_vdi(unsigned long start, unsigned long end,
 		nr_reqs = nr_nodes;
 
 	for (i = start; i >= end; i--) {
-		ret = read_object(entries, nr_nodes, sys->epoch,
+		ret = read_object(entries, nr_nodes, epoch,
 				  vid_to_vdi_oid(i), (char *)&inode,
 				  sizeof(inode), 0, nr_reqs);
 		if (ret < 0)
@@ -158,8 +158,8 @@ static int find_first_vdi(unsigned long start, unsigned long end,
 }
 
 
-static int do_lookup_vdi(char *name, int namelen, uint32_t *vid, uint32_t snapid,
-			 uint32_t *next_snapid,
+static int do_lookup_vdi(uint32_t epoch, char *name, int namelen, uint32_t *vid,
+			 uint32_t snapid, uint32_t *next_snapid,
 			 unsigned long *right_nr,  unsigned long *deleted_nr)
 {
 	int ret;
@@ -177,7 +177,7 @@ static int do_lookup_vdi(char *name, int namelen, uint32_t *vid, uint32_t snapid
 	} else if (nr < SD_NR_VDIS) {
 	right_side:
 		/* look up on the right side of the hash point */
-		ret = find_first_vdi(nr - 1, start_nr, name, namelen, snapid, vid,
+		ret = find_first_vdi(epoch, nr - 1, start_nr, name, namelen, snapid, vid,
 				     deleted_nr, next_snapid);
 		return ret;
 	} else {
@@ -188,7 +188,7 @@ static int do_lookup_vdi(char *name, int namelen, uint32_t *vid, uint32_t snapid
 			return SD_RES_FULL_VDI;
 		else if (nr) {
 			/* look up on the left side of the hash point */
-			ret = find_first_vdi(nr - 1, 0, name, namelen, snapid, vid,
+			ret = find_first_vdi(epoch, nr - 1, 0, name, namelen, snapid, vid,
 					     deleted_nr, next_snapid);
 			if (ret == SD_RES_NO_VDI)
 				; /* we need to go to the right side */
@@ -201,7 +201,7 @@ static int do_lookup_vdi(char *name, int namelen, uint32_t *vid, uint32_t snapid
 	}
 }
 
-int lookup_vdi(char *data, int data_len, uint32_t *vid, uint32_t snapid)
+int lookup_vdi(uint32_t epoch, char *data, int data_len, uint32_t *vid, uint32_t snapid)
 {
 	char *name = data;
 	uint32_t dummy0;
@@ -210,11 +210,11 @@ int lookup_vdi(char *data, int data_len, uint32_t *vid, uint32_t snapid)
 	if (data_len != SD_MAX_VDI_LEN)
 		return SD_RES_INVALID_PARMS;
 
-	return do_lookup_vdi(name, strlen(name), vid, snapid,
+	return do_lookup_vdi(epoch, name, strlen(name), vid, snapid,
 			     &dummy0, &dummy1, &dummy2);
 }
 
-int add_vdi(char *data, int data_len, uint64_t size,
+int add_vdi(uint32_t epoch, char *data, int data_len, uint64_t size,
 	    uint32_t *new_vid, uint32_t base_vid, uint32_t copies, int is_snapshot)
 {
 	uint32_t cur_vid;
@@ -228,7 +228,7 @@ int add_vdi(char *data, int data_len, uint64_t size,
 
 	name = data;
 
-	ret = do_lookup_vdi(name, strlen(name), &cur_vid, 0, &next_snapid,
+	ret = do_lookup_vdi(epoch, name, strlen(name), &cur_vid, 0, &next_snapid,
 			    &right_nr, &deleted_nr);
 
 	if (is_snapshot) {
@@ -266,13 +266,15 @@ int add_vdi(char *data, int data_len, uint64_t size,
 		copies = sys->nr_sobjs;
 	}
 
-	ret = create_vdi_obj(name, *new_vid, size, base_vid, cur_vid, copies,
+	ret = create_vdi_obj(epoch, name, *new_vid, size, base_vid, cur_vid, copies,
 			     next_snapid, is_snapshot);
 
 	return ret;
 }
 
-int del_vdi(char *data, int data_len, uint32_t snapid)
+int start_deletion(uint32_t vid, uint32_t epoch);
+
+int del_vdi(uint32_t epoch, char *data, int data_len, uint32_t snapid)
 {
 	char *name = data;
 	uint32_t vid;
@@ -286,7 +288,7 @@ int del_vdi(char *data, int data_len, uint32_t snapid)
 	if (data_len != SD_MAX_VDI_LEN)
 		return SD_RES_INVALID_PARMS;
 
-	ret = do_lookup_vdi(name, strlen(name), &vid, snapid,
+	ret = do_lookup_vdi(epoch, name, strlen(name), &vid, snapid,
 			     &dummy0, &dummy1, &dummy2);
 	if (ret != SD_RES_SUCCESS)
 		return ret;
@@ -296,7 +298,7 @@ int del_vdi(char *data, int data_len, uint32_t snapid)
 	if (nr_reqs > nr_nodes)
 		nr_reqs = nr_nodes;
 
-	ret = read_object(entries, nr_nodes, sys->epoch,
+	ret = read_object(entries, nr_nodes, epoch,
 			  vid_to_vdi_oid(vid), (char *)&inode, sizeof(inode), 0,
 			  nr_reqs);
 	if (ret < 0)
@@ -304,13 +306,13 @@ int del_vdi(char *data, int data_len, uint32_t snapid)
 
 	memset(inode.name, 0, sizeof(inode.name));
 
-	ret = write_object(entries, nr_nodes, sys->epoch,
+	ret = write_object(entries, nr_nodes, epoch,
 			   vid_to_vdi_oid(vid), (char *)&inode, sizeof(inode), 0,
 			   nr_reqs, 0);
 	if (ret < 0)
 		return SD_RES_EIO;
 
-	ret = start_deletion(vid);
+	ret = start_deletion(vid, epoch);
 	if (ret < 0)
 		return SD_RES_NO_MEM;
 
@@ -330,6 +332,7 @@ int read_vdis(char *data, int len, unsigned int *rsp_len)
 
 struct deletion_work {
 	uint32_t done;
+	uint32_t epoch;
 
 	struct work work;
 	struct list_head dw_siblings;
@@ -356,7 +359,7 @@ static void delete_one(struct work *work, int idx)
 
 	nr_nodes = get_ordered_sd_node_list(entries);
 
-	ret = read_object(entries, nr_nodes, sys->epoch,
+	ret = read_object(entries, nr_nodes, dw->epoch,
 			  vid_to_vdi_oid(vdi_id), (void *)&inode, sizeof(inode),
 			  0, sys->nr_sobjs);
 
@@ -369,12 +372,12 @@ static void delete_one(struct work *work, int idx)
 		if (!inode.data_vdi_id[i])
 			continue;
 
-		remove_object(entries, nr_nodes, sys->epoch,
+		remove_object(entries, nr_nodes, dw->epoch,
 			      vid_to_data_oid(inode.data_vdi_id[i], i),
 			      inode.nr_copies);
 	}
 
-	if (remove_object(entries, nr_nodes, sys->epoch, vid_to_vdi_oid(vdi_id),
+	if (remove_object(entries, nr_nodes, dw->epoch, vid_to_vdi_oid(vdi_id),
 			  sys->nr_sobjs))
 		eprintf("failed to remove vdi objects %x\n", vdi_id);
 }
@@ -420,7 +423,7 @@ static int fill_vdi_list(struct deletion_work *dw,
 	((uint32_t *)dw->buf)[dw->count++] = root_vid;
 again:
 	vid = ((uint32_t *)dw->buf)[done++];
-	ret = read_object(entries, nr_entries, sys->epoch,
+	ret = read_object(entries, nr_entries, dw->epoch,
 			  vid_to_vdi_oid(vid), (void *)&inode, sizeof(inode),
 			  0, nr_entries);
 
@@ -446,13 +449,13 @@ again:
 }
 
 static uint64_t get_vdi_root(struct sheepdog_node_list_entry *entries,
-			     int nr_entries, uint32_t vid)
+			     int nr_entries, uint32_t epoch, uint32_t vid)
 {
 	int ret;
 	static struct sheepdog_inode inode;
 
 next:
-	ret = read_object(entries, nr_entries, sys->epoch,
+	ret = read_object(entries, nr_entries, epoch,
 			  vid_to_vdi_oid(vid),
 			  (void *)&inode, sizeof(inode), 0, nr_entries);
 
@@ -478,7 +481,7 @@ static void __start_deletion(struct work *work, int idx)
 
 	nr_nodes = get_ordered_sd_node_list(entries);
 
-	root_vid = get_vdi_root(entries, nr_nodes, dw->vid);
+	root_vid = get_vdi_root(entries, nr_nodes, dw->epoch, dw->vid);
 	if (!root_vid)
 		goto fail;
 
@@ -523,7 +526,7 @@ static void __start_deletion_done(struct work *work, int idx)
 	}
 }
 
-int start_deletion(uint32_t vid)
+int start_deletion(uint32_t vid, uint32_t epoch)
 {
 	struct deletion_work *dw;
 
@@ -539,6 +542,7 @@ int start_deletion(uint32_t vid)
 
 	dw->count = 0;
 	dw->vid = vid;
+	dw->epoch = epoch;
 
 	dw->work.fn = __start_deletion;
 	dw->work.done = __start_deletion_done;
-- 
1.6.5




More information about the sheepdog mailing list