Any reason not to put this patch for now? On Tue, Jun 05, 2012 at 08:07:34AM -0400, Christoph Hellwig wrote: > Let sd_block_handler handle the fine details of how to handle an incoming > blocking event. By passing the sender node structure we can easily handle > ignoring it on other nodes, and by keeping a local operation in progress > flag in group.c we can replace the callbacked flag in the on the wire > events with a way simpler mechanism. > > The only slightly complicated bit is that zk_notify_blocked in the zookeeper > backend can now go negative for a short period of time, so we explicitly > have to check for it beeing positive in two places. > > Signed-off-by: Christoph Hellwig <hch at lst.de> > > diff --git a/sheep/cluster.h b/sheep/cluster.h > index 07e5f7b..80865cf 100644 > --- a/sheep/cluster.h > +++ b/sheep/cluster.h > @@ -11,6 +11,7 @@ > #ifndef __CLUSTER_H__ > #define __CLUSTER_H__ > > +#include <stdbool.h> > #include <stdio.h> > #include <stdlib.h> > #include <stdint.h> > @@ -186,7 +187,7 @@ void sd_join_handler(struct sd_node *joined, struct sd_node *members, > void sd_leave_handler(struct sd_node *left, struct sd_node *members, > size_t nr_members); > void sd_notify_handler(struct sd_node *sender, void *msg, size_t msg_len); > -void sd_block_handler(void); > +bool sd_block_handler(struct sd_node *sender); > enum cluster_join_result sd_check_join_cb(struct sd_node *joining, > void *opaque); > > diff --git a/sheep/cluster/accord.c b/sheep/cluster/accord.c > index abb6db1..013b978 100644 > --- a/sheep/cluster/accord.c > +++ b/sheep/cluster/accord.c > @@ -47,8 +47,6 @@ struct acrd_event { > uint64_t ids[SD_MAX_NODES]; > > enum cluster_join_result join_result; > - > - int callbacked; /* set non-zero after sd_block_handler() was called */ > }; > > static struct sd_node this_node; > @@ -564,14 +562,8 @@ static void acrd_handler(int listen_fd, int events, void *data) > sd_leave_handler(&ev.sender, ev.nodes, ev.nr_nodes); > break; > case EVENT_BLOCK: > - if (node_cmp(&ev.sender, &this_node) == 0 && !ev.callbacked) { > - ev.callbacked = 1; > - > - acrd_queue_push_back(ahandle, &ev); > - sd_block_handler(); > - } else { > - acrd_queue_push_back(ahandle, NULL); > - } > + acrd_queue_push_back(ahandle, NULL); > + sd_block_handler(&ev.sender); > break; > case EVENT_NOTIFY: > sd_notify_handler(&ev.sender, ev.buf, ev.buf_len); > diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c > index 77c6710..d42a4cb 100644 > --- a/sheep/cluster/corosync.c > +++ b/sheep/cluster/corosync.c > @@ -308,13 +308,9 @@ static int __corosync_dispatch_one(struct corosync_event *cevent) > sd_leave_handler(&cevent->sender.ent, entries, nr_cpg_nodes); > break; > case COROSYNC_EVENT_TYPE_BLOCK: > - if (cpg_node_equal(&cevent->sender, &this_node) && > - !cevent->callbacked) { > - sd_block_handler(); > - cevent->callbacked = 1; > - } > + sd_block_handler(&cevent->sender.ent); > > - /* block the rest messages until unblock message comes */ > + /* block other messages until the unblock message comes */ > return 0; > case COROSYNC_EVENT_TYPE_NOTIFY: > sd_notify_handler(&cevent->sender.ent, cevent->msg, > diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c > index 2c3ce24..dd3936b 100644 > --- a/sheep/cluster/local.c > +++ b/sheep/cluster/local.c > @@ -52,8 +52,6 @@ struct local_event { > pid_t pids[SD_MAX_NODES]; > > enum cluster_join_result join_result; > - > - int callbacked; /* set non-zero after sd_block_handler() was called */ > }; > > > @@ -405,10 +403,7 @@ static void local_handler(int listen_fd, int events, void *data) > shm_queue_pop(); > break; > case EVENT_BLOCK: > - if (node_eq(&ev->sender, &this_node) && !ev->callbacked) { > - sd_block_handler(); > - ev->callbacked = 1; > - } > + sd_block_handler(&ev->sender); > break; > case EVENT_NOTIFY: > sd_notify_handler(&ev->sender, ev->buf, ev->buf_len); > diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c > index 3dde1e4..6bd8e3a 100644 > --- a/sheep/cluster/zookeeper.c > +++ b/sheep/cluster/zookeeper.c > @@ -61,8 +61,6 @@ struct zk_event { > > enum cluster_join_result join_result; > > - int callbacked; /* set non-zero after sd_block_handler() was called */ > - > size_t buf_len; > uint8_t buf[SD_MAX_EVENT_BUF_SIZE]; > }; > @@ -243,7 +241,7 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev) > eventfd_t value = 1; > > /* process leave event */ > - if (!uatomic_read(&zk_notify_blocked) && > + if (uatomic_read(&zk_notify_blocked) <= 0 && > uatomic_read(&nr_zk_levents)) { > nr_levents = uatomic_sub_return(&nr_zk_levents, 1) + 1; > dprintf("nr_zk_levents:%d, head:%u\n", nr_levents, zk_levent_head); > @@ -498,7 +496,6 @@ static int add_event(zhandle_t *zh, enum zk_event_type type, > ev.type = type; > ev.sender = *znode; > ev.buf_len = buf_len; > - ev.callbacked = 0; > if (buf) > memcpy(ev.buf, buf, buf_len); > zk_queue_push(zh, &ev); > @@ -515,7 +512,6 @@ static int leave_event(zhandle_t *zh, struct zk_node *znode) > ev->type = EVENT_LEAVE; > ev->sender = *znode; > ev->buf_len = 0; > - ev->callbacked = 0; > > nr_levents = uatomic_add_return(&nr_zk_levents, 1); > dprintf("nr_zk_levents:%d, tail:%u\n", nr_levents, zk_levent_tail); > @@ -714,7 +710,7 @@ static void zk_handler(int listen_fd, int events, void *data) > if (ret < 0) > return; > > - if (uatomic_read(&zk_notify_blocked)) > + if (uatomic_read(&zk_notify_blocked) > 0) > return; > > ret = zk_queue_pop(zhandle, &ev); > @@ -820,16 +816,9 @@ static void zk_handler(int listen_fd, int events, void *data) > break; > case EVENT_BLOCK: > dprintf("BLOCK\n"); > - if (node_eq(&ev.sender.node, &this_node.node) > - && !ev.callbacked) { > - uatomic_inc(&zk_notify_blocked); > - ev.callbacked = 1; > - zk_queue_push_back(zhandle, &ev); > - sd_block_handler(); > - } else { > - zk_queue_push_back(zhandle, NULL); > - } > - > + zk_queue_push_back(zhandle, NULL); > + if (sd_block_handler(&ev.sender.node)) > + uatomic_inc(&zk_notify_blocked); > break; > case EVENT_NOTIFY: > dprintf("NOTIFY\n"); > diff --git a/sheep/group.c b/sheep/group.c > index c2679f2..d00a121 100644 > --- a/sheep/group.c > +++ b/sheep/group.c > @@ -251,6 +251,11 @@ int get_nr_copies(struct vnode_info *vnode_info) > return min(vnode_info->nr_zones, sys->nr_copies); > } > > +/* > + * Indicator if a cluster operation is currently running. > + */ > +static bool cluster_op_running = false; > + > static struct vdi_op_message *prepare_cluster_msg(struct request *req, > size_t *sizep) > { > @@ -295,6 +300,8 @@ static void cluster_op_done(struct work *work) > struct vdi_op_message *msg; > size_t size; > > + cluster_op_running = false; > + > msg = prepare_cluster_msg(req, &size); > if (!msg) > panic(); > @@ -305,20 +312,33 @@ static void cluster_op_done(struct work *work) > } > > /* > - * Perform a blocked cluster operation. > + * Perform a blocked cluster operation if we were the node requesting it > + * and do not have any other operation pending. > * > - * Must run in the main thread as it access unlocked state like > + * If this method returns false the caller must call the method again for > + * the same event once it gets notified again. > + * > + * Must run in the main thread as it accesses unlocked state like > * sys->pending_list. > */ > -void sd_block_handler(void) > +bool sd_block_handler(struct sd_node *sender) > { > - struct request *req = list_first_entry(&sys->pending_list, > - struct request, pending_list); > + struct request *req; > + > + if (!node_eq(sender, &sys->this_node)) > + return false; > + if (cluster_op_running) > + return false; > + > + cluster_op_running = true; > > + req = list_first_entry(&sys->pending_list, > + struct request, pending_list); > req->work.fn = do_cluster_request; > req->work.done = cluster_op_done; > > queue_work(sys->block_wqueue, &req->work); > + return true; > } > > /* > -- > sheepdog mailing list > sheepdog at lists.wpkg.org > http://lists.wpkg.org/mailman/listinfo/sheepdog ---end quoted text--- |