[Sheepdog] [RFC PATCH] sheep: introduce sd_op_template

Liu Yuan namei.unix at gmail.com
Sat Oct 22 07:14:37 CEST 2011


On 10/21/2011 04:28 PM, MORITA Kazutaka wrote:

> When we want to add a new operation (SD_OP_xxxxx), it is not clear
> which codes we should modify.  And in some cases, we need to modify
> codes everywhere to implement one operation.  This is not a good
> design.
> 
> This patch abstracts out Sheepdog operations into sd_op_template, and
> moves all the request processing codes to sheep/ops.c.
> 
> The definition of sd_op_template is as follows:
> 
> struct sd_op_template {
>         enum sd_op_type type;
> 
>         int available_always;
> 
>         int (*process)(const struct sd_req *req, struct sd_rsp *rsp,
>                        void *data);
>         int (*post_process)(const struct sd_req *req, struct sd_rsp *rsp,
>                             void *data);
> };
> 
> 'type' is the type of the operation; SD_OP_TYPE_CLUSTER,
> SD_OP_TYPE_STORE, or SD_OP_TYPE_IO.
> 
> 'available_always' is set to non-zero if the operations should be
> processed even when the cluster is not working.
> 
> 'process()' and 'post_process()' are the main functions of this
> operation.  process() will be called in the worker thread, and
> post_process() will be called in the main thread.
> 
> If type is SD_OP_TYPE_CLUSTER, it is guaranteed that only one node
> processes a cluster operation at the same time.  We can use this for
> something like distributed locking.  process() will be called on the
> local node, and post_process() will be called on every nodes.
> 
> If type is SD_OP_TYPE_STORE, both process() and post_process() will be
> called on the local node.
> 
> If type is SD_OP_TYPE_IO, neither process() nor post_process() is used
> because this type of operation is heavily intertwined with Sheepdog
> core codes.  We will be unlikely to add new operations of this type.
> 
> Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
> ---
>  sheep/Makefile.am  |    2 +-
>  sheep/group.c      |  286 +++--------------------------------
>  sheep/ops.c        |  427 ++++++++++++++++++++++++++++++++++++++++++++++++++++
>  sheep/sdnet.c      |  111 ++------------
>  sheep/sheep_priv.h |   48 ++++++-
>  sheep/store.c      |   19 +--
>  6 files changed, 522 insertions(+), 371 deletions(-)
>  create mode 100644 sheep/ops.c
> 
> diff --git a/sheep/Makefile.am b/sheep/Makefile.am
> index 2b9d58f..3db914d 100644
> --- a/sheep/Makefile.am
> +++ b/sheep/Makefile.am
> @@ -23,7 +23,7 @@ INCLUDES		= -I$(top_builddir)/include -I$(top_srcdir)/include $(libcpg_CFLAGS) $
>  
>  sbin_PROGRAMS		= sheep
>  
> -sheep_SOURCES		= sheep.c group.c sdnet.c store.c vdi.c work.c journal.c \
> +sheep_SOURCES		= sheep.c group.c sdnet.c store.c vdi.c work.c journal.c ops.c \
>  			  cluster/corosync.c
>  sheep_LDADD	  	= $(libcpg_LIBS) $(libcfg_LIBS) ../lib/libsheepdog.a -lpthread
>  sheep_DEPENDENCIES	= ../lib/libsheepdog.a
> diff --git a/sheep/group.c b/sheep/group.c
> index e22dabc..8664b6f 100644
> --- a/sheep/group.c
> +++ b/sheep/group.c
> @@ -101,17 +101,7 @@ static size_t get_join_message_size(struct join_message *jm)
>  	return sizeof(*jm) + jm->nr_nodes * sizeof(jm->nodes[0]);
>  }
>  
> -static int get_node_idx(struct sheepdog_node_list_entry *ent,
> -			struct sheepdog_node_list_entry *entries, int nr_nodes)
> -{
> -	ent = bsearch(ent, entries, nr_nodes, sizeof(*ent), node_cmp);
> -	if (!ent)
> -		return -1;
> -
> -	return ent - entries;
> -}
> -
> -static int get_zones_nr_from(struct sheepdog_node_list_entry *nodes, int nr_nodes)
> +int get_zones_nr_from(struct sheepdog_node_list_entry *nodes, int nr_nodes)
>  {
>  	int nr_zones = 0, i, j;
>  	uint32_t zones[SD_MAX_REDUNDANCY];
> @@ -146,116 +136,28 @@ void setup_ordered_sd_vnode_list(struct request *req)
>  	get_ordered_sd_vnode_list(req->entry, &req->nr_vnodes, &req->nr_zones);
>  }
>  
> -static void get_node_list(struct sd_node_req *req,
> -			  struct sd_node_rsp *rsp, void *data)
> +static void do_cluster_op(void *arg)
>  {
> -	int nr_nodes;
> -
> -	nr_nodes = sys->nr_nodes;
> -	memcpy(data, sys->nodes, sizeof(*sys->nodes) * nr_nodes);
> -	rsp->data_length = nr_nodes * sizeof(struct sheepdog_node_list_entry);
> -	rsp->nr_nodes = nr_nodes;
> -	rsp->local_idx = get_node_idx(&sys->this_node, data, nr_nodes);
> -	rsp->master_idx = -1;
> -}
> +	struct vdi_op_message *msg = arg;
> +	int ret;
> +	struct request *req;
>  
> -static int get_epoch(struct sd_obj_req *req,
> -		      struct sd_obj_rsp *rsp, void *data)
> -{
> -	int epoch = req->tgt_epoch;
> -	int len, ret;
> -	dprintf("%d\n", epoch);
> -	len = epoch_log_read(epoch, (char *)data, req->data_length);
> -	if (len == -1) {
> -		ret = SD_RES_NO_TAG;
> -		rsp->data_length = 0;
> -	} else {
> -		ret = SD_RES_SUCCESS;
> -		rsp->data_length = len;
> -	}
> -	return ret;
> -}
> +	req = list_first_entry(&sys->pending_list, struct request, pending_list);
> +	ret = req->op->process((const struct sd_req *)&msg->req,
> +			       (struct sd_rsp *)&msg->rsp, req->data);
>  
> -static void vdi_op(void *arg);
> +	msg->rsp.result = ret;
> +}
>  
>  void cluster_queue_request(struct work *work, int idx)
>  {
>  	struct request *req = container_of(work, struct request, work);
>  	struct sd_req *hdr = (struct sd_req *)&req->rq;
> -	struct sd_rsp *rsp = (struct sd_rsp *)&req->rp;
>  	struct vdi_op_message *msg;
> -	struct epoch_log *log;
> -	int ret = SD_RES_SUCCESS, i, max_logs, epoch;
> -	uint32_t sys_stat = sys_stat_get();
>  	size_t size;
>  
>  	eprintf("%p %x\n", req, hdr->opcode);
>  
> -	switch (hdr->opcode) {
> -	case SD_OP_GET_EPOCH:
> -		ret = get_epoch((struct sd_obj_req *)hdr,
> -			  (struct sd_obj_rsp *)rsp, req->data);
> -		break;
> -	case SD_OP_GET_NODE_LIST:
> -		get_node_list((struct sd_node_req *)hdr,
> -			      (struct sd_node_rsp *)rsp, req->data);
> -		break;
> -	case SD_OP_STAT_CLUSTER:
> -		max_logs = rsp->data_length / sizeof(*log);
> -		epoch = get_latest_epoch();
> -		rsp->data_length = 0;
> -		for (i = 0; i < max_logs; i++) {
> -			if (epoch <= 0)
> -				break;
> -
> -			log = (struct epoch_log *)req->data + i;
> -			log->epoch = epoch;
> -			log->ctime = get_cluster_ctime();
> -			log->nr_nodes = epoch_log_read(epoch, (char *)log->nodes,
> -						       sizeof(log->nodes));
> -			if (log->nr_nodes == -1)
> -				log->nr_nodes = epoch_log_read_remote(epoch,
> -								      (char *)log->nodes,
> -								      sizeof(log->nodes));
> -
> -			rsp->data_length += sizeof(*log);
> -			log->nr_nodes /= sizeof(log->nodes[0]);
> -			epoch--;
> -		}
> -
> -		switch (sys_stat) {
> -		case SD_STATUS_OK:
> -			ret = SD_RES_SUCCESS;
> -			break;
> -		case SD_STATUS_WAIT_FOR_FORMAT:
> -			ret = SD_RES_WAIT_FOR_FORMAT;
> -			break;
> -		case SD_STATUS_WAIT_FOR_JOIN:
> -			ret = SD_RES_WAIT_FOR_JOIN;
> -			break;
> -		case SD_STATUS_SHUTDOWN:
> -			ret = SD_RES_SHUTDOWN;
> -			break;
> -		case SD_STATUS_JOIN_FAILED:
> -			ret = SD_RES_JOIN_FAILED;
> -			break;
> -		case SD_STATUS_HALT:
> -			ret = SD_RES_HALT;
> -			break;
> -		default:
> -			ret = SD_RES_SYSTEM_ERROR;
> -			break;
> -		}
> -		break;
> -	default:
> -		/* forward request to group */
> -		goto forward;
> -	}
> -
> -	rsp->result = ret;
> -	return;
> -
> -forward:
>  	if (hdr->flags & SD_FLAG_CMD_WRITE)
>  		size = sizeof(*msg);
>  	else
> @@ -272,7 +174,12 @@ forward:
>  
>  	list_add(&req->pending_list, &sys->pending_list);
>  
> -	sys->cdrv->notify(msg, size, vdi_op);
> +	if (req->op->process)
> +		sys->cdrv->notify(msg, size, do_cluster_op);
> +	else {
> +		msg->rsp.result = SD_RES_SUCCESS;
> +		sys->cdrv->notify(msg, size, NULL);
> +	}
>  
>  	free(msg);
>  }
> @@ -628,85 +535,6 @@ join_finished:
>  	return;
>  }
>  
> -static void vdi_op(void *arg)
> -{
> -	struct vdi_op_message *msg = arg;
> -	const struct sd_vdi_req *hdr = &msg->req;
> -	struct sd_vdi_rsp *rsp = &msg->rsp;
> -	void *data, *tag;
> -	int ret = SD_RES_SUCCESS;
> -	struct sheepdog_vdi_attr *vattr;
> -	uint32_t vid = 0, attrid = 0, nr_copies = sys->nr_sobjs;
> -	uint64_t ctime = 0;
> -	struct request *req;
> -
> -	req = list_first_entry(&sys->pending_list, struct request, pending_list);
> -	data = req->data;
> -
> -	switch (hdr->opcode) {
> -	case SD_OP_NEW_VDI:
> -		ret = add_vdi(hdr->epoch, data, hdr->data_length, hdr->vdi_size, &vid,
> -			      hdr->base_vdi_id, hdr->copies,
> -			      hdr->snapid, &nr_copies);
> -		break;
> -	case SD_OP_DEL_VDI:
> -		ret = del_vdi(hdr->epoch, data, hdr->data_length, &vid,
> -			      hdr->snapid, &nr_copies);
> -		break;
> -	case SD_OP_LOCK_VDI:
> -	case SD_OP_GET_VDI_INFO:
> -		if (hdr->proto_ver != SD_PROTO_VER) {
> -			ret = SD_RES_VER_MISMATCH;
> -			break;
> -		}
> -		if (hdr->data_length == SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN)
> -			tag = (char *)data + SD_MAX_VDI_LEN;
> -		else if (hdr->data_length == SD_MAX_VDI_LEN)
> -			tag = NULL;
> -		else {
> -			ret = SD_RES_INVALID_PARMS;
> -			break;
> -		}
> -		ret = lookup_vdi(hdr->epoch, data, tag, &vid, hdr->snapid,
> -				 &nr_copies, NULL);
> -		if (ret != SD_RES_SUCCESS)
> -			break;
> -		break;
> -	case SD_OP_GET_VDI_ATTR:
> -		vattr = data;
> -		ret = lookup_vdi(hdr->epoch, vattr->name, vattr->tag,
> -				 &vid, hdr->snapid, &nr_copies, &ctime);
> -		if (ret != SD_RES_SUCCESS)
> -			break;
> -		/* the curernt vdi id can change if we take the snapshot,
> -		   so we use the hash value of the vdi name as the vdi id */
> -		vid = fnv_64a_buf(vattr->name, strlen(vattr->name), FNV1A_64_INIT);
> -		vid &= SD_NR_VDIS - 1;
> -		ret = get_vdi_attr(hdr->epoch, data, hdr->data_length, vid,
> -				   &attrid, nr_copies, ctime,
> -				   hdr->flags & SD_FLAG_CMD_CREAT,
> -				   hdr->flags & SD_FLAG_CMD_EXCL,
> -				   hdr->flags & SD_FLAG_CMD_DEL);
> -		break;
> -	case SD_OP_RELEASE_VDI:
> -		break;
> -	case SD_OP_MAKE_FS:
> -		ret = SD_RES_SUCCESS;
> -		break;
> -	case SD_OP_SHUTDOWN:
> -		break;
> -	default:
> -		ret = SD_RES_SYSTEM_ERROR;
> -		eprintf("opcode %d is not implemented\n", hdr->opcode);
> -		break;
> -	}
> -
> -	rsp->vdi_id = vid;
> -	rsp->attr_id = attrid;
> -	rsp->copies = nr_copies;
> -	rsp->result = ret;
> -}
> -
>  static void __sd_notify(struct cpg_event *cevent)
>  {
>  }
> @@ -715,86 +543,22 @@ static void __sd_notify_done(struct cpg_event *cevent)
>  {
>  	struct work_notify *w = container_of(cevent, struct work_notify, cev);
>  	struct vdi_op_message *msg = (struct vdi_op_message *)w->msg;
> -	const struct sd_vdi_req *hdr = &msg->req;
> -	struct sd_vdi_rsp *rsp = &msg->rsp;
> -	void *data = msg->data;
>  	struct request *req;
>  	int ret = msg->rsp.result;
> -	int i, latest_epoch;
> -	uint64_t ctime;
> -
> -	if (ret != SD_RES_SUCCESS)
> -		goto out;
> -
> -	switch (hdr->opcode) {
> -	case SD_OP_NEW_VDI:
> -	{
> -		unsigned long nr = rsp->vdi_id;
> -		vprintf(SDOG_INFO, "done %d %ld\n", ret, nr);
> -		set_bit(nr, sys->vdi_inuse);
> -		break;
> -	}
> -	case SD_OP_DEL_VDI:
> -		break;
> -	case SD_OP_LOCK_VDI:
> -	case SD_OP_RELEASE_VDI:
> -	case SD_OP_GET_VDI_INFO:
> -	case SD_OP_GET_VDI_ATTR:
> -		break;
> -	case SD_OP_MAKE_FS:
> -		sys->nr_sobjs = ((struct sd_so_req *)hdr)->copies;
> -		sys->flags = ((struct sd_so_req *)hdr)->flags;
> -		if (!sys->nr_sobjs)
> -			sys->nr_sobjs = SD_DEFAULT_REDUNDANCY;
> -
> -		ctime = ((struct sd_so_req *)hdr)->ctime;
> -		set_cluster_ctime(ctime);
> -
> -		latest_epoch = get_latest_epoch();
> -		for (i = 1; i <= latest_epoch; i++)
> -			remove_epoch(i);
> -		memset(sys->vdi_inuse, 0, sizeof(sys->vdi_inuse));
> -
> -		sys->epoch = 1;
> -		sys->recovered_epoch = 1;
> -
> -		dprintf("write epoch log, %d, %d\n", sys->epoch, sys->nr_nodes);
> -		ret = epoch_log_write(sys->epoch, (char *)sys->nodes,
> -				      sys->nr_nodes * sizeof(struct sheepdog_node_list_entry));
> -		if (ret < 0)
> -			eprintf("can't write epoch %u\n", sys->epoch);
> -		update_epoch_store(sys->epoch);
> +	struct sd_op_template *op = get_sd_op(msg->req.opcode);
>  
> -		set_cluster_copies(sys->nr_sobjs);
> -		set_cluster_flags(sys->flags);
> +	if (ret == SD_RES_SUCCESS && op->post_process)
> +		ret = op->post_process((const struct sd_req *)&msg->req,
> +				       (struct sd_rsp *)&msg->rsp, msg->data);
>  
> -		if (sys_flag_nohalt())
> -			sys_stat_set(SD_STATUS_OK);
> -		else {
> -			int nr_zones = get_zones_nr_from(sys->nodes, sys->nr_nodes);
> -
> -			if (nr_zones >= sys->nr_sobjs)
> -				sys_stat_set(SD_STATUS_OK);
> -			else
> -				sys_stat_set(SD_STATUS_HALT);
> -		}
> -		break;
> -	case SD_OP_SHUTDOWN:
> -		sys_stat_set(SD_STATUS_SHUTDOWN);
> -		break;
> -	default:
> -		eprintf("unknown operation %d\n", hdr->opcode);
> -		ret = SD_RES_UNKNOWN;
> -	}
> -out:
>  	if (!is_myself(w->sender.addr, w->sender.port))
>  		return;
>  
>  	req = list_first_entry(&sys->pending_list, struct request, pending_list);
>  
> -	rsp->result = ret;
> -	memcpy(req->data, data, rsp->data_length);
> -	memcpy(&req->rp, rsp, sizeof(req->rp));
> +	msg->rsp.result = ret;
> +	memcpy(req->data, msg->data, msg->rsp.data_length);
> +	memcpy(&req->rp, &msg->rsp, sizeof(req->rp));
>  	list_del(&req->pending_list);
>  	req->done(req);
>  }
> @@ -1226,7 +990,7 @@ do_retry:
>  
>  		list_del(&cevent->cpg_event_list);
>  
> -		if (is_io_request(req->rq.opcode)) {
> +		if (is_io_op(req->op)) {
>  			int copies = sys->nr_sobjs;
>  
>  			if (copies > req->nr_zones)
> @@ -1282,7 +1046,7 @@ do_retry:
>  			}
>  		}
>  
> -		if (is_cluster_request(req->rq.opcode))
> +		if (is_cluster_op(req->op))
>  			queue_work(sys->cpg_wqueue, &req->work);
>  		else if (req->rq.flags & SD_FLAG_CMD_IO_LOCAL)
>  			queue_work(sys->io_wqueue, &req->work);
> diff --git a/sheep/ops.c b/sheep/ops.c
> new file mode 100644
> index 0000000..0d38e7b
> --- /dev/null
> +++ b/sheep/ops.c
> @@ -0,0 +1,427 @@
> +/*
> + * Copyright (C) 2011 Nippon Telegraph and Telephone Corporation.
> + *
> + * This program is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU General Public License version
> + * 2 as published by the Free Software Foundation.
> + *
> + * You should have received a copy of the GNU General Public License
> + * along with this program. If not, see <http://www.gnu.org/licenses/>.
> + */
> +#include <stdio.h>
> +#include <stdlib.h>
> +
> +#include "sheep_priv.h"
> +
> +static int process_new_vdi(const struct sd_req *req, struct sd_rsp *rsp,
> +			   void *data)
> +{
> +	const struct sd_vdi_req *hdr = (const struct sd_vdi_req *)req;
> +	struct sd_vdi_rsp *vdi_rsp = (struct sd_vdi_rsp *)rsp;
> +	uint32_t vid = 0, nr_copies = sys->nr_sobjs;
> +	int ret;
> +
> +	ret = add_vdi(hdr->epoch, data, hdr->data_length, hdr->vdi_size, &vid,
> +		      hdr->base_vdi_id, hdr->copies,
> +		      hdr->snapid, &nr_copies);
> +
> +	vdi_rsp->vdi_id = vid;
> +	vdi_rsp->copies = nr_copies;
> +
> +	return ret;
> +}
> +
> +static int post_process_new_vdi(const struct sd_req *req, struct sd_rsp *rsp,
> +				void *data)
> +{
> +	struct sd_vdi_rsp *vdi_rsp = (struct sd_vdi_rsp *)rsp;
> +	unsigned long nr = vdi_rsp->vdi_id;
> +	int ret = vdi_rsp->result;
> +
> +	vprintf(SDOG_INFO, "done %d %ld\n", ret, nr);
> +	set_bit(nr, sys->vdi_inuse);
> +
> +	return SD_RES_SUCCESS;
> +}
> +
> +static int process_del_vdi(const struct sd_req *req, struct sd_rsp *rsp,
> +			   void *data)
> +{
> +	const struct sd_vdi_req *hdr = (const struct sd_vdi_req *)req;
> +	struct sd_vdi_rsp *vdi_rsp = (struct sd_vdi_rsp *)rsp;
> +	uint32_t vid = 0, nr_copies = sys->nr_sobjs;
> +	int ret;
> +
> +	ret = del_vdi(hdr->epoch, data, hdr->data_length, &vid,
> +		      hdr->snapid, &nr_copies);
> +
> +	vdi_rsp->vdi_id = vid;
> +	vdi_rsp->copies = nr_copies;
> +
> +	return ret;
> +}
> +
> +static int process_get_vdi_info(const struct sd_req *req, struct sd_rsp *rsp,
> +				void *data)
> +{
> +	const struct sd_vdi_req *hdr = (const struct sd_vdi_req *)req;
> +	struct sd_vdi_rsp *vdi_rsp = (struct sd_vdi_rsp *)rsp;
> +	uint32_t vid = 0, nr_copies = sys->nr_sobjs;
> +	void *tag;
> +	int ret;
> +
> +	if (hdr->proto_ver != SD_PROTO_VER)
> +		return SD_RES_VER_MISMATCH;
> +
> +	if (hdr->data_length == SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN)
> +		tag = (char *)data + SD_MAX_VDI_LEN;
> +	else if (hdr->data_length == SD_MAX_VDI_LEN)
> +		tag = NULL;
> +	else
> +		return SD_RES_INVALID_PARMS;
> +
> +	ret = lookup_vdi(hdr->epoch, data, tag, &vid, hdr->snapid,
> +			 &nr_copies, NULL);
> +	if (ret != SD_RES_SUCCESS)
> +		return ret;
> +
> +	vdi_rsp->vdi_id = vid;
> +	vdi_rsp->copies = nr_copies;
> +
> +	return ret;
> +}
> +
> +static int post_process_make_fs(const struct sd_req *req, struct sd_rsp *rsp,
> +				void *data)
> +{
> +	const struct sd_so_req *hdr = (const struct sd_so_req *)req;
> +	int i, latest_epoch, ret;
> +	uint64_t ctime;
> +
> +	sys->nr_sobjs = hdr->copies;
> +	sys->flags = hdr->flags;
> +	if (!sys->nr_sobjs)
> +		sys->nr_sobjs = SD_DEFAULT_REDUNDANCY;
> +
> +	ctime = hdr->ctime;
> +	set_cluster_ctime(ctime);
> +
> +	latest_epoch = get_latest_epoch();
> +	for (i = 1; i <= latest_epoch; i++)
> +		remove_epoch(i);
> +	memset(sys->vdi_inuse, 0, sizeof(sys->vdi_inuse));
> +
> +	sys->epoch = 1;
> +	sys->recovered_epoch = 1;
> +
> +	dprintf("write epoch log, %d, %d\n", sys->epoch, sys->nr_nodes);
> +	ret = epoch_log_write(sys->epoch, (char *)sys->nodes,
> +			      sys->nr_nodes * sizeof(struct sheepdog_node_list_entry));
> +	if (ret < 0) {
> +		eprintf("can't write epoch %u\n", sys->epoch);
> +		return SD_RES_EIO;
> +	}
> +	update_epoch_store(sys->epoch);
> +
> +	set_cluster_copies(sys->nr_sobjs);
> +	set_cluster_flags(sys->flags);
> +
> +	if (sys_flag_nohalt())
> +		sys_stat_set(SD_STATUS_OK);
> +	else {
> +		int nr_zones = get_zones_nr_from(sys->nodes, sys->nr_nodes);
> +
> +		if (nr_zones >= sys->nr_sobjs)
> +			sys_stat_set(SD_STATUS_OK);
> +		else
> +			sys_stat_set(SD_STATUS_HALT);
> +	}
> +
> +	return SD_RES_SUCCESS;
> +}
> +
> +static int post_process_shutdown(const struct sd_req *req, struct sd_rsp *rsp,
> +				 void *data)
> +{
> +	sys_stat_set(SD_STATUS_SHUTDOWN);
> +
> +	return SD_RES_SUCCESS;
> +}
> +
> +static int process_get_vdi_attr(const struct sd_req *req, struct sd_rsp *rsp,
> +				void *data)
> +{
> +	const struct sd_vdi_req *hdr = (const struct sd_vdi_req *)req;
> +	struct sd_vdi_rsp *vdi_rsp = (struct sd_vdi_rsp *)rsp;
> +	uint32_t vid = 0, attrid = 0, nr_copies = sys->nr_sobjs;
> +	uint64_t ctime = 0;
> +	int ret;
> +	struct sheepdog_vdi_attr *vattr;
> +
> +	vattr = data;
> +	ret = lookup_vdi(hdr->epoch, vattr->name, vattr->tag,
> +			 &vid, hdr->snapid, &nr_copies, &ctime);
> +	if (ret != SD_RES_SUCCESS)
> +		return ret;
> +
> +	/* the curernt vdi id can change if we take the snapshot,
> +	   so we use the hash value of the vdi name as the vdi id */
> +	vid = fnv_64a_buf(vattr->name, strlen(vattr->name), FNV1A_64_INIT);
> +	vid &= SD_NR_VDIS - 1;
> +	ret = get_vdi_attr(hdr->epoch, data, hdr->data_length, vid,
> +			   &attrid, nr_copies, ctime,
> +			   hdr->flags & SD_FLAG_CMD_CREAT,
> +			   hdr->flags & SD_FLAG_CMD_EXCL,
> +			   hdr->flags & SD_FLAG_CMD_DEL);
> +
> +	vdi_rsp->vdi_id = vid;
> +	vdi_rsp->attr_id = attrid;
> +	vdi_rsp->copies = nr_copies;
> +
> +	return ret;
> +}
> +
> +static int post_process_read_vdis(const struct sd_req *req, struct sd_rsp *rsp,
> +				  void *data)
> +{
> +	return read_vdis(data, req->data_length, &rsp->data_length);
> +}
> +
> +static int get_node_idx(struct sheepdog_node_list_entry *ent,
> +			struct sheepdog_node_list_entry *entries, int nr_nodes)
> +{
> +	ent = bsearch(ent, entries, nr_nodes, sizeof(*ent), node_cmp);
> +	if (!ent)
> +		return -1;
> +
> +	return ent - entries;
> +}
> +
> +static int post_process_get_node_list(const struct sd_req *req, struct sd_rsp *rsp,
> +				      void *data)
> +{
> +	struct sd_node_rsp *node_rsp = (struct sd_node_rsp *)rsp;
> +	int nr_nodes;
> +
> +	nr_nodes = sys->nr_nodes;
> +	memcpy(data, sys->nodes, sizeof(*sys->nodes) * nr_nodes);
> +	node_rsp->data_length = nr_nodes * sizeof(struct sheepdog_node_list_entry);
> +	node_rsp->nr_nodes = nr_nodes;
> +	node_rsp->local_idx = get_node_idx(&sys->this_node, data, nr_nodes);
> +	node_rsp->master_idx = -1;
> +
> +	return SD_RES_SUCCESS;
> +}
> +
> +static int process_stat_sheep(const struct sd_req *req, struct sd_rsp *rsp,
> +			      void *data)
> +{
> +	struct sd_node_rsp *node_rsp = (struct sd_node_rsp *)rsp;
> +	uint32_t epoch = req->epoch;
> +
> +	return stat_sheep(&node_rsp->store_size, &node_rsp->store_free, epoch);
> +}
> +
> +static int process_stat_cluster(const struct sd_req *req, struct sd_rsp *rsp,
> +				void *data)
> +{
> +	struct epoch_log *log;
> +	int i, max_logs, epoch;
> +	uint32_t sys_stat = sys_stat_get();
> +
> +	max_logs = rsp->data_length / sizeof(*log);
> +	epoch = get_latest_epoch();
> +	rsp->data_length = 0;
> +	for (i = 0; i < max_logs; i++) {
> +		if (epoch <= 0)
> +			break;
> +
> +		log = (struct epoch_log *)data + i;
> +		log->epoch = epoch;
> +		log->ctime = get_cluster_ctime();
> +		log->nr_nodes = epoch_log_read(epoch, (char *)log->nodes,
> +					       sizeof(log->nodes));
> +		if (log->nr_nodes == -1)
> +			log->nr_nodes = epoch_log_read_remote(epoch,
> +							      (char *)log->nodes,
> +							      sizeof(log->nodes));
> +
> +		rsp->data_length += sizeof(*log);
> +		log->nr_nodes /= sizeof(log->nodes[0]);
> +		epoch--;
> +	}
> +
> +	switch (sys_stat) {
> +	case SD_STATUS_OK:
> +		return SD_RES_SUCCESS;
> +	case SD_STATUS_WAIT_FOR_FORMAT:
> +		return SD_RES_WAIT_FOR_FORMAT;
> +	case SD_STATUS_WAIT_FOR_JOIN:
> +		return SD_RES_WAIT_FOR_JOIN;
> +	case SD_STATUS_SHUTDOWN:
> +		return SD_RES_SHUTDOWN;
> +	case SD_STATUS_JOIN_FAILED:
> +		return SD_RES_JOIN_FAILED;
> +	case SD_STATUS_HALT:
> +		return SD_RES_HALT;
> +	default:
> +		return SD_RES_SYSTEM_ERROR;
> +	}
> +}
> +
> +static int process_kill_node(const struct sd_req *req, struct sd_rsp *rsp,
> +			     void *data)
> +{
> +	exit(1);
> +}
> +
> +static int process_get_obj_list(const struct sd_req *req, struct sd_rsp *rsp,
> +				void *data)
> +{
> +	return get_obj_list((const struct sd_list_req *)req,
> +			    (struct sd_list_rsp *)rsp, data);
> +}
> +
> +static int process_get_epoch(const struct sd_req *req, struct sd_rsp *rsp,
> +			     void *data)
> +{
> +	const struct sd_obj_req *obj_req = (const struct sd_obj_req *)req;
> +	struct sd_obj_rsp *obj_rsp = (struct sd_obj_rsp *)rsp;
> +	int epoch = obj_req->tgt_epoch;
> +	int len, ret;
> +	dprintf("%d\n", epoch);
> +	len = epoch_log_read(epoch, (char *)data, obj_req->data_length);
> +	if (len == -1) {
> +		ret = SD_RES_NO_TAG;
> +		obj_rsp->data_length = 0;
> +	} else {
> +		ret = SD_RES_SUCCESS;
> +		obj_rsp->data_length = len;
> +	}
> +	return ret;
> +}
> +
> +static struct sd_op_template sd_ops[] = {
> +
> +	/* cluster operations */
> +	[SD_OP_NEW_VDI] = {
> +		.type = SD_OP_TYPE_CLUSTER,
> +		.process = process_new_vdi,
> +		.post_process = post_process_new_vdi,
> +	},
> +
> +	[SD_OP_DEL_VDI] = {
> +		.type = SD_OP_TYPE_CLUSTER,
> +		.process = process_del_vdi,
> +	},
> +
> +	[SD_OP_GET_VDI_INFO] = {
> +		.type = SD_OP_TYPE_CLUSTER,
> +		.process = process_get_vdi_info,
> +	},
> +
> +	[SD_OP_LOCK_VDI] = {
> +		.type = SD_OP_TYPE_CLUSTER,
> +		.process = process_get_vdi_info,
> +	},
> +
> +	[SD_OP_MAKE_FS] = {
> +		.type = SD_OP_TYPE_CLUSTER,
> +		.available_always = 1,
> +		.post_process = post_process_make_fs,
> +	},
> +
> +	[SD_OP_SHUTDOWN] = {
> +		.type = SD_OP_TYPE_CLUSTER,
> +		.post_process = post_process_shutdown,
> +	},
> +
> +	[SD_OP_GET_VDI_ATTR] = {
> +		.type = SD_OP_TYPE_CLUSTER,
> +		.process = process_get_vdi_attr,
> +	},
> +
> +	[SD_OP_RELEASE_VDI] = {
> +		.type = SD_OP_TYPE_CLUSTER,
> +	},
> +
> +	/* store operations */
> +	[SD_OP_READ_VDIS] = {
> +		.type = SD_OP_TYPE_STORE,
> +		.available_always = 1,
> +		.post_process = post_process_read_vdis,
> +	},
> +
> +	[SD_OP_GET_NODE_LIST] = {
> +		.type = SD_OP_TYPE_STORE,
> +		.available_always = 1,
> +		.post_process = post_process_get_node_list,
> +	},
> +
> +	[SD_OP_STAT_SHEEP] = {
> +		.type = SD_OP_TYPE_STORE,
> +		.process = process_stat_sheep,
> +	},
> +
> +	[SD_OP_STAT_CLUSTER] = {
> +		.type = SD_OP_TYPE_STORE,
> +		.available_always = 1,
> +		.process = process_stat_cluster,
> +	},
> +
> +	[SD_OP_KILL_NODE] = {
> +		.type = SD_OP_TYPE_STORE,
> +		.available_always = 1,
> +		.process = process_kill_node,
> +	},
> +
> +	[SD_OP_GET_OBJ_LIST] = {
> +		.type = SD_OP_TYPE_STORE,
> +		.process = process_get_obj_list,
> +	},
> +
> +	[SD_OP_GET_EPOCH] = {
> +		.type = SD_OP_TYPE_STORE,
> +		.process = process_get_epoch,
> +	},
> +
> +	/* I/O operations */
> +	[SD_OP_CREATE_AND_WRITE_OBJ] = {
> +		.type = SD_OP_TYPE_IO,
> +	},
> +
> +	[SD_OP_READ_OBJ] = {
> +		.type = SD_OP_TYPE_IO,
> +	},
> +
> +	[SD_OP_WRITE_OBJ] = {
> +		.type = SD_OP_TYPE_IO,
> +	},
> +
> +	[SD_OP_REMOVE_OBJ] = {
> +		.type = SD_OP_TYPE_IO,
> +	},
> +};
> +
> +struct sd_op_template *get_sd_op(uint8_t opcode)
> +{
> +	if (sd_ops[opcode].type == 0)
> +		return NULL;
> +
> +	return sd_ops + opcode;
> +}
> +
> +int is_cluster_op(struct sd_op_template *op)
> +{
> +	return op->type == SD_OP_TYPE_CLUSTER;
> +}
> +
> +int is_store_op(struct sd_op_template *op)
> +{
> +	return op->type == SD_OP_TYPE_STORE;
> +}
> +
> +int is_io_op(struct sd_op_template *op)
> +{
> +	return op->type == SD_OP_TYPE_IO;
> +}
> diff --git a/sheep/sdnet.c b/sheep/sdnet.c
> index 474b1be..7d7e142 100644
> --- a/sheep/sdnet.c
> +++ b/sheep/sdnet.c
> @@ -19,46 +19,6 @@
>  
>  #include "sheep_priv.h"
>  
> -int is_io_request(unsigned op)
> -{
> -	int ret = 0;
> -
> -	switch (op) {
> -	case SD_OP_CREATE_AND_WRITE_OBJ:
> -	case SD_OP_REMOVE_OBJ:
> -	case SD_OP_READ_OBJ:
> -	case SD_OP_WRITE_OBJ:
> -		ret = 1;
> -		break;
> -	default:
> -		break;
> -	}
> -
> -	return ret;
> -}
> -
> -int is_cluster_request(unsigned op)
> -{
> -	int ret = 0;
> -
> -	switch (op) {
> -	case SD_OP_NEW_VDI:
> -	case SD_OP_DEL_VDI:
> -	case SD_OP_LOCK_VDI:
> -	case SD_OP_RELEASE_VDI:
> -	case SD_OP_GET_VDI_INFO:
> -	case SD_OP_MAKE_FS:
> -	case SD_OP_SHUTDOWN:
> -	case SD_OP_GET_VDI_ATTR:
> -		ret = 1;
> -		break;
> -	default:
> -		break;
> -	}
> -
> -	return ret;
> -}
> -
>  void resume_pending_requests(void)
>  {
>  	struct request *next, *tmp;
> @@ -119,18 +79,21 @@ static void setup_access_to_local_objects(struct request *req)
>  static void __done(struct work *work, int idx)
>  {
>  	struct request *req = container_of(work, struct request, work);
> -	struct sd_req *hdr = (struct sd_req *)&req->rq;
>  	int again = 0;
>  	int copies = sys->nr_sobjs;
>  
>  	if (copies > req->nr_zones)
>  		copies = req->nr_zones;
>  
> -	if (is_cluster_request(hdr->opcode))
> +	if (is_cluster_op(req->op))
>  		/* request is forwarded to cpg group */
>  		return;
>  
> -	if (is_io_request(hdr->opcode)) {
> +	if (is_store_op(req->op) && req->op->post_process)
> +		req->rp.result = req->op->post_process(&req->rq, &req->rp,
> +						       req->data);
> +
> +	if (is_io_op(req->op)) {
>  		struct cpg_event *cevent = &req->cev;
>  
>  		list_del(&req->r_wlist);
> @@ -225,39 +188,15 @@ static void queue_request(struct request *req)
>  
>  	dprintf("%x\n", hdr->opcode);
>  
> -	if (hdr->opcode == SD_OP_KILL_NODE) {
> -		log_close();
> -		exit(1);
> -	}
> -
>  	if (sys->status == SD_STATUS_SHUTDOWN) {
>  		rsp->result = SD_RES_SHUTDOWN;
>  		req->done(req);
>  		return;
>  	}
>  
> -	/*
> -	 * we can know why this node failed to join with
> -	 * SD_OP_STAT_CLUSTER, so the request should be handled even
> -	 * when the cluster status is SD_STATUS_JOIN_FAILED
> -	 */
> -	if (sys->status == SD_STATUS_JOIN_FAILED &&
> -	    hdr->opcode != SD_OP_STAT_CLUSTER) {
> -		rsp->result = SD_RES_JOIN_FAILED;
> -		req->done(req);
> -		return;
> -	}
> -
>  	if (sys->status == SD_STATUS_WAIT_FOR_FORMAT ||
>  	    sys->status == SD_STATUS_WAIT_FOR_JOIN) {
> -		/* TODO: cleanup */
> -		switch (hdr->opcode) {
> -		case SD_OP_STAT_CLUSTER:
> -		case SD_OP_MAKE_FS:
> -		case SD_OP_GET_NODE_LIST:
> -		case SD_OP_READ_VDIS:
> -			break;
> -		default:
> +		if (!req->op->available_always) {
>  			if (sys->status == SD_STATUS_WAIT_FOR_FORMAT)
>  				rsp->result = SD_RES_WAIT_FOR_FORMAT;
>  			else
> @@ -267,33 +206,11 @@ static void queue_request(struct request *req)
>  		}
>  	}
>  
> -	switch (hdr->opcode) {
> -	case SD_OP_CREATE_AND_WRITE_OBJ:
> -	case SD_OP_REMOVE_OBJ:
> -	case SD_OP_READ_OBJ:
> -	case SD_OP_WRITE_OBJ:
> -	case SD_OP_STAT_SHEEP:
> -	case SD_OP_GET_OBJ_LIST:
> +	if (is_io_op(req->op) || is_store_op(req->op))
>  		req->work.fn = store_queue_request;
> -		break;
> -	case SD_OP_GET_NODE_LIST:
> -	case SD_OP_NEW_VDI:
> -	case SD_OP_DEL_VDI:
> -	case SD_OP_LOCK_VDI:
> -	case SD_OP_RELEASE_VDI:
> -	case SD_OP_GET_VDI_INFO:
> -	case SD_OP_MAKE_FS:
> -	case SD_OP_SHUTDOWN:
> -	case SD_OP_STAT_CLUSTER:
> -	case SD_OP_GET_VDI_ATTR:
> -	case SD_OP_GET_EPOCH:
> +	else if (is_cluster_op(req->op))
>  		req->work.fn = cluster_queue_request;
> -		break;
> -	case SD_OP_READ_VDIS:
> -		rsp->result = read_vdis(req->data, hdr->data_length, &rsp->data_length);
> -		req->done(req);
> -		return;
> -	default:
> +	else {
>  		eprintf("unknown operation %d\n", hdr->opcode);
>  		rsp->result = SD_RES_SYSTEM_ERROR;
>  		req->done(req);
> @@ -314,7 +231,7 @@ static void queue_request(struct request *req)
>  		hdr->epoch = sys->epoch;
>  
>  	setup_ordered_sd_vnode_list(req);
> -	if (is_io_request(hdr->opcode))
> +	if (is_io_op(req->op))
>  		setup_access_to_local_objects(req);
>  
>  	cevent->ctype = CPG_EVENT_REQUEST;
> @@ -410,6 +327,12 @@ static void client_rx_handler(struct client_info *ci)
>  
>  		/* use le_to_cpu */
>  		memcpy(&req->rq, hdr, sizeof(req->rq));
> +		req->op = get_sd_op(hdr->opcode);
> +		if (!req->op) {
> +			eprintf("invalid opcode, %d\n", hdr->opcode);
> +			conn->c_rx_state = C_IO_CLOSED;
> +			break;
> +		}
>  
>  		if (data_len && hdr->flags & SD_FLAG_CMD_WRITE) {
>  			conn->c_rx_state = C_IO_DATA;
> diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
> index 65bc3aa..1fe2d4d 100644
> --- a/sheep/sheep_priv.h
> +++ b/sheep/sheep_priv.h
> @@ -72,6 +72,8 @@ struct request {
>  	struct sd_req rq;
>  	struct sd_rsp rp;
>  
> +	struct sd_op_template *op;
> +
>  	void *data;
>  
>  	struct client_info *ci;
> @@ -152,8 +154,6 @@ extern struct cluster_info *sys;
>  
>  int create_listen_port(int port, void *data);
>  
> -int is_io_request(unsigned op);
> -int is_cluster_request(unsigned op);
>  int init_store(const char *dir);
>  int init_base_path(const char *dir);
>  
> @@ -173,6 +173,7 @@ int get_vdi_attr(uint32_t epoch, struct sheepdog_vdi_attr *vattr, int data_len,
>  		 uint32_t vid, uint32_t *attrid, int copies, uint64_t ctime,
>  		 int write, int excl, int delete);
>  
> +int get_zones_nr_from(struct sheepdog_node_list_entry *nodes, int nr_nodes);
>  void setup_ordered_sd_vnode_list(struct request *req);
>  void get_ordered_sd_vnode_list(struct sheepdog_vnode_list_entry *entries,
>  			       int *nr_vnodes, int *nr_zones);
> @@ -215,6 +216,8 @@ int get_latest_epoch(void);
>  int remove_epoch(int epoch);
>  int set_cluster_ctime(uint64_t ctime);
>  uint64_t get_cluster_ctime(void);
> +int stat_sheep(uint64_t *store_size, uint64_t *store_free, uint32_t epoch);
> +int get_obj_list(const struct sd_list_req *hdr, struct sd_list_rsp *rsp, void *data);
>  
>  int start_recovery(uint32_t epoch);
>  void resume_recovery_work(void);
> @@ -235,6 +238,47 @@ int remove_object(struct sheepdog_vnode_list_entry *e,
>  int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx,
>  		 uint32_t epoch, int worker_idx);
>  
> +/* Operations */
> +
> +enum sd_op_type {
> +	SD_OP_TYPE_CLUSTER = 1, /* cluster operations */
> +	SD_OP_TYPE_STORE,       /* store operations */
> +	SD_OP_TYPE_IO,          /* io operations */
> +};
> +
> +struct sd_op_template {
> +	enum sd_op_type type;
> +
> +	/* process request even when cluster is not working */
> +	int available_always;
> +
> +	/*
> +	 * process() will be called in the worker thread, and
> +	 * post_process() will be called in the main thread.
> +	 *
> +	 * If type is SD_OP_TYPE_CLUSTER, it is guaranteed that only
> +	 * one node processes a cluster operation at the same time.
> +	 * We can use this for something like distributed locking.
> +	 * process() will be called on the local node, and
> +	 * post_process() will be called on every nodes.
> +	 *
> +	 * If type is SD_OP_TYPE_STORE, both process() and
> +	 * post_process() will be called on the local node.
> +	 *
> +	 * If type is SD_OP_TYPE_IO, neither process() nor
> +	 * post_process() is used because this type of operation is
> +	 * heavily intertwined with Sheepdog core codes.  We will be
> +	 * unlikely to add new operations of this type.
> +	 */
> +	int (*process)(const struct sd_req *req, struct sd_rsp *rsp, void *data);
> +	int (*post_process)(const struct sd_req *req, struct sd_rsp *rsp, void *data);
> +};
> +
> +struct sd_op_template *get_sd_op(uint8_t opcode);
> +int is_cluster_op(struct sd_op_template *op);
> +int is_store_op(struct sd_op_template *op);
> +int is_io_op(struct sd_op_template *op);
> +
>  /* Journal */
>  int jrnl_perform(int fd, void *buf, size_t count, off_t offset,
>  		 const char *path, const char *jrnl_dir);
> diff --git a/sheep/store.c b/sheep/store.c
> index ee8996e..83644db 100644
> --- a/sheep/store.c
> +++ b/sheep/store.c
> @@ -52,7 +52,7 @@ static int obj_cmp(const void *oid1, const void *oid2)
>  	return 0;
>  }
>  
> -static int stat_sheep(uint64_t *store_size, uint64_t *store_free, uint32_t epoch)
> +int stat_sheep(uint64_t *store_size, uint64_t *store_free, uint32_t epoch)
>  {
>  	struct statvfs vs;
>  	int ret;
> @@ -96,16 +96,14 @@ static int merge_objlist(struct sheepdog_vnode_list_entry *entries, int nr_entri
>  			 uint64_t *list1, int nr_list1,
>  			 uint64_t *list2, int nr_list2, int nr_objs);
>  
> -static int get_obj_list(struct request *req)
> +int get_obj_list(const struct sd_list_req *hdr, struct sd_list_rsp *rsp, void *data)
>  {
>  	DIR *dir;
>  	struct dirent *d;
> -	struct sd_list_req *hdr = (struct sd_list_req *)&req->rq;
> -	struct sd_list_rsp *rsp = (struct sd_list_rsp *)&req->rp;
>  	uint64_t oid;
>  	uint32_t epoch;
>  	char path[1024];
> -	uint64_t *p = (uint64_t *)req->data;
> +	uint64_t *p = (uint64_t *)data;
>  	int nr = 0;
>  	uint64_t *objlist = NULL;
>  	int obj_nr, i;
> @@ -778,20 +776,15 @@ void store_queue_request(struct work *work, int idx)
>  	uint64_t oid = hdr->oid;
>  	uint32_t opcode = hdr->opcode;
>  	uint32_t epoch = hdr->epoch;
> -	struct sd_node_rsp *nrsp = (struct sd_node_rsp *)&req->rp;
>  
>  	dprintf("%"PRIu32", %x, %" PRIx64" , %u\n", idx, opcode, oid, epoch);
>  
>  	if (hdr->flags & SD_FLAG_CMD_RECOVERY)
>  		epoch = hdr->tgt_epoch;
>  
> -	if (opcode == SD_OP_STAT_SHEEP) {
> -		ret = stat_sheep(&nrsp->store_size, &nrsp->store_free, epoch);
> -		goto out;
> -	}
> -
> -	if (opcode == SD_OP_GET_OBJ_LIST) {
> -		ret = get_obj_list(req);
> +	if (is_store_op(req->op)) {
> +		if (req->op->process)
> +			ret = req->op->process(&req->rq, &req->rp, req->data);
>  		goto out;
>  	}
>  


for i in 0 1; do ./sheep/sheep /store/$i -z $i -p 700$i;sleep 1;done
collie/collie cluster format
for i in 0 1; do ./collie/collie cluster info -p 700$i; done

Cluster status: The sheepdog is stopped doing IO, short of living nodes

Creation time        Epoch Nodes
Cluster status: The sheepdog is stopped doing IO, short of living nodes

Creation time        Epoch Nodes


Seems that we lost the epoch history if the cluster status is not ok.
I have checked the code, finding nothing wrong, weird.

Thanks,
Yuan



More information about the sheepdog mailing list