[sheepdog] [PATCH] sheep: simplify the cluster driver interface for blocking events
Christoph Hellwig
hch at infradead.org
Fri Jun 15 17:13:43 CEST 2012
Any reason not to put this patch for now?
On Tue, Jun 05, 2012 at 08:07:34AM -0400, Christoph Hellwig wrote:
> Let sd_block_handler handle the fine details of how to handle an incoming
> blocking event. By passing the sender node structure we can easily handle
> ignoring it on other nodes, and by keeping a local operation in progress
> flag in group.c we can replace the callbacked flag in the on the wire
> events with a way simpler mechanism.
>
> The only slightly complicated bit is that zk_notify_blocked in the zookeeper
> backend can now go negative for a short period of time, so we explicitly
> have to check for it beeing positive in two places.
>
> Signed-off-by: Christoph Hellwig <hch at lst.de>
>
> diff --git a/sheep/cluster.h b/sheep/cluster.h
> index 07e5f7b..80865cf 100644
> --- a/sheep/cluster.h
> +++ b/sheep/cluster.h
> @@ -11,6 +11,7 @@
> #ifndef __CLUSTER_H__
> #define __CLUSTER_H__
>
> +#include <stdbool.h>
> #include <stdio.h>
> #include <stdlib.h>
> #include <stdint.h>
> @@ -186,7 +187,7 @@ void sd_join_handler(struct sd_node *joined, struct sd_node *members,
> void sd_leave_handler(struct sd_node *left, struct sd_node *members,
> size_t nr_members);
> void sd_notify_handler(struct sd_node *sender, void *msg, size_t msg_len);
> -void sd_block_handler(void);
> +bool sd_block_handler(struct sd_node *sender);
> enum cluster_join_result sd_check_join_cb(struct sd_node *joining,
> void *opaque);
>
> diff --git a/sheep/cluster/accord.c b/sheep/cluster/accord.c
> index abb6db1..013b978 100644
> --- a/sheep/cluster/accord.c
> +++ b/sheep/cluster/accord.c
> @@ -47,8 +47,6 @@ struct acrd_event {
> uint64_t ids[SD_MAX_NODES];
>
> enum cluster_join_result join_result;
> -
> - int callbacked; /* set non-zero after sd_block_handler() was called */
> };
>
> static struct sd_node this_node;
> @@ -564,14 +562,8 @@ static void acrd_handler(int listen_fd, int events, void *data)
> sd_leave_handler(&ev.sender, ev.nodes, ev.nr_nodes);
> break;
> case EVENT_BLOCK:
> - if (node_cmp(&ev.sender, &this_node) == 0 && !ev.callbacked) {
> - ev.callbacked = 1;
> -
> - acrd_queue_push_back(ahandle, &ev);
> - sd_block_handler();
> - } else {
> - acrd_queue_push_back(ahandle, NULL);
> - }
> + acrd_queue_push_back(ahandle, NULL);
> + sd_block_handler(&ev.sender);
> break;
> case EVENT_NOTIFY:
> sd_notify_handler(&ev.sender, ev.buf, ev.buf_len);
> diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
> index 77c6710..d42a4cb 100644
> --- a/sheep/cluster/corosync.c
> +++ b/sheep/cluster/corosync.c
> @@ -308,13 +308,9 @@ static int __corosync_dispatch_one(struct corosync_event *cevent)
> sd_leave_handler(&cevent->sender.ent, entries, nr_cpg_nodes);
> break;
> case COROSYNC_EVENT_TYPE_BLOCK:
> - if (cpg_node_equal(&cevent->sender, &this_node) &&
> - !cevent->callbacked) {
> - sd_block_handler();
> - cevent->callbacked = 1;
> - }
> + sd_block_handler(&cevent->sender.ent);
>
> - /* block the rest messages until unblock message comes */
> + /* block other messages until the unblock message comes */
> return 0;
> case COROSYNC_EVENT_TYPE_NOTIFY:
> sd_notify_handler(&cevent->sender.ent, cevent->msg,
> diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c
> index 2c3ce24..dd3936b 100644
> --- a/sheep/cluster/local.c
> +++ b/sheep/cluster/local.c
> @@ -52,8 +52,6 @@ struct local_event {
> pid_t pids[SD_MAX_NODES];
>
> enum cluster_join_result join_result;
> -
> - int callbacked; /* set non-zero after sd_block_handler() was called */
> };
>
>
> @@ -405,10 +403,7 @@ static void local_handler(int listen_fd, int events, void *data)
> shm_queue_pop();
> break;
> case EVENT_BLOCK:
> - if (node_eq(&ev->sender, &this_node) && !ev->callbacked) {
> - sd_block_handler();
> - ev->callbacked = 1;
> - }
> + sd_block_handler(&ev->sender);
> break;
> case EVENT_NOTIFY:
> sd_notify_handler(&ev->sender, ev->buf, ev->buf_len);
> diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
> index 3dde1e4..6bd8e3a 100644
> --- a/sheep/cluster/zookeeper.c
> +++ b/sheep/cluster/zookeeper.c
> @@ -61,8 +61,6 @@ struct zk_event {
>
> enum cluster_join_result join_result;
>
> - int callbacked; /* set non-zero after sd_block_handler() was called */
> -
> size_t buf_len;
> uint8_t buf[SD_MAX_EVENT_BUF_SIZE];
> };
> @@ -243,7 +241,7 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
> eventfd_t value = 1;
>
> /* process leave event */
> - if (!uatomic_read(&zk_notify_blocked) &&
> + if (uatomic_read(&zk_notify_blocked) <= 0 &&
> uatomic_read(&nr_zk_levents)) {
> nr_levents = uatomic_sub_return(&nr_zk_levents, 1) + 1;
> dprintf("nr_zk_levents:%d, head:%u\n", nr_levents, zk_levent_head);
> @@ -498,7 +496,6 @@ static int add_event(zhandle_t *zh, enum zk_event_type type,
> ev.type = type;
> ev.sender = *znode;
> ev.buf_len = buf_len;
> - ev.callbacked = 0;
> if (buf)
> memcpy(ev.buf, buf, buf_len);
> zk_queue_push(zh, &ev);
> @@ -515,7 +512,6 @@ static int leave_event(zhandle_t *zh, struct zk_node *znode)
> ev->type = EVENT_LEAVE;
> ev->sender = *znode;
> ev->buf_len = 0;
> - ev->callbacked = 0;
>
> nr_levents = uatomic_add_return(&nr_zk_levents, 1);
> dprintf("nr_zk_levents:%d, tail:%u\n", nr_levents, zk_levent_tail);
> @@ -714,7 +710,7 @@ static void zk_handler(int listen_fd, int events, void *data)
> if (ret < 0)
> return;
>
> - if (uatomic_read(&zk_notify_blocked))
> + if (uatomic_read(&zk_notify_blocked) > 0)
> return;
>
> ret = zk_queue_pop(zhandle, &ev);
> @@ -820,16 +816,9 @@ static void zk_handler(int listen_fd, int events, void *data)
> break;
> case EVENT_BLOCK:
> dprintf("BLOCK\n");
> - if (node_eq(&ev.sender.node, &this_node.node)
> - && !ev.callbacked) {
> - uatomic_inc(&zk_notify_blocked);
> - ev.callbacked = 1;
> - zk_queue_push_back(zhandle, &ev);
> - sd_block_handler();
> - } else {
> - zk_queue_push_back(zhandle, NULL);
> - }
> -
> + zk_queue_push_back(zhandle, NULL);
> + if (sd_block_handler(&ev.sender.node))
> + uatomic_inc(&zk_notify_blocked);
> break;
> case EVENT_NOTIFY:
> dprintf("NOTIFY\n");
> diff --git a/sheep/group.c b/sheep/group.c
> index c2679f2..d00a121 100644
> --- a/sheep/group.c
> +++ b/sheep/group.c
> @@ -251,6 +251,11 @@ int get_nr_copies(struct vnode_info *vnode_info)
> return min(vnode_info->nr_zones, sys->nr_copies);
> }
>
> +/*
> + * Indicator if a cluster operation is currently running.
> + */
> +static bool cluster_op_running = false;
> +
> static struct vdi_op_message *prepare_cluster_msg(struct request *req,
> size_t *sizep)
> {
> @@ -295,6 +300,8 @@ static void cluster_op_done(struct work *work)
> struct vdi_op_message *msg;
> size_t size;
>
> + cluster_op_running = false;
> +
> msg = prepare_cluster_msg(req, &size);
> if (!msg)
> panic();
> @@ -305,20 +312,33 @@ static void cluster_op_done(struct work *work)
> }
>
> /*
> - * Perform a blocked cluster operation.
> + * Perform a blocked cluster operation if we were the node requesting it
> + * and do not have any other operation pending.
> *
> - * Must run in the main thread as it access unlocked state like
> + * If this method returns false the caller must call the method again for
> + * the same event once it gets notified again.
> + *
> + * Must run in the main thread as it accesses unlocked state like
> * sys->pending_list.
> */
> -void sd_block_handler(void)
> +bool sd_block_handler(struct sd_node *sender)
> {
> - struct request *req = list_first_entry(&sys->pending_list,
> - struct request, pending_list);
> + struct request *req;
> +
> + if (!node_eq(sender, &sys->this_node))
> + return false;
> + if (cluster_op_running)
> + return false;
> +
> + cluster_op_running = true;
>
> + req = list_first_entry(&sys->pending_list,
> + struct request, pending_list);
> req->work.fn = do_cluster_request;
> req->work.done = cluster_op_done;
>
> queue_work(sys->block_wqueue, &req->work);
> + return true;
> }
>
> /*
> --
> sheepdog mailing list
> sheepdog at lists.wpkg.org
> http://lists.wpkg.org/mailman/listinfo/sheepdog
---end quoted text---
More information about the sheepdog
mailing list