On Wed, May 16, 2012 at 3:04 PM, Christoph Hellwig <hch at infradead.org> wrote: > The prime AIM of this patch is to fix the racy access to sys->pending_list > in do_cluster_op, but it actually cleans up the surrounding code massively > as well. > > It contains three tightly related changes: > > - split a new ->block operation from ->notify. It is used to tell the > cluster driver to block new events, but does not contain a message by > itself yet. > - the block_cb callback previously passed to ->notify is not passed to > ->block any more, but a new sd_block_handler callback is provided > that can be called from the cluster driver in main thread context. > sd_block_handler takes care of grabbing the first request from > sys->pending list in the main thread, and then scheduling a workqueue > to handle the cluster operation > - a new ->unblock cluster operation is added which is called from the > ->done handler of the block workqueue to tell the cluster driver > to unblock the event processing, as well as sending the message with > the results from the main processing (or simplify the cluster wide > notification if there is no work routine in the ops table) > > Signed-off-by: Christoph Hellwig <hch at lst.de> > > --- > sheep/cluster.h | 23 ++++++--- > sheep/cluster/accord.c | 45 ++++++++---------- > sheep/cluster/corosync.c | 78 ++++++------------------------- > sheep/cluster/local.c | 54 ++++++++++----------- > sheep/cluster/zookeeper.c | 99 ++++++++++++++++------------------------ > sheep/group.c | 113 ++++++++++++++++++++++++++++------------------ > sheep/sdnet.c | 2 > sheep/sheep.c | 3 - > sheep/sheep_priv.h | 1 > 9 files changed, 195 insertions(+), 223 deletions(-) > > Index: sheepdog/sheep/cluster.h > =================================================================== > --- sheepdog.orig/sheep/cluster.h 2012-05-16 08:20:02.419903970 +0200 > +++ sheepdog/sheep/cluster.h 2012-05-16 08:43:40.103890448 +0200 > @@ -76,15 +76,23 @@ struct cluster_driver { > * This function sends 'msg' to all the nodes. The notified messages > * can be read through sd_notify_handler(). > * > - * If 'block_cb' is specified, block_cb() is called before 'msg' is > - * notified to all the nodes. All the cluster events including this > - * notification are blocked until block_cb() returns or this blocking > - * node leaves the cluster. The sheep daemon can sleep in block_cb(), > - * so this callback must be not called from the dispatch (main) thread. > - * > * Returns zero on success, -1 on error > */ > - int (*notify)(void *msg, size_t msg_len, void (*block_cb)(void *arg)); > + int (*notify)(void *msg, size_t msg_len); > + > + /* > + * Send a message to all nodes to block further events. > + * > + * Once the cluster driver has ensured that events are blocked on all > + * nodes it needs to call sd_block_handler() on the node where ->block > + * was called. > + */ > + void (*block)(void); > + > + /* > + * Unblock events on all nodes, and send a a message to all nodes. > + */ > + void (*unblock)(void *msg, size_t msg_len); > > /* > * Dispatch handlers > @@ -189,6 +197,7 @@ void sd_join_handler(struct sd_node *joi > void sd_leave_handler(struct sd_node *left, struct sd_node *members, > size_t nr_members); > void sd_notify_handler(struct sd_node *sender, void *msg, size_t msg_len); > +void sd_block_handler(void); > enum cluster_join_result sd_check_join_cb(struct sd_node *joining, > void *opaque); > > Index: sheepdog/sheep/cluster/corosync.c > =================================================================== > --- sheepdog.orig/sheep/cluster/corosync.c 2012-05-16 08:20:02.419903970 +0200 > +++ sheepdog/sheep/cluster/corosync.c 2012-05-16 08:21:23.147903200 +0200 > @@ -29,10 +29,7 @@ static struct cpg_name cpg_group = { 8, > static corosync_cfg_handle_t cfg_handle; > static struct cpg_node this_node; > > -static struct work_queue *corosync_block_wq; > - > static LIST_HEAD(corosync_event_list); > -static LIST_HEAD(corosync_block_list); > > static struct cpg_node cpg_nodes[SD_MAX_NODES]; > static size_t nr_cpg_nodes; > @@ -205,24 +202,6 @@ retry: > return 0; > } > > -static void corosync_block(struct work *work) > -{ > - struct corosync_block_msg *bm = container_of(work, typeof(*bm), work); > - > - bm->cb(bm->msg); > -} > - > -static void corosync_block_done(struct work *work) > -{ > - struct corosync_block_msg *bm = container_of(work, typeof(*bm), work); > - > - send_message(COROSYNC_MSG_TYPE_UNBLOCK, 0, &this_node, NULL, 0, > - bm->msg, bm->msg_len); > - > - free(bm->msg); > - free(bm); > -} > - > static struct corosync_event *find_block_event(enum corosync_event_type type, > struct cpg_node *sender) > { > @@ -276,7 +255,6 @@ static void build_node_list(struct cpg_n > */ > static int __corosync_dispatch_one(struct corosync_event *cevent) > { > - struct corosync_block_msg *bm; > enum cluster_join_result res; > struct sd_node entries[SD_MAX_NODES]; > int idx; > @@ -343,18 +321,7 @@ static int __corosync_dispatch_one(struc > if (cevent->blocked) { > if (cpg_node_equal(&cevent->sender, &this_node) && > !cevent->callbacked) { > - /* call a block callback function from a worker thread */ > - if (list_empty(&corosync_block_list)) > - panic("cannot call block callback\n"); > - > - bm = list_first_entry(&corosync_block_list, > - typeof(*bm), list); > - list_del(&bm->list); > - > - bm->work.fn = corosync_block; > - bm->work.done = corosync_block_done; > - queue_work(corosync_block_wq, &bm->work); > - > + sd_block_handler(); > cevent->callbacked = 1; > } > > @@ -653,12 +620,6 @@ static int corosync_init(const char *opt > return -1; > } > > - corosync_block_wq = init_work_queue(1); > - if (!corosync_block_wq) { > - eprintf("failed to create corosync workqueue: %m\n"); > - return -1; > - } > - > return fd; > } > > @@ -698,31 +659,22 @@ static int corosync_leave(void) > NULL, 0); > } > > -static int corosync_notify(void *msg, size_t msg_len, void (*block_cb)(void *)) > +static void corosync_block(void) > { > - int ret; > - struct corosync_block_msg *bm; > - > - if (block_cb) { > - bm = zalloc(sizeof(*bm)); > - if (!bm) > - panic("failed to allocate memory\n"); > - bm->msg = zalloc(msg_len); > - if (!bm->msg) > - panic("failed to allocate memory\n"); > + send_message(COROSYNC_MSG_TYPE_BLOCK, 0, &this_node, NULL, 0, > + NULL, 0); > +} > > - memcpy(bm->msg, msg, msg_len); > - bm->msg_len = msg_len; > - bm->cb = block_cb; > - list_add_tail(&bm->list, &corosync_block_list); > - > - ret = send_message(COROSYNC_MSG_TYPE_BLOCK, 0, &this_node, > - NULL, 0, NULL, 0); > - } else > - ret = send_message(COROSYNC_MSG_TYPE_NOTIFY, 0, &this_node, > - NULL, 0, msg, msg_len); > +static void corosync_unblock(void *msg, size_t msg_len) > +{ > + send_message(COROSYNC_MSG_TYPE_UNBLOCK, 0, &this_node, NULL, 0, > + msg, msg_len); > +} > > - return ret; > +static int corosync_notify(void *msg, size_t msg_len) > +{ > + return send_message(COROSYNC_MSG_TYPE_NOTIFY, 0, &this_node, > + NULL, 0, msg, msg_len); > } > > static int corosync_dispatch(void) > @@ -743,6 +695,8 @@ struct cluster_driver cdrv_corosync = { > .join = corosync_join, > .leave = corosync_leave, > .notify = corosync_notify, > + .block = corosync_block, > + .unblock = corosync_unblock, > .dispatch = corosync_dispatch, > }; > > Index: sheepdog/sheep/group.c > =================================================================== > --- sheepdog.orig/sheep/group.c 2012-05-16 08:20:02.419903970 +0200 > +++ sheepdog/sheep/group.c 2012-05-16 08:21:23.151903200 +0200 > @@ -209,29 +209,70 @@ int get_nr_copies(struct vnode_info *vno > return min(vnode_info->nr_zones, sys->nr_copies); > } > > +static struct vdi_op_message *prepare_cluster_msg(struct request *req, > + size_t *sizep) > +{ > + struct vdi_op_message *msg; > + size_t size; > + > + if (has_process_main(req->op)) > + size = sizeof(*msg) + req->rq.data_length; > + else > + size = sizeof(*msg); > + > + msg = zalloc(size); > + if (!msg) { > + eprintf("failed to allocate memory\n"); > + return NULL; > + } > + > + memcpy(&msg->req, &req->rq, sizeof(struct sd_req)); > + memcpy(&msg->rsp, &req->rp, sizeof(struct sd_rsp)); > + > + if (has_process_main(req->op)) > + memcpy(msg->data, req->data, req->rq.data_length); > + > + *sizep = size; > + return msg; > +} > + > +static void do_cluster_request(struct work *work) > +{ > + struct request *req = container_of(work, struct request, work); > + int ret; > + > + ret = do_process_work(req->op, &req->rq, &req->rp, req->data); > + req->rp.result = ret; > +} > + > +static void cluster_op_done(struct work *work) > +{ > + struct request *req = container_of(work, struct request, work); > + struct vdi_op_message *msg; > + size_t size; > + > + msg = prepare_cluster_msg(req, &size); > + if (!msg) > + panic(); > + > + sys->cdrv->unblock(msg, size); > +} > + > /* > * Perform a blocked cluster operation. > * > * Must run in the main thread as it access unlocked state like > * sys->pending_list. > */ > -static void do_cluster_op(void *arg) > +void sd_block_handler(void) > { > - struct vdi_op_message *msg = arg; > - int ret; > - struct request *req; > - void *data; > - > - req = list_first_entry(&sys->pending_list, struct request, pending_list); > + struct request *req = list_first_entry(&sys->pending_list, > + struct request, pending_list); > > - if (has_process_main(req->op)) > - data = msg->data; > - else > - data = req->data; > - ret = do_process_work(req->op, (const struct sd_req *)&msg->req, > - (struct sd_rsp *)&msg->rsp, data); > + req->work.fn = do_cluster_request; > + req->work.done = cluster_op_done; > > - msg->rsp.result = ret; > + queue_work(sys->block_wqueue, &req->work); > } > > /* > @@ -241,40 +282,28 @@ static void do_cluster_op(void *arg) > * Must run in the main thread as it access unlocked state like > * sys->pending_list. > */ > -static void do_cluster_request(struct request *req) > +static void queue_cluster_request(struct request *req) > { > - struct sd_req *hdr = &req->rq; > - struct vdi_op_message *msg; > - size_t size; > + eprintf("%p %x\n", req, req->rq.opcode); > > - eprintf("%p %x\n", req, hdr->opcode); > + if (has_process_work(req->op)) { > + list_add_tail(&req->pending_list, &sys->pending_list); > + sys->cdrv->block(); > + } else { > + struct vdi_op_message *msg; > + size_t size; > > - if (has_process_main(req->op)) > - size = sizeof(*msg) + hdr->data_length; > - else > - size = sizeof(*msg); > - > - msg = zalloc(size); > - if (!msg) { > - eprintf("failed to allocate memory\n"); > - return; > - } > - > - msg->req = *((struct sd_vdi_req *)&req->rq); > - msg->rsp = *((struct sd_vdi_rsp *)&req->rp); > - if (has_process_main(req->op)) > - memcpy(msg->data, req->data, hdr->data_length); > + msg = prepare_cluster_msg(req, &size); > + if (!msg) > + return; > > - list_add_tail(&req->pending_list, &sys->pending_list); > + list_add_tail(&req->pending_list, &sys->pending_list); > > - if (has_process_work(req->op)) > - sys->cdrv->notify(msg, size, do_cluster_op); > - else { > msg->rsp.result = SD_RES_SUCCESS; > - sys->cdrv->notify(msg, size, NULL); > - } > + sys->cdrv->notify(msg, size); > > - free(msg); > + free(msg); > + } > } > > static void group_handler(int listen_fd, int events, void *data) > @@ -1075,7 +1104,7 @@ static void process_request_queue(void) > * directly from the main thread. It's the cluster > * drivers job to ensure we avoid blocking on I/O here. > */ > - do_cluster_request(req); > + queue_cluster_request(req); > } else { /* is_local_op(req->op) */ > queue_work(sys->io_wqueue, &req->work); > } > Index: sheepdog/sheep/sdnet.c > =================================================================== > --- sheepdog.orig/sheep/sdnet.c 2012-05-16 08:20:02.419903970 +0200 > +++ sheepdog/sheep/sdnet.c 2012-05-16 08:21:23.151903200 +0200 > @@ -318,7 +318,7 @@ static void queue_request(struct request > req->work.fn = do_local_request; > req->work.done = local_op_done; > } else if (is_cluster_op(req->op)) { > - /* directly executed in the main thread */; > + ; > } else { > eprintf("unknown operation %d\n", hdr->opcode); > rsp->result = SD_RES_SYSTEM_ERROR; > Index: sheepdog/sheep/sheep.c > =================================================================== > --- sheepdog.orig/sheep/sheep.c 2012-05-16 08:20:02.419903970 +0200 > +++ sheepdog/sheep/sheep.c 2012-05-16 08:21:23.151903200 +0200 > @@ -258,9 +258,10 @@ int main(int argc, char **argv) > sys->recovery_wqueue = init_work_queue(1); > sys->deletion_wqueue = init_work_queue(1); > sys->flush_wqueue = init_work_queue(1); > + sys->block_wqueue = init_work_queue(1); > if (!sys->event_wqueue || !sys->gateway_wqueue || !sys->io_wqueue || > !sys->recovery_wqueue || !sys->deletion_wqueue || > - !sys->flush_wqueue) > + !sys->flush_wqueue || !sys->block_wqueue) > exit(1); > > ret = init_signal(); > Index: sheepdog/sheep/sheep_priv.h > =================================================================== > --- sheepdog.orig/sheep/sheep_priv.h 2012-05-16 08:20:02.419903970 +0200 > +++ sheepdog/sheep/sheep_priv.h 2012-05-16 08:21:23.151903200 +0200 > @@ -149,6 +149,7 @@ struct cluster_info { > struct work_queue *deletion_wqueue; > struct work_queue *recovery_wqueue; > struct work_queue *flush_wqueue; > + struct work_queue *block_wqueue; > }; > > struct siocb { > Index: sheepdog/sheep/cluster/local.c > =================================================================== > --- sheepdog.orig/sheep/cluster/local.c 2012-05-16 08:20:02.419903970 +0200 > +++ sheepdog/sheep/cluster/local.c 2012-05-16 08:40:30.647892257 +0200 > @@ -53,10 +53,8 @@ struct local_event { > > enum cluster_join_result join_result; > > - void (*block_cb)(void *arg); > - > int blocked; /* set non-zero when sheep must block this event */ > - int callbacked; /* set non-zero if sheep already called block_cb() */ > + int callbacked; /* set non-zero after sd_block_handler() was called */ > }; > > > @@ -215,7 +213,7 @@ static void shm_queue_init(void) > > static void add_event(enum local_event_type type, > struct sd_node *node, void *buf, > - size_t buf_len, void (*block_cb)(void *arg)) > + size_t buf_len, int blocked) > { > int idx; > struct sd_node *n; > @@ -250,8 +248,7 @@ static void add_event(enum local_event_t > memmove(p, p + 1, sizeof(*p) * (ev.nr_nodes - idx)); > break; > case EVENT_NOTIFY: > - ev.blocked = !!block_cb; > - ev.block_cb = block_cb; > + ev.blocked = blocked; > break; > } > > @@ -273,7 +270,7 @@ static void check_pids(void *arg) > > for (i = 0; i < nr; i++) > if (!process_exists(pids[i])) > - add_event(EVENT_LEAVE, nodes + i, NULL, 0, NULL); > + add_event(EVENT_LEAVE, nodes + i, NULL, 0, 0); > > shm_queue_unlock(); > > @@ -329,7 +326,7 @@ static int local_join(struct sd_node *my > > shm_queue_lock(); > > - add_event(EVENT_JOIN, &this_node, opaque, opaque_len, NULL); > + add_event(EVENT_JOIN, &this_node, opaque, opaque_len, 0); > > shm_queue_unlock(); > > @@ -340,25 +337,34 @@ static int local_leave(void) > { > shm_queue_lock(); > > - add_event(EVENT_LEAVE, &this_node, NULL, 0, NULL); > + add_event(EVENT_LEAVE, &this_node, NULL, 0, 0); > > shm_queue_unlock(); > > return 0; > } > > -static int local_notify(void *msg, size_t msg_len, void (*block_cb)(void *arg)) > +static int local_notify(void *msg, size_t msg_len) > { > shm_queue_lock(); > > - add_event(EVENT_NOTIFY, &this_node, msg, msg_len, block_cb); > + add_event(EVENT_NOTIFY, &this_node, msg, msg_len, 0); > > shm_queue_unlock(); > > return 0; > } > > -static void local_block(struct work *work) > +static void local_block(void) > +{ > + shm_queue_lock(); > + > + add_event(EVENT_NOTIFY, &this_node, NULL, 0, 1); > + > + shm_queue_unlock(); > +} > + > +static void local_unblock(void *msg, size_t msg_len) > { > struct local_event *ev; > > @@ -366,8 +372,10 @@ static void local_block(struct work *wor > > ev = shm_queue_peek(); > > - ev->block_cb(ev->buf); > ev->blocked = 0; > + ev->buf_len = msg_len; > + if (msg) > + memcpy(ev->buf, msg, msg_len); > msync(ev, sizeof(*ev), MS_SYNC); > > shm_queue_notify(); > @@ -375,20 +383,12 @@ static void local_block(struct work *wor > shm_queue_unlock(); > } > > -static void local_block_done(struct work *work) > -{ > -} > - > static int local_dispatch(void) > { > int ret; > struct signalfd_siginfo siginfo; > struct local_event *ev; > enum cluster_join_result res; > - static struct work work = { > - .fn = local_block, > - .done = local_block_done, > - }; > > dprintf("read siginfo\n"); > ret = read(sigfd, &siginfo, sizeof(siginfo)); > @@ -438,12 +438,10 @@ static int local_dispatch(void) > break; > case EVENT_NOTIFY: > if (ev->blocked) { > - if (node_eq(&ev->sender, &this_node)) { > - if (!ev->callbacked) { > - queue_work(local_block_wq, &work); > - > - ev->callbacked = 1; > - } > + if (node_eq(&ev->sender, &this_node) && > + !ev->callbacked) { > + sd_block_handler(); > + ev->callbacked = 1; > } > goto out; > } > @@ -466,6 +464,8 @@ struct cluster_driver cdrv_local = { > .join = local_join, > .leave = local_leave, > .notify = local_notify, > + .block = local_block, > + .unblock = local_unblock, > .dispatch = local_dispatch, > }; > > Index: sheepdog/sheep/cluster/zookeeper.c > =================================================================== > --- sheepdog.orig/sheep/cluster/zookeeper.c 2012-05-16 07:18:11.715939357 +0200 > +++ sheepdog/sheep/cluster/zookeeper.c 2012-05-16 08:40:23.707892324 +0200 > @@ -76,10 +76,8 @@ struct zk_event { > > enum cluster_join_result join_result; > > - void (*block_cb)(void *arg); > - > int blocked; /* set non-zero when sheep must block this event */ > - int callbacked; /* set non-zero if sheep already called block_cb() */ > + int callbacked; /* set non-zero after sd_block_handler() was called */ > > size_t buf_len; > uint8_t buf[MAX_EVENT_BUF_SIZE]; > @@ -498,53 +496,46 @@ static void zk_member_init(zhandle_t *zh > /* ZooKeeper driver APIs */ > > static zhandle_t *zhandle; > - > -static struct work_queue *zk_block_wq; > - > static struct zk_node this_node; > > static int add_event(zhandle_t *zh, enum zk_event_type type, > struct zk_node *znode, void *buf, > - size_t buf_len, void (*block_cb)(void *arg)) > + size_t buf_len, int blocked) > { > - int nr_levents; > - struct zk_event ev, *lev; > - eventfd_t value = 1; > + struct zk_event ev; > > ev.type = type; > ev.sender = *znode; > ev.buf_len = buf_len; > ev.callbacked = 0; > - ev.blocked = 0; > + ev.blocked = blocked; > if (buf) > memcpy(ev.buf, buf, buf_len); > + zk_queue_push(zh, &ev); > + return 0; > +} > > - switch (type) { > - case EVENT_JOIN: > - ev.blocked = 1; > - break; > - case EVENT_LEAVE: > - lev = &zk_levents[zk_levent_tail%SD_MAX_NODES]; > +static int leave_event(zhandle_t *zh, struct zk_node *znode) > +{ > + int nr_levents; > + struct zk_event *ev; > + const eventfd_t value = 1; > > - memcpy(lev, &ev, sizeof(ev)); > + ev = &zk_levents[zk_levent_tail % SD_MAX_NODES]; > + ev->type = EVENT_LEAVE; > + ev->sender = *znode; > + ev->buf_len = 0; > + ev->callbacked = 0; > + ev->blocked = 0; > > - nr_levents = uatomic_add_return(&nr_zk_levents, 1); > - dprintf("nr_zk_levents:%d, tail:%u\n", nr_levents, zk_levent_tail); > + nr_levents = uatomic_add_return(&nr_zk_levents, 1); > + dprintf("nr_zk_levents:%d, tail:%u\n", nr_levents, zk_levent_tail); > > - zk_levent_tail++; > + zk_levent_tail++; > > - /* manual notify */ > - dprintf("write event to efd:%d\n", efd); > - eventfd_write(efd, value); > - goto out; > - case EVENT_NOTIFY: > - ev.blocked = !!block_cb; > - ev.block_cb = block_cb; > - break; > - } > - > - zk_queue_push(zh, &ev); > -out: > + /* manual notify */ > + dprintf("write event to efd:%d\n", efd); > + eventfd_write(efd, value); > return 0; > } > > @@ -586,7 +577,7 @@ static void watcher(zhandle_t *zh, int t > str_to_node(p, &znode.node); > dprintf("zk_nodes leave:%s\n", node_to_str(&znode.node)); > > - add_event(zh, EVENT_LEAVE, &znode, NULL, 0, NULL); > + leave_event(zh, &znode); > return; > } > > @@ -674,12 +665,6 @@ static int zk_init(const char *option, u > return -1; > } > > - zk_block_wq = init_work_queue(1); > - if (!zk_block_wq) { > - eprintf("failed to create zookeeper workqueue: %m\n"); > - return -1; > - } > - > return efd; > } > > @@ -704,7 +689,7 @@ static int zk_join(struct sd_node *mysel > > dprintf("clientid:%ld\n", cid->client_id); > > - rc = add_event(zhandle, EVENT_JOIN, &this_node, opaque, opaque_len, NULL); > + rc = add_event(zhandle, EVENT_JOIN, &this_node, opaque, opaque_len, 1); > > return rc; > } > @@ -717,12 +702,17 @@ static int zk_leave(void) > return zk_delete(zhandle, path, -1); > } > > -static int zk_notify(void *msg, size_t msg_len, void (*block_cb)(void *arg)) > +static int zk_notify(void *msg, size_t msg_len) > +{ > + return add_event(zhandle, EVENT_NOTIFY, &this_node, msg, msg_len, 0); > +} > + > +static void zk_block(void) > { > - return add_event(zhandle, EVENT_NOTIFY, &this_node, msg, msg_len, block_cb); > + add_event(zhandle, EVENT_NOTIFY, &this_node, NULL, 0, 1); > } > > -static void zk_block(struct work *work) > +static void zk_unblock(void *msg, size_t msg_len) > { > int rc; > struct zk_event ev; > @@ -731,8 +721,10 @@ static void zk_block(struct work *work) > rc = zk_queue_pop(zhandle, &ev); > assert(rc == 0); > > - ev.block_cb(ev.buf); > ev.blocked = 0; > + ev.buf_len = msg_len; > + if (msg) > + memcpy(ev.buf, msg, msg_len); > > zk_queue_push_back(zhandle, &ev); > > @@ -743,10 +735,6 @@ static void zk_block(struct work *work) > eventfd_write(efd, value); > } > > -static void zk_block_done(struct work *work) > -{ > -} > - > static int zk_dispatch(void) > { > int ret, rc, retry; > @@ -755,10 +743,6 @@ static int zk_dispatch(void) > struct zk_event ev; > struct zk_node *n; > enum cluster_join_result res; > - static struct work work = { > - .fn = zk_block, > - .done = zk_block_done, > - }; > > dprintf("read event\n"); > ret = eventfd_read(efd, &value); > @@ -866,13 +850,10 @@ static int zk_dispatch(void) > if (ev.blocked) { > if (node_eq(&ev.sender.node, &this_node.node) > && !ev.callbacked) { > - ev.callbacked = 1; > - > uatomic_inc(&zk_notify_blocked); Now, zk_dispatch()/zk_queue_pop()/zk_unblock() will read/write zk_notify_blocked variable, these functions are all in main thread, we can update this value directly, but use uatomic_xx is also harmless. > - > + ev.callbacked = 1; > zk_queue_push_back(zhandle, &ev); > - > - queue_work(zk_block_wq, &work); > + sd_block_handler(); > } else > zk_queue_push_back(zhandle, NULL); > > @@ -893,6 +874,8 @@ struct cluster_driver cdrv_zookeeper = { > .join = zk_join, > .leave = zk_leave, > .notify = zk_notify, > + .block = zk_block, > + .unblock = zk_unblock, > .dispatch = zk_dispatch, > }; > > Index: sheepdog/sheep/cluster/accord.c > =================================================================== > --- sheepdog.orig/sheep/cluster/accord.c 2012-05-15 09:15:08.859954176 +0200 > +++ sheepdog/sheep/cluster/accord.c 2012-05-16 08:40:37.235892186 +0200 > @@ -46,10 +46,8 @@ struct acrd_event { > > enum cluster_join_result join_result; > > - void (*block_cb)(void *arg); > - > int blocked; /* set non-zero when sheep must block this event */ > - int callbacked; /* set non-zero if sheep already called block_cb() */ > + int callbacked; /* set non-zero after sd_block_handler() was called */ > }; > > static struct sd_node this_node; > @@ -246,7 +244,7 @@ again: > > static int add_event(struct acrd_handle *ah, enum acrd_event_type type, > struct sd_node *node, void *buf, > - size_t buf_len, void (*block_cb)(void *arg)) > + size_t buf_len, int blocked) > { > int idx; > struct sd_node *n; > @@ -257,6 +255,7 @@ static int add_event(struct acrd_handle > > ev.type = type; > ev.sender = *node; > + ev.blocked = blocked; > ev.buf_len = buf_len; > if (buf) > memcpy(ev.buf, buf, buf_len); > @@ -265,7 +264,6 @@ static int add_event(struct acrd_handle > > switch (type) { > case EVENT_JOIN: > - ev.blocked = 1; > ev.nodes[ev.nr_nodes] = *node; > ev.ids[ev.nr_nodes] = this_id; /* must be local node */ > ev.nr_nodes++; > @@ -282,8 +280,6 @@ static int add_event(struct acrd_handle > memmove(i, i + 1, sizeof(*i) * (ev.nr_nodes - idx)); > break; > case EVENT_NOTIFY: > - ev.blocked = !!block_cb; > - ev.block_cb = block_cb; > break; > } > > @@ -412,8 +408,7 @@ static void __acrd_leave(struct work *wo > > for (i = 0; i < nr_nodes; i++) { > if (ids[i] == info->left_nodeid) { > - add_event(ah, EVENT_LEAVE, nodes + i, NULL, 0, > - NULL); > + add_event(ah, EVENT_LEAVE, nodes + i, NULL, 0, 0); > break; > } > } > @@ -517,20 +512,25 @@ static int accord_join(struct sd_node *m > { > this_node = *myself; > > - return add_event(ahandle, EVENT_JOIN, &this_node, opaque, opaque_len, NULL); > + return add_event(ahandle, EVENT_JOIN, &this_node, opaque, opaque_len, 1); > } > > static int accord_leave(void) > { > - return add_event(ahandle, EVENT_LEAVE, &this_node, NULL, 0, NULL); > + return add_event(ahandle, EVENT_LEAVE, &this_node, NULL, 0, 0); > } > > -static int accord_notify(void *msg, size_t msg_len, void (*block_cb)(void *arg)) > +static int accord_notify(void *msg, size_t msg_len) > { > - return add_event(ahandle, EVENT_NOTIFY, &this_node, msg, msg_len, block_cb); > + return add_event(ahandle, EVENT_NOTIFY, &this_node, msg, msg_len, 0); > } > > -static void acrd_block(struct work *work) > +static void accord_block(void) > +{ > + return add_event(ahandle, EVENT_NOTIFY, &this_node, NULL, 0, 1); > +} > + > +static void acrd_unblock(void *msg, size_t msg_len) > { > struct acrd_event ev; > > @@ -538,28 +538,22 @@ static void acrd_block(struct work *work > > acrd_queue_pop(ahandle, &ev); > > - ev.block_cb(ev.buf); > ev.blocked = 0; > + ev.buf_len = msg_len; > + if (msg) > + memcpy(ev.buf, msg, msg_len); > > acrd_queue_push_back(ahandle, &ev); > > pthread_mutex_unlock(&queue_lock); > } > > -static void acrd_block_done(struct work *work) > -{ > -} > - > static int accord_dispatch(void) > { > int ret; > eventfd_t value; > struct acrd_event ev; > enum cluster_join_result res; > - static struct work work = { > - .fn = acrd_block, > - .done = acrd_block_done, > - }; > > dprintf("read event\n"); > ret = eventfd_read(efd, &value); > @@ -612,11 +606,10 @@ static int accord_dispatch(void) > case EVENT_NOTIFY: > if (ev.blocked) { > if (node_cmp(&ev.sender, &this_node) == 0 && !ev.callbacked) { > - queue_work(acrd_wq, &work); > - > ev.callbacked = 1; > > acrd_queue_push_back(ahandle, &ev); > + sd_block_handler(); > } else > acrd_queue_push_back(ahandle, NULL); > > @@ -639,6 +632,8 @@ struct cluster_driver cdrv_accord = { > .join = accord_join, > .leave = accord_leave, > .notify = accord_notify, > + .block = accord_block, > + .unblock = accord_unblock, > .dispatch = accord_dispatch, > }; > > -- > sheepdog mailing list > sheepdog at lists.wpkg.org > http://lists.wpkg.org/mailman/listinfo/sheepdog -- Yunkai Zhang Work at Taobao |