[Sheepdog] [PATCH 2/2] remove access to sys->epoch in threads
FUJITA Tomonori
fujita.tomonori at lab.ntt.co.jp
Thu May 6 12:45:37 CEST 2010
We can't access to sys->epoch in threads. Use hdr->epoch instead.
Signed-off-by: FUJITA Tomonori <fujita.tomonori at lab.ntt.co.jp>
---
collie/collie.h | 7 ++---
collie/group.c | 8 +++---
collie/store.c | 32 +++++++++++++--------------
collie/vdi.c | 64 +++++++++++++++++++++++++++++-------------------------
4 files changed, 56 insertions(+), 55 deletions(-)
diff --git a/collie/collie.h b/collie/collie.h
index 20d8b5e..d792d58 100644
--- a/collie/collie.h
+++ b/collie/collie.h
@@ -106,13 +106,13 @@ int create_listen_port(int port, void *data);
int is_io_request(unsigned op);
int init_store(char *dir);
-int add_vdi(char *data, int data_len, uint64_t size,
+int add_vdi(uint32_t epoch, char *data, int data_len, uint64_t size,
uint32_t *new_vid, uint32_t base_vid, uint32_t copies,
int is_snapshot);
-int del_vdi(char *data, int data_len, uint32_t snapid);
+int del_vdi(uint32_t epoch, char *data, int data_len, uint32_t snapid);
-int lookup_vdi(char *data, int data_len, uint32_t *vid, uint32_t snapid);
+int lookup_vdi(uint32_t epoch, char *data, int data_len, uint32_t *vid, uint32_t snapid);
int read_vdis(char *data, int len, unsigned int *rsp_len);
@@ -146,7 +146,6 @@ int set_cluster_ctime(uint64_t ctime);
uint64_t get_cluster_ctime(void);
int start_recovery(uint32_t epoch, uint32_t *failed_vdis, int nr_failed_vdis);
-int start_deletion(uint32_t vid);
static inline int is_myself(struct sheepdog_node_list_entry *e)
{
diff --git a/collie/group.c b/collie/group.c
index 617f262..377da49 100644
--- a/collie/group.c
+++ b/collie/group.c
@@ -628,7 +628,7 @@ static void vdi_op(struct vdi_op_message *msg)
switch (hdr->opcode) {
case SD_OP_NEW_VDI:
- ret = add_vdi(data, hdr->data_length, hdr->vdi_size, &vid,
+ ret = add_vdi(hdr->epoch, data, hdr->data_length, hdr->vdi_size, &vid,
hdr->base_vdi_id, hdr->copies,
hdr->snapid);
break;
@@ -637,11 +637,11 @@ static void vdi_op(struct vdi_op_message *msg)
ret = SD_RES_VDI_LOCKED;
break;
}
- ret = del_vdi(data, hdr->data_length, hdr->snapid);
+ ret = del_vdi(hdr->epoch, data, hdr->data_length, hdr->snapid);
break;
case SD_OP_LOCK_VDI:
case SD_OP_GET_VDI_INFO:
- ret = lookup_vdi(data, hdr->data_length, &vid, hdr->snapid);
+ ret = lookup_vdi(hdr->epoch, data, hdr->data_length, &vid, hdr->snapid);
if (ret != SD_RES_SUCCESS)
break;
break;
@@ -1042,7 +1042,7 @@ static void del_node(struct cpg_address *addr, struct work_confchg *w)
w->failed_vdis = buf;
}
- ret = lookup_vdi((char *)vm->ent.name,
+ ret = lookup_vdi(sys->epoch, (char *)vm->ent.name,
sizeof(vm->ent.name), &vid, 0);
if (ret == SD_RES_SUCCESS)
w->failed_vdis[w->nr_failed_vdis++] = vid;
diff --git a/collie/store.c b/collie/store.c
index 2dfaf54..3e42cf9 100644
--- a/collie/store.c
+++ b/collie/store.c
@@ -281,7 +281,7 @@ out:
static int ob_open(uint32_t epoch, uint64_t oid, int aflags, int *ret);
-static int read_from_one(uint64_t oid,
+static int read_from_one(uint32_t epoch, uint64_t oid,
unsigned *ori_rlen, void *buf, uint64_t offset)
{
int i, n, nr, fd, ret;
@@ -301,7 +301,7 @@ again:
addr_to_str(name, sizeof(name), e[n].addr, 0);
if (is_myself(&e[n])) {
- fd = ob_open(sys->epoch, oid, 0, &ret);
+ fd = ob_open(epoch, oid, 0, &ret);
if (fd < 0 || ret != 0)
continue;
@@ -320,7 +320,7 @@ again:
memset(&hdr, 0, sizeof(hdr));
hdr.opcode = SD_OP_READ_OBJ;
hdr.oid = oid;
- hdr.epoch = sys->epoch;
+ hdr.epoch = epoch;
rlen = *ori_rlen;
wlen = 0;
@@ -358,14 +358,14 @@ out:
return ret;
}
-static int read_from_other_sheeps(uint64_t oid, char *buf, int copies)
+static int read_from_other_sheeps(uint32_t epoch, uint64_t oid, char *buf, int copies)
{
int ret;
unsigned int rlen;
rlen = SD_DATA_OBJ_SIZE;
- ret = read_from_one(oid, &rlen, buf, 0);
+ ret = read_from_one(epoch, oid, &rlen, buf, 0);
return ret;
}
@@ -396,14 +396,13 @@ static int forward_read_obj_req(struct request *req, char *buf)
copies = sys->nr_sobjs;
hdr->flags |= SD_FLAG_CMD_DIRECT;
- hdr->epoch = sys->epoch;
/* TODO: we can do better; we need to check this first */
for (i = 0; i < copies; i++) {
n = obj_to_sheep(e, nr, oid, i);
if (is_myself(&e[n])) {
- ret = store_queue_request_local(req, buf, sys->epoch);
+ ret = store_queue_request_local(req, buf, hdr->epoch);
goto out;
}
}
@@ -471,7 +470,6 @@ static int forward_write_obj_req(struct request *req, char *buf)
pfds[i].fd = -1;
hdr->flags |= SD_FLAG_CMD_DIRECT;
- hdr->epoch = sys->epoch;
wlen = hdr->data_length;
rlen = 0;
@@ -506,7 +504,7 @@ static int forward_write_obj_req(struct request *req, char *buf)
}
if (local) {
- ret = store_queue_request_local(req, buf, sys->epoch);
+ ret = store_queue_request_local(req, buf, hdr->epoch);
rsp->result = ret;
if (nr_fds == 0) {
@@ -657,7 +655,7 @@ static int store_queue_request_local(struct request *req, char *buf, uint32_t ep
if (hdr->flags & SD_FLAG_CMD_COW) {
dprintf("%" PRIx64 "\n", hdr->cow_oid);
- ret = read_from_other_sheeps(hdr->cow_oid, buf,
+ ret = read_from_other_sheeps(hdr->epoch, hdr->cow_oid, buf,
hdr->copies);
if (ret) {
eprintf("failed to read old object\n");
@@ -785,11 +783,10 @@ void store_queue_request(struct work *work, int idx)
struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&req->rp;
uint64_t oid = hdr->oid;
uint32_t opcode = hdr->opcode;
- uint32_t epoch = sys->epoch;
- uint32_t req_epoch = hdr->epoch;
+ uint32_t epoch = hdr->epoch;
struct sd_node_rsp *nrsp = (struct sd_node_rsp *)&req->rp;
- dprintf("%d, %x, %" PRIx64" , %u, %u\n", idx, opcode, oid, epoch, req_epoch);
+ dprintf("%d, %x, %" PRIx64" , %u\n", idx, opcode, oid, epoch);
if (hdr->flags & SD_FLAG_CMD_RECOVERY)
epoch = hdr->tgt_epoch;
@@ -815,8 +812,8 @@ void store_queue_request(struct work *work, int idx)
ret = store_queue_request_local(req, buf, epoch);
out:
if (ret != SD_RES_SUCCESS) {
- dprintf("failed, %d, %x, %" PRIx64" , %u, %u, %d\n",
- idx, opcode, oid, epoch, req_epoch, ret);
+ dprintf("failed, %d, %x, %" PRIx64" , %u, %d\n",
+ idx, opcode, oid, epoch, ret);
rsp->result = ret;
}
}
@@ -1123,7 +1120,7 @@ next:
memset(&hdr, 0, sizeof(hdr));
hdr.opcode = SD_OP_READ_OBJ;
hdr.oid = oid;
- hdr.epoch = sys->epoch;
+ hdr.epoch = epoch;
hdr.flags = SD_FLAG_CMD_RECOVERY | SD_FLAG_CMD_DIRECT;
hdr.tgt_epoch = tgt_epoch;
hdr.data_length = rlen;
@@ -1340,7 +1337,8 @@ static int __fill_obj_list(struct recovery_work *rw,
memset(&hdr, 0, sizeof(hdr));
hdr.opcode = SD_OP_GET_OBJ_LIST;
- hdr.epoch = sys->epoch;
+ /* we don't need to set epoch */
+ hdr.epoch = epoch;
hdr.start = start_hash;
hdr.end = end_hash;
hdr.tgt_epoch = epoch - 1;
diff --git a/collie/vdi.c b/collie/vdi.c
index dc2c082..a563f42 100644
--- a/collie/vdi.c
+++ b/collie/vdi.c
@@ -18,7 +18,7 @@
/* TODO: should be performed atomically */
-static int create_vdi_obj(char *name, uint32_t new_vid, uint64_t size,
+static int create_vdi_obj(uint32_t epoch, char *name, uint32_t new_vid, uint64_t size,
uint32_t base_vid, uint32_t cur_vid, uint32_t copies,
uint32_t snapid, int is_snapshot)
{
@@ -32,7 +32,7 @@ static int create_vdi_obj(char *name, uint32_t new_vid, uint64_t size,
nr_nodes = get_ordered_sd_node_list(entries);
if (base_vid) {
- ret = read_object(entries, nr_nodes, sys->epoch,
+ ret = read_object(entries, nr_nodes, epoch,
vid_to_vdi_oid(base_vid), (char *)&base,
sizeof(base), 0, copies);
if (ret < 0)
@@ -46,7 +46,7 @@ static int create_vdi_obj(char *name, uint32_t new_vid, uint64_t size,
vprintf(SDOG_INFO "tree snapshot %s %" PRIx32 " %" PRIx32 "\n",
name, cur_vid, base_vid);
- ret = read_object(entries, nr_nodes, sys->epoch,
+ ret = read_object(entries, nr_nodes, epoch,
vid_to_vdi_oid(cur_vid), (char *)&cur,
sizeof(cur), 0, copies);
if (ret < 0) {
@@ -89,7 +89,7 @@ static int create_vdi_obj(char *name, uint32_t new_vid, uint64_t size,
}
if (is_snapshot && cur_vid != base_vid) {
- ret = write_object(entries, nr_nodes, sys->epoch,
+ ret = write_object(entries, nr_nodes, epoch,
vid_to_vdi_oid(cur_vid), (char *)&cur,
sizeof(cur), 0, copies, 0);
if (ret < 0) {
@@ -99,7 +99,7 @@ static int create_vdi_obj(char *name, uint32_t new_vid, uint64_t size,
}
if (base_vid) {
- ret = write_object(entries, nr_nodes, sys->epoch,
+ ret = write_object(entries, nr_nodes, epoch,
vid_to_vdi_oid(base_vid), (char *)&base,
sizeof(base), 0, copies, 0);
if (ret < 0) {
@@ -108,7 +108,7 @@ static int create_vdi_obj(char *name, uint32_t new_vid, uint64_t size,
}
}
- ret = write_object(entries, nr_nodes, sys->epoch,
+ ret = write_object(entries, nr_nodes, epoch,
vid_to_vdi_oid(new_vid), (char *)&new, sizeof(new),
0, copies, 1);
if (ret < 0)
@@ -117,7 +117,7 @@ static int create_vdi_obj(char *name, uint32_t new_vid, uint64_t size,
return ret;
}
-static int find_first_vdi(unsigned long start, unsigned long end,
+static int find_first_vdi(uint32_t epoch, unsigned long start, unsigned long end,
char *name, int namelen, uint32_t snapid, uint32_t *vid,
unsigned long *deleted_nr, uint32_t *next_snap)
{
@@ -134,7 +134,7 @@ static int find_first_vdi(unsigned long start, unsigned long end,
nr_reqs = nr_nodes;
for (i = start; i >= end; i--) {
- ret = read_object(entries, nr_nodes, sys->epoch,
+ ret = read_object(entries, nr_nodes, epoch,
vid_to_vdi_oid(i), (char *)&inode,
sizeof(inode), 0, nr_reqs);
if (ret < 0)
@@ -158,8 +158,8 @@ static int find_first_vdi(unsigned long start, unsigned long end,
}
-static int do_lookup_vdi(char *name, int namelen, uint32_t *vid, uint32_t snapid,
- uint32_t *next_snapid,
+static int do_lookup_vdi(uint32_t epoch, char *name, int namelen, uint32_t *vid,
+ uint32_t snapid, uint32_t *next_snapid,
unsigned long *right_nr, unsigned long *deleted_nr)
{
int ret;
@@ -177,7 +177,7 @@ static int do_lookup_vdi(char *name, int namelen, uint32_t *vid, uint32_t snapid
} else if (nr < SD_NR_VDIS) {
right_side:
/* look up on the right side of the hash point */
- ret = find_first_vdi(nr - 1, start_nr, name, namelen, snapid, vid,
+ ret = find_first_vdi(epoch, nr - 1, start_nr, name, namelen, snapid, vid,
deleted_nr, next_snapid);
return ret;
} else {
@@ -188,7 +188,7 @@ static int do_lookup_vdi(char *name, int namelen, uint32_t *vid, uint32_t snapid
return SD_RES_FULL_VDI;
else if (nr) {
/* look up on the left side of the hash point */
- ret = find_first_vdi(nr - 1, 0, name, namelen, snapid, vid,
+ ret = find_first_vdi(epoch, nr - 1, 0, name, namelen, snapid, vid,
deleted_nr, next_snapid);
if (ret == SD_RES_NO_VDI)
; /* we need to go to the right side */
@@ -201,7 +201,7 @@ static int do_lookup_vdi(char *name, int namelen, uint32_t *vid, uint32_t snapid
}
}
-int lookup_vdi(char *data, int data_len, uint32_t *vid, uint32_t snapid)
+int lookup_vdi(uint32_t epoch, char *data, int data_len, uint32_t *vid, uint32_t snapid)
{
char *name = data;
uint32_t dummy0;
@@ -210,11 +210,11 @@ int lookup_vdi(char *data, int data_len, uint32_t *vid, uint32_t snapid)
if (data_len != SD_MAX_VDI_LEN)
return SD_RES_INVALID_PARMS;
- return do_lookup_vdi(name, strlen(name), vid, snapid,
+ return do_lookup_vdi(epoch, name, strlen(name), vid, snapid,
&dummy0, &dummy1, &dummy2);
}
-int add_vdi(char *data, int data_len, uint64_t size,
+int add_vdi(uint32_t epoch, char *data, int data_len, uint64_t size,
uint32_t *new_vid, uint32_t base_vid, uint32_t copies, int is_snapshot)
{
uint32_t cur_vid;
@@ -228,7 +228,7 @@ int add_vdi(char *data, int data_len, uint64_t size,
name = data;
- ret = do_lookup_vdi(name, strlen(name), &cur_vid, 0, &next_snapid,
+ ret = do_lookup_vdi(epoch, name, strlen(name), &cur_vid, 0, &next_snapid,
&right_nr, &deleted_nr);
if (is_snapshot) {
@@ -266,13 +266,15 @@ int add_vdi(char *data, int data_len, uint64_t size,
copies = sys->nr_sobjs;
}
- ret = create_vdi_obj(name, *new_vid, size, base_vid, cur_vid, copies,
+ ret = create_vdi_obj(epoch, name, *new_vid, size, base_vid, cur_vid, copies,
next_snapid, is_snapshot);
return ret;
}
-int del_vdi(char *data, int data_len, uint32_t snapid)
+int start_deletion(uint32_t vid, uint32_t epoch);
+
+int del_vdi(uint32_t epoch, char *data, int data_len, uint32_t snapid)
{
char *name = data;
uint32_t vid;
@@ -286,7 +288,7 @@ int del_vdi(char *data, int data_len, uint32_t snapid)
if (data_len != SD_MAX_VDI_LEN)
return SD_RES_INVALID_PARMS;
- ret = do_lookup_vdi(name, strlen(name), &vid, snapid,
+ ret = do_lookup_vdi(epoch, name, strlen(name), &vid, snapid,
&dummy0, &dummy1, &dummy2);
if (ret != SD_RES_SUCCESS)
return ret;
@@ -296,7 +298,7 @@ int del_vdi(char *data, int data_len, uint32_t snapid)
if (nr_reqs > nr_nodes)
nr_reqs = nr_nodes;
- ret = read_object(entries, nr_nodes, sys->epoch,
+ ret = read_object(entries, nr_nodes, epoch,
vid_to_vdi_oid(vid), (char *)&inode, sizeof(inode), 0,
nr_reqs);
if (ret < 0)
@@ -304,13 +306,13 @@ int del_vdi(char *data, int data_len, uint32_t snapid)
memset(inode.name, 0, sizeof(inode.name));
- ret = write_object(entries, nr_nodes, sys->epoch,
+ ret = write_object(entries, nr_nodes, epoch,
vid_to_vdi_oid(vid), (char *)&inode, sizeof(inode), 0,
nr_reqs, 0);
if (ret < 0)
return SD_RES_EIO;
- ret = start_deletion(vid);
+ ret = start_deletion(vid, epoch);
if (ret < 0)
return SD_RES_NO_MEM;
@@ -330,6 +332,7 @@ int read_vdis(char *data, int len, unsigned int *rsp_len)
struct deletion_work {
uint32_t done;
+ uint32_t epoch;
struct work work;
struct list_head dw_siblings;
@@ -356,7 +359,7 @@ static void delete_one(struct work *work, int idx)
nr_nodes = get_ordered_sd_node_list(entries);
- ret = read_object(entries, nr_nodes, sys->epoch,
+ ret = read_object(entries, nr_nodes, dw->epoch,
vid_to_vdi_oid(vdi_id), (void *)&inode, sizeof(inode),
0, sys->nr_sobjs);
@@ -369,12 +372,12 @@ static void delete_one(struct work *work, int idx)
if (!inode.data_vdi_id[i])
continue;
- remove_object(entries, nr_nodes, sys->epoch,
+ remove_object(entries, nr_nodes, dw->epoch,
vid_to_data_oid(inode.data_vdi_id[i], i),
inode.nr_copies);
}
- if (remove_object(entries, nr_nodes, sys->epoch, vid_to_vdi_oid(vdi_id),
+ if (remove_object(entries, nr_nodes, dw->epoch, vid_to_vdi_oid(vdi_id),
sys->nr_sobjs))
eprintf("failed to remove vdi objects %x\n", vdi_id);
}
@@ -420,7 +423,7 @@ static int fill_vdi_list(struct deletion_work *dw,
((uint32_t *)dw->buf)[dw->count++] = root_vid;
again:
vid = ((uint32_t *)dw->buf)[done++];
- ret = read_object(entries, nr_entries, sys->epoch,
+ ret = read_object(entries, nr_entries, dw->epoch,
vid_to_vdi_oid(vid), (void *)&inode, sizeof(inode),
0, nr_entries);
@@ -446,13 +449,13 @@ again:
}
static uint64_t get_vdi_root(struct sheepdog_node_list_entry *entries,
- int nr_entries, uint32_t vid)
+ int nr_entries, uint32_t epoch, uint32_t vid)
{
int ret;
static struct sheepdog_inode inode;
next:
- ret = read_object(entries, nr_entries, sys->epoch,
+ ret = read_object(entries, nr_entries, epoch,
vid_to_vdi_oid(vid),
(void *)&inode, sizeof(inode), 0, nr_entries);
@@ -478,7 +481,7 @@ static void __start_deletion(struct work *work, int idx)
nr_nodes = get_ordered_sd_node_list(entries);
- root_vid = get_vdi_root(entries, nr_nodes, dw->vid);
+ root_vid = get_vdi_root(entries, nr_nodes, dw->epoch, dw->vid);
if (!root_vid)
goto fail;
@@ -523,7 +526,7 @@ static void __start_deletion_done(struct work *work, int idx)
}
}
-int start_deletion(uint32_t vid)
+int start_deletion(uint32_t vid, uint32_t epoch)
{
struct deletion_work *dw;
@@ -539,6 +542,7 @@ int start_deletion(uint32_t vid)
dw->count = 0;
dw->vid = vid;
+ dw->epoch = epoch;
dw->work.fn = __start_deletion;
dw->work.done = __start_deletion_done;
--
1.6.5
More information about the sheepdog
mailing list