[Sheepdog] [RFC PATCH] sheep: introduce sd_op_template
MORITA Kazutaka
morita.kazutaka at lab.ntt.co.jp
Fri Oct 21 10:28:08 CEST 2011
When we want to add a new operation (SD_OP_xxxxx), it is not clear
which codes we should modify. And in some cases, we need to modify
codes everywhere to implement one operation. This is not a good
design.
This patch abstracts out Sheepdog operations into sd_op_template, and
moves all the request processing codes to sheep/ops.c.
The definition of sd_op_template is as follows:
struct sd_op_template {
enum sd_op_type type;
int available_always;
int (*process)(const struct sd_req *req, struct sd_rsp *rsp,
void *data);
int (*post_process)(const struct sd_req *req, struct sd_rsp *rsp,
void *data);
};
'type' is the type of the operation; SD_OP_TYPE_CLUSTER,
SD_OP_TYPE_STORE, or SD_OP_TYPE_IO.
'available_always' is set to non-zero if the operations should be
processed even when the cluster is not working.
'process()' and 'post_process()' are the main functions of this
operation. process() will be called in the worker thread, and
post_process() will be called in the main thread.
If type is SD_OP_TYPE_CLUSTER, it is guaranteed that only one node
processes a cluster operation at the same time. We can use this for
something like distributed locking. process() will be called on the
local node, and post_process() will be called on every nodes.
If type is SD_OP_TYPE_STORE, both process() and post_process() will be
called on the local node.
If type is SD_OP_TYPE_IO, neither process() nor post_process() is used
because this type of operation is heavily intertwined with Sheepdog
core codes. We will be unlikely to add new operations of this type.
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
sheep/Makefile.am | 2 +-
sheep/group.c | 286 +++--------------------------------
sheep/ops.c | 427 ++++++++++++++++++++++++++++++++++++++++++++++++++++
sheep/sdnet.c | 111 ++------------
sheep/sheep_priv.h | 48 ++++++-
sheep/store.c | 19 +--
6 files changed, 522 insertions(+), 371 deletions(-)
create mode 100644 sheep/ops.c
diff --git a/sheep/Makefile.am b/sheep/Makefile.am
index 2b9d58f..3db914d 100644
--- a/sheep/Makefile.am
+++ b/sheep/Makefile.am
@@ -23,7 +23,7 @@ INCLUDES = -I$(top_builddir)/include -I$(top_srcdir)/include $(libcpg_CFLAGS) $
sbin_PROGRAMS = sheep
-sheep_SOURCES = sheep.c group.c sdnet.c store.c vdi.c work.c journal.c \
+sheep_SOURCES = sheep.c group.c sdnet.c store.c vdi.c work.c journal.c ops.c \
cluster/corosync.c
sheep_LDADD = $(libcpg_LIBS) $(libcfg_LIBS) ../lib/libsheepdog.a -lpthread
sheep_DEPENDENCIES = ../lib/libsheepdog.a
diff --git a/sheep/group.c b/sheep/group.c
index e22dabc..8664b6f 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -101,17 +101,7 @@ static size_t get_join_message_size(struct join_message *jm)
return sizeof(*jm) + jm->nr_nodes * sizeof(jm->nodes[0]);
}
-static int get_node_idx(struct sheepdog_node_list_entry *ent,
- struct sheepdog_node_list_entry *entries, int nr_nodes)
-{
- ent = bsearch(ent, entries, nr_nodes, sizeof(*ent), node_cmp);
- if (!ent)
- return -1;
-
- return ent - entries;
-}
-
-static int get_zones_nr_from(struct sheepdog_node_list_entry *nodes, int nr_nodes)
+int get_zones_nr_from(struct sheepdog_node_list_entry *nodes, int nr_nodes)
{
int nr_zones = 0, i, j;
uint32_t zones[SD_MAX_REDUNDANCY];
@@ -146,116 +136,28 @@ void setup_ordered_sd_vnode_list(struct request *req)
get_ordered_sd_vnode_list(req->entry, &req->nr_vnodes, &req->nr_zones);
}
-static void get_node_list(struct sd_node_req *req,
- struct sd_node_rsp *rsp, void *data)
+static void do_cluster_op(void *arg)
{
- int nr_nodes;
-
- nr_nodes = sys->nr_nodes;
- memcpy(data, sys->nodes, sizeof(*sys->nodes) * nr_nodes);
- rsp->data_length = nr_nodes * sizeof(struct sheepdog_node_list_entry);
- rsp->nr_nodes = nr_nodes;
- rsp->local_idx = get_node_idx(&sys->this_node, data, nr_nodes);
- rsp->master_idx = -1;
-}
+ struct vdi_op_message *msg = arg;
+ int ret;
+ struct request *req;
-static int get_epoch(struct sd_obj_req *req,
- struct sd_obj_rsp *rsp, void *data)
-{
- int epoch = req->tgt_epoch;
- int len, ret;
- dprintf("%d\n", epoch);
- len = epoch_log_read(epoch, (char *)data, req->data_length);
- if (len == -1) {
- ret = SD_RES_NO_TAG;
- rsp->data_length = 0;
- } else {
- ret = SD_RES_SUCCESS;
- rsp->data_length = len;
- }
- return ret;
-}
+ req = list_first_entry(&sys->pending_list, struct request, pending_list);
+ ret = req->op->process((const struct sd_req *)&msg->req,
+ (struct sd_rsp *)&msg->rsp, req->data);
-static void vdi_op(void *arg);
+ msg->rsp.result = ret;
+}
void cluster_queue_request(struct work *work, int idx)
{
struct request *req = container_of(work, struct request, work);
struct sd_req *hdr = (struct sd_req *)&req->rq;
- struct sd_rsp *rsp = (struct sd_rsp *)&req->rp;
struct vdi_op_message *msg;
- struct epoch_log *log;
- int ret = SD_RES_SUCCESS, i, max_logs, epoch;
- uint32_t sys_stat = sys_stat_get();
size_t size;
eprintf("%p %x\n", req, hdr->opcode);
- switch (hdr->opcode) {
- case SD_OP_GET_EPOCH:
- ret = get_epoch((struct sd_obj_req *)hdr,
- (struct sd_obj_rsp *)rsp, req->data);
- break;
- case SD_OP_GET_NODE_LIST:
- get_node_list((struct sd_node_req *)hdr,
- (struct sd_node_rsp *)rsp, req->data);
- break;
- case SD_OP_STAT_CLUSTER:
- max_logs = rsp->data_length / sizeof(*log);
- epoch = get_latest_epoch();
- rsp->data_length = 0;
- for (i = 0; i < max_logs; i++) {
- if (epoch <= 0)
- break;
-
- log = (struct epoch_log *)req->data + i;
- log->epoch = epoch;
- log->ctime = get_cluster_ctime();
- log->nr_nodes = epoch_log_read(epoch, (char *)log->nodes,
- sizeof(log->nodes));
- if (log->nr_nodes == -1)
- log->nr_nodes = epoch_log_read_remote(epoch,
- (char *)log->nodes,
- sizeof(log->nodes));
-
- rsp->data_length += sizeof(*log);
- log->nr_nodes /= sizeof(log->nodes[0]);
- epoch--;
- }
-
- switch (sys_stat) {
- case SD_STATUS_OK:
- ret = SD_RES_SUCCESS;
- break;
- case SD_STATUS_WAIT_FOR_FORMAT:
- ret = SD_RES_WAIT_FOR_FORMAT;
- break;
- case SD_STATUS_WAIT_FOR_JOIN:
- ret = SD_RES_WAIT_FOR_JOIN;
- break;
- case SD_STATUS_SHUTDOWN:
- ret = SD_RES_SHUTDOWN;
- break;
- case SD_STATUS_JOIN_FAILED:
- ret = SD_RES_JOIN_FAILED;
- break;
- case SD_STATUS_HALT:
- ret = SD_RES_HALT;
- break;
- default:
- ret = SD_RES_SYSTEM_ERROR;
- break;
- }
- break;
- default:
- /* forward request to group */
- goto forward;
- }
-
- rsp->result = ret;
- return;
-
-forward:
if (hdr->flags & SD_FLAG_CMD_WRITE)
size = sizeof(*msg);
else
@@ -272,7 +174,12 @@ forward:
list_add(&req->pending_list, &sys->pending_list);
- sys->cdrv->notify(msg, size, vdi_op);
+ if (req->op->process)
+ sys->cdrv->notify(msg, size, do_cluster_op);
+ else {
+ msg->rsp.result = SD_RES_SUCCESS;
+ sys->cdrv->notify(msg, size, NULL);
+ }
free(msg);
}
@@ -628,85 +535,6 @@ join_finished:
return;
}
-static void vdi_op(void *arg)
-{
- struct vdi_op_message *msg = arg;
- const struct sd_vdi_req *hdr = &msg->req;
- struct sd_vdi_rsp *rsp = &msg->rsp;
- void *data, *tag;
- int ret = SD_RES_SUCCESS;
- struct sheepdog_vdi_attr *vattr;
- uint32_t vid = 0, attrid = 0, nr_copies = sys->nr_sobjs;
- uint64_t ctime = 0;
- struct request *req;
-
- req = list_first_entry(&sys->pending_list, struct request, pending_list);
- data = req->data;
-
- switch (hdr->opcode) {
- case SD_OP_NEW_VDI:
- ret = add_vdi(hdr->epoch, data, hdr->data_length, hdr->vdi_size, &vid,
- hdr->base_vdi_id, hdr->copies,
- hdr->snapid, &nr_copies);
- break;
- case SD_OP_DEL_VDI:
- ret = del_vdi(hdr->epoch, data, hdr->data_length, &vid,
- hdr->snapid, &nr_copies);
- break;
- case SD_OP_LOCK_VDI:
- case SD_OP_GET_VDI_INFO:
- if (hdr->proto_ver != SD_PROTO_VER) {
- ret = SD_RES_VER_MISMATCH;
- break;
- }
- if (hdr->data_length == SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN)
- tag = (char *)data + SD_MAX_VDI_LEN;
- else if (hdr->data_length == SD_MAX_VDI_LEN)
- tag = NULL;
- else {
- ret = SD_RES_INVALID_PARMS;
- break;
- }
- ret = lookup_vdi(hdr->epoch, data, tag, &vid, hdr->snapid,
- &nr_copies, NULL);
- if (ret != SD_RES_SUCCESS)
- break;
- break;
- case SD_OP_GET_VDI_ATTR:
- vattr = data;
- ret = lookup_vdi(hdr->epoch, vattr->name, vattr->tag,
- &vid, hdr->snapid, &nr_copies, &ctime);
- if (ret != SD_RES_SUCCESS)
- break;
- /* the curernt vdi id can change if we take the snapshot,
- so we use the hash value of the vdi name as the vdi id */
- vid = fnv_64a_buf(vattr->name, strlen(vattr->name), FNV1A_64_INIT);
- vid &= SD_NR_VDIS - 1;
- ret = get_vdi_attr(hdr->epoch, data, hdr->data_length, vid,
- &attrid, nr_copies, ctime,
- hdr->flags & SD_FLAG_CMD_CREAT,
- hdr->flags & SD_FLAG_CMD_EXCL,
- hdr->flags & SD_FLAG_CMD_DEL);
- break;
- case SD_OP_RELEASE_VDI:
- break;
- case SD_OP_MAKE_FS:
- ret = SD_RES_SUCCESS;
- break;
- case SD_OP_SHUTDOWN:
- break;
- default:
- ret = SD_RES_SYSTEM_ERROR;
- eprintf("opcode %d is not implemented\n", hdr->opcode);
- break;
- }
-
- rsp->vdi_id = vid;
- rsp->attr_id = attrid;
- rsp->copies = nr_copies;
- rsp->result = ret;
-}
-
static void __sd_notify(struct cpg_event *cevent)
{
}
@@ -715,86 +543,22 @@ static void __sd_notify_done(struct cpg_event *cevent)
{
struct work_notify *w = container_of(cevent, struct work_notify, cev);
struct vdi_op_message *msg = (struct vdi_op_message *)w->msg;
- const struct sd_vdi_req *hdr = &msg->req;
- struct sd_vdi_rsp *rsp = &msg->rsp;
- void *data = msg->data;
struct request *req;
int ret = msg->rsp.result;
- int i, latest_epoch;
- uint64_t ctime;
-
- if (ret != SD_RES_SUCCESS)
- goto out;
-
- switch (hdr->opcode) {
- case SD_OP_NEW_VDI:
- {
- unsigned long nr = rsp->vdi_id;
- vprintf(SDOG_INFO, "done %d %ld\n", ret, nr);
- set_bit(nr, sys->vdi_inuse);
- break;
- }
- case SD_OP_DEL_VDI:
- break;
- case SD_OP_LOCK_VDI:
- case SD_OP_RELEASE_VDI:
- case SD_OP_GET_VDI_INFO:
- case SD_OP_GET_VDI_ATTR:
- break;
- case SD_OP_MAKE_FS:
- sys->nr_sobjs = ((struct sd_so_req *)hdr)->copies;
- sys->flags = ((struct sd_so_req *)hdr)->flags;
- if (!sys->nr_sobjs)
- sys->nr_sobjs = SD_DEFAULT_REDUNDANCY;
-
- ctime = ((struct sd_so_req *)hdr)->ctime;
- set_cluster_ctime(ctime);
-
- latest_epoch = get_latest_epoch();
- for (i = 1; i <= latest_epoch; i++)
- remove_epoch(i);
- memset(sys->vdi_inuse, 0, sizeof(sys->vdi_inuse));
-
- sys->epoch = 1;
- sys->recovered_epoch = 1;
-
- dprintf("write epoch log, %d, %d\n", sys->epoch, sys->nr_nodes);
- ret = epoch_log_write(sys->epoch, (char *)sys->nodes,
- sys->nr_nodes * sizeof(struct sheepdog_node_list_entry));
- if (ret < 0)
- eprintf("can't write epoch %u\n", sys->epoch);
- update_epoch_store(sys->epoch);
+ struct sd_op_template *op = get_sd_op(msg->req.opcode);
- set_cluster_copies(sys->nr_sobjs);
- set_cluster_flags(sys->flags);
+ if (ret == SD_RES_SUCCESS && op->post_process)
+ ret = op->post_process((const struct sd_req *)&msg->req,
+ (struct sd_rsp *)&msg->rsp, msg->data);
- if (sys_flag_nohalt())
- sys_stat_set(SD_STATUS_OK);
- else {
- int nr_zones = get_zones_nr_from(sys->nodes, sys->nr_nodes);
-
- if (nr_zones >= sys->nr_sobjs)
- sys_stat_set(SD_STATUS_OK);
- else
- sys_stat_set(SD_STATUS_HALT);
- }
- break;
- case SD_OP_SHUTDOWN:
- sys_stat_set(SD_STATUS_SHUTDOWN);
- break;
- default:
- eprintf("unknown operation %d\n", hdr->opcode);
- ret = SD_RES_UNKNOWN;
- }
-out:
if (!is_myself(w->sender.addr, w->sender.port))
return;
req = list_first_entry(&sys->pending_list, struct request, pending_list);
- rsp->result = ret;
- memcpy(req->data, data, rsp->data_length);
- memcpy(&req->rp, rsp, sizeof(req->rp));
+ msg->rsp.result = ret;
+ memcpy(req->data, msg->data, msg->rsp.data_length);
+ memcpy(&req->rp, &msg->rsp, sizeof(req->rp));
list_del(&req->pending_list);
req->done(req);
}
@@ -1226,7 +990,7 @@ do_retry:
list_del(&cevent->cpg_event_list);
- if (is_io_request(req->rq.opcode)) {
+ if (is_io_op(req->op)) {
int copies = sys->nr_sobjs;
if (copies > req->nr_zones)
@@ -1282,7 +1046,7 @@ do_retry:
}
}
- if (is_cluster_request(req->rq.opcode))
+ if (is_cluster_op(req->op))
queue_work(sys->cpg_wqueue, &req->work);
else if (req->rq.flags & SD_FLAG_CMD_IO_LOCAL)
queue_work(sys->io_wqueue, &req->work);
diff --git a/sheep/ops.c b/sheep/ops.c
new file mode 100644
index 0000000..0d38e7b
--- /dev/null
+++ b/sheep/ops.c
@@ -0,0 +1,427 @@
+/*
+ * Copyright (C) 2011 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+
+#include "sheep_priv.h"
+
+static int process_new_vdi(const struct sd_req *req, struct sd_rsp *rsp,
+ void *data)
+{
+ const struct sd_vdi_req *hdr = (const struct sd_vdi_req *)req;
+ struct sd_vdi_rsp *vdi_rsp = (struct sd_vdi_rsp *)rsp;
+ uint32_t vid = 0, nr_copies = sys->nr_sobjs;
+ int ret;
+
+ ret = add_vdi(hdr->epoch, data, hdr->data_length, hdr->vdi_size, &vid,
+ hdr->base_vdi_id, hdr->copies,
+ hdr->snapid, &nr_copies);
+
+ vdi_rsp->vdi_id = vid;
+ vdi_rsp->copies = nr_copies;
+
+ return ret;
+}
+
+static int post_process_new_vdi(const struct sd_req *req, struct sd_rsp *rsp,
+ void *data)
+{
+ struct sd_vdi_rsp *vdi_rsp = (struct sd_vdi_rsp *)rsp;
+ unsigned long nr = vdi_rsp->vdi_id;
+ int ret = vdi_rsp->result;
+
+ vprintf(SDOG_INFO, "done %d %ld\n", ret, nr);
+ set_bit(nr, sys->vdi_inuse);
+
+ return SD_RES_SUCCESS;
+}
+
+static int process_del_vdi(const struct sd_req *req, struct sd_rsp *rsp,
+ void *data)
+{
+ const struct sd_vdi_req *hdr = (const struct sd_vdi_req *)req;
+ struct sd_vdi_rsp *vdi_rsp = (struct sd_vdi_rsp *)rsp;
+ uint32_t vid = 0, nr_copies = sys->nr_sobjs;
+ int ret;
+
+ ret = del_vdi(hdr->epoch, data, hdr->data_length, &vid,
+ hdr->snapid, &nr_copies);
+
+ vdi_rsp->vdi_id = vid;
+ vdi_rsp->copies = nr_copies;
+
+ return ret;
+}
+
+static int process_get_vdi_info(const struct sd_req *req, struct sd_rsp *rsp,
+ void *data)
+{
+ const struct sd_vdi_req *hdr = (const struct sd_vdi_req *)req;
+ struct sd_vdi_rsp *vdi_rsp = (struct sd_vdi_rsp *)rsp;
+ uint32_t vid = 0, nr_copies = sys->nr_sobjs;
+ void *tag;
+ int ret;
+
+ if (hdr->proto_ver != SD_PROTO_VER)
+ return SD_RES_VER_MISMATCH;
+
+ if (hdr->data_length == SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN)
+ tag = (char *)data + SD_MAX_VDI_LEN;
+ else if (hdr->data_length == SD_MAX_VDI_LEN)
+ tag = NULL;
+ else
+ return SD_RES_INVALID_PARMS;
+
+ ret = lookup_vdi(hdr->epoch, data, tag, &vid, hdr->snapid,
+ &nr_copies, NULL);
+ if (ret != SD_RES_SUCCESS)
+ return ret;
+
+ vdi_rsp->vdi_id = vid;
+ vdi_rsp->copies = nr_copies;
+
+ return ret;
+}
+
+static int post_process_make_fs(const struct sd_req *req, struct sd_rsp *rsp,
+ void *data)
+{
+ const struct sd_so_req *hdr = (const struct sd_so_req *)req;
+ int i, latest_epoch, ret;
+ uint64_t ctime;
+
+ sys->nr_sobjs = hdr->copies;
+ sys->flags = hdr->flags;
+ if (!sys->nr_sobjs)
+ sys->nr_sobjs = SD_DEFAULT_REDUNDANCY;
+
+ ctime = hdr->ctime;
+ set_cluster_ctime(ctime);
+
+ latest_epoch = get_latest_epoch();
+ for (i = 1; i <= latest_epoch; i++)
+ remove_epoch(i);
+ memset(sys->vdi_inuse, 0, sizeof(sys->vdi_inuse));
+
+ sys->epoch = 1;
+ sys->recovered_epoch = 1;
+
+ dprintf("write epoch log, %d, %d\n", sys->epoch, sys->nr_nodes);
+ ret = epoch_log_write(sys->epoch, (char *)sys->nodes,
+ sys->nr_nodes * sizeof(struct sheepdog_node_list_entry));
+ if (ret < 0) {
+ eprintf("can't write epoch %u\n", sys->epoch);
+ return SD_RES_EIO;
+ }
+ update_epoch_store(sys->epoch);
+
+ set_cluster_copies(sys->nr_sobjs);
+ set_cluster_flags(sys->flags);
+
+ if (sys_flag_nohalt())
+ sys_stat_set(SD_STATUS_OK);
+ else {
+ int nr_zones = get_zones_nr_from(sys->nodes, sys->nr_nodes);
+
+ if (nr_zones >= sys->nr_sobjs)
+ sys_stat_set(SD_STATUS_OK);
+ else
+ sys_stat_set(SD_STATUS_HALT);
+ }
+
+ return SD_RES_SUCCESS;
+}
+
+static int post_process_shutdown(const struct sd_req *req, struct sd_rsp *rsp,
+ void *data)
+{
+ sys_stat_set(SD_STATUS_SHUTDOWN);
+
+ return SD_RES_SUCCESS;
+}
+
+static int process_get_vdi_attr(const struct sd_req *req, struct sd_rsp *rsp,
+ void *data)
+{
+ const struct sd_vdi_req *hdr = (const struct sd_vdi_req *)req;
+ struct sd_vdi_rsp *vdi_rsp = (struct sd_vdi_rsp *)rsp;
+ uint32_t vid = 0, attrid = 0, nr_copies = sys->nr_sobjs;
+ uint64_t ctime = 0;
+ int ret;
+ struct sheepdog_vdi_attr *vattr;
+
+ vattr = data;
+ ret = lookup_vdi(hdr->epoch, vattr->name, vattr->tag,
+ &vid, hdr->snapid, &nr_copies, &ctime);
+ if (ret != SD_RES_SUCCESS)
+ return ret;
+
+ /* the curernt vdi id can change if we take the snapshot,
+ so we use the hash value of the vdi name as the vdi id */
+ vid = fnv_64a_buf(vattr->name, strlen(vattr->name), FNV1A_64_INIT);
+ vid &= SD_NR_VDIS - 1;
+ ret = get_vdi_attr(hdr->epoch, data, hdr->data_length, vid,
+ &attrid, nr_copies, ctime,
+ hdr->flags & SD_FLAG_CMD_CREAT,
+ hdr->flags & SD_FLAG_CMD_EXCL,
+ hdr->flags & SD_FLAG_CMD_DEL);
+
+ vdi_rsp->vdi_id = vid;
+ vdi_rsp->attr_id = attrid;
+ vdi_rsp->copies = nr_copies;
+
+ return ret;
+}
+
+static int post_process_read_vdis(const struct sd_req *req, struct sd_rsp *rsp,
+ void *data)
+{
+ return read_vdis(data, req->data_length, &rsp->data_length);
+}
+
+static int get_node_idx(struct sheepdog_node_list_entry *ent,
+ struct sheepdog_node_list_entry *entries, int nr_nodes)
+{
+ ent = bsearch(ent, entries, nr_nodes, sizeof(*ent), node_cmp);
+ if (!ent)
+ return -1;
+
+ return ent - entries;
+}
+
+static int post_process_get_node_list(const struct sd_req *req, struct sd_rsp *rsp,
+ void *data)
+{
+ struct sd_node_rsp *node_rsp = (struct sd_node_rsp *)rsp;
+ int nr_nodes;
+
+ nr_nodes = sys->nr_nodes;
+ memcpy(data, sys->nodes, sizeof(*sys->nodes) * nr_nodes);
+ node_rsp->data_length = nr_nodes * sizeof(struct sheepdog_node_list_entry);
+ node_rsp->nr_nodes = nr_nodes;
+ node_rsp->local_idx = get_node_idx(&sys->this_node, data, nr_nodes);
+ node_rsp->master_idx = -1;
+
+ return SD_RES_SUCCESS;
+}
+
+static int process_stat_sheep(const struct sd_req *req, struct sd_rsp *rsp,
+ void *data)
+{
+ struct sd_node_rsp *node_rsp = (struct sd_node_rsp *)rsp;
+ uint32_t epoch = req->epoch;
+
+ return stat_sheep(&node_rsp->store_size, &node_rsp->store_free, epoch);
+}
+
+static int process_stat_cluster(const struct sd_req *req, struct sd_rsp *rsp,
+ void *data)
+{
+ struct epoch_log *log;
+ int i, max_logs, epoch;
+ uint32_t sys_stat = sys_stat_get();
+
+ max_logs = rsp->data_length / sizeof(*log);
+ epoch = get_latest_epoch();
+ rsp->data_length = 0;
+ for (i = 0; i < max_logs; i++) {
+ if (epoch <= 0)
+ break;
+
+ log = (struct epoch_log *)data + i;
+ log->epoch = epoch;
+ log->ctime = get_cluster_ctime();
+ log->nr_nodes = epoch_log_read(epoch, (char *)log->nodes,
+ sizeof(log->nodes));
+ if (log->nr_nodes == -1)
+ log->nr_nodes = epoch_log_read_remote(epoch,
+ (char *)log->nodes,
+ sizeof(log->nodes));
+
+ rsp->data_length += sizeof(*log);
+ log->nr_nodes /= sizeof(log->nodes[0]);
+ epoch--;
+ }
+
+ switch (sys_stat) {
+ case SD_STATUS_OK:
+ return SD_RES_SUCCESS;
+ case SD_STATUS_WAIT_FOR_FORMAT:
+ return SD_RES_WAIT_FOR_FORMAT;
+ case SD_STATUS_WAIT_FOR_JOIN:
+ return SD_RES_WAIT_FOR_JOIN;
+ case SD_STATUS_SHUTDOWN:
+ return SD_RES_SHUTDOWN;
+ case SD_STATUS_JOIN_FAILED:
+ return SD_RES_JOIN_FAILED;
+ case SD_STATUS_HALT:
+ return SD_RES_HALT;
+ default:
+ return SD_RES_SYSTEM_ERROR;
+ }
+}
+
+static int process_kill_node(const struct sd_req *req, struct sd_rsp *rsp,
+ void *data)
+{
+ exit(1);
+}
+
+static int process_get_obj_list(const struct sd_req *req, struct sd_rsp *rsp,
+ void *data)
+{
+ return get_obj_list((const struct sd_list_req *)req,
+ (struct sd_list_rsp *)rsp, data);
+}
+
+static int process_get_epoch(const struct sd_req *req, struct sd_rsp *rsp,
+ void *data)
+{
+ const struct sd_obj_req *obj_req = (const struct sd_obj_req *)req;
+ struct sd_obj_rsp *obj_rsp = (struct sd_obj_rsp *)rsp;
+ int epoch = obj_req->tgt_epoch;
+ int len, ret;
+ dprintf("%d\n", epoch);
+ len = epoch_log_read(epoch, (char *)data, obj_req->data_length);
+ if (len == -1) {
+ ret = SD_RES_NO_TAG;
+ obj_rsp->data_length = 0;
+ } else {
+ ret = SD_RES_SUCCESS;
+ obj_rsp->data_length = len;
+ }
+ return ret;
+}
+
+static struct sd_op_template sd_ops[] = {
+
+ /* cluster operations */
+ [SD_OP_NEW_VDI] = {
+ .type = SD_OP_TYPE_CLUSTER,
+ .process = process_new_vdi,
+ .post_process = post_process_new_vdi,
+ },
+
+ [SD_OP_DEL_VDI] = {
+ .type = SD_OP_TYPE_CLUSTER,
+ .process = process_del_vdi,
+ },
+
+ [SD_OP_GET_VDI_INFO] = {
+ .type = SD_OP_TYPE_CLUSTER,
+ .process = process_get_vdi_info,
+ },
+
+ [SD_OP_LOCK_VDI] = {
+ .type = SD_OP_TYPE_CLUSTER,
+ .process = process_get_vdi_info,
+ },
+
+ [SD_OP_MAKE_FS] = {
+ .type = SD_OP_TYPE_CLUSTER,
+ .available_always = 1,
+ .post_process = post_process_make_fs,
+ },
+
+ [SD_OP_SHUTDOWN] = {
+ .type = SD_OP_TYPE_CLUSTER,
+ .post_process = post_process_shutdown,
+ },
+
+ [SD_OP_GET_VDI_ATTR] = {
+ .type = SD_OP_TYPE_CLUSTER,
+ .process = process_get_vdi_attr,
+ },
+
+ [SD_OP_RELEASE_VDI] = {
+ .type = SD_OP_TYPE_CLUSTER,
+ },
+
+ /* store operations */
+ [SD_OP_READ_VDIS] = {
+ .type = SD_OP_TYPE_STORE,
+ .available_always = 1,
+ .post_process = post_process_read_vdis,
+ },
+
+ [SD_OP_GET_NODE_LIST] = {
+ .type = SD_OP_TYPE_STORE,
+ .available_always = 1,
+ .post_process = post_process_get_node_list,
+ },
+
+ [SD_OP_STAT_SHEEP] = {
+ .type = SD_OP_TYPE_STORE,
+ .process = process_stat_sheep,
+ },
+
+ [SD_OP_STAT_CLUSTER] = {
+ .type = SD_OP_TYPE_STORE,
+ .available_always = 1,
+ .process = process_stat_cluster,
+ },
+
+ [SD_OP_KILL_NODE] = {
+ .type = SD_OP_TYPE_STORE,
+ .available_always = 1,
+ .process = process_kill_node,
+ },
+
+ [SD_OP_GET_OBJ_LIST] = {
+ .type = SD_OP_TYPE_STORE,
+ .process = process_get_obj_list,
+ },
+
+ [SD_OP_GET_EPOCH] = {
+ .type = SD_OP_TYPE_STORE,
+ .process = process_get_epoch,
+ },
+
+ /* I/O operations */
+ [SD_OP_CREATE_AND_WRITE_OBJ] = {
+ .type = SD_OP_TYPE_IO,
+ },
+
+ [SD_OP_READ_OBJ] = {
+ .type = SD_OP_TYPE_IO,
+ },
+
+ [SD_OP_WRITE_OBJ] = {
+ .type = SD_OP_TYPE_IO,
+ },
+
+ [SD_OP_REMOVE_OBJ] = {
+ .type = SD_OP_TYPE_IO,
+ },
+};
+
+struct sd_op_template *get_sd_op(uint8_t opcode)
+{
+ if (sd_ops[opcode].type == 0)
+ return NULL;
+
+ return sd_ops + opcode;
+}
+
+int is_cluster_op(struct sd_op_template *op)
+{
+ return op->type == SD_OP_TYPE_CLUSTER;
+}
+
+int is_store_op(struct sd_op_template *op)
+{
+ return op->type == SD_OP_TYPE_STORE;
+}
+
+int is_io_op(struct sd_op_template *op)
+{
+ return op->type == SD_OP_TYPE_IO;
+}
diff --git a/sheep/sdnet.c b/sheep/sdnet.c
index 474b1be..7d7e142 100644
--- a/sheep/sdnet.c
+++ b/sheep/sdnet.c
@@ -19,46 +19,6 @@
#include "sheep_priv.h"
-int is_io_request(unsigned op)
-{
- int ret = 0;
-
- switch (op) {
- case SD_OP_CREATE_AND_WRITE_OBJ:
- case SD_OP_REMOVE_OBJ:
- case SD_OP_READ_OBJ:
- case SD_OP_WRITE_OBJ:
- ret = 1;
- break;
- default:
- break;
- }
-
- return ret;
-}
-
-int is_cluster_request(unsigned op)
-{
- int ret = 0;
-
- switch (op) {
- case SD_OP_NEW_VDI:
- case SD_OP_DEL_VDI:
- case SD_OP_LOCK_VDI:
- case SD_OP_RELEASE_VDI:
- case SD_OP_GET_VDI_INFO:
- case SD_OP_MAKE_FS:
- case SD_OP_SHUTDOWN:
- case SD_OP_GET_VDI_ATTR:
- ret = 1;
- break;
- default:
- break;
- }
-
- return ret;
-}
-
void resume_pending_requests(void)
{
struct request *next, *tmp;
@@ -119,18 +79,21 @@ static void setup_access_to_local_objects(struct request *req)
static void __done(struct work *work, int idx)
{
struct request *req = container_of(work, struct request, work);
- struct sd_req *hdr = (struct sd_req *)&req->rq;
int again = 0;
int copies = sys->nr_sobjs;
if (copies > req->nr_zones)
copies = req->nr_zones;
- if (is_cluster_request(hdr->opcode))
+ if (is_cluster_op(req->op))
/* request is forwarded to cpg group */
return;
- if (is_io_request(hdr->opcode)) {
+ if (is_store_op(req->op) && req->op->post_process)
+ req->rp.result = req->op->post_process(&req->rq, &req->rp,
+ req->data);
+
+ if (is_io_op(req->op)) {
struct cpg_event *cevent = &req->cev;
list_del(&req->r_wlist);
@@ -225,39 +188,15 @@ static void queue_request(struct request *req)
dprintf("%x\n", hdr->opcode);
- if (hdr->opcode == SD_OP_KILL_NODE) {
- log_close();
- exit(1);
- }
-
if (sys->status == SD_STATUS_SHUTDOWN) {
rsp->result = SD_RES_SHUTDOWN;
req->done(req);
return;
}
- /*
- * we can know why this node failed to join with
- * SD_OP_STAT_CLUSTER, so the request should be handled even
- * when the cluster status is SD_STATUS_JOIN_FAILED
- */
- if (sys->status == SD_STATUS_JOIN_FAILED &&
- hdr->opcode != SD_OP_STAT_CLUSTER) {
- rsp->result = SD_RES_JOIN_FAILED;
- req->done(req);
- return;
- }
-
if (sys->status == SD_STATUS_WAIT_FOR_FORMAT ||
sys->status == SD_STATUS_WAIT_FOR_JOIN) {
- /* TODO: cleanup */
- switch (hdr->opcode) {
- case SD_OP_STAT_CLUSTER:
- case SD_OP_MAKE_FS:
- case SD_OP_GET_NODE_LIST:
- case SD_OP_READ_VDIS:
- break;
- default:
+ if (!req->op->available_always) {
if (sys->status == SD_STATUS_WAIT_FOR_FORMAT)
rsp->result = SD_RES_WAIT_FOR_FORMAT;
else
@@ -267,33 +206,11 @@ static void queue_request(struct request *req)
}
}
- switch (hdr->opcode) {
- case SD_OP_CREATE_AND_WRITE_OBJ:
- case SD_OP_REMOVE_OBJ:
- case SD_OP_READ_OBJ:
- case SD_OP_WRITE_OBJ:
- case SD_OP_STAT_SHEEP:
- case SD_OP_GET_OBJ_LIST:
+ if (is_io_op(req->op) || is_store_op(req->op))
req->work.fn = store_queue_request;
- break;
- case SD_OP_GET_NODE_LIST:
- case SD_OP_NEW_VDI:
- case SD_OP_DEL_VDI:
- case SD_OP_LOCK_VDI:
- case SD_OP_RELEASE_VDI:
- case SD_OP_GET_VDI_INFO:
- case SD_OP_MAKE_FS:
- case SD_OP_SHUTDOWN:
- case SD_OP_STAT_CLUSTER:
- case SD_OP_GET_VDI_ATTR:
- case SD_OP_GET_EPOCH:
+ else if (is_cluster_op(req->op))
req->work.fn = cluster_queue_request;
- break;
- case SD_OP_READ_VDIS:
- rsp->result = read_vdis(req->data, hdr->data_length, &rsp->data_length);
- req->done(req);
- return;
- default:
+ else {
eprintf("unknown operation %d\n", hdr->opcode);
rsp->result = SD_RES_SYSTEM_ERROR;
req->done(req);
@@ -314,7 +231,7 @@ static void queue_request(struct request *req)
hdr->epoch = sys->epoch;
setup_ordered_sd_vnode_list(req);
- if (is_io_request(hdr->opcode))
+ if (is_io_op(req->op))
setup_access_to_local_objects(req);
cevent->ctype = CPG_EVENT_REQUEST;
@@ -410,6 +327,12 @@ static void client_rx_handler(struct client_info *ci)
/* use le_to_cpu */
memcpy(&req->rq, hdr, sizeof(req->rq));
+ req->op = get_sd_op(hdr->opcode);
+ if (!req->op) {
+ eprintf("invalid opcode, %d\n", hdr->opcode);
+ conn->c_rx_state = C_IO_CLOSED;
+ break;
+ }
if (data_len && hdr->flags & SD_FLAG_CMD_WRITE) {
conn->c_rx_state = C_IO_DATA;
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 65bc3aa..1fe2d4d 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -72,6 +72,8 @@ struct request {
struct sd_req rq;
struct sd_rsp rp;
+ struct sd_op_template *op;
+
void *data;
struct client_info *ci;
@@ -152,8 +154,6 @@ extern struct cluster_info *sys;
int create_listen_port(int port, void *data);
-int is_io_request(unsigned op);
-int is_cluster_request(unsigned op);
int init_store(const char *dir);
int init_base_path(const char *dir);
@@ -173,6 +173,7 @@ int get_vdi_attr(uint32_t epoch, struct sheepdog_vdi_attr *vattr, int data_len,
uint32_t vid, uint32_t *attrid, int copies, uint64_t ctime,
int write, int excl, int delete);
+int get_zones_nr_from(struct sheepdog_node_list_entry *nodes, int nr_nodes);
void setup_ordered_sd_vnode_list(struct request *req);
void get_ordered_sd_vnode_list(struct sheepdog_vnode_list_entry *entries,
int *nr_vnodes, int *nr_zones);
@@ -215,6 +216,8 @@ int get_latest_epoch(void);
int remove_epoch(int epoch);
int set_cluster_ctime(uint64_t ctime);
uint64_t get_cluster_ctime(void);
+int stat_sheep(uint64_t *store_size, uint64_t *store_free, uint32_t epoch);
+int get_obj_list(const struct sd_list_req *hdr, struct sd_list_rsp *rsp, void *data);
int start_recovery(uint32_t epoch);
void resume_recovery_work(void);
@@ -235,6 +238,47 @@ int remove_object(struct sheepdog_vnode_list_entry *e,
int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx,
uint32_t epoch, int worker_idx);
+/* Operations */
+
+enum sd_op_type {
+ SD_OP_TYPE_CLUSTER = 1, /* cluster operations */
+ SD_OP_TYPE_STORE, /* store operations */
+ SD_OP_TYPE_IO, /* io operations */
+};
+
+struct sd_op_template {
+ enum sd_op_type type;
+
+ /* process request even when cluster is not working */
+ int available_always;
+
+ /*
+ * process() will be called in the worker thread, and
+ * post_process() will be called in the main thread.
+ *
+ * If type is SD_OP_TYPE_CLUSTER, it is guaranteed that only
+ * one node processes a cluster operation at the same time.
+ * We can use this for something like distributed locking.
+ * process() will be called on the local node, and
+ * post_process() will be called on every nodes.
+ *
+ * If type is SD_OP_TYPE_STORE, both process() and
+ * post_process() will be called on the local node.
+ *
+ * If type is SD_OP_TYPE_IO, neither process() nor
+ * post_process() is used because this type of operation is
+ * heavily intertwined with Sheepdog core codes. We will be
+ * unlikely to add new operations of this type.
+ */
+ int (*process)(const struct sd_req *req, struct sd_rsp *rsp, void *data);
+ int (*post_process)(const struct sd_req *req, struct sd_rsp *rsp, void *data);
+};
+
+struct sd_op_template *get_sd_op(uint8_t opcode);
+int is_cluster_op(struct sd_op_template *op);
+int is_store_op(struct sd_op_template *op);
+int is_io_op(struct sd_op_template *op);
+
/* Journal */
int jrnl_perform(int fd, void *buf, size_t count, off_t offset,
const char *path, const char *jrnl_dir);
diff --git a/sheep/store.c b/sheep/store.c
index ee8996e..83644db 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -52,7 +52,7 @@ static int obj_cmp(const void *oid1, const void *oid2)
return 0;
}
-static int stat_sheep(uint64_t *store_size, uint64_t *store_free, uint32_t epoch)
+int stat_sheep(uint64_t *store_size, uint64_t *store_free, uint32_t epoch)
{
struct statvfs vs;
int ret;
@@ -96,16 +96,14 @@ static int merge_objlist(struct sheepdog_vnode_list_entry *entries, int nr_entri
uint64_t *list1, int nr_list1,
uint64_t *list2, int nr_list2, int nr_objs);
-static int get_obj_list(struct request *req)
+int get_obj_list(const struct sd_list_req *hdr, struct sd_list_rsp *rsp, void *data)
{
DIR *dir;
struct dirent *d;
- struct sd_list_req *hdr = (struct sd_list_req *)&req->rq;
- struct sd_list_rsp *rsp = (struct sd_list_rsp *)&req->rp;
uint64_t oid;
uint32_t epoch;
char path[1024];
- uint64_t *p = (uint64_t *)req->data;
+ uint64_t *p = (uint64_t *)data;
int nr = 0;
uint64_t *objlist = NULL;
int obj_nr, i;
@@ -778,20 +776,15 @@ void store_queue_request(struct work *work, int idx)
uint64_t oid = hdr->oid;
uint32_t opcode = hdr->opcode;
uint32_t epoch = hdr->epoch;
- struct sd_node_rsp *nrsp = (struct sd_node_rsp *)&req->rp;
dprintf("%"PRIu32", %x, %" PRIx64" , %u\n", idx, opcode, oid, epoch);
if (hdr->flags & SD_FLAG_CMD_RECOVERY)
epoch = hdr->tgt_epoch;
- if (opcode == SD_OP_STAT_SHEEP) {
- ret = stat_sheep(&nrsp->store_size, &nrsp->store_free, epoch);
- goto out;
- }
-
- if (opcode == SD_OP_GET_OBJ_LIST) {
- ret = get_obj_list(req);
+ if (is_store_op(req->op)) {
+ if (req->op->process)
+ ret = req->op->process(&req->rq, &req->rp, req->data);
goto out;
}
--
1.7.2.5
More information about the sheepdog
mailing list