[sheepdog] [PATCH, RFC] sheep: rewrite blocked notifications
Yunkai Zhang
yunkai.me at gmail.com
Thu May 17 09:11:20 CEST 2012
On Wed, May 16, 2012 at 3:04 PM, Christoph Hellwig <hch at infradead.org> wrote:
> The prime AIM of this patch is to fix the racy access to sys->pending_list
> in do_cluster_op, but it actually cleans up the surrounding code massively
> as well.
>
> It contains three tightly related changes:
>
> - split a new ->block operation from ->notify. It is used to tell the
> cluster driver to block new events, but does not contain a message by
> itself yet.
> - the block_cb callback previously passed to ->notify is not passed to
> ->block any more, but a new sd_block_handler callback is provided
> that can be called from the cluster driver in main thread context.
> sd_block_handler takes care of grabbing the first request from
> sys->pending list in the main thread, and then scheduling a workqueue
> to handle the cluster operation
> - a new ->unblock cluster operation is added which is called from the
> ->done handler of the block workqueue to tell the cluster driver
> to unblock the event processing, as well as sending the message with
> the results from the main processing (or simplify the cluster wide
> notification if there is no work routine in the ops table)
>
> Signed-off-by: Christoph Hellwig <hch at lst.de>
>
> ---
> sheep/cluster.h | 23 ++++++---
> sheep/cluster/accord.c | 45 ++++++++----------
> sheep/cluster/corosync.c | 78 ++++++-------------------------
> sheep/cluster/local.c | 54 ++++++++++-----------
> sheep/cluster/zookeeper.c | 99 ++++++++++++++++------------------------
> sheep/group.c | 113 ++++++++++++++++++++++++++++------------------
> sheep/sdnet.c | 2
> sheep/sheep.c | 3 -
> sheep/sheep_priv.h | 1
> 9 files changed, 195 insertions(+), 223 deletions(-)
>
> Index: sheepdog/sheep/cluster.h
> ===================================================================
> --- sheepdog.orig/sheep/cluster.h 2012-05-16 08:20:02.419903970 +0200
> +++ sheepdog/sheep/cluster.h 2012-05-16 08:43:40.103890448 +0200
> @@ -76,15 +76,23 @@ struct cluster_driver {
> * This function sends 'msg' to all the nodes. The notified messages
> * can be read through sd_notify_handler().
> *
> - * If 'block_cb' is specified, block_cb() is called before 'msg' is
> - * notified to all the nodes. All the cluster events including this
> - * notification are blocked until block_cb() returns or this blocking
> - * node leaves the cluster. The sheep daemon can sleep in block_cb(),
> - * so this callback must be not called from the dispatch (main) thread.
> - *
> * Returns zero on success, -1 on error
> */
> - int (*notify)(void *msg, size_t msg_len, void (*block_cb)(void *arg));
> + int (*notify)(void *msg, size_t msg_len);
> +
> + /*
> + * Send a message to all nodes to block further events.
> + *
> + * Once the cluster driver has ensured that events are blocked on all
> + * nodes it needs to call sd_block_handler() on the node where ->block
> + * was called.
> + */
> + void (*block)(void);
> +
> + /*
> + * Unblock events on all nodes, and send a a message to all nodes.
> + */
> + void (*unblock)(void *msg, size_t msg_len);
>
> /*
> * Dispatch handlers
> @@ -189,6 +197,7 @@ void sd_join_handler(struct sd_node *joi
> void sd_leave_handler(struct sd_node *left, struct sd_node *members,
> size_t nr_members);
> void sd_notify_handler(struct sd_node *sender, void *msg, size_t msg_len);
> +void sd_block_handler(void);
> enum cluster_join_result sd_check_join_cb(struct sd_node *joining,
> void *opaque);
>
> Index: sheepdog/sheep/cluster/corosync.c
> ===================================================================
> --- sheepdog.orig/sheep/cluster/corosync.c 2012-05-16 08:20:02.419903970 +0200
> +++ sheepdog/sheep/cluster/corosync.c 2012-05-16 08:21:23.147903200 +0200
> @@ -29,10 +29,7 @@ static struct cpg_name cpg_group = { 8,
> static corosync_cfg_handle_t cfg_handle;
> static struct cpg_node this_node;
>
> -static struct work_queue *corosync_block_wq;
> -
> static LIST_HEAD(corosync_event_list);
> -static LIST_HEAD(corosync_block_list);
>
> static struct cpg_node cpg_nodes[SD_MAX_NODES];
> static size_t nr_cpg_nodes;
> @@ -205,24 +202,6 @@ retry:
> return 0;
> }
>
> -static void corosync_block(struct work *work)
> -{
> - struct corosync_block_msg *bm = container_of(work, typeof(*bm), work);
> -
> - bm->cb(bm->msg);
> -}
> -
> -static void corosync_block_done(struct work *work)
> -{
> - struct corosync_block_msg *bm = container_of(work, typeof(*bm), work);
> -
> - send_message(COROSYNC_MSG_TYPE_UNBLOCK, 0, &this_node, NULL, 0,
> - bm->msg, bm->msg_len);
> -
> - free(bm->msg);
> - free(bm);
> -}
> -
> static struct corosync_event *find_block_event(enum corosync_event_type type,
> struct cpg_node *sender)
> {
> @@ -276,7 +255,6 @@ static void build_node_list(struct cpg_n
> */
> static int __corosync_dispatch_one(struct corosync_event *cevent)
> {
> - struct corosync_block_msg *bm;
> enum cluster_join_result res;
> struct sd_node entries[SD_MAX_NODES];
> int idx;
> @@ -343,18 +321,7 @@ static int __corosync_dispatch_one(struc
> if (cevent->blocked) {
> if (cpg_node_equal(&cevent->sender, &this_node) &&
> !cevent->callbacked) {
> - /* call a block callback function from a worker thread */
> - if (list_empty(&corosync_block_list))
> - panic("cannot call block callback\n");
> -
> - bm = list_first_entry(&corosync_block_list,
> - typeof(*bm), list);
> - list_del(&bm->list);
> -
> - bm->work.fn = corosync_block;
> - bm->work.done = corosync_block_done;
> - queue_work(corosync_block_wq, &bm->work);
> -
> + sd_block_handler();
> cevent->callbacked = 1;
> }
>
> @@ -653,12 +620,6 @@ static int corosync_init(const char *opt
> return -1;
> }
>
> - corosync_block_wq = init_work_queue(1);
> - if (!corosync_block_wq) {
> - eprintf("failed to create corosync workqueue: %m\n");
> - return -1;
> - }
> -
> return fd;
> }
>
> @@ -698,31 +659,22 @@ static int corosync_leave(void)
> NULL, 0);
> }
>
> -static int corosync_notify(void *msg, size_t msg_len, void (*block_cb)(void *))
> +static void corosync_block(void)
> {
> - int ret;
> - struct corosync_block_msg *bm;
> -
> - if (block_cb) {
> - bm = zalloc(sizeof(*bm));
> - if (!bm)
> - panic("failed to allocate memory\n");
> - bm->msg = zalloc(msg_len);
> - if (!bm->msg)
> - panic("failed to allocate memory\n");
> + send_message(COROSYNC_MSG_TYPE_BLOCK, 0, &this_node, NULL, 0,
> + NULL, 0);
> +}
>
> - memcpy(bm->msg, msg, msg_len);
> - bm->msg_len = msg_len;
> - bm->cb = block_cb;
> - list_add_tail(&bm->list, &corosync_block_list);
> -
> - ret = send_message(COROSYNC_MSG_TYPE_BLOCK, 0, &this_node,
> - NULL, 0, NULL, 0);
> - } else
> - ret = send_message(COROSYNC_MSG_TYPE_NOTIFY, 0, &this_node,
> - NULL, 0, msg, msg_len);
> +static void corosync_unblock(void *msg, size_t msg_len)
> +{
> + send_message(COROSYNC_MSG_TYPE_UNBLOCK, 0, &this_node, NULL, 0,
> + msg, msg_len);
> +}
>
> - return ret;
> +static int corosync_notify(void *msg, size_t msg_len)
> +{
> + return send_message(COROSYNC_MSG_TYPE_NOTIFY, 0, &this_node,
> + NULL, 0, msg, msg_len);
> }
>
> static int corosync_dispatch(void)
> @@ -743,6 +695,8 @@ struct cluster_driver cdrv_corosync = {
> .join = corosync_join,
> .leave = corosync_leave,
> .notify = corosync_notify,
> + .block = corosync_block,
> + .unblock = corosync_unblock,
> .dispatch = corosync_dispatch,
> };
>
> Index: sheepdog/sheep/group.c
> ===================================================================
> --- sheepdog.orig/sheep/group.c 2012-05-16 08:20:02.419903970 +0200
> +++ sheepdog/sheep/group.c 2012-05-16 08:21:23.151903200 +0200
> @@ -209,29 +209,70 @@ int get_nr_copies(struct vnode_info *vno
> return min(vnode_info->nr_zones, sys->nr_copies);
> }
>
> +static struct vdi_op_message *prepare_cluster_msg(struct request *req,
> + size_t *sizep)
> +{
> + struct vdi_op_message *msg;
> + size_t size;
> +
> + if (has_process_main(req->op))
> + size = sizeof(*msg) + req->rq.data_length;
> + else
> + size = sizeof(*msg);
> +
> + msg = zalloc(size);
> + if (!msg) {
> + eprintf("failed to allocate memory\n");
> + return NULL;
> + }
> +
> + memcpy(&msg->req, &req->rq, sizeof(struct sd_req));
> + memcpy(&msg->rsp, &req->rp, sizeof(struct sd_rsp));
> +
> + if (has_process_main(req->op))
> + memcpy(msg->data, req->data, req->rq.data_length);
> +
> + *sizep = size;
> + return msg;
> +}
> +
> +static void do_cluster_request(struct work *work)
> +{
> + struct request *req = container_of(work, struct request, work);
> + int ret;
> +
> + ret = do_process_work(req->op, &req->rq, &req->rp, req->data);
> + req->rp.result = ret;
> +}
> +
> +static void cluster_op_done(struct work *work)
> +{
> + struct request *req = container_of(work, struct request, work);
> + struct vdi_op_message *msg;
> + size_t size;
> +
> + msg = prepare_cluster_msg(req, &size);
> + if (!msg)
> + panic();
> +
> + sys->cdrv->unblock(msg, size);
> +}
> +
> /*
> * Perform a blocked cluster operation.
> *
> * Must run in the main thread as it access unlocked state like
> * sys->pending_list.
> */
> -static void do_cluster_op(void *arg)
> +void sd_block_handler(void)
> {
> - struct vdi_op_message *msg = arg;
> - int ret;
> - struct request *req;
> - void *data;
> -
> - req = list_first_entry(&sys->pending_list, struct request, pending_list);
> + struct request *req = list_first_entry(&sys->pending_list,
> + struct request, pending_list);
>
> - if (has_process_main(req->op))
> - data = msg->data;
> - else
> - data = req->data;
> - ret = do_process_work(req->op, (const struct sd_req *)&msg->req,
> - (struct sd_rsp *)&msg->rsp, data);
> + req->work.fn = do_cluster_request;
> + req->work.done = cluster_op_done;
>
> - msg->rsp.result = ret;
> + queue_work(sys->block_wqueue, &req->work);
> }
>
> /*
> @@ -241,40 +282,28 @@ static void do_cluster_op(void *arg)
> * Must run in the main thread as it access unlocked state like
> * sys->pending_list.
> */
> -static void do_cluster_request(struct request *req)
> +static void queue_cluster_request(struct request *req)
> {
> - struct sd_req *hdr = &req->rq;
> - struct vdi_op_message *msg;
> - size_t size;
> + eprintf("%p %x\n", req, req->rq.opcode);
>
> - eprintf("%p %x\n", req, hdr->opcode);
> + if (has_process_work(req->op)) {
> + list_add_tail(&req->pending_list, &sys->pending_list);
> + sys->cdrv->block();
> + } else {
> + struct vdi_op_message *msg;
> + size_t size;
>
> - if (has_process_main(req->op))
> - size = sizeof(*msg) + hdr->data_length;
> - else
> - size = sizeof(*msg);
> -
> - msg = zalloc(size);
> - if (!msg) {
> - eprintf("failed to allocate memory\n");
> - return;
> - }
> -
> - msg->req = *((struct sd_vdi_req *)&req->rq);
> - msg->rsp = *((struct sd_vdi_rsp *)&req->rp);
> - if (has_process_main(req->op))
> - memcpy(msg->data, req->data, hdr->data_length);
> + msg = prepare_cluster_msg(req, &size);
> + if (!msg)
> + return;
>
> - list_add_tail(&req->pending_list, &sys->pending_list);
> + list_add_tail(&req->pending_list, &sys->pending_list);
>
> - if (has_process_work(req->op))
> - sys->cdrv->notify(msg, size, do_cluster_op);
> - else {
> msg->rsp.result = SD_RES_SUCCESS;
> - sys->cdrv->notify(msg, size, NULL);
> - }
> + sys->cdrv->notify(msg, size);
>
> - free(msg);
> + free(msg);
> + }
> }
>
> static void group_handler(int listen_fd, int events, void *data)
> @@ -1075,7 +1104,7 @@ static void process_request_queue(void)
> * directly from the main thread. It's the cluster
> * drivers job to ensure we avoid blocking on I/O here.
> */
> - do_cluster_request(req);
> + queue_cluster_request(req);
> } else { /* is_local_op(req->op) */
> queue_work(sys->io_wqueue, &req->work);
> }
> Index: sheepdog/sheep/sdnet.c
> ===================================================================
> --- sheepdog.orig/sheep/sdnet.c 2012-05-16 08:20:02.419903970 +0200
> +++ sheepdog/sheep/sdnet.c 2012-05-16 08:21:23.151903200 +0200
> @@ -318,7 +318,7 @@ static void queue_request(struct request
> req->work.fn = do_local_request;
> req->work.done = local_op_done;
> } else if (is_cluster_op(req->op)) {
> - /* directly executed in the main thread */;
> + ;
> } else {
> eprintf("unknown operation %d\n", hdr->opcode);
> rsp->result = SD_RES_SYSTEM_ERROR;
> Index: sheepdog/sheep/sheep.c
> ===================================================================
> --- sheepdog.orig/sheep/sheep.c 2012-05-16 08:20:02.419903970 +0200
> +++ sheepdog/sheep/sheep.c 2012-05-16 08:21:23.151903200 +0200
> @@ -258,9 +258,10 @@ int main(int argc, char **argv)
> sys->recovery_wqueue = init_work_queue(1);
> sys->deletion_wqueue = init_work_queue(1);
> sys->flush_wqueue = init_work_queue(1);
> + sys->block_wqueue = init_work_queue(1);
> if (!sys->event_wqueue || !sys->gateway_wqueue || !sys->io_wqueue ||
> !sys->recovery_wqueue || !sys->deletion_wqueue ||
> - !sys->flush_wqueue)
> + !sys->flush_wqueue || !sys->block_wqueue)
> exit(1);
>
> ret = init_signal();
> Index: sheepdog/sheep/sheep_priv.h
> ===================================================================
> --- sheepdog.orig/sheep/sheep_priv.h 2012-05-16 08:20:02.419903970 +0200
> +++ sheepdog/sheep/sheep_priv.h 2012-05-16 08:21:23.151903200 +0200
> @@ -149,6 +149,7 @@ struct cluster_info {
> struct work_queue *deletion_wqueue;
> struct work_queue *recovery_wqueue;
> struct work_queue *flush_wqueue;
> + struct work_queue *block_wqueue;
> };
>
> struct siocb {
> Index: sheepdog/sheep/cluster/local.c
> ===================================================================
> --- sheepdog.orig/sheep/cluster/local.c 2012-05-16 08:20:02.419903970 +0200
> +++ sheepdog/sheep/cluster/local.c 2012-05-16 08:40:30.647892257 +0200
> @@ -53,10 +53,8 @@ struct local_event {
>
> enum cluster_join_result join_result;
>
> - void (*block_cb)(void *arg);
> -
> int blocked; /* set non-zero when sheep must block this event */
> - int callbacked; /* set non-zero if sheep already called block_cb() */
> + int callbacked; /* set non-zero after sd_block_handler() was called */
> };
>
>
> @@ -215,7 +213,7 @@ static void shm_queue_init(void)
>
> static void add_event(enum local_event_type type,
> struct sd_node *node, void *buf,
> - size_t buf_len, void (*block_cb)(void *arg))
> + size_t buf_len, int blocked)
> {
> int idx;
> struct sd_node *n;
> @@ -250,8 +248,7 @@ static void add_event(enum local_event_t
> memmove(p, p + 1, sizeof(*p) * (ev.nr_nodes - idx));
> break;
> case EVENT_NOTIFY:
> - ev.blocked = !!block_cb;
> - ev.block_cb = block_cb;
> + ev.blocked = blocked;
> break;
> }
>
> @@ -273,7 +270,7 @@ static void check_pids(void *arg)
>
> for (i = 0; i < nr; i++)
> if (!process_exists(pids[i]))
> - add_event(EVENT_LEAVE, nodes + i, NULL, 0, NULL);
> + add_event(EVENT_LEAVE, nodes + i, NULL, 0, 0);
>
> shm_queue_unlock();
>
> @@ -329,7 +326,7 @@ static int local_join(struct sd_node *my
>
> shm_queue_lock();
>
> - add_event(EVENT_JOIN, &this_node, opaque, opaque_len, NULL);
> + add_event(EVENT_JOIN, &this_node, opaque, opaque_len, 0);
>
> shm_queue_unlock();
>
> @@ -340,25 +337,34 @@ static int local_leave(void)
> {
> shm_queue_lock();
>
> - add_event(EVENT_LEAVE, &this_node, NULL, 0, NULL);
> + add_event(EVENT_LEAVE, &this_node, NULL, 0, 0);
>
> shm_queue_unlock();
>
> return 0;
> }
>
> -static int local_notify(void *msg, size_t msg_len, void (*block_cb)(void *arg))
> +static int local_notify(void *msg, size_t msg_len)
> {
> shm_queue_lock();
>
> - add_event(EVENT_NOTIFY, &this_node, msg, msg_len, block_cb);
> + add_event(EVENT_NOTIFY, &this_node, msg, msg_len, 0);
>
> shm_queue_unlock();
>
> return 0;
> }
>
> -static void local_block(struct work *work)
> +static void local_block(void)
> +{
> + shm_queue_lock();
> +
> + add_event(EVENT_NOTIFY, &this_node, NULL, 0, 1);
> +
> + shm_queue_unlock();
> +}
> +
> +static void local_unblock(void *msg, size_t msg_len)
> {
> struct local_event *ev;
>
> @@ -366,8 +372,10 @@ static void local_block(struct work *wor
>
> ev = shm_queue_peek();
>
> - ev->block_cb(ev->buf);
> ev->blocked = 0;
> + ev->buf_len = msg_len;
> + if (msg)
> + memcpy(ev->buf, msg, msg_len);
> msync(ev, sizeof(*ev), MS_SYNC);
>
> shm_queue_notify();
> @@ -375,20 +383,12 @@ static void local_block(struct work *wor
> shm_queue_unlock();
> }
>
> -static void local_block_done(struct work *work)
> -{
> -}
> -
> static int local_dispatch(void)
> {
> int ret;
> struct signalfd_siginfo siginfo;
> struct local_event *ev;
> enum cluster_join_result res;
> - static struct work work = {
> - .fn = local_block,
> - .done = local_block_done,
> - };
>
> dprintf("read siginfo\n");
> ret = read(sigfd, &siginfo, sizeof(siginfo));
> @@ -438,12 +438,10 @@ static int local_dispatch(void)
> break;
> case EVENT_NOTIFY:
> if (ev->blocked) {
> - if (node_eq(&ev->sender, &this_node)) {
> - if (!ev->callbacked) {
> - queue_work(local_block_wq, &work);
> -
> - ev->callbacked = 1;
> - }
> + if (node_eq(&ev->sender, &this_node) &&
> + !ev->callbacked) {
> + sd_block_handler();
> + ev->callbacked = 1;
> }
> goto out;
> }
> @@ -466,6 +464,8 @@ struct cluster_driver cdrv_local = {
> .join = local_join,
> .leave = local_leave,
> .notify = local_notify,
> + .block = local_block,
> + .unblock = local_unblock,
> .dispatch = local_dispatch,
> };
>
> Index: sheepdog/sheep/cluster/zookeeper.c
> ===================================================================
> --- sheepdog.orig/sheep/cluster/zookeeper.c 2012-05-16 07:18:11.715939357 +0200
> +++ sheepdog/sheep/cluster/zookeeper.c 2012-05-16 08:40:23.707892324 +0200
> @@ -76,10 +76,8 @@ struct zk_event {
>
> enum cluster_join_result join_result;
>
> - void (*block_cb)(void *arg);
> -
> int blocked; /* set non-zero when sheep must block this event */
> - int callbacked; /* set non-zero if sheep already called block_cb() */
> + int callbacked; /* set non-zero after sd_block_handler() was called */
>
> size_t buf_len;
> uint8_t buf[MAX_EVENT_BUF_SIZE];
> @@ -498,53 +496,46 @@ static void zk_member_init(zhandle_t *zh
> /* ZooKeeper driver APIs */
>
> static zhandle_t *zhandle;
> -
> -static struct work_queue *zk_block_wq;
> -
> static struct zk_node this_node;
>
> static int add_event(zhandle_t *zh, enum zk_event_type type,
> struct zk_node *znode, void *buf,
> - size_t buf_len, void (*block_cb)(void *arg))
> + size_t buf_len, int blocked)
> {
> - int nr_levents;
> - struct zk_event ev, *lev;
> - eventfd_t value = 1;
> + struct zk_event ev;
>
> ev.type = type;
> ev.sender = *znode;
> ev.buf_len = buf_len;
> ev.callbacked = 0;
> - ev.blocked = 0;
> + ev.blocked = blocked;
> if (buf)
> memcpy(ev.buf, buf, buf_len);
> + zk_queue_push(zh, &ev);
> + return 0;
> +}
>
> - switch (type) {
> - case EVENT_JOIN:
> - ev.blocked = 1;
> - break;
> - case EVENT_LEAVE:
> - lev = &zk_levents[zk_levent_tail%SD_MAX_NODES];
> +static int leave_event(zhandle_t *zh, struct zk_node *znode)
> +{
> + int nr_levents;
> + struct zk_event *ev;
> + const eventfd_t value = 1;
>
> - memcpy(lev, &ev, sizeof(ev));
> + ev = &zk_levents[zk_levent_tail % SD_MAX_NODES];
> + ev->type = EVENT_LEAVE;
> + ev->sender = *znode;
> + ev->buf_len = 0;
> + ev->callbacked = 0;
> + ev->blocked = 0;
>
> - nr_levents = uatomic_add_return(&nr_zk_levents, 1);
> - dprintf("nr_zk_levents:%d, tail:%u\n", nr_levents, zk_levent_tail);
> + nr_levents = uatomic_add_return(&nr_zk_levents, 1);
> + dprintf("nr_zk_levents:%d, tail:%u\n", nr_levents, zk_levent_tail);
>
> - zk_levent_tail++;
> + zk_levent_tail++;
>
> - /* manual notify */
> - dprintf("write event to efd:%d\n", efd);
> - eventfd_write(efd, value);
> - goto out;
> - case EVENT_NOTIFY:
> - ev.blocked = !!block_cb;
> - ev.block_cb = block_cb;
> - break;
> - }
> -
> - zk_queue_push(zh, &ev);
> -out:
> + /* manual notify */
> + dprintf("write event to efd:%d\n", efd);
> + eventfd_write(efd, value);
> return 0;
> }
>
> @@ -586,7 +577,7 @@ static void watcher(zhandle_t *zh, int t
> str_to_node(p, &znode.node);
> dprintf("zk_nodes leave:%s\n", node_to_str(&znode.node));
>
> - add_event(zh, EVENT_LEAVE, &znode, NULL, 0, NULL);
> + leave_event(zh, &znode);
> return;
> }
>
> @@ -674,12 +665,6 @@ static int zk_init(const char *option, u
> return -1;
> }
>
> - zk_block_wq = init_work_queue(1);
> - if (!zk_block_wq) {
> - eprintf("failed to create zookeeper workqueue: %m\n");
> - return -1;
> - }
> -
> return efd;
> }
>
> @@ -704,7 +689,7 @@ static int zk_join(struct sd_node *mysel
>
> dprintf("clientid:%ld\n", cid->client_id);
>
> - rc = add_event(zhandle, EVENT_JOIN, &this_node, opaque, opaque_len, NULL);
> + rc = add_event(zhandle, EVENT_JOIN, &this_node, opaque, opaque_len, 1);
>
> return rc;
> }
> @@ -717,12 +702,17 @@ static int zk_leave(void)
> return zk_delete(zhandle, path, -1);
> }
>
> -static int zk_notify(void *msg, size_t msg_len, void (*block_cb)(void *arg))
> +static int zk_notify(void *msg, size_t msg_len)
> +{
> + return add_event(zhandle, EVENT_NOTIFY, &this_node, msg, msg_len, 0);
> +}
> +
> +static void zk_block(void)
> {
> - return add_event(zhandle, EVENT_NOTIFY, &this_node, msg, msg_len, block_cb);
> + add_event(zhandle, EVENT_NOTIFY, &this_node, NULL, 0, 1);
> }
>
> -static void zk_block(struct work *work)
> +static void zk_unblock(void *msg, size_t msg_len)
> {
> int rc;
> struct zk_event ev;
> @@ -731,8 +721,10 @@ static void zk_block(struct work *work)
> rc = zk_queue_pop(zhandle, &ev);
> assert(rc == 0);
>
> - ev.block_cb(ev.buf);
> ev.blocked = 0;
> + ev.buf_len = msg_len;
> + if (msg)
> + memcpy(ev.buf, msg, msg_len);
>
> zk_queue_push_back(zhandle, &ev);
>
> @@ -743,10 +735,6 @@ static void zk_block(struct work *work)
> eventfd_write(efd, value);
> }
>
> -static void zk_block_done(struct work *work)
> -{
> -}
> -
> static int zk_dispatch(void)
> {
> int ret, rc, retry;
> @@ -755,10 +743,6 @@ static int zk_dispatch(void)
> struct zk_event ev;
> struct zk_node *n;
> enum cluster_join_result res;
> - static struct work work = {
> - .fn = zk_block,
> - .done = zk_block_done,
> - };
>
> dprintf("read event\n");
> ret = eventfd_read(efd, &value);
> @@ -866,13 +850,10 @@ static int zk_dispatch(void)
> if (ev.blocked) {
> if (node_eq(&ev.sender.node, &this_node.node)
> && !ev.callbacked) {
> - ev.callbacked = 1;
> -
> uatomic_inc(&zk_notify_blocked);
Now, zk_dispatch()/zk_queue_pop()/zk_unblock() will read/write
zk_notify_blocked variable, these functions are all in main thread, we
can update this value directly, but use uatomic_xx is also harmless.
> -
> + ev.callbacked = 1;
> zk_queue_push_back(zhandle, &ev);
> -
> - queue_work(zk_block_wq, &work);
> + sd_block_handler();
> } else
> zk_queue_push_back(zhandle, NULL);
>
> @@ -893,6 +874,8 @@ struct cluster_driver cdrv_zookeeper = {
> .join = zk_join,
> .leave = zk_leave,
> .notify = zk_notify,
> + .block = zk_block,
> + .unblock = zk_unblock,
> .dispatch = zk_dispatch,
> };
>
> Index: sheepdog/sheep/cluster/accord.c
> ===================================================================
> --- sheepdog.orig/sheep/cluster/accord.c 2012-05-15 09:15:08.859954176 +0200
> +++ sheepdog/sheep/cluster/accord.c 2012-05-16 08:40:37.235892186 +0200
> @@ -46,10 +46,8 @@ struct acrd_event {
>
> enum cluster_join_result join_result;
>
> - void (*block_cb)(void *arg);
> -
> int blocked; /* set non-zero when sheep must block this event */
> - int callbacked; /* set non-zero if sheep already called block_cb() */
> + int callbacked; /* set non-zero after sd_block_handler() was called */
> };
>
> static struct sd_node this_node;
> @@ -246,7 +244,7 @@ again:
>
> static int add_event(struct acrd_handle *ah, enum acrd_event_type type,
> struct sd_node *node, void *buf,
> - size_t buf_len, void (*block_cb)(void *arg))
> + size_t buf_len, int blocked)
> {
> int idx;
> struct sd_node *n;
> @@ -257,6 +255,7 @@ static int add_event(struct acrd_handle
>
> ev.type = type;
> ev.sender = *node;
> + ev.blocked = blocked;
> ev.buf_len = buf_len;
> if (buf)
> memcpy(ev.buf, buf, buf_len);
> @@ -265,7 +264,6 @@ static int add_event(struct acrd_handle
>
> switch (type) {
> case EVENT_JOIN:
> - ev.blocked = 1;
> ev.nodes[ev.nr_nodes] = *node;
> ev.ids[ev.nr_nodes] = this_id; /* must be local node */
> ev.nr_nodes++;
> @@ -282,8 +280,6 @@ static int add_event(struct acrd_handle
> memmove(i, i + 1, sizeof(*i) * (ev.nr_nodes - idx));
> break;
> case EVENT_NOTIFY:
> - ev.blocked = !!block_cb;
> - ev.block_cb = block_cb;
> break;
> }
>
> @@ -412,8 +408,7 @@ static void __acrd_leave(struct work *wo
>
> for (i = 0; i < nr_nodes; i++) {
> if (ids[i] == info->left_nodeid) {
> - add_event(ah, EVENT_LEAVE, nodes + i, NULL, 0,
> - NULL);
> + add_event(ah, EVENT_LEAVE, nodes + i, NULL, 0, 0);
> break;
> }
> }
> @@ -517,20 +512,25 @@ static int accord_join(struct sd_node *m
> {
> this_node = *myself;
>
> - return add_event(ahandle, EVENT_JOIN, &this_node, opaque, opaque_len, NULL);
> + return add_event(ahandle, EVENT_JOIN, &this_node, opaque, opaque_len, 1);
> }
>
> static int accord_leave(void)
> {
> - return add_event(ahandle, EVENT_LEAVE, &this_node, NULL, 0, NULL);
> + return add_event(ahandle, EVENT_LEAVE, &this_node, NULL, 0, 0);
> }
>
> -static int accord_notify(void *msg, size_t msg_len, void (*block_cb)(void *arg))
> +static int accord_notify(void *msg, size_t msg_len)
> {
> - return add_event(ahandle, EVENT_NOTIFY, &this_node, msg, msg_len, block_cb);
> + return add_event(ahandle, EVENT_NOTIFY, &this_node, msg, msg_len, 0);
> }
>
> -static void acrd_block(struct work *work)
> +static void accord_block(void)
> +{
> + return add_event(ahandle, EVENT_NOTIFY, &this_node, NULL, 0, 1);
> +}
> +
> +static void acrd_unblock(void *msg, size_t msg_len)
> {
> struct acrd_event ev;
>
> @@ -538,28 +538,22 @@ static void acrd_block(struct work *work
>
> acrd_queue_pop(ahandle, &ev);
>
> - ev.block_cb(ev.buf);
> ev.blocked = 0;
> + ev.buf_len = msg_len;
> + if (msg)
> + memcpy(ev.buf, msg, msg_len);
>
> acrd_queue_push_back(ahandle, &ev);
>
> pthread_mutex_unlock(&queue_lock);
> }
>
> -static void acrd_block_done(struct work *work)
> -{
> -}
> -
> static int accord_dispatch(void)
> {
> int ret;
> eventfd_t value;
> struct acrd_event ev;
> enum cluster_join_result res;
> - static struct work work = {
> - .fn = acrd_block,
> - .done = acrd_block_done,
> - };
>
> dprintf("read event\n");
> ret = eventfd_read(efd, &value);
> @@ -612,11 +606,10 @@ static int accord_dispatch(void)
> case EVENT_NOTIFY:
> if (ev.blocked) {
> if (node_cmp(&ev.sender, &this_node) == 0 && !ev.callbacked) {
> - queue_work(acrd_wq, &work);
> -
> ev.callbacked = 1;
>
> acrd_queue_push_back(ahandle, &ev);
> + sd_block_handler();
> } else
> acrd_queue_push_back(ahandle, NULL);
>
> @@ -639,6 +632,8 @@ struct cluster_driver cdrv_accord = {
> .join = accord_join,
> .leave = accord_leave,
> .notify = accord_notify,
> + .block = accord_block,
> + .unblock = accord_unblock,
> .dispatch = accord_dispatch,
> };
>
> --
> sheepdog mailing list
> sheepdog at lists.wpkg.org
> http://lists.wpkg.org/mailman/listinfo/sheepdog
--
Yunkai Zhang
Work at Taobao
More information about the sheepdog
mailing list