[Sheepdog] [RFC PATCH] sheep: introduce sd_op_template

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Mon Oct 24 06:17:07 CEST 2011


At Sat, 22 Oct 2011 13:14:37 +0800,
Liu Yuan wrote:
> 
> On 10/21/2011 04:28 PM, MORITA Kazutaka wrote:
> 
> > When we want to add a new operation (SD_OP_xxxxx), it is not clear
> > which codes we should modify.  And in some cases, we need to modify
> > codes everywhere to implement one operation.  This is not a good
> > design.
> > 
> > This patch abstracts out Sheepdog operations into sd_op_template, and
> > moves all the request processing codes to sheep/ops.c.
> > 
> > The definition of sd_op_template is as follows:
> > 
> > struct sd_op_template {
> >         enum sd_op_type type;
> > 
> >         int available_always;
> > 
> >         int (*process)(const struct sd_req *req, struct sd_rsp *rsp,
> >                        void *data);
> >         int (*post_process)(const struct sd_req *req, struct sd_rsp *rsp,
> >                             void *data);
> > };
> > 
> > 'type' is the type of the operation; SD_OP_TYPE_CLUSTER,
> > SD_OP_TYPE_STORE, or SD_OP_TYPE_IO.
> > 
> > 'available_always' is set to non-zero if the operations should be
> > processed even when the cluster is not working.
> > 
> > 'process()' and 'post_process()' are the main functions of this
> > operation.  process() will be called in the worker thread, and
> > post_process() will be called in the main thread.
> > 
> > If type is SD_OP_TYPE_CLUSTER, it is guaranteed that only one node
> > processes a cluster operation at the same time.  We can use this for
> > something like distributed locking.  process() will be called on the
> > local node, and post_process() will be called on every nodes.
> > 
> > If type is SD_OP_TYPE_STORE, both process() and post_process() will be
> > called on the local node.
> > 
> > If type is SD_OP_TYPE_IO, neither process() nor post_process() is used
> > because this type of operation is heavily intertwined with Sheepdog
> > core codes.  We will be unlikely to add new operations of this type.
> > 
> > Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
> > ---
> >  sheep/Makefile.am  |    2 +-
> >  sheep/group.c      |  286 +++--------------------------------
> >  sheep/ops.c        |  427 ++++++++++++++++++++++++++++++++++++++++++++++++++++
> >  sheep/sdnet.c      |  111 ++------------
> >  sheep/sheep_priv.h |   48 ++++++-
> >  sheep/store.c      |   19 +--
> >  6 files changed, 522 insertions(+), 371 deletions(-)
> >  create mode 100644 sheep/ops.c
> > 
> > diff --git a/sheep/Makefile.am b/sheep/Makefile.am
> > index 2b9d58f..3db914d 100644
> > --- a/sheep/Makefile.am
> > +++ b/sheep/Makefile.am
> > @@ -23,7 +23,7 @@ INCLUDES		= -I$(top_builddir)/include -I$(top_srcdir)/include $(libcpg_CFLAGS) $
> >  
> >  sbin_PROGRAMS		= sheep
> >  
> > -sheep_SOURCES		= sheep.c group.c sdnet.c store.c vdi.c work.c journal.c \
> > +sheep_SOURCES		= sheep.c group.c sdnet.c store.c vdi.c work.c journal.c ops.c \
> >  			  cluster/corosync.c
> >  sheep_LDADD	  	= $(libcpg_LIBS) $(libcfg_LIBS) ../lib/libsheepdog.a -lpthread
> >  sheep_DEPENDENCIES	= ../lib/libsheepdog.a
> > diff --git a/sheep/group.c b/sheep/group.c
> > index e22dabc..8664b6f 100644
> > --- a/sheep/group.c
> > +++ b/sheep/group.c
> > @@ -101,17 +101,7 @@ static size_t get_join_message_size(struct join_message *jm)
> >  	return sizeof(*jm) + jm->nr_nodes * sizeof(jm->nodes[0]);
> >  }
> >  
> > -static int get_node_idx(struct sheepdog_node_list_entry *ent,
> > -			struct sheepdog_node_list_entry *entries, int nr_nodes)
> > -{
> > -	ent = bsearch(ent, entries, nr_nodes, sizeof(*ent), node_cmp);
> > -	if (!ent)
> > -		return -1;
> > -
> > -	return ent - entries;
> > -}
> > -
> > -static int get_zones_nr_from(struct sheepdog_node_list_entry *nodes, int nr_nodes)
> > +int get_zones_nr_from(struct sheepdog_node_list_entry *nodes, int nr_nodes)
> >  {
> >  	int nr_zones = 0, i, j;
> >  	uint32_t zones[SD_MAX_REDUNDANCY];
> > @@ -146,116 +136,28 @@ void setup_ordered_sd_vnode_list(struct request *req)
> >  	get_ordered_sd_vnode_list(req->entry, &req->nr_vnodes, &req->nr_zones);
> >  }
> >  
> > -static void get_node_list(struct sd_node_req *req,
> > -			  struct sd_node_rsp *rsp, void *data)
> > +static void do_cluster_op(void *arg)
> >  {
> > -	int nr_nodes;
> > -
> > -	nr_nodes = sys->nr_nodes;
> > -	memcpy(data, sys->nodes, sizeof(*sys->nodes) * nr_nodes);
> > -	rsp->data_length = nr_nodes * sizeof(struct sheepdog_node_list_entry);
> > -	rsp->nr_nodes = nr_nodes;
> > -	rsp->local_idx = get_node_idx(&sys->this_node, data, nr_nodes);
> > -	rsp->master_idx = -1;
> > -}
> > +	struct vdi_op_message *msg = arg;
> > +	int ret;
> > +	struct request *req;
> >  
> > -static int get_epoch(struct sd_obj_req *req,
> > -		      struct sd_obj_rsp *rsp, void *data)
> > -{
> > -	int epoch = req->tgt_epoch;
> > -	int len, ret;
> > -	dprintf("%d\n", epoch);
> > -	len = epoch_log_read(epoch, (char *)data, req->data_length);
> > -	if (len == -1) {
> > -		ret = SD_RES_NO_TAG;
> > -		rsp->data_length = 0;
> > -	} else {
> > -		ret = SD_RES_SUCCESS;
> > -		rsp->data_length = len;
> > -	}
> > -	return ret;
> > -}
> > +	req = list_first_entry(&sys->pending_list, struct request, pending_list);
> > +	ret = req->op->process((const struct sd_req *)&msg->req,
> > +			       (struct sd_rsp *)&msg->rsp, req->data);
> >  
> > -static void vdi_op(void *arg);
> > +	msg->rsp.result = ret;
> > +}
> >  
> >  void cluster_queue_request(struct work *work, int idx)
> >  {
> >  	struct request *req = container_of(work, struct request, work);
> >  	struct sd_req *hdr = (struct sd_req *)&req->rq;
> > -	struct sd_rsp *rsp = (struct sd_rsp *)&req->rp;
> >  	struct vdi_op_message *msg;
> > -	struct epoch_log *log;
> > -	int ret = SD_RES_SUCCESS, i, max_logs, epoch;
> > -	uint32_t sys_stat = sys_stat_get();
> >  	size_t size;
> >  
> >  	eprintf("%p %x\n", req, hdr->opcode);
> >  
> > -	switch (hdr->opcode) {
> > -	case SD_OP_GET_EPOCH:
> > -		ret = get_epoch((struct sd_obj_req *)hdr,
> > -			  (struct sd_obj_rsp *)rsp, req->data);
> > -		break;
> > -	case SD_OP_GET_NODE_LIST:
> > -		get_node_list((struct sd_node_req *)hdr,
> > -			      (struct sd_node_rsp *)rsp, req->data);
> > -		break;
> > -	case SD_OP_STAT_CLUSTER:
> > -		max_logs = rsp->data_length / sizeof(*log);
> > -		epoch = get_latest_epoch();
> > -		rsp->data_length = 0;
> > -		for (i = 0; i < max_logs; i++) {
> > -			if (epoch <= 0)
> > -				break;
> > -
> > -			log = (struct epoch_log *)req->data + i;
> > -			log->epoch = epoch;
> > -			log->ctime = get_cluster_ctime();
> > -			log->nr_nodes = epoch_log_read(epoch, (char *)log->nodes,
> > -						       sizeof(log->nodes));
> > -			if (log->nr_nodes == -1)
> > -				log->nr_nodes = epoch_log_read_remote(epoch,
> > -								      (char *)log->nodes,
> > -								      sizeof(log->nodes));
> > -
> > -			rsp->data_length += sizeof(*log);
> > -			log->nr_nodes /= sizeof(log->nodes[0]);
> > -			epoch--;
> > -		}
> > -
> > -		switch (sys_stat) {
> > -		case SD_STATUS_OK:
> > -			ret = SD_RES_SUCCESS;
> > -			break;
> > -		case SD_STATUS_WAIT_FOR_FORMAT:
> > -			ret = SD_RES_WAIT_FOR_FORMAT;
> > -			break;
> > -		case SD_STATUS_WAIT_FOR_JOIN:
> > -			ret = SD_RES_WAIT_FOR_JOIN;
> > -			break;
> > -		case SD_STATUS_SHUTDOWN:
> > -			ret = SD_RES_SHUTDOWN;
> > -			break;
> > -		case SD_STATUS_JOIN_FAILED:
> > -			ret = SD_RES_JOIN_FAILED;
> > -			break;
> > -		case SD_STATUS_HALT:
> > -			ret = SD_RES_HALT;
> > -			break;
> > -		default:
> > -			ret = SD_RES_SYSTEM_ERROR;
> > -			break;
> > -		}
> > -		break;
> > -	default:
> > -		/* forward request to group */
> > -		goto forward;
> > -	}
> > -
> > -	rsp->result = ret;
> > -	return;
> > -
> > -forward:
> >  	if (hdr->flags & SD_FLAG_CMD_WRITE)
> >  		size = sizeof(*msg);
> >  	else
> > @@ -272,7 +174,12 @@ forward:
> >  
> >  	list_add(&req->pending_list, &sys->pending_list);
> >  
> > -	sys->cdrv->notify(msg, size, vdi_op);
> > +	if (req->op->process)
> > +		sys->cdrv->notify(msg, size, do_cluster_op);
> > +	else {
> > +		msg->rsp.result = SD_RES_SUCCESS;
> > +		sys->cdrv->notify(msg, size, NULL);
> > +	}
> >  
> >  	free(msg);
> >  }
> > @@ -628,85 +535,6 @@ join_finished:
> >  	return;
> >  }
> >  
> > -static void vdi_op(void *arg)
> > -{
> > -	struct vdi_op_message *msg = arg;
> > -	const struct sd_vdi_req *hdr = &msg->req;
> > -	struct sd_vdi_rsp *rsp = &msg->rsp;
> > -	void *data, *tag;
> > -	int ret = SD_RES_SUCCESS;
> > -	struct sheepdog_vdi_attr *vattr;
> > -	uint32_t vid = 0, attrid = 0, nr_copies = sys->nr_sobjs;
> > -	uint64_t ctime = 0;
> > -	struct request *req;
> > -
> > -	req = list_first_entry(&sys->pending_list, struct request, pending_list);
> > -	data = req->data;
> > -
> > -	switch (hdr->opcode) {
> > -	case SD_OP_NEW_VDI:
> > -		ret = add_vdi(hdr->epoch, data, hdr->data_length, hdr->vdi_size, &vid,
> > -			      hdr->base_vdi_id, hdr->copies,
> > -			      hdr->snapid, &nr_copies);
> > -		break;
> > -	case SD_OP_DEL_VDI:
> > -		ret = del_vdi(hdr->epoch, data, hdr->data_length, &vid,
> > -			      hdr->snapid, &nr_copies);
> > -		break;
> > -	case SD_OP_LOCK_VDI:
> > -	case SD_OP_GET_VDI_INFO:
> > -		if (hdr->proto_ver != SD_PROTO_VER) {
> > -			ret = SD_RES_VER_MISMATCH;
> > -			break;
> > -		}
> > -		if (hdr->data_length == SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN)
> > -			tag = (char *)data + SD_MAX_VDI_LEN;
> > -		else if (hdr->data_length == SD_MAX_VDI_LEN)
> > -			tag = NULL;
> > -		else {
> > -			ret = SD_RES_INVALID_PARMS;
> > -			break;
> > -		}
> > -		ret = lookup_vdi(hdr->epoch, data, tag, &vid, hdr->snapid,
> > -				 &nr_copies, NULL);
> > -		if (ret != SD_RES_SUCCESS)
> > -			break;
> > -		break;
> > -	case SD_OP_GET_VDI_ATTR:
> > -		vattr = data;
> > -		ret = lookup_vdi(hdr->epoch, vattr->name, vattr->tag,
> > -				 &vid, hdr->snapid, &nr_copies, &ctime);
> > -		if (ret != SD_RES_SUCCESS)
> > -			break;
> > -		/* the curernt vdi id can change if we take the snapshot,
> > -		   so we use the hash value of the vdi name as the vdi id */
> > -		vid = fnv_64a_buf(vattr->name, strlen(vattr->name), FNV1A_64_INIT);
> > -		vid &= SD_NR_VDIS - 1;
> > -		ret = get_vdi_attr(hdr->epoch, data, hdr->data_length, vid,
> > -				   &attrid, nr_copies, ctime,
> > -				   hdr->flags & SD_FLAG_CMD_CREAT,
> > -				   hdr->flags & SD_FLAG_CMD_EXCL,
> > -				   hdr->flags & SD_FLAG_CMD_DEL);
> > -		break;
> > -	case SD_OP_RELEASE_VDI:
> > -		break;
> > -	case SD_OP_MAKE_FS:
> > -		ret = SD_RES_SUCCESS;
> > -		break;
> > -	case SD_OP_SHUTDOWN:
> > -		break;
> > -	default:
> > -		ret = SD_RES_SYSTEM_ERROR;
> > -		eprintf("opcode %d is not implemented\n", hdr->opcode);
> > -		break;
> > -	}
> > -
> > -	rsp->vdi_id = vid;
> > -	rsp->attr_id = attrid;
> > -	rsp->copies = nr_copies;
> > -	rsp->result = ret;
> > -}
> > -
> >  static void __sd_notify(struct cpg_event *cevent)
> >  {
> >  }
> > @@ -715,86 +543,22 @@ static void __sd_notify_done(struct cpg_event *cevent)
> >  {
> >  	struct work_notify *w = container_of(cevent, struct work_notify, cev);
> >  	struct vdi_op_message *msg = (struct vdi_op_message *)w->msg;
> > -	const struct sd_vdi_req *hdr = &msg->req;
> > -	struct sd_vdi_rsp *rsp = &msg->rsp;
> > -	void *data = msg->data;
> >  	struct request *req;
> >  	int ret = msg->rsp.result;
> > -	int i, latest_epoch;
> > -	uint64_t ctime;
> > -
> > -	if (ret != SD_RES_SUCCESS)
> > -		goto out;
> > -
> > -	switch (hdr->opcode) {
> > -	case SD_OP_NEW_VDI:
> > -	{
> > -		unsigned long nr = rsp->vdi_id;
> > -		vprintf(SDOG_INFO, "done %d %ld\n", ret, nr);
> > -		set_bit(nr, sys->vdi_inuse);
> > -		break;
> > -	}
> > -	case SD_OP_DEL_VDI:
> > -		break;
> > -	case SD_OP_LOCK_VDI:
> > -	case SD_OP_RELEASE_VDI:
> > -	case SD_OP_GET_VDI_INFO:
> > -	case SD_OP_GET_VDI_ATTR:
> > -		break;
> > -	case SD_OP_MAKE_FS:
> > -		sys->nr_sobjs = ((struct sd_so_req *)hdr)->copies;
> > -		sys->flags = ((struct sd_so_req *)hdr)->flags;
> > -		if (!sys->nr_sobjs)
> > -			sys->nr_sobjs = SD_DEFAULT_REDUNDANCY;
> > -
> > -		ctime = ((struct sd_so_req *)hdr)->ctime;
> > -		set_cluster_ctime(ctime);
> > -
> > -		latest_epoch = get_latest_epoch();
> > -		for (i = 1; i <= latest_epoch; i++)
> > -			remove_epoch(i);
> > -		memset(sys->vdi_inuse, 0, sizeof(sys->vdi_inuse));
> > -
> > -		sys->epoch = 1;
> > -		sys->recovered_epoch = 1;
> > -
> > -		dprintf("write epoch log, %d, %d\n", sys->epoch, sys->nr_nodes);
> > -		ret = epoch_log_write(sys->epoch, (char *)sys->nodes,
> > -				      sys->nr_nodes * sizeof(struct sheepdog_node_list_entry));
> > -		if (ret < 0)
> > -			eprintf("can't write epoch %u\n", sys->epoch);
> > -		update_epoch_store(sys->epoch);
> > +	struct sd_op_template *op = get_sd_op(msg->req.opcode);
> >  
> > -		set_cluster_copies(sys->nr_sobjs);
> > -		set_cluster_flags(sys->flags);
> > +	if (ret == SD_RES_SUCCESS && op->post_process)
> > +		ret = op->post_process((const struct sd_req *)&msg->req,
> > +				       (struct sd_rsp *)&msg->rsp, msg->data);
> >  
> > -		if (sys_flag_nohalt())
> > -			sys_stat_set(SD_STATUS_OK);
> > -		else {
> > -			int nr_zones = get_zones_nr_from(sys->nodes, sys->nr_nodes);
> > -
> > -			if (nr_zones >= sys->nr_sobjs)
> > -				sys_stat_set(SD_STATUS_OK);
> > -			else
> > -				sys_stat_set(SD_STATUS_HALT);
> > -		}
> > -		break;
> > -	case SD_OP_SHUTDOWN:
> > -		sys_stat_set(SD_STATUS_SHUTDOWN);
> > -		break;
> > -	default:
> > -		eprintf("unknown operation %d\n", hdr->opcode);
> > -		ret = SD_RES_UNKNOWN;
> > -	}
> > -out:
> >  	if (!is_myself(w->sender.addr, w->sender.port))
> >  		return;
> >  
> >  	req = list_first_entry(&sys->pending_list, struct request, pending_list);
> >  
> > -	rsp->result = ret;
> > -	memcpy(req->data, data, rsp->data_length);
> > -	memcpy(&req->rp, rsp, sizeof(req->rp));
> > +	msg->rsp.result = ret;
> > +	memcpy(req->data, msg->data, msg->rsp.data_length);
> > +	memcpy(&req->rp, &msg->rsp, sizeof(req->rp));
> >  	list_del(&req->pending_list);
> >  	req->done(req);
> >  }
> > @@ -1226,7 +990,7 @@ do_retry:
> >  
> >  		list_del(&cevent->cpg_event_list);
> >  
> > -		if (is_io_request(req->rq.opcode)) {
> > +		if (is_io_op(req->op)) {
> >  			int copies = sys->nr_sobjs;
> >  
> >  			if (copies > req->nr_zones)
> > @@ -1282,7 +1046,7 @@ do_retry:
> >  			}
> >  		}
> >  
> > -		if (is_cluster_request(req->rq.opcode))
> > +		if (is_cluster_op(req->op))
> >  			queue_work(sys->cpg_wqueue, &req->work);
> >  		else if (req->rq.flags & SD_FLAG_CMD_IO_LOCAL)
> >  			queue_work(sys->io_wqueue, &req->work);
> > diff --git a/sheep/ops.c b/sheep/ops.c
> > new file mode 100644
> > index 0000000..0d38e7b
> > --- /dev/null
> > +++ b/sheep/ops.c
> > @@ -0,0 +1,427 @@
> > +/*
> > + * Copyright (C) 2011 Nippon Telegraph and Telephone Corporation.
> > + *
> > + * This program is free software; you can redistribute it and/or
> > + * modify it under the terms of the GNU General Public License version
> > + * 2 as published by the Free Software Foundation.
> > + *
> > + * You should have received a copy of the GNU General Public License
> > + * along with this program. If not, see <http://www.gnu.org/licenses/>.
> > + */
> > +#include <stdio.h>
> > +#include <stdlib.h>
> > +
> > +#include "sheep_priv.h"
> > +
> > +static int process_new_vdi(const struct sd_req *req, struct sd_rsp *rsp,
> > +			   void *data)
> > +{
> > +	const struct sd_vdi_req *hdr = (const struct sd_vdi_req *)req;
> > +	struct sd_vdi_rsp *vdi_rsp = (struct sd_vdi_rsp *)rsp;
> > +	uint32_t vid = 0, nr_copies = sys->nr_sobjs;
> > +	int ret;
> > +
> > +	ret = add_vdi(hdr->epoch, data, hdr->data_length, hdr->vdi_size, &vid,
> > +		      hdr->base_vdi_id, hdr->copies,
> > +		      hdr->snapid, &nr_copies);
> > +
> > +	vdi_rsp->vdi_id = vid;
> > +	vdi_rsp->copies = nr_copies;
> > +
> > +	return ret;
> > +}
> > +
> > +static int post_process_new_vdi(const struct sd_req *req, struct sd_rsp *rsp,
> > +				void *data)
> > +{
> > +	struct sd_vdi_rsp *vdi_rsp = (struct sd_vdi_rsp *)rsp;
> > +	unsigned long nr = vdi_rsp->vdi_id;
> > +	int ret = vdi_rsp->result;
> > +
> > +	vprintf(SDOG_INFO, "done %d %ld\n", ret, nr);
> > +	set_bit(nr, sys->vdi_inuse);
> > +
> > +	return SD_RES_SUCCESS;
> > +}
> > +
> > +static int process_del_vdi(const struct sd_req *req, struct sd_rsp *rsp,
> > +			   void *data)
> > +{
> > +	const struct sd_vdi_req *hdr = (const struct sd_vdi_req *)req;
> > +	struct sd_vdi_rsp *vdi_rsp = (struct sd_vdi_rsp *)rsp;
> > +	uint32_t vid = 0, nr_copies = sys->nr_sobjs;
> > +	int ret;
> > +
> > +	ret = del_vdi(hdr->epoch, data, hdr->data_length, &vid,
> > +		      hdr->snapid, &nr_copies);
> > +
> > +	vdi_rsp->vdi_id = vid;
> > +	vdi_rsp->copies = nr_copies;
> > +
> > +	return ret;
> > +}
> > +
> > +static int process_get_vdi_info(const struct sd_req *req, struct sd_rsp *rsp,
> > +				void *data)
> > +{
> > +	const struct sd_vdi_req *hdr = (const struct sd_vdi_req *)req;
> > +	struct sd_vdi_rsp *vdi_rsp = (struct sd_vdi_rsp *)rsp;
> > +	uint32_t vid = 0, nr_copies = sys->nr_sobjs;
> > +	void *tag;
> > +	int ret;
> > +
> > +	if (hdr->proto_ver != SD_PROTO_VER)
> > +		return SD_RES_VER_MISMATCH;
> > +
> > +	if (hdr->data_length == SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN)
> > +		tag = (char *)data + SD_MAX_VDI_LEN;
> > +	else if (hdr->data_length == SD_MAX_VDI_LEN)
> > +		tag = NULL;
> > +	else
> > +		return SD_RES_INVALID_PARMS;
> > +
> > +	ret = lookup_vdi(hdr->epoch, data, tag, &vid, hdr->snapid,
> > +			 &nr_copies, NULL);
> > +	if (ret != SD_RES_SUCCESS)
> > +		return ret;
> > +
> > +	vdi_rsp->vdi_id = vid;
> > +	vdi_rsp->copies = nr_copies;
> > +
> > +	return ret;
> > +}
> > +
> > +static int post_process_make_fs(const struct sd_req *req, struct sd_rsp *rsp,
> > +				void *data)
> > +{
> > +	const struct sd_so_req *hdr = (const struct sd_so_req *)req;
> > +	int i, latest_epoch, ret;
> > +	uint64_t ctime;
> > +
> > +	sys->nr_sobjs = hdr->copies;
> > +	sys->flags = hdr->flags;
> > +	if (!sys->nr_sobjs)
> > +		sys->nr_sobjs = SD_DEFAULT_REDUNDANCY;
> > +
> > +	ctime = hdr->ctime;
> > +	set_cluster_ctime(ctime);
> > +
> > +	latest_epoch = get_latest_epoch();
> > +	for (i = 1; i <= latest_epoch; i++)
> > +		remove_epoch(i);
> > +	memset(sys->vdi_inuse, 0, sizeof(sys->vdi_inuse));
> > +
> > +	sys->epoch = 1;
> > +	sys->recovered_epoch = 1;
> > +
> > +	dprintf("write epoch log, %d, %d\n", sys->epoch, sys->nr_nodes);
> > +	ret = epoch_log_write(sys->epoch, (char *)sys->nodes,
> > +			      sys->nr_nodes * sizeof(struct sheepdog_node_list_entry));
> > +	if (ret < 0) {
> > +		eprintf("can't write epoch %u\n", sys->epoch);
> > +		return SD_RES_EIO;
> > +	}
> > +	update_epoch_store(sys->epoch);
> > +
> > +	set_cluster_copies(sys->nr_sobjs);
> > +	set_cluster_flags(sys->flags);
> > +
> > +	if (sys_flag_nohalt())
> > +		sys_stat_set(SD_STATUS_OK);
> > +	else {
> > +		int nr_zones = get_zones_nr_from(sys->nodes, sys->nr_nodes);
> > +
> > +		if (nr_zones >= sys->nr_sobjs)
> > +			sys_stat_set(SD_STATUS_OK);
> > +		else
> > +			sys_stat_set(SD_STATUS_HALT);
> > +	}
> > +
> > +	return SD_RES_SUCCESS;
> > +}
> > +
> > +static int post_process_shutdown(const struct sd_req *req, struct sd_rsp *rsp,
> > +				 void *data)
> > +{
> > +	sys_stat_set(SD_STATUS_SHUTDOWN);
> > +
> > +	return SD_RES_SUCCESS;
> > +}
> > +
> > +static int process_get_vdi_attr(const struct sd_req *req, struct sd_rsp *rsp,
> > +				void *data)
> > +{
> > +	const struct sd_vdi_req *hdr = (const struct sd_vdi_req *)req;
> > +	struct sd_vdi_rsp *vdi_rsp = (struct sd_vdi_rsp *)rsp;
> > +	uint32_t vid = 0, attrid = 0, nr_copies = sys->nr_sobjs;
> > +	uint64_t ctime = 0;
> > +	int ret;
> > +	struct sheepdog_vdi_attr *vattr;
> > +
> > +	vattr = data;
> > +	ret = lookup_vdi(hdr->epoch, vattr->name, vattr->tag,
> > +			 &vid, hdr->snapid, &nr_copies, &ctime);
> > +	if (ret != SD_RES_SUCCESS)
> > +		return ret;
> > +
> > +	/* the curernt vdi id can change if we take the snapshot,
> > +	   so we use the hash value of the vdi name as the vdi id */
> > +	vid = fnv_64a_buf(vattr->name, strlen(vattr->name), FNV1A_64_INIT);
> > +	vid &= SD_NR_VDIS - 1;
> > +	ret = get_vdi_attr(hdr->epoch, data, hdr->data_length, vid,
> > +			   &attrid, nr_copies, ctime,
> > +			   hdr->flags & SD_FLAG_CMD_CREAT,
> > +			   hdr->flags & SD_FLAG_CMD_EXCL,
> > +			   hdr->flags & SD_FLAG_CMD_DEL);
> > +
> > +	vdi_rsp->vdi_id = vid;
> > +	vdi_rsp->attr_id = attrid;
> > +	vdi_rsp->copies = nr_copies;
> > +
> > +	return ret;
> > +}
> > +
> > +static int post_process_read_vdis(const struct sd_req *req, struct sd_rsp *rsp,
> > +				  void *data)
> > +{
> > +	return read_vdis(data, req->data_length, &rsp->data_length);
> > +}
> > +
> > +static int get_node_idx(struct sheepdog_node_list_entry *ent,
> > +			struct sheepdog_node_list_entry *entries, int nr_nodes)
> > +{
> > +	ent = bsearch(ent, entries, nr_nodes, sizeof(*ent), node_cmp);
> > +	if (!ent)
> > +		return -1;
> > +
> > +	return ent - entries;
> > +}
> > +
> > +static int post_process_get_node_list(const struct sd_req *req, struct sd_rsp *rsp,
> > +				      void *data)
> > +{
> > +	struct sd_node_rsp *node_rsp = (struct sd_node_rsp *)rsp;
> > +	int nr_nodes;
> > +
> > +	nr_nodes = sys->nr_nodes;
> > +	memcpy(data, sys->nodes, sizeof(*sys->nodes) * nr_nodes);
> > +	node_rsp->data_length = nr_nodes * sizeof(struct sheepdog_node_list_entry);
> > +	node_rsp->nr_nodes = nr_nodes;
> > +	node_rsp->local_idx = get_node_idx(&sys->this_node, data, nr_nodes);
> > +	node_rsp->master_idx = -1;
> > +
> > +	return SD_RES_SUCCESS;
> > +}
> > +
> > +static int process_stat_sheep(const struct sd_req *req, struct sd_rsp *rsp,
> > +			      void *data)
> > +{
> > +	struct sd_node_rsp *node_rsp = (struct sd_node_rsp *)rsp;
> > +	uint32_t epoch = req->epoch;
> > +
> > +	return stat_sheep(&node_rsp->store_size, &node_rsp->store_free, epoch);
> > +}
> > +
> > +static int process_stat_cluster(const struct sd_req *req, struct sd_rsp *rsp,
> > +				void *data)
> > +{
> > +	struct epoch_log *log;
> > +	int i, max_logs, epoch;
> > +	uint32_t sys_stat = sys_stat_get();
> > +
> > +	max_logs = rsp->data_length / sizeof(*log);
> > +	epoch = get_latest_epoch();
> > +	rsp->data_length = 0;
> > +	for (i = 0; i < max_logs; i++) {
> > +		if (epoch <= 0)
> > +			break;
> > +
> > +		log = (struct epoch_log *)data + i;
> > +		log->epoch = epoch;
> > +		log->ctime = get_cluster_ctime();
> > +		log->nr_nodes = epoch_log_read(epoch, (char *)log->nodes,
> > +					       sizeof(log->nodes));
> > +		if (log->nr_nodes == -1)
> > +			log->nr_nodes = epoch_log_read_remote(epoch,
> > +							      (char *)log->nodes,
> > +							      sizeof(log->nodes));
> > +
> > +		rsp->data_length += sizeof(*log);
> > +		log->nr_nodes /= sizeof(log->nodes[0]);
> > +		epoch--;
> > +	}
> > +
> > +	switch (sys_stat) {
> > +	case SD_STATUS_OK:
> > +		return SD_RES_SUCCESS;
> > +	case SD_STATUS_WAIT_FOR_FORMAT:
> > +		return SD_RES_WAIT_FOR_FORMAT;
> > +	case SD_STATUS_WAIT_FOR_JOIN:
> > +		return SD_RES_WAIT_FOR_JOIN;
> > +	case SD_STATUS_SHUTDOWN:
> > +		return SD_RES_SHUTDOWN;
> > +	case SD_STATUS_JOIN_FAILED:
> > +		return SD_RES_JOIN_FAILED;
> > +	case SD_STATUS_HALT:
> > +		return SD_RES_HALT;
> > +	default:
> > +		return SD_RES_SYSTEM_ERROR;
> > +	}
> > +}
> > +
> > +static int process_kill_node(const struct sd_req *req, struct sd_rsp *rsp,
> > +			     void *data)
> > +{
> > +	exit(1);
> > +}
> > +
> > +static int process_get_obj_list(const struct sd_req *req, struct sd_rsp *rsp,
> > +				void *data)
> > +{
> > +	return get_obj_list((const struct sd_list_req *)req,
> > +			    (struct sd_list_rsp *)rsp, data);
> > +}
> > +
> > +static int process_get_epoch(const struct sd_req *req, struct sd_rsp *rsp,
> > +			     void *data)
> > +{
> > +	const struct sd_obj_req *obj_req = (const struct sd_obj_req *)req;
> > +	struct sd_obj_rsp *obj_rsp = (struct sd_obj_rsp *)rsp;
> > +	int epoch = obj_req->tgt_epoch;
> > +	int len, ret;
> > +	dprintf("%d\n", epoch);
> > +	len = epoch_log_read(epoch, (char *)data, obj_req->data_length);
> > +	if (len == -1) {
> > +		ret = SD_RES_NO_TAG;
> > +		obj_rsp->data_length = 0;
> > +	} else {
> > +		ret = SD_RES_SUCCESS;
> > +		obj_rsp->data_length = len;
> > +	}
> > +	return ret;
> > +}
> > +
> > +static struct sd_op_template sd_ops[] = {
> > +
> > +	/* cluster operations */
> > +	[SD_OP_NEW_VDI] = {
> > +		.type = SD_OP_TYPE_CLUSTER,
> > +		.process = process_new_vdi,
> > +		.post_process = post_process_new_vdi,
> > +	},
> > +
> > +	[SD_OP_DEL_VDI] = {
> > +		.type = SD_OP_TYPE_CLUSTER,
> > +		.process = process_del_vdi,
> > +	},
> > +
> > +	[SD_OP_GET_VDI_INFO] = {
> > +		.type = SD_OP_TYPE_CLUSTER,
> > +		.process = process_get_vdi_info,
> > +	},
> > +
> > +	[SD_OP_LOCK_VDI] = {
> > +		.type = SD_OP_TYPE_CLUSTER,
> > +		.process = process_get_vdi_info,
> > +	},
> > +
> > +	[SD_OP_MAKE_FS] = {
> > +		.type = SD_OP_TYPE_CLUSTER,
> > +		.available_always = 1,
> > +		.post_process = post_process_make_fs,
> > +	},
> > +
> > +	[SD_OP_SHUTDOWN] = {
> > +		.type = SD_OP_TYPE_CLUSTER,
> > +		.post_process = post_process_shutdown,
> > +	},
> > +
> > +	[SD_OP_GET_VDI_ATTR] = {
> > +		.type = SD_OP_TYPE_CLUSTER,
> > +		.process = process_get_vdi_attr,
> > +	},
> > +
> > +	[SD_OP_RELEASE_VDI] = {
> > +		.type = SD_OP_TYPE_CLUSTER,
> > +	},
> > +
> > +	/* store operations */
> > +	[SD_OP_READ_VDIS] = {
> > +		.type = SD_OP_TYPE_STORE,
> > +		.available_always = 1,
> > +		.post_process = post_process_read_vdis,
> > +	},
> > +
> > +	[SD_OP_GET_NODE_LIST] = {
> > +		.type = SD_OP_TYPE_STORE,
> > +		.available_always = 1,
> > +		.post_process = post_process_get_node_list,
> > +	},
> > +
> > +	[SD_OP_STAT_SHEEP] = {
> > +		.type = SD_OP_TYPE_STORE,
> > +		.process = process_stat_sheep,
> > +	},
> > +
> > +	[SD_OP_STAT_CLUSTER] = {
> > +		.type = SD_OP_TYPE_STORE,
> > +		.available_always = 1,
> > +		.process = process_stat_cluster,
> > +	},
> > +
> > +	[SD_OP_KILL_NODE] = {
> > +		.type = SD_OP_TYPE_STORE,
> > +		.available_always = 1,
> > +		.process = process_kill_node,
> > +	},
> > +
> > +	[SD_OP_GET_OBJ_LIST] = {
> > +		.type = SD_OP_TYPE_STORE,
> > +		.process = process_get_obj_list,
> > +	},
> > +
> > +	[SD_OP_GET_EPOCH] = {
> > +		.type = SD_OP_TYPE_STORE,
> > +		.process = process_get_epoch,
> > +	},
> > +
> > +	/* I/O operations */
> > +	[SD_OP_CREATE_AND_WRITE_OBJ] = {
> > +		.type = SD_OP_TYPE_IO,
> > +	},
> > +
> > +	[SD_OP_READ_OBJ] = {
> > +		.type = SD_OP_TYPE_IO,
> > +	},
> > +
> > +	[SD_OP_WRITE_OBJ] = {
> > +		.type = SD_OP_TYPE_IO,
> > +	},
> > +
> > +	[SD_OP_REMOVE_OBJ] = {
> > +		.type = SD_OP_TYPE_IO,
> > +	},
> > +};
> > +
> > +struct sd_op_template *get_sd_op(uint8_t opcode)
> > +{
> > +	if (sd_ops[opcode].type == 0)
> > +		return NULL;
> > +
> > +	return sd_ops + opcode;
> > +}
> > +
> > +int is_cluster_op(struct sd_op_template *op)
> > +{
> > +	return op->type == SD_OP_TYPE_CLUSTER;
> > +}
> > +
> > +int is_store_op(struct sd_op_template *op)
> > +{
> > +	return op->type == SD_OP_TYPE_STORE;
> > +}
> > +
> > +int is_io_op(struct sd_op_template *op)
> > +{
> > +	return op->type == SD_OP_TYPE_IO;
> > +}
> > diff --git a/sheep/sdnet.c b/sheep/sdnet.c
> > index 474b1be..7d7e142 100644
> > --- a/sheep/sdnet.c
> > +++ b/sheep/sdnet.c
> > @@ -19,46 +19,6 @@
> >  
> >  #include "sheep_priv.h"
> >  
> > -int is_io_request(unsigned op)
> > -{
> > -	int ret = 0;
> > -
> > -	switch (op) {
> > -	case SD_OP_CREATE_AND_WRITE_OBJ:
> > -	case SD_OP_REMOVE_OBJ:
> > -	case SD_OP_READ_OBJ:
> > -	case SD_OP_WRITE_OBJ:
> > -		ret = 1;
> > -		break;
> > -	default:
> > -		break;
> > -	}
> > -
> > -	return ret;
> > -}
> > -
> > -int is_cluster_request(unsigned op)
> > -{
> > -	int ret = 0;
> > -
> > -	switch (op) {
> > -	case SD_OP_NEW_VDI:
> > -	case SD_OP_DEL_VDI:
> > -	case SD_OP_LOCK_VDI:
> > -	case SD_OP_RELEASE_VDI:
> > -	case SD_OP_GET_VDI_INFO:
> > -	case SD_OP_MAKE_FS:
> > -	case SD_OP_SHUTDOWN:
> > -	case SD_OP_GET_VDI_ATTR:
> > -		ret = 1;
> > -		break;
> > -	default:
> > -		break;
> > -	}
> > -
> > -	return ret;
> > -}
> > -
> >  void resume_pending_requests(void)
> >  {
> >  	struct request *next, *tmp;
> > @@ -119,18 +79,21 @@ static void setup_access_to_local_objects(struct request *req)
> >  static void __done(struct work *work, int idx)
> >  {
> >  	struct request *req = container_of(work, struct request, work);
> > -	struct sd_req *hdr = (struct sd_req *)&req->rq;
> >  	int again = 0;
> >  	int copies = sys->nr_sobjs;
> >  
> >  	if (copies > req->nr_zones)
> >  		copies = req->nr_zones;
> >  
> > -	if (is_cluster_request(hdr->opcode))
> > +	if (is_cluster_op(req->op))
> >  		/* request is forwarded to cpg group */
> >  		return;
> >  
> > -	if (is_io_request(hdr->opcode)) {
> > +	if (is_store_op(req->op) && req->op->post_process)
> > +		req->rp.result = req->op->post_process(&req->rq, &req->rp,
> > +						       req->data);
> > +
> > +	if (is_io_op(req->op)) {
> >  		struct cpg_event *cevent = &req->cev;
> >  
> >  		list_del(&req->r_wlist);
> > @@ -225,39 +188,15 @@ static void queue_request(struct request *req)
> >  
> >  	dprintf("%x\n", hdr->opcode);
> >  
> > -	if (hdr->opcode == SD_OP_KILL_NODE) {
> > -		log_close();
> > -		exit(1);
> > -	}
> > -
> >  	if (sys->status == SD_STATUS_SHUTDOWN) {
> >  		rsp->result = SD_RES_SHUTDOWN;
> >  		req->done(req);
> >  		return;
> >  	}
> >  
> > -	/*
> > -	 * we can know why this node failed to join with
> > -	 * SD_OP_STAT_CLUSTER, so the request should be handled even
> > -	 * when the cluster status is SD_STATUS_JOIN_FAILED
> > -	 */
> > -	if (sys->status == SD_STATUS_JOIN_FAILED &&
> > -	    hdr->opcode != SD_OP_STAT_CLUSTER) {
> > -		rsp->result = SD_RES_JOIN_FAILED;
> > -		req->done(req);
> > -		return;
> > -	}
> > -
> >  	if (sys->status == SD_STATUS_WAIT_FOR_FORMAT ||
> >  	    sys->status == SD_STATUS_WAIT_FOR_JOIN) {
> > -		/* TODO: cleanup */
> > -		switch (hdr->opcode) {
> > -		case SD_OP_STAT_CLUSTER:
> > -		case SD_OP_MAKE_FS:
> > -		case SD_OP_GET_NODE_LIST:
> > -		case SD_OP_READ_VDIS:
> > -			break;
> > -		default:
> > +		if (!req->op->available_always) {
> >  			if (sys->status == SD_STATUS_WAIT_FOR_FORMAT)
> >  				rsp->result = SD_RES_WAIT_FOR_FORMAT;
> >  			else
> > @@ -267,33 +206,11 @@ static void queue_request(struct request *req)
> >  		}
> >  	}
> >  
> > -	switch (hdr->opcode) {
> > -	case SD_OP_CREATE_AND_WRITE_OBJ:
> > -	case SD_OP_REMOVE_OBJ:
> > -	case SD_OP_READ_OBJ:
> > -	case SD_OP_WRITE_OBJ:
> > -	case SD_OP_STAT_SHEEP:
> > -	case SD_OP_GET_OBJ_LIST:
> > +	if (is_io_op(req->op) || is_store_op(req->op))
> >  		req->work.fn = store_queue_request;
> > -		break;
> > -	case SD_OP_GET_NODE_LIST:
> > -	case SD_OP_NEW_VDI:
> > -	case SD_OP_DEL_VDI:
> > -	case SD_OP_LOCK_VDI:
> > -	case SD_OP_RELEASE_VDI:
> > -	case SD_OP_GET_VDI_INFO:
> > -	case SD_OP_MAKE_FS:
> > -	case SD_OP_SHUTDOWN:
> > -	case SD_OP_STAT_CLUSTER:
> > -	case SD_OP_GET_VDI_ATTR:
> > -	case SD_OP_GET_EPOCH:
> > +	else if (is_cluster_op(req->op))
> >  		req->work.fn = cluster_queue_request;
> > -		break;
> > -	case SD_OP_READ_VDIS:
> > -		rsp->result = read_vdis(req->data, hdr->data_length, &rsp->data_length);
> > -		req->done(req);
> > -		return;
> > -	default:
> > +	else {
> >  		eprintf("unknown operation %d\n", hdr->opcode);
> >  		rsp->result = SD_RES_SYSTEM_ERROR;
> >  		req->done(req);
> > @@ -314,7 +231,7 @@ static void queue_request(struct request *req)
> >  		hdr->epoch = sys->epoch;
> >  
> >  	setup_ordered_sd_vnode_list(req);
> > -	if (is_io_request(hdr->opcode))
> > +	if (is_io_op(req->op))
> >  		setup_access_to_local_objects(req);
> >  
> >  	cevent->ctype = CPG_EVENT_REQUEST;
> > @@ -410,6 +327,12 @@ static void client_rx_handler(struct client_info *ci)
> >  
> >  		/* use le_to_cpu */
> >  		memcpy(&req->rq, hdr, sizeof(req->rq));
> > +		req->op = get_sd_op(hdr->opcode);
> > +		if (!req->op) {
> > +			eprintf("invalid opcode, %d\n", hdr->opcode);
> > +			conn->c_rx_state = C_IO_CLOSED;
> > +			break;
> > +		}
> >  
> >  		if (data_len && hdr->flags & SD_FLAG_CMD_WRITE) {
> >  			conn->c_rx_state = C_IO_DATA;
> > diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
> > index 65bc3aa..1fe2d4d 100644
> > --- a/sheep/sheep_priv.h
> > +++ b/sheep/sheep_priv.h
> > @@ -72,6 +72,8 @@ struct request {
> >  	struct sd_req rq;
> >  	struct sd_rsp rp;
> >  
> > +	struct sd_op_template *op;
> > +
> >  	void *data;
> >  
> >  	struct client_info *ci;
> > @@ -152,8 +154,6 @@ extern struct cluster_info *sys;
> >  
> >  int create_listen_port(int port, void *data);
> >  
> > -int is_io_request(unsigned op);
> > -int is_cluster_request(unsigned op);
> >  int init_store(const char *dir);
> >  int init_base_path(const char *dir);
> >  
> > @@ -173,6 +173,7 @@ int get_vdi_attr(uint32_t epoch, struct sheepdog_vdi_attr *vattr, int data_len,
> >  		 uint32_t vid, uint32_t *attrid, int copies, uint64_t ctime,
> >  		 int write, int excl, int delete);
> >  
> > +int get_zones_nr_from(struct sheepdog_node_list_entry *nodes, int nr_nodes);
> >  void setup_ordered_sd_vnode_list(struct request *req);
> >  void get_ordered_sd_vnode_list(struct sheepdog_vnode_list_entry *entries,
> >  			       int *nr_vnodes, int *nr_zones);
> > @@ -215,6 +216,8 @@ int get_latest_epoch(void);
> >  int remove_epoch(int epoch);
> >  int set_cluster_ctime(uint64_t ctime);
> >  uint64_t get_cluster_ctime(void);
> > +int stat_sheep(uint64_t *store_size, uint64_t *store_free, uint32_t epoch);
> > +int get_obj_list(const struct sd_list_req *hdr, struct sd_list_rsp *rsp, void *data);
> >  
> >  int start_recovery(uint32_t epoch);
> >  void resume_recovery_work(void);
> > @@ -235,6 +238,47 @@ int remove_object(struct sheepdog_vnode_list_entry *e,
> >  int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx,
> >  		 uint32_t epoch, int worker_idx);
> >  
> > +/* Operations */
> > +
> > +enum sd_op_type {
> > +	SD_OP_TYPE_CLUSTER = 1, /* cluster operations */
> > +	SD_OP_TYPE_STORE,       /* store operations */
> > +	SD_OP_TYPE_IO,          /* io operations */
> > +};
> > +
> > +struct sd_op_template {
> > +	enum sd_op_type type;
> > +
> > +	/* process request even when cluster is not working */
> > +	int available_always;
> > +
> > +	/*
> > +	 * process() will be called in the worker thread, and
> > +	 * post_process() will be called in the main thread.
> > +	 *
> > +	 * If type is SD_OP_TYPE_CLUSTER, it is guaranteed that only
> > +	 * one node processes a cluster operation at the same time.
> > +	 * We can use this for something like distributed locking.
> > +	 * process() will be called on the local node, and
> > +	 * post_process() will be called on every nodes.
> > +	 *
> > +	 * If type is SD_OP_TYPE_STORE, both process() and
> > +	 * post_process() will be called on the local node.
> > +	 *
> > +	 * If type is SD_OP_TYPE_IO, neither process() nor
> > +	 * post_process() is used because this type of operation is
> > +	 * heavily intertwined with Sheepdog core codes.  We will be
> > +	 * unlikely to add new operations of this type.
> > +	 */
> > +	int (*process)(const struct sd_req *req, struct sd_rsp *rsp, void *data);
> > +	int (*post_process)(const struct sd_req *req, struct sd_rsp *rsp, void *data);
> > +};
> > +
> > +struct sd_op_template *get_sd_op(uint8_t opcode);
> > +int is_cluster_op(struct sd_op_template *op);
> > +int is_store_op(struct sd_op_template *op);
> > +int is_io_op(struct sd_op_template *op);
> > +
> >  /* Journal */
> >  int jrnl_perform(int fd, void *buf, size_t count, off_t offset,
> >  		 const char *path, const char *jrnl_dir);
> > diff --git a/sheep/store.c b/sheep/store.c
> > index ee8996e..83644db 100644
> > --- a/sheep/store.c
> > +++ b/sheep/store.c
> > @@ -52,7 +52,7 @@ static int obj_cmp(const void *oid1, const void *oid2)
> >  	return 0;
> >  }
> >  
> > -static int stat_sheep(uint64_t *store_size, uint64_t *store_free, uint32_t epoch)
> > +int stat_sheep(uint64_t *store_size, uint64_t *store_free, uint32_t epoch)
> >  {
> >  	struct statvfs vs;
> >  	int ret;
> > @@ -96,16 +96,14 @@ static int merge_objlist(struct sheepdog_vnode_list_entry *entries, int nr_entri
> >  			 uint64_t *list1, int nr_list1,
> >  			 uint64_t *list2, int nr_list2, int nr_objs);
> >  
> > -static int get_obj_list(struct request *req)
> > +int get_obj_list(const struct sd_list_req *hdr, struct sd_list_rsp *rsp, void *data)
> >  {
> >  	DIR *dir;
> >  	struct dirent *d;
> > -	struct sd_list_req *hdr = (struct sd_list_req *)&req->rq;
> > -	struct sd_list_rsp *rsp = (struct sd_list_rsp *)&req->rp;
> >  	uint64_t oid;
> >  	uint32_t epoch;
> >  	char path[1024];
> > -	uint64_t *p = (uint64_t *)req->data;
> > +	uint64_t *p = (uint64_t *)data;
> >  	int nr = 0;
> >  	uint64_t *objlist = NULL;
> >  	int obj_nr, i;
> > @@ -778,20 +776,15 @@ void store_queue_request(struct work *work, int idx)
> >  	uint64_t oid = hdr->oid;
> >  	uint32_t opcode = hdr->opcode;
> >  	uint32_t epoch = hdr->epoch;
> > -	struct sd_node_rsp *nrsp = (struct sd_node_rsp *)&req->rp;
> >  
> >  	dprintf("%"PRIu32", %x, %" PRIx64" , %u\n", idx, opcode, oid, epoch);
> >  
> >  	if (hdr->flags & SD_FLAG_CMD_RECOVERY)
> >  		epoch = hdr->tgt_epoch;
> >  
> > -	if (opcode == SD_OP_STAT_SHEEP) {
> > -		ret = stat_sheep(&nrsp->store_size, &nrsp->store_free, epoch);
> > -		goto out;
> > -	}
> > -
> > -	if (opcode == SD_OP_GET_OBJ_LIST) {
> > -		ret = get_obj_list(req);
> > +	if (is_store_op(req->op)) {
> > +		if (req->op->process)
> > +			ret = req->op->process(&req->rq, &req->rp, req->data);
> >  		goto out;
> >  	}
> >  
> 
> 
> for i in 0 1; do ./sheep/sheep /store/$i -z $i -p 700$i;sleep 1;done
> collie/collie cluster format
> for i in 0 1; do ./collie/collie cluster info -p 700$i; done
> 
> Cluster status: The sheepdog is stopped doing IO, short of living nodes
> 
> Creation time        Epoch Nodes
> Cluster status: The sheepdog is stopped doing IO, short of living nodes
> 
> Creation time        Epoch Nodes
> 
> 
> Seems that we lost the epoch history if the cluster status is not ok.
> I have checked the code, finding nothing wrong, weird.

Thanks for your review!

It seems that Sheepdog sets the data_length to zero when the request
doesn't return SD_RES_SUCCESS.

I'll fix this and address all of you comments in the v2 patch.

Thanks,

Kazutaka



More information about the sheepdog mailing list