[Sheepdog] [RFC PATCH] sheep: introduce sd_op_template

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Fri Oct 21 10:28:08 CEST 2011


When we want to add a new operation (SD_OP_xxxxx), it is not clear
which codes we should modify.  And in some cases, we need to modify
codes everywhere to implement one operation.  This is not a good
design.

This patch abstracts out Sheepdog operations into sd_op_template, and
moves all the request processing codes to sheep/ops.c.

The definition of sd_op_template is as follows:

struct sd_op_template {
        enum sd_op_type type;

        int available_always;

        int (*process)(const struct sd_req *req, struct sd_rsp *rsp,
                       void *data);
        int (*post_process)(const struct sd_req *req, struct sd_rsp *rsp,
                            void *data);
};

'type' is the type of the operation; SD_OP_TYPE_CLUSTER,
SD_OP_TYPE_STORE, or SD_OP_TYPE_IO.

'available_always' is set to non-zero if the operations should be
processed even when the cluster is not working.

'process()' and 'post_process()' are the main functions of this
operation.  process() will be called in the worker thread, and
post_process() will be called in the main thread.

If type is SD_OP_TYPE_CLUSTER, it is guaranteed that only one node
processes a cluster operation at the same time.  We can use this for
something like distributed locking.  process() will be called on the
local node, and post_process() will be called on every nodes.

If type is SD_OP_TYPE_STORE, both process() and post_process() will be
called on the local node.

If type is SD_OP_TYPE_IO, neither process() nor post_process() is used
because this type of operation is heavily intertwined with Sheepdog
core codes.  We will be unlikely to add new operations of this type.

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 sheep/Makefile.am  |    2 +-
 sheep/group.c      |  286 +++--------------------------------
 sheep/ops.c        |  427 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 sheep/sdnet.c      |  111 ++------------
 sheep/sheep_priv.h |   48 ++++++-
 sheep/store.c      |   19 +--
 6 files changed, 522 insertions(+), 371 deletions(-)
 create mode 100644 sheep/ops.c

diff --git a/sheep/Makefile.am b/sheep/Makefile.am
index 2b9d58f..3db914d 100644
--- a/sheep/Makefile.am
+++ b/sheep/Makefile.am
@@ -23,7 +23,7 @@ INCLUDES		= -I$(top_builddir)/include -I$(top_srcdir)/include $(libcpg_CFLAGS) $
 
 sbin_PROGRAMS		= sheep
 
-sheep_SOURCES		= sheep.c group.c sdnet.c store.c vdi.c work.c journal.c \
+sheep_SOURCES		= sheep.c group.c sdnet.c store.c vdi.c work.c journal.c ops.c \
 			  cluster/corosync.c
 sheep_LDADD	  	= $(libcpg_LIBS) $(libcfg_LIBS) ../lib/libsheepdog.a -lpthread
 sheep_DEPENDENCIES	= ../lib/libsheepdog.a
diff --git a/sheep/group.c b/sheep/group.c
index e22dabc..8664b6f 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -101,17 +101,7 @@ static size_t get_join_message_size(struct join_message *jm)
 	return sizeof(*jm) + jm->nr_nodes * sizeof(jm->nodes[0]);
 }
 
-static int get_node_idx(struct sheepdog_node_list_entry *ent,
-			struct sheepdog_node_list_entry *entries, int nr_nodes)
-{
-	ent = bsearch(ent, entries, nr_nodes, sizeof(*ent), node_cmp);
-	if (!ent)
-		return -1;
-
-	return ent - entries;
-}
-
-static int get_zones_nr_from(struct sheepdog_node_list_entry *nodes, int nr_nodes)
+int get_zones_nr_from(struct sheepdog_node_list_entry *nodes, int nr_nodes)
 {
 	int nr_zones = 0, i, j;
 	uint32_t zones[SD_MAX_REDUNDANCY];
@@ -146,116 +136,28 @@ void setup_ordered_sd_vnode_list(struct request *req)
 	get_ordered_sd_vnode_list(req->entry, &req->nr_vnodes, &req->nr_zones);
 }
 
-static void get_node_list(struct sd_node_req *req,
-			  struct sd_node_rsp *rsp, void *data)
+static void do_cluster_op(void *arg)
 {
-	int nr_nodes;
-
-	nr_nodes = sys->nr_nodes;
-	memcpy(data, sys->nodes, sizeof(*sys->nodes) * nr_nodes);
-	rsp->data_length = nr_nodes * sizeof(struct sheepdog_node_list_entry);
-	rsp->nr_nodes = nr_nodes;
-	rsp->local_idx = get_node_idx(&sys->this_node, data, nr_nodes);
-	rsp->master_idx = -1;
-}
+	struct vdi_op_message *msg = arg;
+	int ret;
+	struct request *req;
 
-static int get_epoch(struct sd_obj_req *req,
-		      struct sd_obj_rsp *rsp, void *data)
-{
-	int epoch = req->tgt_epoch;
-	int len, ret;
-	dprintf("%d\n", epoch);
-	len = epoch_log_read(epoch, (char *)data, req->data_length);
-	if (len == -1) {
-		ret = SD_RES_NO_TAG;
-		rsp->data_length = 0;
-	} else {
-		ret = SD_RES_SUCCESS;
-		rsp->data_length = len;
-	}
-	return ret;
-}
+	req = list_first_entry(&sys->pending_list, struct request, pending_list);
+	ret = req->op->process((const struct sd_req *)&msg->req,
+			       (struct sd_rsp *)&msg->rsp, req->data);
 
-static void vdi_op(void *arg);
+	msg->rsp.result = ret;
+}
 
 void cluster_queue_request(struct work *work, int idx)
 {
 	struct request *req = container_of(work, struct request, work);
 	struct sd_req *hdr = (struct sd_req *)&req->rq;
-	struct sd_rsp *rsp = (struct sd_rsp *)&req->rp;
 	struct vdi_op_message *msg;
-	struct epoch_log *log;
-	int ret = SD_RES_SUCCESS, i, max_logs, epoch;
-	uint32_t sys_stat = sys_stat_get();
 	size_t size;
 
 	eprintf("%p %x\n", req, hdr->opcode);
 
-	switch (hdr->opcode) {
-	case SD_OP_GET_EPOCH:
-		ret = get_epoch((struct sd_obj_req *)hdr,
-			  (struct sd_obj_rsp *)rsp, req->data);
-		break;
-	case SD_OP_GET_NODE_LIST:
-		get_node_list((struct sd_node_req *)hdr,
-			      (struct sd_node_rsp *)rsp, req->data);
-		break;
-	case SD_OP_STAT_CLUSTER:
-		max_logs = rsp->data_length / sizeof(*log);
-		epoch = get_latest_epoch();
-		rsp->data_length = 0;
-		for (i = 0; i < max_logs; i++) {
-			if (epoch <= 0)
-				break;
-
-			log = (struct epoch_log *)req->data + i;
-			log->epoch = epoch;
-			log->ctime = get_cluster_ctime();
-			log->nr_nodes = epoch_log_read(epoch, (char *)log->nodes,
-						       sizeof(log->nodes));
-			if (log->nr_nodes == -1)
-				log->nr_nodes = epoch_log_read_remote(epoch,
-								      (char *)log->nodes,
-								      sizeof(log->nodes));
-
-			rsp->data_length += sizeof(*log);
-			log->nr_nodes /= sizeof(log->nodes[0]);
-			epoch--;
-		}
-
-		switch (sys_stat) {
-		case SD_STATUS_OK:
-			ret = SD_RES_SUCCESS;
-			break;
-		case SD_STATUS_WAIT_FOR_FORMAT:
-			ret = SD_RES_WAIT_FOR_FORMAT;
-			break;
-		case SD_STATUS_WAIT_FOR_JOIN:
-			ret = SD_RES_WAIT_FOR_JOIN;
-			break;
-		case SD_STATUS_SHUTDOWN:
-			ret = SD_RES_SHUTDOWN;
-			break;
-		case SD_STATUS_JOIN_FAILED:
-			ret = SD_RES_JOIN_FAILED;
-			break;
-		case SD_STATUS_HALT:
-			ret = SD_RES_HALT;
-			break;
-		default:
-			ret = SD_RES_SYSTEM_ERROR;
-			break;
-		}
-		break;
-	default:
-		/* forward request to group */
-		goto forward;
-	}
-
-	rsp->result = ret;
-	return;
-
-forward:
 	if (hdr->flags & SD_FLAG_CMD_WRITE)
 		size = sizeof(*msg);
 	else
@@ -272,7 +174,12 @@ forward:
 
 	list_add(&req->pending_list, &sys->pending_list);
 
-	sys->cdrv->notify(msg, size, vdi_op);
+	if (req->op->process)
+		sys->cdrv->notify(msg, size, do_cluster_op);
+	else {
+		msg->rsp.result = SD_RES_SUCCESS;
+		sys->cdrv->notify(msg, size, NULL);
+	}
 
 	free(msg);
 }
@@ -628,85 +535,6 @@ join_finished:
 	return;
 }
 
-static void vdi_op(void *arg)
-{
-	struct vdi_op_message *msg = arg;
-	const struct sd_vdi_req *hdr = &msg->req;
-	struct sd_vdi_rsp *rsp = &msg->rsp;
-	void *data, *tag;
-	int ret = SD_RES_SUCCESS;
-	struct sheepdog_vdi_attr *vattr;
-	uint32_t vid = 0, attrid = 0, nr_copies = sys->nr_sobjs;
-	uint64_t ctime = 0;
-	struct request *req;
-
-	req = list_first_entry(&sys->pending_list, struct request, pending_list);
-	data = req->data;
-
-	switch (hdr->opcode) {
-	case SD_OP_NEW_VDI:
-		ret = add_vdi(hdr->epoch, data, hdr->data_length, hdr->vdi_size, &vid,
-			      hdr->base_vdi_id, hdr->copies,
-			      hdr->snapid, &nr_copies);
-		break;
-	case SD_OP_DEL_VDI:
-		ret = del_vdi(hdr->epoch, data, hdr->data_length, &vid,
-			      hdr->snapid, &nr_copies);
-		break;
-	case SD_OP_LOCK_VDI:
-	case SD_OP_GET_VDI_INFO:
-		if (hdr->proto_ver != SD_PROTO_VER) {
-			ret = SD_RES_VER_MISMATCH;
-			break;
-		}
-		if (hdr->data_length == SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN)
-			tag = (char *)data + SD_MAX_VDI_LEN;
-		else if (hdr->data_length == SD_MAX_VDI_LEN)
-			tag = NULL;
-		else {
-			ret = SD_RES_INVALID_PARMS;
-			break;
-		}
-		ret = lookup_vdi(hdr->epoch, data, tag, &vid, hdr->snapid,
-				 &nr_copies, NULL);
-		if (ret != SD_RES_SUCCESS)
-			break;
-		break;
-	case SD_OP_GET_VDI_ATTR:
-		vattr = data;
-		ret = lookup_vdi(hdr->epoch, vattr->name, vattr->tag,
-				 &vid, hdr->snapid, &nr_copies, &ctime);
-		if (ret != SD_RES_SUCCESS)
-			break;
-		/* the curernt vdi id can change if we take the snapshot,
-		   so we use the hash value of the vdi name as the vdi id */
-		vid = fnv_64a_buf(vattr->name, strlen(vattr->name), FNV1A_64_INIT);
-		vid &= SD_NR_VDIS - 1;
-		ret = get_vdi_attr(hdr->epoch, data, hdr->data_length, vid,
-				   &attrid, nr_copies, ctime,
-				   hdr->flags & SD_FLAG_CMD_CREAT,
-				   hdr->flags & SD_FLAG_CMD_EXCL,
-				   hdr->flags & SD_FLAG_CMD_DEL);
-		break;
-	case SD_OP_RELEASE_VDI:
-		break;
-	case SD_OP_MAKE_FS:
-		ret = SD_RES_SUCCESS;
-		break;
-	case SD_OP_SHUTDOWN:
-		break;
-	default:
-		ret = SD_RES_SYSTEM_ERROR;
-		eprintf("opcode %d is not implemented\n", hdr->opcode);
-		break;
-	}
-
-	rsp->vdi_id = vid;
-	rsp->attr_id = attrid;
-	rsp->copies = nr_copies;
-	rsp->result = ret;
-}
-
 static void __sd_notify(struct cpg_event *cevent)
 {
 }
@@ -715,86 +543,22 @@ static void __sd_notify_done(struct cpg_event *cevent)
 {
 	struct work_notify *w = container_of(cevent, struct work_notify, cev);
 	struct vdi_op_message *msg = (struct vdi_op_message *)w->msg;
-	const struct sd_vdi_req *hdr = &msg->req;
-	struct sd_vdi_rsp *rsp = &msg->rsp;
-	void *data = msg->data;
 	struct request *req;
 	int ret = msg->rsp.result;
-	int i, latest_epoch;
-	uint64_t ctime;
-
-	if (ret != SD_RES_SUCCESS)
-		goto out;
-
-	switch (hdr->opcode) {
-	case SD_OP_NEW_VDI:
-	{
-		unsigned long nr = rsp->vdi_id;
-		vprintf(SDOG_INFO, "done %d %ld\n", ret, nr);
-		set_bit(nr, sys->vdi_inuse);
-		break;
-	}
-	case SD_OP_DEL_VDI:
-		break;
-	case SD_OP_LOCK_VDI:
-	case SD_OP_RELEASE_VDI:
-	case SD_OP_GET_VDI_INFO:
-	case SD_OP_GET_VDI_ATTR:
-		break;
-	case SD_OP_MAKE_FS:
-		sys->nr_sobjs = ((struct sd_so_req *)hdr)->copies;
-		sys->flags = ((struct sd_so_req *)hdr)->flags;
-		if (!sys->nr_sobjs)
-			sys->nr_sobjs = SD_DEFAULT_REDUNDANCY;
-
-		ctime = ((struct sd_so_req *)hdr)->ctime;
-		set_cluster_ctime(ctime);
-
-		latest_epoch = get_latest_epoch();
-		for (i = 1; i <= latest_epoch; i++)
-			remove_epoch(i);
-		memset(sys->vdi_inuse, 0, sizeof(sys->vdi_inuse));
-
-		sys->epoch = 1;
-		sys->recovered_epoch = 1;
-
-		dprintf("write epoch log, %d, %d\n", sys->epoch, sys->nr_nodes);
-		ret = epoch_log_write(sys->epoch, (char *)sys->nodes,
-				      sys->nr_nodes * sizeof(struct sheepdog_node_list_entry));
-		if (ret < 0)
-			eprintf("can't write epoch %u\n", sys->epoch);
-		update_epoch_store(sys->epoch);
+	struct sd_op_template *op = get_sd_op(msg->req.opcode);
 
-		set_cluster_copies(sys->nr_sobjs);
-		set_cluster_flags(sys->flags);
+	if (ret == SD_RES_SUCCESS && op->post_process)
+		ret = op->post_process((const struct sd_req *)&msg->req,
+				       (struct sd_rsp *)&msg->rsp, msg->data);
 
-		if (sys_flag_nohalt())
-			sys_stat_set(SD_STATUS_OK);
-		else {
-			int nr_zones = get_zones_nr_from(sys->nodes, sys->nr_nodes);
-
-			if (nr_zones >= sys->nr_sobjs)
-				sys_stat_set(SD_STATUS_OK);
-			else
-				sys_stat_set(SD_STATUS_HALT);
-		}
-		break;
-	case SD_OP_SHUTDOWN:
-		sys_stat_set(SD_STATUS_SHUTDOWN);
-		break;
-	default:
-		eprintf("unknown operation %d\n", hdr->opcode);
-		ret = SD_RES_UNKNOWN;
-	}
-out:
 	if (!is_myself(w->sender.addr, w->sender.port))
 		return;
 
 	req = list_first_entry(&sys->pending_list, struct request, pending_list);
 
-	rsp->result = ret;
-	memcpy(req->data, data, rsp->data_length);
-	memcpy(&req->rp, rsp, sizeof(req->rp));
+	msg->rsp.result = ret;
+	memcpy(req->data, msg->data, msg->rsp.data_length);
+	memcpy(&req->rp, &msg->rsp, sizeof(req->rp));
 	list_del(&req->pending_list);
 	req->done(req);
 }
@@ -1226,7 +990,7 @@ do_retry:
 
 		list_del(&cevent->cpg_event_list);
 
-		if (is_io_request(req->rq.opcode)) {
+		if (is_io_op(req->op)) {
 			int copies = sys->nr_sobjs;
 
 			if (copies > req->nr_zones)
@@ -1282,7 +1046,7 @@ do_retry:
 			}
 		}
 
-		if (is_cluster_request(req->rq.opcode))
+		if (is_cluster_op(req->op))
 			queue_work(sys->cpg_wqueue, &req->work);
 		else if (req->rq.flags & SD_FLAG_CMD_IO_LOCAL)
 			queue_work(sys->io_wqueue, &req->work);
diff --git a/sheep/ops.c b/sheep/ops.c
new file mode 100644
index 0000000..0d38e7b
--- /dev/null
+++ b/sheep/ops.c
@@ -0,0 +1,427 @@
+/*
+ * Copyright (C) 2011 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+
+#include "sheep_priv.h"
+
+static int process_new_vdi(const struct sd_req *req, struct sd_rsp *rsp,
+			   void *data)
+{
+	const struct sd_vdi_req *hdr = (const struct sd_vdi_req *)req;
+	struct sd_vdi_rsp *vdi_rsp = (struct sd_vdi_rsp *)rsp;
+	uint32_t vid = 0, nr_copies = sys->nr_sobjs;
+	int ret;
+
+	ret = add_vdi(hdr->epoch, data, hdr->data_length, hdr->vdi_size, &vid,
+		      hdr->base_vdi_id, hdr->copies,
+		      hdr->snapid, &nr_copies);
+
+	vdi_rsp->vdi_id = vid;
+	vdi_rsp->copies = nr_copies;
+
+	return ret;
+}
+
+static int post_process_new_vdi(const struct sd_req *req, struct sd_rsp *rsp,
+				void *data)
+{
+	struct sd_vdi_rsp *vdi_rsp = (struct sd_vdi_rsp *)rsp;
+	unsigned long nr = vdi_rsp->vdi_id;
+	int ret = vdi_rsp->result;
+
+	vprintf(SDOG_INFO, "done %d %ld\n", ret, nr);
+	set_bit(nr, sys->vdi_inuse);
+
+	return SD_RES_SUCCESS;
+}
+
+static int process_del_vdi(const struct sd_req *req, struct sd_rsp *rsp,
+			   void *data)
+{
+	const struct sd_vdi_req *hdr = (const struct sd_vdi_req *)req;
+	struct sd_vdi_rsp *vdi_rsp = (struct sd_vdi_rsp *)rsp;
+	uint32_t vid = 0, nr_copies = sys->nr_sobjs;
+	int ret;
+
+	ret = del_vdi(hdr->epoch, data, hdr->data_length, &vid,
+		      hdr->snapid, &nr_copies);
+
+	vdi_rsp->vdi_id = vid;
+	vdi_rsp->copies = nr_copies;
+
+	return ret;
+}
+
+static int process_get_vdi_info(const struct sd_req *req, struct sd_rsp *rsp,
+				void *data)
+{
+	const struct sd_vdi_req *hdr = (const struct sd_vdi_req *)req;
+	struct sd_vdi_rsp *vdi_rsp = (struct sd_vdi_rsp *)rsp;
+	uint32_t vid = 0, nr_copies = sys->nr_sobjs;
+	void *tag;
+	int ret;
+
+	if (hdr->proto_ver != SD_PROTO_VER)
+		return SD_RES_VER_MISMATCH;
+
+	if (hdr->data_length == SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN)
+		tag = (char *)data + SD_MAX_VDI_LEN;
+	else if (hdr->data_length == SD_MAX_VDI_LEN)
+		tag = NULL;
+	else
+		return SD_RES_INVALID_PARMS;
+
+	ret = lookup_vdi(hdr->epoch, data, tag, &vid, hdr->snapid,
+			 &nr_copies, NULL);
+	if (ret != SD_RES_SUCCESS)
+		return ret;
+
+	vdi_rsp->vdi_id = vid;
+	vdi_rsp->copies = nr_copies;
+
+	return ret;
+}
+
+static int post_process_make_fs(const struct sd_req *req, struct sd_rsp *rsp,
+				void *data)
+{
+	const struct sd_so_req *hdr = (const struct sd_so_req *)req;
+	int i, latest_epoch, ret;
+	uint64_t ctime;
+
+	sys->nr_sobjs = hdr->copies;
+	sys->flags = hdr->flags;
+	if (!sys->nr_sobjs)
+		sys->nr_sobjs = SD_DEFAULT_REDUNDANCY;
+
+	ctime = hdr->ctime;
+	set_cluster_ctime(ctime);
+
+	latest_epoch = get_latest_epoch();
+	for (i = 1; i <= latest_epoch; i++)
+		remove_epoch(i);
+	memset(sys->vdi_inuse, 0, sizeof(sys->vdi_inuse));
+
+	sys->epoch = 1;
+	sys->recovered_epoch = 1;
+
+	dprintf("write epoch log, %d, %d\n", sys->epoch, sys->nr_nodes);
+	ret = epoch_log_write(sys->epoch, (char *)sys->nodes,
+			      sys->nr_nodes * sizeof(struct sheepdog_node_list_entry));
+	if (ret < 0) {
+		eprintf("can't write epoch %u\n", sys->epoch);
+		return SD_RES_EIO;
+	}
+	update_epoch_store(sys->epoch);
+
+	set_cluster_copies(sys->nr_sobjs);
+	set_cluster_flags(sys->flags);
+
+	if (sys_flag_nohalt())
+		sys_stat_set(SD_STATUS_OK);
+	else {
+		int nr_zones = get_zones_nr_from(sys->nodes, sys->nr_nodes);
+
+		if (nr_zones >= sys->nr_sobjs)
+			sys_stat_set(SD_STATUS_OK);
+		else
+			sys_stat_set(SD_STATUS_HALT);
+	}
+
+	return SD_RES_SUCCESS;
+}
+
+static int post_process_shutdown(const struct sd_req *req, struct sd_rsp *rsp,
+				 void *data)
+{
+	sys_stat_set(SD_STATUS_SHUTDOWN);
+
+	return SD_RES_SUCCESS;
+}
+
+static int process_get_vdi_attr(const struct sd_req *req, struct sd_rsp *rsp,
+				void *data)
+{
+	const struct sd_vdi_req *hdr = (const struct sd_vdi_req *)req;
+	struct sd_vdi_rsp *vdi_rsp = (struct sd_vdi_rsp *)rsp;
+	uint32_t vid = 0, attrid = 0, nr_copies = sys->nr_sobjs;
+	uint64_t ctime = 0;
+	int ret;
+	struct sheepdog_vdi_attr *vattr;
+
+	vattr = data;
+	ret = lookup_vdi(hdr->epoch, vattr->name, vattr->tag,
+			 &vid, hdr->snapid, &nr_copies, &ctime);
+	if (ret != SD_RES_SUCCESS)
+		return ret;
+
+	/* the curernt vdi id can change if we take the snapshot,
+	   so we use the hash value of the vdi name as the vdi id */
+	vid = fnv_64a_buf(vattr->name, strlen(vattr->name), FNV1A_64_INIT);
+	vid &= SD_NR_VDIS - 1;
+	ret = get_vdi_attr(hdr->epoch, data, hdr->data_length, vid,
+			   &attrid, nr_copies, ctime,
+			   hdr->flags & SD_FLAG_CMD_CREAT,
+			   hdr->flags & SD_FLAG_CMD_EXCL,
+			   hdr->flags & SD_FLAG_CMD_DEL);
+
+	vdi_rsp->vdi_id = vid;
+	vdi_rsp->attr_id = attrid;
+	vdi_rsp->copies = nr_copies;
+
+	return ret;
+}
+
+static int post_process_read_vdis(const struct sd_req *req, struct sd_rsp *rsp,
+				  void *data)
+{
+	return read_vdis(data, req->data_length, &rsp->data_length);
+}
+
+static int get_node_idx(struct sheepdog_node_list_entry *ent,
+			struct sheepdog_node_list_entry *entries, int nr_nodes)
+{
+	ent = bsearch(ent, entries, nr_nodes, sizeof(*ent), node_cmp);
+	if (!ent)
+		return -1;
+
+	return ent - entries;
+}
+
+static int post_process_get_node_list(const struct sd_req *req, struct sd_rsp *rsp,
+				      void *data)
+{
+	struct sd_node_rsp *node_rsp = (struct sd_node_rsp *)rsp;
+	int nr_nodes;
+
+	nr_nodes = sys->nr_nodes;
+	memcpy(data, sys->nodes, sizeof(*sys->nodes) * nr_nodes);
+	node_rsp->data_length = nr_nodes * sizeof(struct sheepdog_node_list_entry);
+	node_rsp->nr_nodes = nr_nodes;
+	node_rsp->local_idx = get_node_idx(&sys->this_node, data, nr_nodes);
+	node_rsp->master_idx = -1;
+
+	return SD_RES_SUCCESS;
+}
+
+static int process_stat_sheep(const struct sd_req *req, struct sd_rsp *rsp,
+			      void *data)
+{
+	struct sd_node_rsp *node_rsp = (struct sd_node_rsp *)rsp;
+	uint32_t epoch = req->epoch;
+
+	return stat_sheep(&node_rsp->store_size, &node_rsp->store_free, epoch);
+}
+
+static int process_stat_cluster(const struct sd_req *req, struct sd_rsp *rsp,
+				void *data)
+{
+	struct epoch_log *log;
+	int i, max_logs, epoch;
+	uint32_t sys_stat = sys_stat_get();
+
+	max_logs = rsp->data_length / sizeof(*log);
+	epoch = get_latest_epoch();
+	rsp->data_length = 0;
+	for (i = 0; i < max_logs; i++) {
+		if (epoch <= 0)
+			break;
+
+		log = (struct epoch_log *)data + i;
+		log->epoch = epoch;
+		log->ctime = get_cluster_ctime();
+		log->nr_nodes = epoch_log_read(epoch, (char *)log->nodes,
+					       sizeof(log->nodes));
+		if (log->nr_nodes == -1)
+			log->nr_nodes = epoch_log_read_remote(epoch,
+							      (char *)log->nodes,
+							      sizeof(log->nodes));
+
+		rsp->data_length += sizeof(*log);
+		log->nr_nodes /= sizeof(log->nodes[0]);
+		epoch--;
+	}
+
+	switch (sys_stat) {
+	case SD_STATUS_OK:
+		return SD_RES_SUCCESS;
+	case SD_STATUS_WAIT_FOR_FORMAT:
+		return SD_RES_WAIT_FOR_FORMAT;
+	case SD_STATUS_WAIT_FOR_JOIN:
+		return SD_RES_WAIT_FOR_JOIN;
+	case SD_STATUS_SHUTDOWN:
+		return SD_RES_SHUTDOWN;
+	case SD_STATUS_JOIN_FAILED:
+		return SD_RES_JOIN_FAILED;
+	case SD_STATUS_HALT:
+		return SD_RES_HALT;
+	default:
+		return SD_RES_SYSTEM_ERROR;
+	}
+}
+
+static int process_kill_node(const struct sd_req *req, struct sd_rsp *rsp,
+			     void *data)
+{
+	exit(1);
+}
+
+static int process_get_obj_list(const struct sd_req *req, struct sd_rsp *rsp,
+				void *data)
+{
+	return get_obj_list((const struct sd_list_req *)req,
+			    (struct sd_list_rsp *)rsp, data);
+}
+
+static int process_get_epoch(const struct sd_req *req, struct sd_rsp *rsp,
+			     void *data)
+{
+	const struct sd_obj_req *obj_req = (const struct sd_obj_req *)req;
+	struct sd_obj_rsp *obj_rsp = (struct sd_obj_rsp *)rsp;
+	int epoch = obj_req->tgt_epoch;
+	int len, ret;
+	dprintf("%d\n", epoch);
+	len = epoch_log_read(epoch, (char *)data, obj_req->data_length);
+	if (len == -1) {
+		ret = SD_RES_NO_TAG;
+		obj_rsp->data_length = 0;
+	} else {
+		ret = SD_RES_SUCCESS;
+		obj_rsp->data_length = len;
+	}
+	return ret;
+}
+
+static struct sd_op_template sd_ops[] = {
+
+	/* cluster operations */
+	[SD_OP_NEW_VDI] = {
+		.type = SD_OP_TYPE_CLUSTER,
+		.process = process_new_vdi,
+		.post_process = post_process_new_vdi,
+	},
+
+	[SD_OP_DEL_VDI] = {
+		.type = SD_OP_TYPE_CLUSTER,
+		.process = process_del_vdi,
+	},
+
+	[SD_OP_GET_VDI_INFO] = {
+		.type = SD_OP_TYPE_CLUSTER,
+		.process = process_get_vdi_info,
+	},
+
+	[SD_OP_LOCK_VDI] = {
+		.type = SD_OP_TYPE_CLUSTER,
+		.process = process_get_vdi_info,
+	},
+
+	[SD_OP_MAKE_FS] = {
+		.type = SD_OP_TYPE_CLUSTER,
+		.available_always = 1,
+		.post_process = post_process_make_fs,
+	},
+
+	[SD_OP_SHUTDOWN] = {
+		.type = SD_OP_TYPE_CLUSTER,
+		.post_process = post_process_shutdown,
+	},
+
+	[SD_OP_GET_VDI_ATTR] = {
+		.type = SD_OP_TYPE_CLUSTER,
+		.process = process_get_vdi_attr,
+	},
+
+	[SD_OP_RELEASE_VDI] = {
+		.type = SD_OP_TYPE_CLUSTER,
+	},
+
+	/* store operations */
+	[SD_OP_READ_VDIS] = {
+		.type = SD_OP_TYPE_STORE,
+		.available_always = 1,
+		.post_process = post_process_read_vdis,
+	},
+
+	[SD_OP_GET_NODE_LIST] = {
+		.type = SD_OP_TYPE_STORE,
+		.available_always = 1,
+		.post_process = post_process_get_node_list,
+	},
+
+	[SD_OP_STAT_SHEEP] = {
+		.type = SD_OP_TYPE_STORE,
+		.process = process_stat_sheep,
+	},
+
+	[SD_OP_STAT_CLUSTER] = {
+		.type = SD_OP_TYPE_STORE,
+		.available_always = 1,
+		.process = process_stat_cluster,
+	},
+
+	[SD_OP_KILL_NODE] = {
+		.type = SD_OP_TYPE_STORE,
+		.available_always = 1,
+		.process = process_kill_node,
+	},
+
+	[SD_OP_GET_OBJ_LIST] = {
+		.type = SD_OP_TYPE_STORE,
+		.process = process_get_obj_list,
+	},
+
+	[SD_OP_GET_EPOCH] = {
+		.type = SD_OP_TYPE_STORE,
+		.process = process_get_epoch,
+	},
+
+	/* I/O operations */
+	[SD_OP_CREATE_AND_WRITE_OBJ] = {
+		.type = SD_OP_TYPE_IO,
+	},
+
+	[SD_OP_READ_OBJ] = {
+		.type = SD_OP_TYPE_IO,
+	},
+
+	[SD_OP_WRITE_OBJ] = {
+		.type = SD_OP_TYPE_IO,
+	},
+
+	[SD_OP_REMOVE_OBJ] = {
+		.type = SD_OP_TYPE_IO,
+	},
+};
+
+struct sd_op_template *get_sd_op(uint8_t opcode)
+{
+	if (sd_ops[opcode].type == 0)
+		return NULL;
+
+	return sd_ops + opcode;
+}
+
+int is_cluster_op(struct sd_op_template *op)
+{
+	return op->type == SD_OP_TYPE_CLUSTER;
+}
+
+int is_store_op(struct sd_op_template *op)
+{
+	return op->type == SD_OP_TYPE_STORE;
+}
+
+int is_io_op(struct sd_op_template *op)
+{
+	return op->type == SD_OP_TYPE_IO;
+}
diff --git a/sheep/sdnet.c b/sheep/sdnet.c
index 474b1be..7d7e142 100644
--- a/sheep/sdnet.c
+++ b/sheep/sdnet.c
@@ -19,46 +19,6 @@
 
 #include "sheep_priv.h"
 
-int is_io_request(unsigned op)
-{
-	int ret = 0;
-
-	switch (op) {
-	case SD_OP_CREATE_AND_WRITE_OBJ:
-	case SD_OP_REMOVE_OBJ:
-	case SD_OP_READ_OBJ:
-	case SD_OP_WRITE_OBJ:
-		ret = 1;
-		break;
-	default:
-		break;
-	}
-
-	return ret;
-}
-
-int is_cluster_request(unsigned op)
-{
-	int ret = 0;
-
-	switch (op) {
-	case SD_OP_NEW_VDI:
-	case SD_OP_DEL_VDI:
-	case SD_OP_LOCK_VDI:
-	case SD_OP_RELEASE_VDI:
-	case SD_OP_GET_VDI_INFO:
-	case SD_OP_MAKE_FS:
-	case SD_OP_SHUTDOWN:
-	case SD_OP_GET_VDI_ATTR:
-		ret = 1;
-		break;
-	default:
-		break;
-	}
-
-	return ret;
-}
-
 void resume_pending_requests(void)
 {
 	struct request *next, *tmp;
@@ -119,18 +79,21 @@ static void setup_access_to_local_objects(struct request *req)
 static void __done(struct work *work, int idx)
 {
 	struct request *req = container_of(work, struct request, work);
-	struct sd_req *hdr = (struct sd_req *)&req->rq;
 	int again = 0;
 	int copies = sys->nr_sobjs;
 
 	if (copies > req->nr_zones)
 		copies = req->nr_zones;
 
-	if (is_cluster_request(hdr->opcode))
+	if (is_cluster_op(req->op))
 		/* request is forwarded to cpg group */
 		return;
 
-	if (is_io_request(hdr->opcode)) {
+	if (is_store_op(req->op) && req->op->post_process)
+		req->rp.result = req->op->post_process(&req->rq, &req->rp,
+						       req->data);
+
+	if (is_io_op(req->op)) {
 		struct cpg_event *cevent = &req->cev;
 
 		list_del(&req->r_wlist);
@@ -225,39 +188,15 @@ static void queue_request(struct request *req)
 
 	dprintf("%x\n", hdr->opcode);
 
-	if (hdr->opcode == SD_OP_KILL_NODE) {
-		log_close();
-		exit(1);
-	}
-
 	if (sys->status == SD_STATUS_SHUTDOWN) {
 		rsp->result = SD_RES_SHUTDOWN;
 		req->done(req);
 		return;
 	}
 
-	/*
-	 * we can know why this node failed to join with
-	 * SD_OP_STAT_CLUSTER, so the request should be handled even
-	 * when the cluster status is SD_STATUS_JOIN_FAILED
-	 */
-	if (sys->status == SD_STATUS_JOIN_FAILED &&
-	    hdr->opcode != SD_OP_STAT_CLUSTER) {
-		rsp->result = SD_RES_JOIN_FAILED;
-		req->done(req);
-		return;
-	}
-
 	if (sys->status == SD_STATUS_WAIT_FOR_FORMAT ||
 	    sys->status == SD_STATUS_WAIT_FOR_JOIN) {
-		/* TODO: cleanup */
-		switch (hdr->opcode) {
-		case SD_OP_STAT_CLUSTER:
-		case SD_OP_MAKE_FS:
-		case SD_OP_GET_NODE_LIST:
-		case SD_OP_READ_VDIS:
-			break;
-		default:
+		if (!req->op->available_always) {
 			if (sys->status == SD_STATUS_WAIT_FOR_FORMAT)
 				rsp->result = SD_RES_WAIT_FOR_FORMAT;
 			else
@@ -267,33 +206,11 @@ static void queue_request(struct request *req)
 		}
 	}
 
-	switch (hdr->opcode) {
-	case SD_OP_CREATE_AND_WRITE_OBJ:
-	case SD_OP_REMOVE_OBJ:
-	case SD_OP_READ_OBJ:
-	case SD_OP_WRITE_OBJ:
-	case SD_OP_STAT_SHEEP:
-	case SD_OP_GET_OBJ_LIST:
+	if (is_io_op(req->op) || is_store_op(req->op))
 		req->work.fn = store_queue_request;
-		break;
-	case SD_OP_GET_NODE_LIST:
-	case SD_OP_NEW_VDI:
-	case SD_OP_DEL_VDI:
-	case SD_OP_LOCK_VDI:
-	case SD_OP_RELEASE_VDI:
-	case SD_OP_GET_VDI_INFO:
-	case SD_OP_MAKE_FS:
-	case SD_OP_SHUTDOWN:
-	case SD_OP_STAT_CLUSTER:
-	case SD_OP_GET_VDI_ATTR:
-	case SD_OP_GET_EPOCH:
+	else if (is_cluster_op(req->op))
 		req->work.fn = cluster_queue_request;
-		break;
-	case SD_OP_READ_VDIS:
-		rsp->result = read_vdis(req->data, hdr->data_length, &rsp->data_length);
-		req->done(req);
-		return;
-	default:
+	else {
 		eprintf("unknown operation %d\n", hdr->opcode);
 		rsp->result = SD_RES_SYSTEM_ERROR;
 		req->done(req);
@@ -314,7 +231,7 @@ static void queue_request(struct request *req)
 		hdr->epoch = sys->epoch;
 
 	setup_ordered_sd_vnode_list(req);
-	if (is_io_request(hdr->opcode))
+	if (is_io_op(req->op))
 		setup_access_to_local_objects(req);
 
 	cevent->ctype = CPG_EVENT_REQUEST;
@@ -410,6 +327,12 @@ static void client_rx_handler(struct client_info *ci)
 
 		/* use le_to_cpu */
 		memcpy(&req->rq, hdr, sizeof(req->rq));
+		req->op = get_sd_op(hdr->opcode);
+		if (!req->op) {
+			eprintf("invalid opcode, %d\n", hdr->opcode);
+			conn->c_rx_state = C_IO_CLOSED;
+			break;
+		}
 
 		if (data_len && hdr->flags & SD_FLAG_CMD_WRITE) {
 			conn->c_rx_state = C_IO_DATA;
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 65bc3aa..1fe2d4d 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -72,6 +72,8 @@ struct request {
 	struct sd_req rq;
 	struct sd_rsp rp;
 
+	struct sd_op_template *op;
+
 	void *data;
 
 	struct client_info *ci;
@@ -152,8 +154,6 @@ extern struct cluster_info *sys;
 
 int create_listen_port(int port, void *data);
 
-int is_io_request(unsigned op);
-int is_cluster_request(unsigned op);
 int init_store(const char *dir);
 int init_base_path(const char *dir);
 
@@ -173,6 +173,7 @@ int get_vdi_attr(uint32_t epoch, struct sheepdog_vdi_attr *vattr, int data_len,
 		 uint32_t vid, uint32_t *attrid, int copies, uint64_t ctime,
 		 int write, int excl, int delete);
 
+int get_zones_nr_from(struct sheepdog_node_list_entry *nodes, int nr_nodes);
 void setup_ordered_sd_vnode_list(struct request *req);
 void get_ordered_sd_vnode_list(struct sheepdog_vnode_list_entry *entries,
 			       int *nr_vnodes, int *nr_zones);
@@ -215,6 +216,8 @@ int get_latest_epoch(void);
 int remove_epoch(int epoch);
 int set_cluster_ctime(uint64_t ctime);
 uint64_t get_cluster_ctime(void);
+int stat_sheep(uint64_t *store_size, uint64_t *store_free, uint32_t epoch);
+int get_obj_list(const struct sd_list_req *hdr, struct sd_list_rsp *rsp, void *data);
 
 int start_recovery(uint32_t epoch);
 void resume_recovery_work(void);
@@ -235,6 +238,47 @@ int remove_object(struct sheepdog_vnode_list_entry *e,
 int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx,
 		 uint32_t epoch, int worker_idx);
 
+/* Operations */
+
+enum sd_op_type {
+	SD_OP_TYPE_CLUSTER = 1, /* cluster operations */
+	SD_OP_TYPE_STORE,       /* store operations */
+	SD_OP_TYPE_IO,          /* io operations */
+};
+
+struct sd_op_template {
+	enum sd_op_type type;
+
+	/* process request even when cluster is not working */
+	int available_always;
+
+	/*
+	 * process() will be called in the worker thread, and
+	 * post_process() will be called in the main thread.
+	 *
+	 * If type is SD_OP_TYPE_CLUSTER, it is guaranteed that only
+	 * one node processes a cluster operation at the same time.
+	 * We can use this for something like distributed locking.
+	 * process() will be called on the local node, and
+	 * post_process() will be called on every nodes.
+	 *
+	 * If type is SD_OP_TYPE_STORE, both process() and
+	 * post_process() will be called on the local node.
+	 *
+	 * If type is SD_OP_TYPE_IO, neither process() nor
+	 * post_process() is used because this type of operation is
+	 * heavily intertwined with Sheepdog core codes.  We will be
+	 * unlikely to add new operations of this type.
+	 */
+	int (*process)(const struct sd_req *req, struct sd_rsp *rsp, void *data);
+	int (*post_process)(const struct sd_req *req, struct sd_rsp *rsp, void *data);
+};
+
+struct sd_op_template *get_sd_op(uint8_t opcode);
+int is_cluster_op(struct sd_op_template *op);
+int is_store_op(struct sd_op_template *op);
+int is_io_op(struct sd_op_template *op);
+
 /* Journal */
 int jrnl_perform(int fd, void *buf, size_t count, off_t offset,
 		 const char *path, const char *jrnl_dir);
diff --git a/sheep/store.c b/sheep/store.c
index ee8996e..83644db 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -52,7 +52,7 @@ static int obj_cmp(const void *oid1, const void *oid2)
 	return 0;
 }
 
-static int stat_sheep(uint64_t *store_size, uint64_t *store_free, uint32_t epoch)
+int stat_sheep(uint64_t *store_size, uint64_t *store_free, uint32_t epoch)
 {
 	struct statvfs vs;
 	int ret;
@@ -96,16 +96,14 @@ static int merge_objlist(struct sheepdog_vnode_list_entry *entries, int nr_entri
 			 uint64_t *list1, int nr_list1,
 			 uint64_t *list2, int nr_list2, int nr_objs);
 
-static int get_obj_list(struct request *req)
+int get_obj_list(const struct sd_list_req *hdr, struct sd_list_rsp *rsp, void *data)
 {
 	DIR *dir;
 	struct dirent *d;
-	struct sd_list_req *hdr = (struct sd_list_req *)&req->rq;
-	struct sd_list_rsp *rsp = (struct sd_list_rsp *)&req->rp;
 	uint64_t oid;
 	uint32_t epoch;
 	char path[1024];
-	uint64_t *p = (uint64_t *)req->data;
+	uint64_t *p = (uint64_t *)data;
 	int nr = 0;
 	uint64_t *objlist = NULL;
 	int obj_nr, i;
@@ -778,20 +776,15 @@ void store_queue_request(struct work *work, int idx)
 	uint64_t oid = hdr->oid;
 	uint32_t opcode = hdr->opcode;
 	uint32_t epoch = hdr->epoch;
-	struct sd_node_rsp *nrsp = (struct sd_node_rsp *)&req->rp;
 
 	dprintf("%"PRIu32", %x, %" PRIx64" , %u\n", idx, opcode, oid, epoch);
 
 	if (hdr->flags & SD_FLAG_CMD_RECOVERY)
 		epoch = hdr->tgt_epoch;
 
-	if (opcode == SD_OP_STAT_SHEEP) {
-		ret = stat_sheep(&nrsp->store_size, &nrsp->store_free, epoch);
-		goto out;
-	}
-
-	if (opcode == SD_OP_GET_OBJ_LIST) {
-		ret = get_obj_list(req);
+	if (is_store_op(req->op)) {
+		if (req->op->process)
+			ret = req->op->process(&req->rq, &req->rp, req->data);
 		goto out;
 	}
 
-- 
1.7.2.5




More information about the sheepdog mailing list