[sheepdog] [PATCH, RFC] sheep: rewrite blocked notifications

Yunkai Zhang yunkai.me at gmail.com
Thu May 17 09:11:20 CEST 2012


On Wed, May 16, 2012 at 3:04 PM, Christoph Hellwig <hch at infradead.org> wrote:
> The prime AIM of this patch is to fix the racy access to sys->pending_list
> in do_cluster_op, but it actually cleans up the surrounding code massively
> as well.
>
> It contains three tightly related changes:
>
>  - split a new ->block operation from ->notify.  It is used to tell the
>   cluster driver to block new events, but does not contain a message by
>   itself yet.
>  - the block_cb callback previously passed to ->notify is not passed to
>   ->block any more, but a new sd_block_handler callback is provided
>   that can be called from the cluster driver in main thread context.
>   sd_block_handler takes care of grabbing the first request from
>   sys->pending list in the main thread, and then scheduling a workqueue
>   to handle the cluster operation
>  - a new ->unblock cluster operation is added which is called from the
>   ->done handler of the block workqueue to tell the cluster driver
>   to unblock the event processing, as well as sending the message with
>   the results from the main processing (or simplify the cluster wide
>   notification if there is no work routine in the ops table)
>
> Signed-off-by: Christoph Hellwig <hch at lst.de>
>
> ---
>  sheep/cluster.h           |   23 ++++++---
>  sheep/cluster/accord.c    |   45 ++++++++----------
>  sheep/cluster/corosync.c  |   78 ++++++-------------------------
>  sheep/cluster/local.c     |   54 ++++++++++-----------
>  sheep/cluster/zookeeper.c |   99 ++++++++++++++++------------------------
>  sheep/group.c             |  113 ++++++++++++++++++++++++++++------------------
>  sheep/sdnet.c             |    2
>  sheep/sheep.c             |    3 -
>  sheep/sheep_priv.h        |    1
>  9 files changed, 195 insertions(+), 223 deletions(-)
>
> Index: sheepdog/sheep/cluster.h
> ===================================================================
> --- sheepdog.orig/sheep/cluster.h       2012-05-16 08:20:02.419903970 +0200
> +++ sheepdog/sheep/cluster.h    2012-05-16 08:43:40.103890448 +0200
> @@ -76,15 +76,23 @@ struct cluster_driver {
>         * This function sends 'msg' to all the nodes.  The notified messages
>         * can be read through sd_notify_handler().
>         *
> -        * If 'block_cb' is specified, block_cb() is called before 'msg' is
> -        * notified to all the nodes.  All the cluster events including this
> -        * notification are blocked until block_cb() returns or this blocking
> -        * node leaves the cluster.  The sheep daemon can sleep in block_cb(),
> -        * so this callback must be not called from the dispatch (main) thread.
> -        *
>         * Returns zero on success, -1 on error
>         */
> -       int (*notify)(void *msg, size_t msg_len, void (*block_cb)(void *arg));
> +       int (*notify)(void *msg, size_t msg_len);
> +
> +       /*
> +        * Send a message to all nodes to block further events.
> +        *
> +        * Once the cluster driver has ensured that events are blocked on all
> +        * nodes it needs to call sd_block_handler() on the node where ->block
> +        * was called.
> +        */
> +       void (*block)(void);
> +
> +       /*
> +        * Unblock events on all nodes, and send a a message to all nodes.
> +        */
> +       void (*unblock)(void *msg, size_t msg_len);
>
>        /*
>         * Dispatch handlers
> @@ -189,6 +197,7 @@ void sd_join_handler(struct sd_node *joi
>  void sd_leave_handler(struct sd_node *left, struct sd_node *members,
>                size_t nr_members);
>  void sd_notify_handler(struct sd_node *sender, void *msg, size_t msg_len);
> +void sd_block_handler(void);
>  enum cluster_join_result sd_check_join_cb(struct sd_node *joining,
>                void *opaque);
>
> Index: sheepdog/sheep/cluster/corosync.c
> ===================================================================
> --- sheepdog.orig/sheep/cluster/corosync.c      2012-05-16 08:20:02.419903970 +0200
> +++ sheepdog/sheep/cluster/corosync.c   2012-05-16 08:21:23.147903200 +0200
> @@ -29,10 +29,7 @@ static struct cpg_name cpg_group = { 8,
>  static corosync_cfg_handle_t cfg_handle;
>  static struct cpg_node this_node;
>
> -static struct work_queue *corosync_block_wq;
> -
>  static LIST_HEAD(corosync_event_list);
> -static LIST_HEAD(corosync_block_list);
>
>  static struct cpg_node cpg_nodes[SD_MAX_NODES];
>  static size_t nr_cpg_nodes;
> @@ -205,24 +202,6 @@ retry:
>        return 0;
>  }
>
> -static void corosync_block(struct work *work)
> -{
> -       struct corosync_block_msg *bm = container_of(work, typeof(*bm), work);
> -
> -       bm->cb(bm->msg);
> -}
> -
> -static void corosync_block_done(struct work *work)
> -{
> -       struct corosync_block_msg *bm = container_of(work, typeof(*bm), work);
> -
> -       send_message(COROSYNC_MSG_TYPE_UNBLOCK, 0, &this_node, NULL, 0,
> -                    bm->msg, bm->msg_len);
> -
> -       free(bm->msg);
> -       free(bm);
> -}
> -
>  static struct corosync_event *find_block_event(enum corosync_event_type type,
>                                               struct cpg_node *sender)
>  {
> @@ -276,7 +255,6 @@ static void build_node_list(struct cpg_n
>  */
>  static int __corosync_dispatch_one(struct corosync_event *cevent)
>  {
> -       struct corosync_block_msg *bm;
>        enum cluster_join_result res;
>        struct sd_node entries[SD_MAX_NODES];
>        int idx;
> @@ -343,18 +321,7 @@ static int __corosync_dispatch_one(struc
>                if (cevent->blocked) {
>                        if (cpg_node_equal(&cevent->sender, &this_node) &&
>                            !cevent->callbacked) {
> -                               /* call a block callback function from a worker thread */
> -                               if (list_empty(&corosync_block_list))
> -                                       panic("cannot call block callback\n");
> -
> -                               bm = list_first_entry(&corosync_block_list,
> -                                                     typeof(*bm), list);
> -                               list_del(&bm->list);
> -
> -                               bm->work.fn = corosync_block;
> -                               bm->work.done = corosync_block_done;
> -                               queue_work(corosync_block_wq, &bm->work);
> -
> +                               sd_block_handler();
>                                cevent->callbacked = 1;
>                        }
>
> @@ -653,12 +620,6 @@ static int corosync_init(const char *opt
>                return -1;
>        }
>
> -       corosync_block_wq = init_work_queue(1);
> -       if (!corosync_block_wq) {
> -               eprintf("failed to create corosync workqueue: %m\n");
> -               return -1;
> -       }
> -
>        return fd;
>  }
>
> @@ -698,31 +659,22 @@ static int corosync_leave(void)
>                            NULL, 0);
>  }
>
> -static int corosync_notify(void *msg, size_t msg_len, void (*block_cb)(void *))
> +static void corosync_block(void)
>  {
> -       int ret;
> -       struct corosync_block_msg *bm;
> -
> -       if (block_cb) {
> -               bm = zalloc(sizeof(*bm));
> -               if (!bm)
> -                       panic("failed to allocate memory\n");
> -               bm->msg = zalloc(msg_len);
> -               if (!bm->msg)
> -                       panic("failed to allocate memory\n");
> +       send_message(COROSYNC_MSG_TYPE_BLOCK, 0, &this_node, NULL, 0,
> +                           NULL, 0);
> +}
>
> -               memcpy(bm->msg, msg, msg_len);
> -               bm->msg_len = msg_len;
> -               bm->cb = block_cb;
> -               list_add_tail(&bm->list, &corosync_block_list);
> -
> -               ret = send_message(COROSYNC_MSG_TYPE_BLOCK, 0, &this_node,
> -                                  NULL, 0, NULL, 0);
> -       } else
> -               ret = send_message(COROSYNC_MSG_TYPE_NOTIFY, 0, &this_node,
> -                                  NULL, 0, msg, msg_len);
> +static void corosync_unblock(void *msg, size_t msg_len)
> +{
> +       send_message(COROSYNC_MSG_TYPE_UNBLOCK, 0, &this_node, NULL, 0,
> +                    msg, msg_len);
> +}
>
> -       return ret;
> +static int corosync_notify(void *msg, size_t msg_len)
> +{
> +       return send_message(COROSYNC_MSG_TYPE_NOTIFY, 0, &this_node,
> +                          NULL, 0, msg, msg_len);
>  }
>
>  static int corosync_dispatch(void)
> @@ -743,6 +695,8 @@ struct cluster_driver cdrv_corosync = {
>        .join       = corosync_join,
>        .leave      = corosync_leave,
>        .notify     = corosync_notify,
> +       .block      = corosync_block,
> +       .unblock    = corosync_unblock,
>        .dispatch   = corosync_dispatch,
>  };
>
> Index: sheepdog/sheep/group.c
> ===================================================================
> --- sheepdog.orig/sheep/group.c 2012-05-16 08:20:02.419903970 +0200
> +++ sheepdog/sheep/group.c      2012-05-16 08:21:23.151903200 +0200
> @@ -209,29 +209,70 @@ int get_nr_copies(struct vnode_info *vno
>        return min(vnode_info->nr_zones, sys->nr_copies);
>  }
>
> +static struct vdi_op_message *prepare_cluster_msg(struct request *req,
> +               size_t *sizep)
> +{
> +       struct vdi_op_message *msg;
> +       size_t size;
> +
> +       if (has_process_main(req->op))
> +               size = sizeof(*msg) + req->rq.data_length;
> +       else
> +               size = sizeof(*msg);
> +
> +       msg = zalloc(size);
> +       if (!msg) {
> +               eprintf("failed to allocate memory\n");
> +               return NULL;
> +       }
> +
> +       memcpy(&msg->req, &req->rq, sizeof(struct sd_req));
> +       memcpy(&msg->rsp, &req->rp, sizeof(struct sd_rsp));
> +
> +       if (has_process_main(req->op))
> +               memcpy(msg->data, req->data, req->rq.data_length);
> +
> +       *sizep = size;
> +       return msg;
> +}
> +
> +static void do_cluster_request(struct work *work)
> +{
> +       struct request *req = container_of(work, struct request, work);
> +       int ret;
> +
> +       ret = do_process_work(req->op, &req->rq, &req->rp, req->data);
> +       req->rp.result = ret;
> +}
> +
> +static void cluster_op_done(struct work *work)
> +{
> +       struct request *req = container_of(work, struct request, work);
> +       struct vdi_op_message *msg;
> +       size_t size;
> +
> +       msg = prepare_cluster_msg(req, &size);
> +       if (!msg)
> +               panic();
> +
> +       sys->cdrv->unblock(msg, size);
> +}
> +
>  /*
>  * Perform a blocked cluster operation.
>  *
>  * Must run in the main thread as it access unlocked state like
>  * sys->pending_list.
>  */
> -static void do_cluster_op(void *arg)
> +void sd_block_handler(void)
>  {
> -       struct vdi_op_message *msg = arg;
> -       int ret;
> -       struct request *req;
> -       void *data;
> -
> -       req = list_first_entry(&sys->pending_list, struct request, pending_list);
> +       struct request *req = list_first_entry(&sys->pending_list,
> +                                               struct request, pending_list);
>
> -       if (has_process_main(req->op))
> -               data = msg->data;
> -       else
> -               data = req->data;
> -       ret = do_process_work(req->op, (const struct sd_req *)&msg->req,
> -                             (struct sd_rsp *)&msg->rsp, data);
> +       req->work.fn = do_cluster_request;
> +       req->work.done = cluster_op_done;
>
> -       msg->rsp.result = ret;
> +       queue_work(sys->block_wqueue, &req->work);
>  }
>
>  /*
> @@ -241,40 +282,28 @@ static void do_cluster_op(void *arg)
>  * Must run in the main thread as it access unlocked state like
>  * sys->pending_list.
>  */
> -static void do_cluster_request(struct request *req)
> +static void queue_cluster_request(struct request *req)
>  {
> -       struct sd_req *hdr = &req->rq;
> -       struct vdi_op_message *msg;
> -       size_t size;
> +       eprintf("%p %x\n", req, req->rq.opcode);
>
> -       eprintf("%p %x\n", req, hdr->opcode);
> +       if (has_process_work(req->op)) {
> +               list_add_tail(&req->pending_list, &sys->pending_list);
> +               sys->cdrv->block();
> +       } else {
> +               struct vdi_op_message *msg;
> +               size_t size;
>
> -       if (has_process_main(req->op))
> -               size = sizeof(*msg) + hdr->data_length;
> -       else
> -               size = sizeof(*msg);
> -
> -       msg = zalloc(size);
> -       if (!msg) {
> -               eprintf("failed to allocate memory\n");
> -               return;
> -       }
> -
> -       msg->req = *((struct sd_vdi_req *)&req->rq);
> -       msg->rsp = *((struct sd_vdi_rsp *)&req->rp);
> -       if (has_process_main(req->op))
> -               memcpy(msg->data, req->data, hdr->data_length);
> +               msg = prepare_cluster_msg(req, &size);
> +               if (!msg)
> +                       return;
>
> -       list_add_tail(&req->pending_list, &sys->pending_list);
> +               list_add_tail(&req->pending_list, &sys->pending_list);
>
> -       if (has_process_work(req->op))
> -               sys->cdrv->notify(msg, size, do_cluster_op);
> -       else {
>                msg->rsp.result = SD_RES_SUCCESS;
> -               sys->cdrv->notify(msg, size, NULL);
> -       }
> +               sys->cdrv->notify(msg, size);
>
> -       free(msg);
> +               free(msg);
> +       }
>  }
>
>  static void group_handler(int listen_fd, int events, void *data)
> @@ -1075,7 +1104,7 @@ static void process_request_queue(void)
>                         * directly from the main thread.  It's the cluster
>                         * drivers job to ensure we avoid blocking on I/O here.
>                         */
> -                       do_cluster_request(req);
> +                       queue_cluster_request(req);
>                } else { /* is_local_op(req->op) */
>                        queue_work(sys->io_wqueue, &req->work);
>                }
> Index: sheepdog/sheep/sdnet.c
> ===================================================================
> --- sheepdog.orig/sheep/sdnet.c 2012-05-16 08:20:02.419903970 +0200
> +++ sheepdog/sheep/sdnet.c      2012-05-16 08:21:23.151903200 +0200
> @@ -318,7 +318,7 @@ static void queue_request(struct request
>                req->work.fn = do_local_request;
>                req->work.done = local_op_done;
>        } else if (is_cluster_op(req->op)) {
> -               /* directly executed in the main thread */;
> +               ;
>        } else {
>                eprintf("unknown operation %d\n", hdr->opcode);
>                rsp->result = SD_RES_SYSTEM_ERROR;
> Index: sheepdog/sheep/sheep.c
> ===================================================================
> --- sheepdog.orig/sheep/sheep.c 2012-05-16 08:20:02.419903970 +0200
> +++ sheepdog/sheep/sheep.c      2012-05-16 08:21:23.151903200 +0200
> @@ -258,9 +258,10 @@ int main(int argc, char **argv)
>        sys->recovery_wqueue = init_work_queue(1);
>        sys->deletion_wqueue = init_work_queue(1);
>        sys->flush_wqueue = init_work_queue(1);
> +       sys->block_wqueue = init_work_queue(1);
>        if (!sys->event_wqueue || !sys->gateway_wqueue || !sys->io_wqueue ||
>            !sys->recovery_wqueue || !sys->deletion_wqueue ||
> -           !sys->flush_wqueue)
> +           !sys->flush_wqueue || !sys->block_wqueue)
>                exit(1);
>
>        ret = init_signal();
> Index: sheepdog/sheep/sheep_priv.h
> ===================================================================
> --- sheepdog.orig/sheep/sheep_priv.h    2012-05-16 08:20:02.419903970 +0200
> +++ sheepdog/sheep/sheep_priv.h 2012-05-16 08:21:23.151903200 +0200
> @@ -149,6 +149,7 @@ struct cluster_info {
>        struct work_queue *deletion_wqueue;
>        struct work_queue *recovery_wqueue;
>        struct work_queue *flush_wqueue;
> +       struct work_queue *block_wqueue;
>  };
>
>  struct siocb {
> Index: sheepdog/sheep/cluster/local.c
> ===================================================================
> --- sheepdog.orig/sheep/cluster/local.c 2012-05-16 08:20:02.419903970 +0200
> +++ sheepdog/sheep/cluster/local.c      2012-05-16 08:40:30.647892257 +0200
> @@ -53,10 +53,8 @@ struct local_event {
>
>        enum cluster_join_result join_result;
>
> -       void (*block_cb)(void *arg);
> -
>        int blocked; /* set non-zero when sheep must block this event */
> -       int callbacked; /* set non-zero if sheep already called block_cb() */
> +       int callbacked; /* set non-zero after sd_block_handler() was called */
>  };
>
>
> @@ -215,7 +213,7 @@ static void shm_queue_init(void)
>
>  static void add_event(enum local_event_type type,
>                      struct sd_node *node, void *buf,
> -                     size_t buf_len, void (*block_cb)(void *arg))
> +                     size_t buf_len, int blocked)
>  {
>        int idx;
>        struct sd_node *n;
> @@ -250,8 +248,7 @@ static void add_event(enum local_event_t
>                memmove(p, p + 1, sizeof(*p) * (ev.nr_nodes - idx));
>                break;
>        case EVENT_NOTIFY:
> -               ev.blocked = !!block_cb;
> -               ev.block_cb = block_cb;
> +               ev.blocked = blocked;
>                break;
>        }
>
> @@ -273,7 +270,7 @@ static void check_pids(void *arg)
>
>        for (i = 0; i < nr; i++)
>                if (!process_exists(pids[i]))
> -                       add_event(EVENT_LEAVE, nodes + i, NULL, 0, NULL);
> +                       add_event(EVENT_LEAVE, nodes + i, NULL, 0, 0);
>
>        shm_queue_unlock();
>
> @@ -329,7 +326,7 @@ static int local_join(struct sd_node *my
>
>        shm_queue_lock();
>
> -       add_event(EVENT_JOIN, &this_node, opaque, opaque_len, NULL);
> +       add_event(EVENT_JOIN, &this_node, opaque, opaque_len, 0);
>
>        shm_queue_unlock();
>
> @@ -340,25 +337,34 @@ static int local_leave(void)
>  {
>        shm_queue_lock();
>
> -       add_event(EVENT_LEAVE, &this_node, NULL, 0, NULL);
> +       add_event(EVENT_LEAVE, &this_node, NULL, 0, 0);
>
>        shm_queue_unlock();
>
>        return 0;
>  }
>
> -static int local_notify(void *msg, size_t msg_len, void (*block_cb)(void *arg))
> +static int local_notify(void *msg, size_t msg_len)
>  {
>        shm_queue_lock();
>
> -       add_event(EVENT_NOTIFY, &this_node, msg, msg_len, block_cb);
> +       add_event(EVENT_NOTIFY, &this_node, msg, msg_len, 0);
>
>        shm_queue_unlock();
>
>        return 0;
>  }
>
> -static void local_block(struct work *work)
> +static void local_block(void)
> +{
> +       shm_queue_lock();
> +
> +       add_event(EVENT_NOTIFY, &this_node, NULL, 0, 1);
> +
> +       shm_queue_unlock();
> +}
> +
> +static void local_unblock(void *msg, size_t msg_len)
>  {
>        struct local_event *ev;
>
> @@ -366,8 +372,10 @@ static void local_block(struct work *wor
>
>        ev = shm_queue_peek();
>
> -       ev->block_cb(ev->buf);
>        ev->blocked = 0;
> +       ev->buf_len = msg_len;
> +       if (msg)
> +               memcpy(ev->buf, msg, msg_len);
>        msync(ev, sizeof(*ev), MS_SYNC);
>
>        shm_queue_notify();
> @@ -375,20 +383,12 @@ static void local_block(struct work *wor
>        shm_queue_unlock();
>  }
>
> -static void local_block_done(struct work *work)
> -{
> -}
> -
>  static int local_dispatch(void)
>  {
>        int ret;
>        struct signalfd_siginfo siginfo;
>        struct local_event *ev;
>        enum cluster_join_result res;
> -       static struct work work = {
> -               .fn = local_block,
> -               .done = local_block_done,
> -       };
>
>        dprintf("read siginfo\n");
>        ret = read(sigfd, &siginfo, sizeof(siginfo));
> @@ -438,12 +438,10 @@ static int local_dispatch(void)
>                break;
>        case EVENT_NOTIFY:
>                if (ev->blocked) {
> -                       if (node_eq(&ev->sender, &this_node)) {
> -                               if (!ev->callbacked) {
> -                                       queue_work(local_block_wq, &work);
> -
> -                                       ev->callbacked = 1;
> -                               }
> +                       if (node_eq(&ev->sender, &this_node) &&
> +                           !ev->callbacked) {
> +                               sd_block_handler();
> +                               ev->callbacked = 1;
>                        }
>                        goto out;
>                }
> @@ -466,6 +464,8 @@ struct cluster_driver cdrv_local = {
>        .join       = local_join,
>        .leave      = local_leave,
>        .notify     = local_notify,
> +       .block      = local_block,
> +       .unblock    = local_unblock,
>        .dispatch   = local_dispatch,
>  };
>
> Index: sheepdog/sheep/cluster/zookeeper.c
> ===================================================================
> --- sheepdog.orig/sheep/cluster/zookeeper.c     2012-05-16 07:18:11.715939357 +0200
> +++ sheepdog/sheep/cluster/zookeeper.c  2012-05-16 08:40:23.707892324 +0200
> @@ -76,10 +76,8 @@ struct zk_event {
>
>        enum cluster_join_result join_result;
>
> -       void (*block_cb)(void *arg);
> -
>        int blocked; /* set non-zero when sheep must block this event */
> -       int callbacked; /* set non-zero if sheep already called block_cb() */
> +       int callbacked; /* set non-zero after sd_block_handler() was called */
>
>        size_t buf_len;
>        uint8_t buf[MAX_EVENT_BUF_SIZE];
> @@ -498,53 +496,46 @@ static void zk_member_init(zhandle_t *zh
>  /* ZooKeeper driver APIs */
>
>  static zhandle_t *zhandle;
> -
> -static struct work_queue *zk_block_wq;
> -
>  static struct zk_node this_node;
>
>  static int add_event(zhandle_t *zh, enum zk_event_type type,
>                     struct zk_node *znode, void *buf,
> -                    size_t buf_len, void (*block_cb)(void *arg))
> +                    size_t buf_len, int blocked)
>  {
> -       int nr_levents;
> -       struct zk_event ev, *lev;
> -       eventfd_t value = 1;
> +       struct zk_event ev;
>
>        ev.type = type;
>        ev.sender = *znode;
>        ev.buf_len = buf_len;
>        ev.callbacked = 0;
> -       ev.blocked = 0;
> +       ev.blocked = blocked;
>        if (buf)
>                memcpy(ev.buf, buf, buf_len);
> +       zk_queue_push(zh, &ev);
> +       return 0;
> +}
>
> -       switch (type) {
> -       case EVENT_JOIN:
> -               ev.blocked = 1;
> -               break;
> -       case EVENT_LEAVE:
> -               lev = &zk_levents[zk_levent_tail%SD_MAX_NODES];
> +static int leave_event(zhandle_t *zh, struct zk_node *znode)
> +{
> +       int nr_levents;
> +       struct zk_event *ev;
> +       const eventfd_t value = 1;
>
> -               memcpy(lev, &ev, sizeof(ev));
> +       ev = &zk_levents[zk_levent_tail % SD_MAX_NODES];
> +       ev->type = EVENT_LEAVE;
> +       ev->sender = *znode;
> +       ev->buf_len = 0;
> +       ev->callbacked = 0;
> +       ev->blocked = 0;
>
> -               nr_levents = uatomic_add_return(&nr_zk_levents, 1);
> -               dprintf("nr_zk_levents:%d, tail:%u\n", nr_levents, zk_levent_tail);
> +       nr_levents = uatomic_add_return(&nr_zk_levents, 1);
> +       dprintf("nr_zk_levents:%d, tail:%u\n", nr_levents, zk_levent_tail);
>
> -               zk_levent_tail++;
> +       zk_levent_tail++;
>
> -               /* manual notify */
> -               dprintf("write event to efd:%d\n", efd);
> -               eventfd_write(efd, value);
> -               goto out;
> -       case EVENT_NOTIFY:
> -               ev.blocked = !!block_cb;
> -               ev.block_cb = block_cb;
> -               break;
> -       }
> -
> -       zk_queue_push(zh, &ev);
> -out:
> +       /* manual notify */
> +       dprintf("write event to efd:%d\n", efd);
> +       eventfd_write(efd, value);
>        return 0;
>  }
>
> @@ -586,7 +577,7 @@ static void watcher(zhandle_t *zh, int t
>                str_to_node(p, &znode.node);
>                dprintf("zk_nodes leave:%s\n", node_to_str(&znode.node));
>
> -               add_event(zh, EVENT_LEAVE, &znode, NULL, 0, NULL);
> +               leave_event(zh, &znode);
>                return;
>        }
>
> @@ -674,12 +665,6 @@ static int zk_init(const char *option, u
>                return -1;
>        }
>
> -       zk_block_wq = init_work_queue(1);
> -       if (!zk_block_wq) {
> -               eprintf("failed to create zookeeper workqueue: %m\n");
> -               return -1;
> -       }
> -
>        return efd;
>  }
>
> @@ -704,7 +689,7 @@ static int zk_join(struct sd_node *mysel
>
>        dprintf("clientid:%ld\n", cid->client_id);
>
> -       rc = add_event(zhandle, EVENT_JOIN, &this_node, opaque, opaque_len, NULL);
> +       rc = add_event(zhandle, EVENT_JOIN, &this_node, opaque, opaque_len, 1);
>
>        return rc;
>  }
> @@ -717,12 +702,17 @@ static int zk_leave(void)
>        return zk_delete(zhandle, path, -1);
>  }
>
> -static int zk_notify(void *msg, size_t msg_len, void (*block_cb)(void *arg))
> +static int zk_notify(void *msg, size_t msg_len)
> +{
> +       return add_event(zhandle, EVENT_NOTIFY, &this_node, msg, msg_len, 0);
> +}
> +
> +static void zk_block(void)
>  {
> -       return add_event(zhandle, EVENT_NOTIFY, &this_node, msg, msg_len, block_cb);
> +       add_event(zhandle, EVENT_NOTIFY, &this_node, NULL, 0, 1);
>  }
>
> -static void zk_block(struct work *work)
> +static void zk_unblock(void *msg, size_t msg_len)
>  {
>        int rc;
>        struct zk_event ev;
> @@ -731,8 +721,10 @@ static void zk_block(struct work *work)
>        rc = zk_queue_pop(zhandle, &ev);
>        assert(rc == 0);
>
> -       ev.block_cb(ev.buf);
>        ev.blocked = 0;
> +       ev.buf_len = msg_len;
> +       if (msg)
> +               memcpy(ev.buf, msg, msg_len);
>
>        zk_queue_push_back(zhandle, &ev);
>
> @@ -743,10 +735,6 @@ static void zk_block(struct work *work)
>        eventfd_write(efd, value);
>  }
>
> -static void zk_block_done(struct work *work)
> -{
> -}
> -
>  static int zk_dispatch(void)
>  {
>        int ret, rc, retry;
> @@ -755,10 +743,6 @@ static int zk_dispatch(void)
>        struct zk_event ev;
>        struct zk_node *n;
>        enum cluster_join_result res;
> -       static struct work work = {
> -               .fn = zk_block,
> -               .done = zk_block_done,
> -       };
>
>        dprintf("read event\n");
>        ret = eventfd_read(efd, &value);
> @@ -866,13 +850,10 @@ static int zk_dispatch(void)
>                if (ev.blocked) {
>                        if (node_eq(&ev.sender.node, &this_node.node)
>                                        && !ev.callbacked) {
> -                               ev.callbacked = 1;
> -
>                                uatomic_inc(&zk_notify_blocked);

Now, zk_dispatch()/zk_queue_pop()/zk_unblock() will read/write
zk_notify_blocked variable, these functions are all in main thread, we
can update this value directly, but use uatomic_xx is also harmless.


> -
> +                               ev.callbacked = 1;
>                                zk_queue_push_back(zhandle, &ev);
> -
> -                               queue_work(zk_block_wq, &work);
> +                               sd_block_handler();
>                        } else
>                                zk_queue_push_back(zhandle, NULL);
>
> @@ -893,6 +874,8 @@ struct cluster_driver cdrv_zookeeper = {
>        .join       = zk_join,
>        .leave      = zk_leave,
>        .notify     = zk_notify,
> +       .block      = zk_block,
> +       .unblock    = zk_unblock,
>        .dispatch   = zk_dispatch,
>  };
>
> Index: sheepdog/sheep/cluster/accord.c
> ===================================================================
> --- sheepdog.orig/sheep/cluster/accord.c        2012-05-15 09:15:08.859954176 +0200
> +++ sheepdog/sheep/cluster/accord.c     2012-05-16 08:40:37.235892186 +0200
> @@ -46,10 +46,8 @@ struct acrd_event {
>
>        enum cluster_join_result join_result;
>
> -       void (*block_cb)(void *arg);
> -
>        int blocked; /* set non-zero when sheep must block this event */
> -       int callbacked; /* set non-zero if sheep already called block_cb() */
> +       int callbacked; /* set non-zero after sd_block_handler() was called */
>  };
>
>  static struct sd_node this_node;
> @@ -246,7 +244,7 @@ again:
>
>  static int add_event(struct acrd_handle *ah, enum acrd_event_type type,
>                     struct sd_node *node, void *buf,
> -                    size_t buf_len, void (*block_cb)(void *arg))
> +                    size_t buf_len, int blocked)
>  {
>        int idx;
>        struct sd_node *n;
> @@ -257,6 +255,7 @@ static int add_event(struct acrd_handle
>
>        ev.type = type;
>        ev.sender = *node;
> +       ev.blocked = blocked;
>        ev.buf_len = buf_len;
>        if (buf)
>                memcpy(ev.buf, buf, buf_len);
> @@ -265,7 +264,6 @@ static int add_event(struct acrd_handle
>
>        switch (type) {
>        case EVENT_JOIN:
> -               ev.blocked = 1;
>                ev.nodes[ev.nr_nodes] = *node;
>                ev.ids[ev.nr_nodes] = this_id; /* must be local node */
>                ev.nr_nodes++;
> @@ -282,8 +280,6 @@ static int add_event(struct acrd_handle
>                memmove(i, i + 1, sizeof(*i) * (ev.nr_nodes - idx));
>                break;
>        case EVENT_NOTIFY:
> -               ev.blocked = !!block_cb;
> -               ev.block_cb = block_cb;
>                break;
>        }
>
> @@ -412,8 +408,7 @@ static void __acrd_leave(struct work *wo
>
>        for (i = 0; i < nr_nodes; i++) {
>                if (ids[i] == info->left_nodeid) {
> -                       add_event(ah, EVENT_LEAVE, nodes + i, NULL, 0,
> -                                 NULL);
> +                       add_event(ah, EVENT_LEAVE, nodes + i, NULL, 0, 0);
>                        break;
>                }
>        }
> @@ -517,20 +512,25 @@ static int accord_join(struct sd_node *m
>  {
>        this_node = *myself;
>
> -       return add_event(ahandle, EVENT_JOIN, &this_node, opaque, opaque_len, NULL);
> +       return add_event(ahandle, EVENT_JOIN, &this_node, opaque, opaque_len, 1);
>  }
>
>  static int accord_leave(void)
>  {
> -       return add_event(ahandle, EVENT_LEAVE, &this_node, NULL, 0, NULL);
> +       return add_event(ahandle, EVENT_LEAVE, &this_node, NULL, 0, 0);
>  }
>
> -static int accord_notify(void *msg, size_t msg_len, void (*block_cb)(void *arg))
> +static int accord_notify(void *msg, size_t msg_len)
>  {
> -       return add_event(ahandle, EVENT_NOTIFY, &this_node, msg, msg_len, block_cb);
> +       return add_event(ahandle, EVENT_NOTIFY, &this_node, msg, msg_len, 0);
>  }
>
> -static void acrd_block(struct work *work)
> +static void accord_block(void)
> +{
> +       return add_event(ahandle, EVENT_NOTIFY, &this_node, NULL, 0, 1);
> +}
> +
> +static void acrd_unblock(void *msg, size_t msg_len)
>  {
>        struct acrd_event ev;
>
> @@ -538,28 +538,22 @@ static void acrd_block(struct work *work
>
>        acrd_queue_pop(ahandle, &ev);
>
> -       ev.block_cb(ev.buf);
>        ev.blocked = 0;
> +       ev.buf_len = msg_len;
> +       if (msg)
> +               memcpy(ev.buf, msg, msg_len);
>
>        acrd_queue_push_back(ahandle, &ev);
>
>        pthread_mutex_unlock(&queue_lock);
>  }
>
> -static void acrd_block_done(struct work *work)
> -{
> -}
> -
>  static int accord_dispatch(void)
>  {
>        int ret;
>        eventfd_t value;
>        struct acrd_event ev;
>        enum cluster_join_result res;
> -       static struct work work = {
> -               .fn = acrd_block,
> -               .done = acrd_block_done,
> -       };
>
>        dprintf("read event\n");
>        ret = eventfd_read(efd, &value);
> @@ -612,11 +606,10 @@ static int accord_dispatch(void)
>        case EVENT_NOTIFY:
>                if (ev.blocked) {
>                        if (node_cmp(&ev.sender, &this_node) == 0 && !ev.callbacked) {
> -                               queue_work(acrd_wq, &work);
> -
>                                ev.callbacked = 1;
>
>                                acrd_queue_push_back(ahandle, &ev);
> +                               sd_block_handler();
>                        } else
>                                acrd_queue_push_back(ahandle, NULL);
>
> @@ -639,6 +632,8 @@ struct cluster_driver cdrv_accord = {
>        .join       = accord_join,
>        .leave      = accord_leave,
>        .notify     = accord_notify,
> +       .block      = accord_block,
> +       .unblock    = accord_unblock,
>        .dispatch   = accord_dispatch,
>  };
>
> --
> sheepdog mailing list
> sheepdog at lists.wpkg.org
> http://lists.wpkg.org/mailman/listinfo/sheepdog



-- 
Yunkai Zhang
Work at Taobao



More information about the sheepdog mailing list