[Sheepdog] [PATCH 2/3] sheep: refactor local and io request handling
Liu Yuan
namei.unix at gmail.com
Sun Nov 20 12:11:26 CET 2011
From: Liu Yuan <tailai.ly at taobao.com>
They don't share any code or logic, let's split 'em out.
- add a new function to handle local request.
other minor changes:
- rename store/cluster_queue_request into do_io/cluster_request to conform naming in ops.c
- rename forward_*_obj_req into do_cluster_*, since we don't truely forward the
io requests. We have to do local io if requested object is local stored. I think
'cluster' is more appropriate.
Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
sheep/group.c | 2 +-
sheep/sdnet.c | 6 ++--
sheep/sheep_priv.h | 5 ++-
sheep/store.c | 55 ++++++++++++++++++++++++++++-----------------------
4 files changed, 37 insertions(+), 31 deletions(-)
diff --git a/sheep/group.c b/sheep/group.c
index bd1356a..f126de5 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -211,7 +211,7 @@ static void do_cluster_op(void *arg)
msg->rsp.result = ret;
}
-void cluster_queue_request(struct work *work, int idx)
+void do_cluster_request(struct work *work, int idx)
{
struct request *req = container_of(work, struct request, work);
struct sd_req *hdr = (struct sd_req *)&req->rq;
diff --git a/sheep/sdnet.c b/sheep/sdnet.c
index 98cafd8..58b836c 100644
--- a/sheep/sdnet.c
+++ b/sheep/sdnet.c
@@ -221,13 +221,13 @@ static void queue_request(struct request *req)
}
if (is_io_op(req->op)) {
- req->work.fn = store_queue_request;
+ req->work.fn = do_io_request;
req->work.done = io_op_done;
} else if (is_local_op(req->op)) {
- req->work.fn = store_queue_request;
+ req->work.fn = do_local_request;
req->work.done = local_op_done;
} else if (is_cluster_op(req->op)) {
- req->work.fn = cluster_queue_request;
+ req->work.fn = do_cluster_request;
req->work.done = cluster_op_done;
} else {
eprintf("unknown operation %d\n", hdr->opcode);
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 5e10a2f..2cef322 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -214,7 +214,8 @@ int create_cluster(int port, int64_t zone);
int leave_cluster(void);
void start_cpg_event_work(void);
-void store_queue_request(struct work *work, int idx);
+void do_io_request(struct work *work, int idx);
+void do_local_request(struct work *work, int idx);
int write_object_local(uint64_t oid, char *data, unsigned int datalen,
uint64_t offset, uint16_t flags, int copies,
uint32_t epoch, int create);
@@ -223,7 +224,7 @@ int read_object_local(uint64_t oid, char *data, unsigned int datalen,
int read_epoch(uint32_t *epoch, uint64_t *ctime,
struct sheepdog_node_list_entry *entries, int *nr_entries);
-void cluster_queue_request(struct work *work, int idx);
+void do_cluster_request(struct work *work, int idx);
int update_epoch_store(uint32_t epoch);
int update_epoch_log(int epoch);
diff --git a/sheep/store.c b/sheep/store.c
index 335da61..1b23eab 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -252,9 +252,9 @@ static int read_copy_from_cluster(struct request *req, uint32_t epoch,
return ret;
}
-static int store_queue_request_local(struct request *req, uint32_t epoch);
+static int do_local_rw(struct request *req, uint32_t epoch);
-static int forward_read_obj_req(struct request *req, int idx)
+static int do_cluster_read(struct request *req, int idx)
{
int i, n, nr, fd, ret;
unsigned wlen, rlen;
@@ -282,7 +282,7 @@ static int forward_read_obj_req(struct request *req, int idx)
n = obj_to_sheep(e, nr, oid, i);
if (is_myself(e[n].addr, e[n].port)) {
- ret = store_queue_request_local(req, hdr.epoch);
+ ret = do_local_rw(req, hdr.epoch);
goto out;
}
}
@@ -310,7 +310,7 @@ out:
return ret;
}
-static int forward_write_obj_req(struct request *req, int idx)
+static int do_cluster_write(struct request *req, int idx)
{
int i, n, nr, fd, ret;
unsigned wlen;
@@ -374,7 +374,7 @@ static int forward_write_obj_req(struct request *req, int idx)
}
if (local) {
- ret = store_queue_request_local(req, hdr.epoch);
+ ret = do_local_rw(req, hdr.epoch);
rsp->result = ret;
if (nr_fds == 0) {
@@ -505,7 +505,7 @@ int write_object_local(uint64_t oid, char *data, unsigned int datalen,
hdr->data_length = datalen;
req->data = data;
- ret = store_queue_request_local(req, epoch);
+ ret = do_local_rw(req, epoch);
free(req);
@@ -535,7 +535,7 @@ int read_object_local(uint64_t oid, char *data, unsigned int datalen,
hdr->data_length = datalen;
req->data = data;
- ret = store_queue_request_local(req, epoch);
+ ret = do_local_rw(req, epoch);
rsp_data_length = rsp->data_length;
free(req);
@@ -678,7 +678,7 @@ out:
return ret;
}
-static int store_queue_request_local(struct request *req, uint32_t epoch)
+static int do_local_rw(struct request *req, uint32_t epoch)
{
struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
int ret = SD_RES_SUCCESS;
@@ -740,7 +740,7 @@ static int fix_object_consistency(struct request *req, int idx)
hdr->data_length = data_length;
hdr->opcode = SD_OP_READ_OBJ;
hdr->flags = 0;
- ret = forward_read_obj_req(req, idx);
+ ret = do_cluster_read(req, idx);
if (ret != SD_RES_SUCCESS) {
eprintf("failed to read object %d\n", ret);
goto out;
@@ -749,7 +749,7 @@ static int fix_object_consistency(struct request *req, int idx)
hdr->opcode = SD_OP_WRITE_OBJ;
hdr->flags = SD_FLAG_CMD_WRITE;
hdr->oid = oid;
- ret = forward_write_obj_req(req, idx);
+ ret = do_cluster_write(req, idx);
if (ret != SD_RES_SUCCESS) {
eprintf("failed to write object %d\n", ret);
goto out;
@@ -763,7 +763,21 @@ out:
return ret;
}
-void store_queue_request(struct work *work, int idx)
+void do_local_request(struct work *work, int idx)
+{
+ struct request *req = container_of(work, struct request, work);
+ struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&req->rp;
+ int ret = SD_RES_SUCCESS;
+
+ dprintf("local queue request: %d\n", idx);
+
+ if (has_process_work(req->op))
+ ret = do_process_work(req->op, &req->rq, &req->rp, req->data);
+
+ rsp->result = ret;
+}
+
+void do_io_request(struct work *work, int idx)
{
struct request *req = container_of(work, struct request, work);
int ret = SD_RES_SUCCESS;
@@ -778,14 +792,9 @@ void store_queue_request(struct work *work, int idx)
if (hdr->flags & SD_FLAG_CMD_RECOVERY)
epoch = hdr->tgt_epoch;
- if (is_local_op(req->op)) {
- if (has_process_work(req->op))
- ret = do_process_work(req->op, &req->rq,
- &req->rp, req->data);
- goto out;
- }
-
- if (!(hdr->flags & SD_FLAG_CMD_IO_LOCAL)) {
+ if (hdr->flags & SD_FLAG_CMD_IO_LOCAL) {
+ ret = do_local_rw(req, epoch);
+ } else {
/* fix object consistency when we read the object for the first time */
if (req->check_consistency) {
ret = fix_object_consistency(req, idx);
@@ -794,18 +803,14 @@ void store_queue_request(struct work *work, int idx)
}
if (hdr->flags & SD_FLAG_CMD_WRITE)
- ret = forward_write_obj_req(req, idx);
+ ret = do_cluster_write(req, idx);
else
- ret = forward_read_obj_req(req, idx);
- goto out;
+ ret = do_cluster_read(req, idx);
}
-
- ret = store_queue_request_local(req, epoch);
out:
if (ret != SD_RES_SUCCESS)
dprintf("failed: %"PRIu32", %x, %" PRIx64" , %u, %"PRIu32"\n",
idx, opcode, oid, epoch, ret);
-
rsp->result = ret;
}
--
1.7.8.rc3
More information about the sheepdog
mailing list