[sheepdog] [PATCH V4 1/2] sheep: refactor gateway_forward_request

Yunkai Zhang yunkai.me at gmail.com
Mon Sep 3 10:48:02 CEST 2012


On Sun, Sep 2, 2012 at 8:59 PM, Yunkai Zhang <yunkai.me at gmail.com> wrote:
> From: Yunkai Zhang <qiushu.zyk at taobao.com>
>
> - No longer treat local request specially, so that we can forward local/remote
>   requests concurrently, and make the code more simpler.
> - Exact a common low level function which can forward request concurrently and
>   collect response data from target nodes. This low level function is useful for
>   the callers who need to read the response data.
>
> Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com>
> ---
>  sheep/gateway.c | 120 +++++++++++++++++++++++++++++++++++---------------------
>  1 file changed, 75 insertions(+), 45 deletions(-)
>
> diff --git a/sheep/gateway.c b/sheep/gateway.c
> index 41d712b..94ec6f9 100644
> --- a/sheep/gateway.c
> +++ b/sheep/gateway.c
> @@ -99,6 +99,7 @@ int gateway_read_obj(struct request *req)
>  }
>
>  struct write_info_entry {
> +       int idx;
>         struct pollfd pfd;
>         struct node_id *nid;
>         struct sockfd *sfd;
> @@ -150,10 +151,13 @@ static inline void pfd_info_init(struct write_info *wi, struct pfd_info *pi)
>   *
>   * Return error code if any one request fails.
>   */
> -static int wait_forward_request(struct write_info *wi, struct sd_rsp *rsp)
> +static int wait_forward_request(struct write_info *wi,
> +                               struct sd_rsp target_rsps[],
> +                               void *target_data[])
>  {
>         int nr_sent, err_ret = SD_RES_SUCCESS, ret, pollret, i;
> -       struct pfd_info pi;;
> +       struct pfd_info pi;
> +       struct sd_rsp *rsp;
>  again:
>         pfd_info_init(wi, &pi);
>         pollret = poll(pi.pfds, pi.nr, -1);
> @@ -170,14 +174,18 @@ again:
>                         break;
>         if (i < nr_sent) {
>                 int re = pi.pfds[i].revents;
> +               int idx = wi->ent[i].idx;
>                 dprintf("%d, revents %x\n", i, re);
>                 if (re & (POLLERR | POLLHUP | POLLNVAL)) {
>                         err_ret = SD_RES_NETWORK_ERROR;
> +                       target_rsps[idx].result = err_ret;
>                         finish_one_write_err(wi, i);
>                 } else if (re & POLLIN) {
> +                       rsp = &target_rsps[idx];
>                         if (do_read(pi.pfds[i].fd, rsp, sizeof(*rsp))) {
>                                 eprintf("remote node might have gone away\n");
>                                 err_ret = SD_RES_NETWORK_ERROR;
> +                               rsp->result = err_ret;
>                                 finish_one_write_err(wi, i);
>                                 goto finish_write;
>                         }
> @@ -187,6 +195,20 @@ again:
>                                 eprintf("fail %"PRIx32"\n", ret);
>                                 err_ret = ret;
>                         }
> +
> +                       if (rsp->data_length) {
> +                               assert(target_data);
> +                               ret = do_read(pi.pfds[i].fd, target_data[idx],
> +                                             rsp->data_length);
> +                               if (ret) {
> +                                       eprintf("failed to read the response data\n");
> +                                       err_ret = SD_RES_NETWORK_ERROR;
> +                                       rsp->result = err_ret;
> +                                       finish_one_write_err(wi, i);
> +                                       goto finish_write;
> +                               }
> +                       }
> +
>                         finish_one_write(wi, i);
>                 } else {
>                         eprintf("unhandled poll event\n");
> @@ -208,9 +230,10 @@ static inline void write_info_init(struct write_info *wi)
>  }
>
>  static inline void
> -write_info_advance(struct write_info *wi, struct sd_vnode *v,
> +write_info_advance(struct write_info *wi, struct sd_vnode *v, int idx,
>                    struct sockfd *sfd)
>  {
> +       wi->ent[wi->nr_sent].idx = idx;
>         wi->ent[wi->nr_sent].nid = &v->nid;
>         wi->ent[wi->nr_sent].pfd.fd = sfd->fd;
>         wi->ent[wi->nr_sent].pfd.events = POLLIN;
> @@ -225,70 +248,49 @@ static inline void gateway_init_fwd_hdr(struct sd_req *fwd, struct sd_req *hdr)
>         fwd->proto_ver = SD_SHEEP_PROTO_VER;
>  }
>
> -static int gateway_forward_request(struct request *req)
> +/*
> + * Forward requests concurrently, collect response data sent back
> + * from target nodes and fill them into target_rsps[] and target_data[],
> + * target_data parameter can be set to NULL if no data return.
> + */
> +static int forward_request_concurrently(struct sd_req *hdr,
> +                                       void *data, unsigned int *wlen,
> +                                       struct sd_vnode *target_vnodes[],
> +                                       struct sd_rsp target_rsps[],
> +                                       void *target_data[],
> +                                       int nr_targets)
>  {
> -       int i, err_ret = SD_RES_SUCCESS, ret, local = -1;
> -       unsigned wlen;
> -       struct sd_rsp *rsp = (struct sd_rsp *)&req->rp;
> -       struct sd_vnode *v;
> -       struct sd_vnode *obj_vnodes[SD_MAX_COPIES];
> -       uint64_t oid = req->rq.obj.oid;
> -       int nr_copies;
>         struct write_info wi;
> -       struct sd_op_template *op;
> -       struct sd_req hdr;
> -
> -       dprintf("%"PRIx64"\n", oid);
> -
> -       gateway_init_fwd_hdr(&hdr, &req->rq);
> -       op = get_sd_op(hdr.opcode);
> +       struct sd_vnode *v;
> +       struct sockfd *sfd;
> +       int i, ret, err_ret = SD_RES_SUCCESS;
>
>         write_info_init(&wi);
> -       wlen = hdr.data_length;
> -       nr_copies = get_req_copy_number(req);
> -       oid_to_vnodes(req->vinfo->vnodes, req->vinfo->nr_vnodes, oid,
> -                     nr_copies, obj_vnodes);
> -
> -       for (i = 0; i < nr_copies; i++) {
> -               struct sockfd *sfd;
>
> -               v = obj_vnodes[i];
> -               if (vnode_is_local(v)) {
> -                       local = i;
> -                       continue;
> -               }
> +       for (i = 0; i < nr_targets; i++) {
> +               v = target_vnodes[i];
>
>                 sfd = sheep_get_sockfd(&v->nid);
>                 if (!sfd) {
>                         err_ret = SD_RES_NETWORK_ERROR;
> +                       target_rsps[i].result = err_ret;
>                         break;
>                 }
>
> -               ret = send_req(sfd->fd, &hdr, req->data, &wlen);
> +               ret = send_req(sfd->fd, hdr, data, wlen);
>                 if (ret) {
>                         sheep_del_sockfd(&v->nid, sfd);
>                         err_ret = SD_RES_NETWORK_ERROR;
> +                       target_rsps[i].result = err_ret;
>                         dprintf("fail %d\n", ret);
>                         break;
>                 }
> -               write_info_advance(&wi, v, sfd);
> -       }
> -
> -       if (local != -1 && err_ret == SD_RES_SUCCESS) {
> -               v = obj_vnodes[local];
> -
> -               assert(op);
> -               ret = sheep_do_op_work(op, req);
> -
> -               if (ret != SD_RES_SUCCESS) {
> -                       eprintf("fail to write local %"PRIx32"\n", ret);
> -                       err_ret = ret;
> -               }
> +               write_info_advance(&wi, v, i, sfd);
>         }
>
>         dprintf("nr_sent %d, err %x\n", wi.nr_sent, err_ret);
>         if (wi.nr_sent > 0) {
> -               ret = wait_forward_request(&wi, rsp);
> +               ret = wait_forward_request(&wi, target_rsps, target_data);
>                 if (ret != SD_RES_SUCCESS)
>                         err_ret = ret;
>         }
> @@ -296,6 +298,34 @@ static int gateway_forward_request(struct request *req)
>         return err_ret;
>  }
>
> +/*
> + * Do forward request, not care about response data
> + * sent back from target nodes.
> + */
> +static int gateway_forward_request(struct request *req)
> +{
> +       int nr_copies;
> +       unsigned wlen;
> +       struct sd_vnode *target_vnodes[SD_MAX_COPIES];
> +       struct sd_rsp target_rsps[SD_MAX_COPIES];
> +       uint64_t oid = req->rq.obj.oid;
> +       struct sd_req hdr;
> +
> +       dprintf("%"PRIx64"\n", oid);
> +
> +       gateway_init_fwd_hdr(&hdr, &req->rq);
> +
> +       wlen = hdr.data_length;
> +
> +       nr_copies = get_req_copy_number(req);
> +       oid_to_vnodes(req->vinfo->vnodes, req->vinfo->nr_vnodes, oid,
> +                     nr_copies, target_vnodes);
> +
> +       return forward_request_concurrently(&hdr, req->data, &wlen,
> +                                           target_vnodes, target_rsps, NULL,
> +                                           nr_copies);
> +}
> +
>  int gateway_write_obj(struct request *req)
>  {
>         if (sys->enable_write_cache && !req->local && !bypass_object_cache(req))
> --
> 1.7.11.4
>

This patch seem have a bug that may lead to crash when write data and
kill nodes currently, let me dig more.



-- 
Yunkai Zhang
Work at Taobao



More information about the sheepdog mailing list