[sheepdog] [PATCH] sheep: simplify the cluster driver interface for blocking events

Christoph Hellwig hch at infradead.org
Fri Jun 15 17:13:43 CEST 2012


Any reason not to put this patch for now?

On Tue, Jun 05, 2012 at 08:07:34AM -0400, Christoph Hellwig wrote:
> Let sd_block_handler handle the fine details of how to handle an incoming
> blocking event.  By passing the sender node structure we can easily handle
> ignoring it on other nodes, and by keeping a local operation in progress
> flag in group.c we can replace the callbacked flag in the on the wire
> events with a way simpler mechanism.
> 
> The only slightly complicated bit is that zk_notify_blocked in the zookeeper
> backend can now go negative for a short period of time, so we explicitly
> have to check for it beeing positive in two places.
> 
> Signed-off-by: Christoph Hellwig <hch at lst.de>
> 
> diff --git a/sheep/cluster.h b/sheep/cluster.h
> index 07e5f7b..80865cf 100644
> --- a/sheep/cluster.h
> +++ b/sheep/cluster.h
> @@ -11,6 +11,7 @@
>  #ifndef __CLUSTER_H__
>  #define __CLUSTER_H__
>  
> +#include <stdbool.h>
>  #include <stdio.h>
>  #include <stdlib.h>
>  #include <stdint.h>
> @@ -186,7 +187,7 @@ void sd_join_handler(struct sd_node *joined, struct sd_node *members,
>  void sd_leave_handler(struct sd_node *left, struct sd_node *members,
>  		size_t nr_members);
>  void sd_notify_handler(struct sd_node *sender, void *msg, size_t msg_len);
> -void sd_block_handler(void);
> +bool sd_block_handler(struct sd_node *sender);
>  enum cluster_join_result sd_check_join_cb(struct sd_node *joining,
>  		void *opaque);
>  
> diff --git a/sheep/cluster/accord.c b/sheep/cluster/accord.c
> index abb6db1..013b978 100644
> --- a/sheep/cluster/accord.c
> +++ b/sheep/cluster/accord.c
> @@ -47,8 +47,6 @@ struct acrd_event {
>  	uint64_t ids[SD_MAX_NODES];
>  
>  	enum cluster_join_result join_result;
> -
> -	int callbacked; /* set non-zero after sd_block_handler() was called */
>  };
>  
>  static struct sd_node this_node;
> @@ -564,14 +562,8 @@ static void acrd_handler(int listen_fd, int events, void *data)
>  		sd_leave_handler(&ev.sender, ev.nodes, ev.nr_nodes);
>  		break;
>  	case EVENT_BLOCK:
> -		if (node_cmp(&ev.sender, &this_node) == 0 && !ev.callbacked) {
> -			ev.callbacked = 1;
> -
> -			acrd_queue_push_back(ahandle, &ev);
> -			sd_block_handler();
> -		} else {
> -			acrd_queue_push_back(ahandle, NULL);
> -		}
> +		acrd_queue_push_back(ahandle, NULL);
> +		sd_block_handler(&ev.sender);
>  		break;
>  	case EVENT_NOTIFY:
>  		sd_notify_handler(&ev.sender, ev.buf, ev.buf_len);
> diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
> index 77c6710..d42a4cb 100644
> --- a/sheep/cluster/corosync.c
> +++ b/sheep/cluster/corosync.c
> @@ -308,13 +308,9 @@ static int __corosync_dispatch_one(struct corosync_event *cevent)
>  		sd_leave_handler(&cevent->sender.ent, entries, nr_cpg_nodes);
>  		break;
>  	case COROSYNC_EVENT_TYPE_BLOCK:
> -		if (cpg_node_equal(&cevent->sender, &this_node) &&
> -		    !cevent->callbacked) {
> -			sd_block_handler();
> -			cevent->callbacked = 1;
> -		}
> +		sd_block_handler(&cevent->sender.ent);
>  
> -		/* block the rest messages until unblock message comes */
> +		/* block other messages until the unblock message comes */
>  		return 0;
>  	case COROSYNC_EVENT_TYPE_NOTIFY:
>  		sd_notify_handler(&cevent->sender.ent, cevent->msg,
> diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c
> index 2c3ce24..dd3936b 100644
> --- a/sheep/cluster/local.c
> +++ b/sheep/cluster/local.c
> @@ -52,8 +52,6 @@ struct local_event {
>  	pid_t pids[SD_MAX_NODES];
>  
>  	enum cluster_join_result join_result;
> -
> -	int callbacked; /* set non-zero after sd_block_handler() was called */
>  };
>  
>  
> @@ -405,10 +403,7 @@ static void local_handler(int listen_fd, int events, void *data)
>  		shm_queue_pop();
>  		break;
>  	case EVENT_BLOCK:
> -		if (node_eq(&ev->sender, &this_node) && !ev->callbacked) {
> -			sd_block_handler();
> -			ev->callbacked = 1;
> -		}
> +		sd_block_handler(&ev->sender);
>  		break;
>  	case EVENT_NOTIFY:
>  		sd_notify_handler(&ev->sender, ev->buf, ev->buf_len);
> diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
> index 3dde1e4..6bd8e3a 100644
> --- a/sheep/cluster/zookeeper.c
> +++ b/sheep/cluster/zookeeper.c
> @@ -61,8 +61,6 @@ struct zk_event {
>  
>  	enum cluster_join_result join_result;
>  
> -	int callbacked; /* set non-zero after sd_block_handler() was called */
> -
>  	size_t buf_len;
>  	uint8_t buf[SD_MAX_EVENT_BUF_SIZE];
>  };
> @@ -243,7 +241,7 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
>  	eventfd_t value = 1;
>  
>  	/* process leave event */
> -	if (!uatomic_read(&zk_notify_blocked) &&
> +	if (uatomic_read(&zk_notify_blocked) <= 0 &&
>  	     uatomic_read(&nr_zk_levents)) {
>  		nr_levents = uatomic_sub_return(&nr_zk_levents, 1) + 1;
>  		dprintf("nr_zk_levents:%d, head:%u\n", nr_levents, zk_levent_head);
> @@ -498,7 +496,6 @@ static int add_event(zhandle_t *zh, enum zk_event_type type,
>  	ev.type = type;
>  	ev.sender = *znode;
>  	ev.buf_len = buf_len;
> -	ev.callbacked = 0;
>  	if (buf)
>  		memcpy(ev.buf, buf, buf_len);
>  	zk_queue_push(zh, &ev);
> @@ -515,7 +512,6 @@ static int leave_event(zhandle_t *zh, struct zk_node *znode)
>  	ev->type = EVENT_LEAVE;
>  	ev->sender = *znode;
>  	ev->buf_len = 0;
> -	ev->callbacked = 0;
>  
>  	nr_levents = uatomic_add_return(&nr_zk_levents, 1);
>  	dprintf("nr_zk_levents:%d, tail:%u\n", nr_levents, zk_levent_tail);
> @@ -714,7 +710,7 @@ static void zk_handler(int listen_fd, int events, void *data)
>  	if (ret < 0)
>  		return;
>  
> -	if (uatomic_read(&zk_notify_blocked))
> +	if (uatomic_read(&zk_notify_blocked) > 0)
>  		return;
>  
>  	ret = zk_queue_pop(zhandle, &ev);
> @@ -820,16 +816,9 @@ static void zk_handler(int listen_fd, int events, void *data)
>  		break;
>  	case EVENT_BLOCK:
>  		dprintf("BLOCK\n");
> -		if (node_eq(&ev.sender.node, &this_node.node)
> -				&& !ev.callbacked) {
> -			uatomic_inc(&zk_notify_blocked);
> -			ev.callbacked = 1;
> -			zk_queue_push_back(zhandle, &ev);
> -			sd_block_handler();
> -		} else {
> -			zk_queue_push_back(zhandle, NULL);
> -		}
> -
> +		zk_queue_push_back(zhandle, NULL);
> +		if (sd_block_handler(&ev.sender.node))
> + 			uatomic_inc(&zk_notify_blocked);
>  		break;
>  	case EVENT_NOTIFY:
>  		dprintf("NOTIFY\n");
> diff --git a/sheep/group.c b/sheep/group.c
> index c2679f2..d00a121 100644
> --- a/sheep/group.c
> +++ b/sheep/group.c
> @@ -251,6 +251,11 @@ int get_nr_copies(struct vnode_info *vnode_info)
>  	return min(vnode_info->nr_zones, sys->nr_copies);
>  }
>  
> +/*
> + * Indicator if a cluster operation is currently running.
> + */
> +static bool cluster_op_running = false;
> +
>  static struct vdi_op_message *prepare_cluster_msg(struct request *req,
>  		size_t *sizep)
>  {
> @@ -295,6 +300,8 @@ static void cluster_op_done(struct work *work)
>  	struct vdi_op_message *msg;
>  	size_t size;
>  
> +	cluster_op_running = false;
> +
>  	msg = prepare_cluster_msg(req, &size);
>  	if (!msg)
>  		panic();
> @@ -305,20 +312,33 @@ static void cluster_op_done(struct work *work)
>  }
>  
>  /*
> - * Perform a blocked cluster operation.
> + * Perform a blocked cluster operation if we were the node requesting it
> + * and do not have any other operation pending.
>   *
> - * Must run in the main thread as it access unlocked state like
> + * If this method returns false the caller must call the method again for
> + * the same event once it gets notified again.
> + *
> + * Must run in the main thread as it accesses unlocked state like
>   * sys->pending_list.
>   */
> -void sd_block_handler(void)
> +bool sd_block_handler(struct sd_node *sender)
>  {
> -	struct request *req = list_first_entry(&sys->pending_list,
> -						struct request, pending_list);
> +	struct request *req;
> +
> +	if (!node_eq(sender, &sys->this_node))
> +		return false;
> +	if (cluster_op_running)
> +		return false;
> +
> +	cluster_op_running = true;
>  
> +	req = list_first_entry(&sys->pending_list,
> +				struct request, pending_list);
>  	req->work.fn = do_cluster_request;
>  	req->work.done = cluster_op_done;
>  
>  	queue_work(sys->block_wqueue, &req->work);
> +	return true;
>  }
>  
>  /*
> -- 
> sheepdog mailing list
> sheepdog at lists.wpkg.org
> http://lists.wpkg.org/mailman/listinfo/sheepdog
---end quoted text---



More information about the sheepdog mailing list