[sheepdog] [PATCH 2/3] sheep: add a helper to send req on top of sockfd cache
Liu Yuan
namei.unix at gmail.com
Wed Sep 12 10:07:13 CEST 2012
From: Liu Yuan <tailai.ly at taobao.com>
This will greatly reduce the duplicated and lenghty code.
Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
sheep/gateway.c | 37 ++++++---------------
sheep/group.c | 33 ++++---------------
sheep/recovery.c | 87 +++++++++++---------------------------------------
sheep/sheep_priv.h | 2 ++
sheep/sockfd_cache.c | 25 +++++++++++++++
sheep/store.c | 23 ++-----------
6 files changed, 64 insertions(+), 143 deletions(-)
diff --git a/sheep/gateway.c b/sheep/gateway.c
index 2231804..3fd49a5 100644
--- a/sheep/gateway.c
+++ b/sheep/gateway.c
@@ -56,44 +56,27 @@ int gateway_read_obj(struct request *req)
*/
j = random();
for (i = 0; i < nr_copies; i++) {
- struct sockfd *sfd;
int idx = (i + j) % nr_copies;
- memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr));
- fwd_hdr.opcode = SD_OP_READ_PEER;
- fwd_hdr.proto_ver = SD_SHEEP_PROTO_VER;
-
v = obj_vnodes[idx];
if (vnode_is_local(v))
continue;
- sfd = sheep_get_sockfd(&v->nid);
- if (!sfd) {
- ret = SD_RES_NETWORK_ERROR;
- continue;
- }
-
+ memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr));
+ fwd_hdr.opcode = SD_OP_READ_PEER;
+ fwd_hdr.proto_ver = SD_SHEEP_PROTO_VER;
wlen = 0;
rlen = fwd_hdr.data_length;
- ret = exec_req(sfd->fd, &fwd_hdr, req->data, &wlen, &rlen);
+ ret = sheep_exec_req(&v->nid, &fwd_hdr, req->data, &wlen,
+ &rlen);
- if (!ret && rsp->result == SD_RES_SUCCESS) {
- memcpy(&req->rp, rsp, sizeof(*rsp));
- ret = rsp->result;
- sheep_put_sockfd(&v->nid, sfd);
- break; /* Read success */
- }
+ if (ret != SD_RES_SUCCESS)
+ continue;
- if (ret) {
- dprintf("remote node might have gone away\n");
- sheep_del_sockfd(&v->nid, sfd);
- ret = SD_RES_NETWORK_ERROR;
- } else {
- ret = rsp->result;
- eprintf("remote read fail %x\n", ret);
- sheep_put_sockfd(&v->nid, sfd);
- }
+ /* Read success */
+ memcpy(&req->rp, rsp, sizeof(*rsp));
+ break;
}
return ret;
}
diff --git a/sheep/group.c b/sheep/group.c
index 56dee6a..ae04a6d 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -623,7 +623,6 @@ static int get_vdis_from(struct sd_node *node)
struct sd_req hdr;
struct sd_rsp *rsp = (struct sd_rsp *)&hdr;
struct vdi_copy *vc = NULL;
- struct sockfd *sfd;
int i, ret = SD_RES_SUCCESS;
unsigned int rlen, wlen;
int count;
@@ -631,18 +630,7 @@ static int get_vdis_from(struct sd_node *node)
if (node_is_local(node))
goto out;
- sfd = sheep_get_sockfd(&node->nid);
- if (!sfd) {
- ret = SD_RES_NETWORK_ERROR;
- goto out;
- }
-
rlen = SD_DATA_OBJ_SIZE; /* FIXME */
- sd_init_req(&hdr, SD_OP_GET_VDI_COPIES);
- hdr.epoch = sys->epoch;
- hdr.data_length = rlen;
- wlen = 0;
-
vc = zalloc(rlen);
if (!vc) {
vprintf(SDOG_ERR, "unable to allocate memory\n");
@@ -650,28 +638,19 @@ static int get_vdis_from(struct sd_node *node)
goto out;
}
- ret = exec_req(sfd->fd, &hdr, (char *)vc, &wlen, &rlen);
-
- if (ret) {
- dprintf("remote node might have gone away\n");
- sheep_del_sockfd(&node->nid, sfd);
- ret = SD_RES_NETWORK_ERROR;
- goto out;
- }
- if (rsp->result != SD_RES_SUCCESS) {
- ret = rsp->result;
- eprintf("failed %x\n", ret);
- sheep_put_sockfd(&node->nid, sfd);
+ sd_init_req(&hdr, SD_OP_GET_VDI_COPIES);
+ hdr.epoch = sys->epoch;
+ hdr.data_length = rlen;
+ wlen = 0;
+ ret = sheep_exec_req(&node->nid, &hdr, (char *)vc, &wlen, &rlen);
+ if (ret != SD_RES_SUCCESS)
goto out;
- }
- sheep_put_sockfd(&node->nid, sfd);
count = rsp->data_length / sizeof(*vc);
for (i = 0; i < count; i++) {
set_bit(vc[i].vid, sys->vdi_inuse);
add_vdi_copy_number(vc[i].vid, vc[i].nr_copies);
}
- ret = SD_RES_SUCCESS;
out:
free(vc);
return ret;
diff --git a/sheep/recovery.c b/sheep/recovery.c
index ae36768..aaa144b 100644
--- a/sheep/recovery.c
+++ b/sheep/recovery.c
@@ -45,8 +45,10 @@ static struct recovery_work *recovering_work;
static int obj_cmp(const void *oid1, const void *oid2)
{
- const uint64_t hval1 = fnv_64a_buf((void *)oid1, sizeof(uint64_t), FNV1A_64_INIT);
- const uint64_t hval2 = fnv_64a_buf((void *)oid2, sizeof(uint64_t), FNV1A_64_INIT);
+ const uint64_t hval1 = fnv_64a_buf((void *)oid1, sizeof(uint64_t),
+ FNV1A_64_INIT);
+ const uint64_t hval2 = fnv_64a_buf((void *)oid2, sizeof(uint64_t),
+ FNV1A_64_INIT);
if (hval1 < hval2)
return -1;
@@ -59,15 +61,12 @@ static int recover_object_from_replica(uint64_t oid, struct sd_vnode *vnode,
uint32_t epoch, uint32_t tgt_epoch)
{
struct sd_req hdr;
- struct sd_rsp *rsp = (struct sd_rsp *)&hdr;
unsigned wlen = 0, rlen;
- int ret = -1;
+ int ret = SD_RES_NO_MEM;
void *buf;
struct siocb iocb = { 0 };
- struct sockfd *sfd;
rlen = get_objsize(oid);
-
buf = valloc(rlen);
if (!buf) {
eprintf("%m\n");
@@ -78,19 +77,6 @@ static int recover_object_from_replica(uint64_t oid, struct sd_vnode *vnode,
iocb.epoch = epoch;
iocb.length = rlen;
ret = sd_store->link(oid, &iocb, tgt_epoch);
- if (ret == SD_RES_SUCCESS) {
- ret = 0;
- goto done;
- } else {
- ret = -1;
- goto out;
- }
- }
-
-
- sfd = sheep_get_sockfd(&vnode->nid);
- if (!sfd) {
- ret = -1;
goto out;
}
@@ -98,41 +84,22 @@ static int recover_object_from_replica(uint64_t oid, struct sd_vnode *vnode,
hdr.epoch = epoch;
hdr.flags = SD_FLAG_CMD_RECOVERY;
hdr.data_length = rlen;
-
hdr.obj.oid = oid;
hdr.obj.tgt_epoch = tgt_epoch;
- ret = exec_req(sfd->fd, &hdr, buf, &wlen, &rlen);
-
- if (ret != 0) {
- eprintf("res: %"PRIx32"\n", rsp->result);
- ret = -1;
- sheep_del_sockfd(&vnode->nid, sfd);
- goto out;
- }
-
- if (rsp->result == SD_RES_SUCCESS) {
- sheep_put_sockfd(&vnode->nid, sfd);
-
- iocb.epoch = epoch;
- iocb.length = rlen;
- iocb.buf = buf;
- ret = sd_store->atomic_put(oid, &iocb);
- if (ret != SD_RES_SUCCESS) {
- ret = -1;
- goto out;
- }
- } else {
- eprintf("failed, res: %"PRIx32"\n", rsp->result);
- ret = rsp->result;
- sheep_del_sockfd(&vnode->nid, sfd);
+ ret = sheep_exec_req(&vnode->nid, &hdr, buf, &wlen, &rlen);
+ if (ret != SD_RES_SUCCESS)
goto out;
- }
-done:
- dprintf("recovered oid %"PRIx64" from %d to epoch %d\n", oid, tgt_epoch, epoch);
+ iocb.epoch = epoch;
+ iocb.length = rlen;
+ iocb.buf = buf;
+ ret = sd_store->atomic_put(oid, &iocb);
out:
- if (ret == SD_RES_SUCCESS)
+ if (ret == SD_RES_SUCCESS) {
+ dprintf("recovered oid %"PRIx64" from %d to epoch %d\n", oid,
+ tgt_epoch, epoch);
objlist_cache_insert(oid);
+ }
free(buf);
return ret;
}
@@ -180,7 +147,7 @@ again:
continue;
ret = recover_object_from_replica(oid, tgt_vnode,
epoch, tgt_epoch);
- if (ret == 0) {
+ if (ret == SD_RES_SUCCESS) {
/* Succeed */
break;
} else if (SD_RES_OLD_NODE_VER == ret) {
@@ -499,19 +466,12 @@ static int fetch_object_list(struct sd_node *e, uint32_t epoch,
char name[128];
struct sd_list_req hdr;
struct sd_list_rsp *rsp = (struct sd_list_rsp *)&hdr;
- struct sockfd *sfd;
int ret;
addr_to_str(name, sizeof(name), e->nid.addr, 0);
dprintf("%s %"PRIu32"\n", name, e->nid.port);
- sfd = sheep_get_sockfd(&e->nid);
- if (!sfd) {
- eprintf("%s %"PRIu32"\n", name, e->nid.port);
- return -1;
- }
-
wlen = 0;
rlen = buf_size;
@@ -520,21 +480,10 @@ static int fetch_object_list(struct sd_node *e, uint32_t epoch,
hdr.flags = 0;
hdr.data_length = rlen;
- ret = exec_req(sfd->fd, (struct sd_req *)&hdr, buf, &wlen, &rlen);
+ ret = sheep_exec_req(&e->nid, (struct sd_req *)&hdr, buf, &wlen, &rlen);
- if (ret) {
- dprintf("remote node might have gone away\n");
- sheep_del_sockfd(&e->nid, sfd);
- return -1;
- }
- if (rsp->result != SD_RES_SUCCESS) {
- ret = rsp->result;
- eprintf("failed %x\n", ret);
- sheep_put_sockfd(&e->nid, sfd);
+ if (ret != SD_RES_SUCCESS)
return -1;
- }
-
- sheep_put_sockfd(&e->nid, sfd);
dprintf("%zu\n", rsp->data_length / sizeof(uint64_t));
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 6eb7e2c..9c7c88f 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -403,6 +403,8 @@ void sockfd_cache_add_group(struct sd_node *nodes, int nr);
struct sockfd *sheep_get_sockfd(struct node_id *);
void sheep_put_sockfd(struct node_id *, struct sockfd *);
void sheep_del_sockfd(struct node_id *, struct sockfd *);
+int sheep_exec_req(struct node_id *nid, struct sd_req *hdr, void *data,
+ unsigned int *wlen, unsigned int *rlen);
static inline bool is_object_cache_enabled(void)
{
diff --git a/sheep/sockfd_cache.c b/sheep/sockfd_cache.c
index 1e957eb..ac1c9ce 100644
--- a/sheep/sockfd_cache.c
+++ b/sheep/sockfd_cache.c
@@ -462,3 +462,28 @@ void sheep_del_sockfd(struct node_id *nid, struct sockfd *sfd)
sockfd_cache_del(nid);
free(sfd);
}
+
+int sheep_exec_req(struct node_id *nid, struct sd_req *hdr, void *buf,
+ unsigned int *wlen, unsigned int *rlen)
+{
+ struct sd_rsp *rsp = (struct sd_rsp *)hdr;
+ struct sockfd *sfd;
+ int ret;
+
+ sfd = sheep_get_sockfd(nid);
+ if (!sfd)
+ return SD_RES_NETWORK_ERROR;
+
+ ret = exec_req(sfd->fd, hdr, buf, wlen, rlen);
+ if (ret) {
+ dprintf("remote node might have gone away\n");
+ sheep_del_sockfd(nid, sfd);
+ return SD_RES_NETWORK_ERROR;
+ }
+ ret = rsp->result;
+ if (ret != SD_RES_SUCCESS)
+ eprintf("failed %x\n", ret);
+
+ sheep_put_sockfd(nid, sfd);
+ return ret;
+}
diff --git a/sheep/store.c b/sheep/store.c
index d89e875..7f9ea29 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -102,37 +102,20 @@ int epoch_log_read_remote(uint32_t epoch, struct sd_node *nodes, int len)
for (i = 0; i < nr; i++) {
struct sd_req hdr;
struct sd_rsp *rsp = (struct sd_rsp *)&hdr;
- struct sockfd *sfd;
unsigned int rlen, wlen;
if (node_is_local(&local_nodes[i]))
continue;
- sfd = sheep_get_sockfd(&local_nodes[i].nid);
- if (!sfd) {
- continue;
- }
-
sd_init_req(&hdr, SD_OP_GET_EPOCH);
hdr.data_length = rlen = len;
hdr.obj.tgt_epoch = epoch;
-
wlen = 0;
-
- ret = exec_req(sfd->fd, &hdr, nodes, &wlen, &rlen);
- if (ret) {
- dprintf("remote node might have gone away\n");
- sheep_del_sockfd(&local_nodes[i].nid, sfd);
- continue;
- }
- if (rsp->result != SD_RES_SUCCESS) {
- ret = rsp->result;
- eprintf("failed %x\n", ret);
- sheep_put_sockfd(&local_nodes[i].nid, sfd);
+ ret = sheep_exec_req(&local_nodes[i].nid, &hdr, nodes, &wlen,
+ &rlen);
+ if (ret != SD_RES_SUCCESS)
continue;
- }
- sheep_put_sockfd(&local_nodes[i].nid, sfd);
return rsp->data_length / sizeof(*nodes);
}
--
1.7.10.2
More information about the sheepdog
mailing list