[sheepdog] [PATCH V4 1/2] sheep: refactor gateway_forward_request
Yunkai Zhang
yunkai.me at gmail.com
Mon Sep 3 10:48:02 CEST 2012
On Sun, Sep 2, 2012 at 8:59 PM, Yunkai Zhang <yunkai.me at gmail.com> wrote:
> From: Yunkai Zhang <qiushu.zyk at taobao.com>
>
> - No longer treat local request specially, so that we can forward local/remote
> requests concurrently, and make the code more simpler.
> - Exact a common low level function which can forward request concurrently and
> collect response data from target nodes. This low level function is useful for
> the callers who need to read the response data.
>
> Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com>
> ---
> sheep/gateway.c | 120 +++++++++++++++++++++++++++++++++++---------------------
> 1 file changed, 75 insertions(+), 45 deletions(-)
>
> diff --git a/sheep/gateway.c b/sheep/gateway.c
> index 41d712b..94ec6f9 100644
> --- a/sheep/gateway.c
> +++ b/sheep/gateway.c
> @@ -99,6 +99,7 @@ int gateway_read_obj(struct request *req)
> }
>
> struct write_info_entry {
> + int idx;
> struct pollfd pfd;
> struct node_id *nid;
> struct sockfd *sfd;
> @@ -150,10 +151,13 @@ static inline void pfd_info_init(struct write_info *wi, struct pfd_info *pi)
> *
> * Return error code if any one request fails.
> */
> -static int wait_forward_request(struct write_info *wi, struct sd_rsp *rsp)
> +static int wait_forward_request(struct write_info *wi,
> + struct sd_rsp target_rsps[],
> + void *target_data[])
> {
> int nr_sent, err_ret = SD_RES_SUCCESS, ret, pollret, i;
> - struct pfd_info pi;;
> + struct pfd_info pi;
> + struct sd_rsp *rsp;
> again:
> pfd_info_init(wi, &pi);
> pollret = poll(pi.pfds, pi.nr, -1);
> @@ -170,14 +174,18 @@ again:
> break;
> if (i < nr_sent) {
> int re = pi.pfds[i].revents;
> + int idx = wi->ent[i].idx;
> dprintf("%d, revents %x\n", i, re);
> if (re & (POLLERR | POLLHUP | POLLNVAL)) {
> err_ret = SD_RES_NETWORK_ERROR;
> + target_rsps[idx].result = err_ret;
> finish_one_write_err(wi, i);
> } else if (re & POLLIN) {
> + rsp = &target_rsps[idx];
> if (do_read(pi.pfds[i].fd, rsp, sizeof(*rsp))) {
> eprintf("remote node might have gone away\n");
> err_ret = SD_RES_NETWORK_ERROR;
> + rsp->result = err_ret;
> finish_one_write_err(wi, i);
> goto finish_write;
> }
> @@ -187,6 +195,20 @@ again:
> eprintf("fail %"PRIx32"\n", ret);
> err_ret = ret;
> }
> +
> + if (rsp->data_length) {
> + assert(target_data);
> + ret = do_read(pi.pfds[i].fd, target_data[idx],
> + rsp->data_length);
> + if (ret) {
> + eprintf("failed to read the response data\n");
> + err_ret = SD_RES_NETWORK_ERROR;
> + rsp->result = err_ret;
> + finish_one_write_err(wi, i);
> + goto finish_write;
> + }
> + }
> +
> finish_one_write(wi, i);
> } else {
> eprintf("unhandled poll event\n");
> @@ -208,9 +230,10 @@ static inline void write_info_init(struct write_info *wi)
> }
>
> static inline void
> -write_info_advance(struct write_info *wi, struct sd_vnode *v,
> +write_info_advance(struct write_info *wi, struct sd_vnode *v, int idx,
> struct sockfd *sfd)
> {
> + wi->ent[wi->nr_sent].idx = idx;
> wi->ent[wi->nr_sent].nid = &v->nid;
> wi->ent[wi->nr_sent].pfd.fd = sfd->fd;
> wi->ent[wi->nr_sent].pfd.events = POLLIN;
> @@ -225,70 +248,49 @@ static inline void gateway_init_fwd_hdr(struct sd_req *fwd, struct sd_req *hdr)
> fwd->proto_ver = SD_SHEEP_PROTO_VER;
> }
>
> -static int gateway_forward_request(struct request *req)
> +/*
> + * Forward requests concurrently, collect response data sent back
> + * from target nodes and fill them into target_rsps[] and target_data[],
> + * target_data parameter can be set to NULL if no data return.
> + */
> +static int forward_request_concurrently(struct sd_req *hdr,
> + void *data, unsigned int *wlen,
> + struct sd_vnode *target_vnodes[],
> + struct sd_rsp target_rsps[],
> + void *target_data[],
> + int nr_targets)
> {
> - int i, err_ret = SD_RES_SUCCESS, ret, local = -1;
> - unsigned wlen;
> - struct sd_rsp *rsp = (struct sd_rsp *)&req->rp;
> - struct sd_vnode *v;
> - struct sd_vnode *obj_vnodes[SD_MAX_COPIES];
> - uint64_t oid = req->rq.obj.oid;
> - int nr_copies;
> struct write_info wi;
> - struct sd_op_template *op;
> - struct sd_req hdr;
> -
> - dprintf("%"PRIx64"\n", oid);
> -
> - gateway_init_fwd_hdr(&hdr, &req->rq);
> - op = get_sd_op(hdr.opcode);
> + struct sd_vnode *v;
> + struct sockfd *sfd;
> + int i, ret, err_ret = SD_RES_SUCCESS;
>
> write_info_init(&wi);
> - wlen = hdr.data_length;
> - nr_copies = get_req_copy_number(req);
> - oid_to_vnodes(req->vinfo->vnodes, req->vinfo->nr_vnodes, oid,
> - nr_copies, obj_vnodes);
> -
> - for (i = 0; i < nr_copies; i++) {
> - struct sockfd *sfd;
>
> - v = obj_vnodes[i];
> - if (vnode_is_local(v)) {
> - local = i;
> - continue;
> - }
> + for (i = 0; i < nr_targets; i++) {
> + v = target_vnodes[i];
>
> sfd = sheep_get_sockfd(&v->nid);
> if (!sfd) {
> err_ret = SD_RES_NETWORK_ERROR;
> + target_rsps[i].result = err_ret;
> break;
> }
>
> - ret = send_req(sfd->fd, &hdr, req->data, &wlen);
> + ret = send_req(sfd->fd, hdr, data, wlen);
> if (ret) {
> sheep_del_sockfd(&v->nid, sfd);
> err_ret = SD_RES_NETWORK_ERROR;
> + target_rsps[i].result = err_ret;
> dprintf("fail %d\n", ret);
> break;
> }
> - write_info_advance(&wi, v, sfd);
> - }
> -
> - if (local != -1 && err_ret == SD_RES_SUCCESS) {
> - v = obj_vnodes[local];
> -
> - assert(op);
> - ret = sheep_do_op_work(op, req);
> -
> - if (ret != SD_RES_SUCCESS) {
> - eprintf("fail to write local %"PRIx32"\n", ret);
> - err_ret = ret;
> - }
> + write_info_advance(&wi, v, i, sfd);
> }
>
> dprintf("nr_sent %d, err %x\n", wi.nr_sent, err_ret);
> if (wi.nr_sent > 0) {
> - ret = wait_forward_request(&wi, rsp);
> + ret = wait_forward_request(&wi, target_rsps, target_data);
> if (ret != SD_RES_SUCCESS)
> err_ret = ret;
> }
> @@ -296,6 +298,34 @@ static int gateway_forward_request(struct request *req)
> return err_ret;
> }
>
> +/*
> + * Do forward request, not care about response data
> + * sent back from target nodes.
> + */
> +static int gateway_forward_request(struct request *req)
> +{
> + int nr_copies;
> + unsigned wlen;
> + struct sd_vnode *target_vnodes[SD_MAX_COPIES];
> + struct sd_rsp target_rsps[SD_MAX_COPIES];
> + uint64_t oid = req->rq.obj.oid;
> + struct sd_req hdr;
> +
> + dprintf("%"PRIx64"\n", oid);
> +
> + gateway_init_fwd_hdr(&hdr, &req->rq);
> +
> + wlen = hdr.data_length;
> +
> + nr_copies = get_req_copy_number(req);
> + oid_to_vnodes(req->vinfo->vnodes, req->vinfo->nr_vnodes, oid,
> + nr_copies, target_vnodes);
> +
> + return forward_request_concurrently(&hdr, req->data, &wlen,
> + target_vnodes, target_rsps, NULL,
> + nr_copies);
> +}
> +
> int gateway_write_obj(struct request *req)
> {
> if (sys->enable_write_cache && !req->local && !bypass_object_cache(req))
> --
> 1.7.11.4
>
This patch seem have a bug that may lead to crash when write data and
kill nodes currently, let me dig more.
--
Yunkai Zhang
Work at Taobao
More information about the sheepdog
mailing list