[Sheepdog] [RFC PATCH] sheep: introduce sd_op_template
MORITA Kazutaka
morita.kazutaka at lab.ntt.co.jp
Mon Oct 24 06:17:07 CEST 2011
At Sat, 22 Oct 2011 13:14:37 +0800,
Liu Yuan wrote:
>
> On 10/21/2011 04:28 PM, MORITA Kazutaka wrote:
>
> > When we want to add a new operation (SD_OP_xxxxx), it is not clear
> > which codes we should modify. And in some cases, we need to modify
> > codes everywhere to implement one operation. This is not a good
> > design.
> >
> > This patch abstracts out Sheepdog operations into sd_op_template, and
> > moves all the request processing codes to sheep/ops.c.
> >
> > The definition of sd_op_template is as follows:
> >
> > struct sd_op_template {
> > enum sd_op_type type;
> >
> > int available_always;
> >
> > int (*process)(const struct sd_req *req, struct sd_rsp *rsp,
> > void *data);
> > int (*post_process)(const struct sd_req *req, struct sd_rsp *rsp,
> > void *data);
> > };
> >
> > 'type' is the type of the operation; SD_OP_TYPE_CLUSTER,
> > SD_OP_TYPE_STORE, or SD_OP_TYPE_IO.
> >
> > 'available_always' is set to non-zero if the operations should be
> > processed even when the cluster is not working.
> >
> > 'process()' and 'post_process()' are the main functions of this
> > operation. process() will be called in the worker thread, and
> > post_process() will be called in the main thread.
> >
> > If type is SD_OP_TYPE_CLUSTER, it is guaranteed that only one node
> > processes a cluster operation at the same time. We can use this for
> > something like distributed locking. process() will be called on the
> > local node, and post_process() will be called on every nodes.
> >
> > If type is SD_OP_TYPE_STORE, both process() and post_process() will be
> > called on the local node.
> >
> > If type is SD_OP_TYPE_IO, neither process() nor post_process() is used
> > because this type of operation is heavily intertwined with Sheepdog
> > core codes. We will be unlikely to add new operations of this type.
> >
> > Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
> > ---
> > sheep/Makefile.am | 2 +-
> > sheep/group.c | 286 +++--------------------------------
> > sheep/ops.c | 427 ++++++++++++++++++++++++++++++++++++++++++++++++++++
> > sheep/sdnet.c | 111 ++------------
> > sheep/sheep_priv.h | 48 ++++++-
> > sheep/store.c | 19 +--
> > 6 files changed, 522 insertions(+), 371 deletions(-)
> > create mode 100644 sheep/ops.c
> >
> > diff --git a/sheep/Makefile.am b/sheep/Makefile.am
> > index 2b9d58f..3db914d 100644
> > --- a/sheep/Makefile.am
> > +++ b/sheep/Makefile.am
> > @@ -23,7 +23,7 @@ INCLUDES = -I$(top_builddir)/include -I$(top_srcdir)/include $(libcpg_CFLAGS) $
> >
> > sbin_PROGRAMS = sheep
> >
> > -sheep_SOURCES = sheep.c group.c sdnet.c store.c vdi.c work.c journal.c \
> > +sheep_SOURCES = sheep.c group.c sdnet.c store.c vdi.c work.c journal.c ops.c \
> > cluster/corosync.c
> > sheep_LDADD = $(libcpg_LIBS) $(libcfg_LIBS) ../lib/libsheepdog.a -lpthread
> > sheep_DEPENDENCIES = ../lib/libsheepdog.a
> > diff --git a/sheep/group.c b/sheep/group.c
> > index e22dabc..8664b6f 100644
> > --- a/sheep/group.c
> > +++ b/sheep/group.c
> > @@ -101,17 +101,7 @@ static size_t get_join_message_size(struct join_message *jm)
> > return sizeof(*jm) + jm->nr_nodes * sizeof(jm->nodes[0]);
> > }
> >
> > -static int get_node_idx(struct sheepdog_node_list_entry *ent,
> > - struct sheepdog_node_list_entry *entries, int nr_nodes)
> > -{
> > - ent = bsearch(ent, entries, nr_nodes, sizeof(*ent), node_cmp);
> > - if (!ent)
> > - return -1;
> > -
> > - return ent - entries;
> > -}
> > -
> > -static int get_zones_nr_from(struct sheepdog_node_list_entry *nodes, int nr_nodes)
> > +int get_zones_nr_from(struct sheepdog_node_list_entry *nodes, int nr_nodes)
> > {
> > int nr_zones = 0, i, j;
> > uint32_t zones[SD_MAX_REDUNDANCY];
> > @@ -146,116 +136,28 @@ void setup_ordered_sd_vnode_list(struct request *req)
> > get_ordered_sd_vnode_list(req->entry, &req->nr_vnodes, &req->nr_zones);
> > }
> >
> > -static void get_node_list(struct sd_node_req *req,
> > - struct sd_node_rsp *rsp, void *data)
> > +static void do_cluster_op(void *arg)
> > {
> > - int nr_nodes;
> > -
> > - nr_nodes = sys->nr_nodes;
> > - memcpy(data, sys->nodes, sizeof(*sys->nodes) * nr_nodes);
> > - rsp->data_length = nr_nodes * sizeof(struct sheepdog_node_list_entry);
> > - rsp->nr_nodes = nr_nodes;
> > - rsp->local_idx = get_node_idx(&sys->this_node, data, nr_nodes);
> > - rsp->master_idx = -1;
> > -}
> > + struct vdi_op_message *msg = arg;
> > + int ret;
> > + struct request *req;
> >
> > -static int get_epoch(struct sd_obj_req *req,
> > - struct sd_obj_rsp *rsp, void *data)
> > -{
> > - int epoch = req->tgt_epoch;
> > - int len, ret;
> > - dprintf("%d\n", epoch);
> > - len = epoch_log_read(epoch, (char *)data, req->data_length);
> > - if (len == -1) {
> > - ret = SD_RES_NO_TAG;
> > - rsp->data_length = 0;
> > - } else {
> > - ret = SD_RES_SUCCESS;
> > - rsp->data_length = len;
> > - }
> > - return ret;
> > -}
> > + req = list_first_entry(&sys->pending_list, struct request, pending_list);
> > + ret = req->op->process((const struct sd_req *)&msg->req,
> > + (struct sd_rsp *)&msg->rsp, req->data);
> >
> > -static void vdi_op(void *arg);
> > + msg->rsp.result = ret;
> > +}
> >
> > void cluster_queue_request(struct work *work, int idx)
> > {
> > struct request *req = container_of(work, struct request, work);
> > struct sd_req *hdr = (struct sd_req *)&req->rq;
> > - struct sd_rsp *rsp = (struct sd_rsp *)&req->rp;
> > struct vdi_op_message *msg;
> > - struct epoch_log *log;
> > - int ret = SD_RES_SUCCESS, i, max_logs, epoch;
> > - uint32_t sys_stat = sys_stat_get();
> > size_t size;
> >
> > eprintf("%p %x\n", req, hdr->opcode);
> >
> > - switch (hdr->opcode) {
> > - case SD_OP_GET_EPOCH:
> > - ret = get_epoch((struct sd_obj_req *)hdr,
> > - (struct sd_obj_rsp *)rsp, req->data);
> > - break;
> > - case SD_OP_GET_NODE_LIST:
> > - get_node_list((struct sd_node_req *)hdr,
> > - (struct sd_node_rsp *)rsp, req->data);
> > - break;
> > - case SD_OP_STAT_CLUSTER:
> > - max_logs = rsp->data_length / sizeof(*log);
> > - epoch = get_latest_epoch();
> > - rsp->data_length = 0;
> > - for (i = 0; i < max_logs; i++) {
> > - if (epoch <= 0)
> > - break;
> > -
> > - log = (struct epoch_log *)req->data + i;
> > - log->epoch = epoch;
> > - log->ctime = get_cluster_ctime();
> > - log->nr_nodes = epoch_log_read(epoch, (char *)log->nodes,
> > - sizeof(log->nodes));
> > - if (log->nr_nodes == -1)
> > - log->nr_nodes = epoch_log_read_remote(epoch,
> > - (char *)log->nodes,
> > - sizeof(log->nodes));
> > -
> > - rsp->data_length += sizeof(*log);
> > - log->nr_nodes /= sizeof(log->nodes[0]);
> > - epoch--;
> > - }
> > -
> > - switch (sys_stat) {
> > - case SD_STATUS_OK:
> > - ret = SD_RES_SUCCESS;
> > - break;
> > - case SD_STATUS_WAIT_FOR_FORMAT:
> > - ret = SD_RES_WAIT_FOR_FORMAT;
> > - break;
> > - case SD_STATUS_WAIT_FOR_JOIN:
> > - ret = SD_RES_WAIT_FOR_JOIN;
> > - break;
> > - case SD_STATUS_SHUTDOWN:
> > - ret = SD_RES_SHUTDOWN;
> > - break;
> > - case SD_STATUS_JOIN_FAILED:
> > - ret = SD_RES_JOIN_FAILED;
> > - break;
> > - case SD_STATUS_HALT:
> > - ret = SD_RES_HALT;
> > - break;
> > - default:
> > - ret = SD_RES_SYSTEM_ERROR;
> > - break;
> > - }
> > - break;
> > - default:
> > - /* forward request to group */
> > - goto forward;
> > - }
> > -
> > - rsp->result = ret;
> > - return;
> > -
> > -forward:
> > if (hdr->flags & SD_FLAG_CMD_WRITE)
> > size = sizeof(*msg);
> > else
> > @@ -272,7 +174,12 @@ forward:
> >
> > list_add(&req->pending_list, &sys->pending_list);
> >
> > - sys->cdrv->notify(msg, size, vdi_op);
> > + if (req->op->process)
> > + sys->cdrv->notify(msg, size, do_cluster_op);
> > + else {
> > + msg->rsp.result = SD_RES_SUCCESS;
> > + sys->cdrv->notify(msg, size, NULL);
> > + }
> >
> > free(msg);
> > }
> > @@ -628,85 +535,6 @@ join_finished:
> > return;
> > }
> >
> > -static void vdi_op(void *arg)
> > -{
> > - struct vdi_op_message *msg = arg;
> > - const struct sd_vdi_req *hdr = &msg->req;
> > - struct sd_vdi_rsp *rsp = &msg->rsp;
> > - void *data, *tag;
> > - int ret = SD_RES_SUCCESS;
> > - struct sheepdog_vdi_attr *vattr;
> > - uint32_t vid = 0, attrid = 0, nr_copies = sys->nr_sobjs;
> > - uint64_t ctime = 0;
> > - struct request *req;
> > -
> > - req = list_first_entry(&sys->pending_list, struct request, pending_list);
> > - data = req->data;
> > -
> > - switch (hdr->opcode) {
> > - case SD_OP_NEW_VDI:
> > - ret = add_vdi(hdr->epoch, data, hdr->data_length, hdr->vdi_size, &vid,
> > - hdr->base_vdi_id, hdr->copies,
> > - hdr->snapid, &nr_copies);
> > - break;
> > - case SD_OP_DEL_VDI:
> > - ret = del_vdi(hdr->epoch, data, hdr->data_length, &vid,
> > - hdr->snapid, &nr_copies);
> > - break;
> > - case SD_OP_LOCK_VDI:
> > - case SD_OP_GET_VDI_INFO:
> > - if (hdr->proto_ver != SD_PROTO_VER) {
> > - ret = SD_RES_VER_MISMATCH;
> > - break;
> > - }
> > - if (hdr->data_length == SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN)
> > - tag = (char *)data + SD_MAX_VDI_LEN;
> > - else if (hdr->data_length == SD_MAX_VDI_LEN)
> > - tag = NULL;
> > - else {
> > - ret = SD_RES_INVALID_PARMS;
> > - break;
> > - }
> > - ret = lookup_vdi(hdr->epoch, data, tag, &vid, hdr->snapid,
> > - &nr_copies, NULL);
> > - if (ret != SD_RES_SUCCESS)
> > - break;
> > - break;
> > - case SD_OP_GET_VDI_ATTR:
> > - vattr = data;
> > - ret = lookup_vdi(hdr->epoch, vattr->name, vattr->tag,
> > - &vid, hdr->snapid, &nr_copies, &ctime);
> > - if (ret != SD_RES_SUCCESS)
> > - break;
> > - /* the curernt vdi id can change if we take the snapshot,
> > - so we use the hash value of the vdi name as the vdi id */
> > - vid = fnv_64a_buf(vattr->name, strlen(vattr->name), FNV1A_64_INIT);
> > - vid &= SD_NR_VDIS - 1;
> > - ret = get_vdi_attr(hdr->epoch, data, hdr->data_length, vid,
> > - &attrid, nr_copies, ctime,
> > - hdr->flags & SD_FLAG_CMD_CREAT,
> > - hdr->flags & SD_FLAG_CMD_EXCL,
> > - hdr->flags & SD_FLAG_CMD_DEL);
> > - break;
> > - case SD_OP_RELEASE_VDI:
> > - break;
> > - case SD_OP_MAKE_FS:
> > - ret = SD_RES_SUCCESS;
> > - break;
> > - case SD_OP_SHUTDOWN:
> > - break;
> > - default:
> > - ret = SD_RES_SYSTEM_ERROR;
> > - eprintf("opcode %d is not implemented\n", hdr->opcode);
> > - break;
> > - }
> > -
> > - rsp->vdi_id = vid;
> > - rsp->attr_id = attrid;
> > - rsp->copies = nr_copies;
> > - rsp->result = ret;
> > -}
> > -
> > static void __sd_notify(struct cpg_event *cevent)
> > {
> > }
> > @@ -715,86 +543,22 @@ static void __sd_notify_done(struct cpg_event *cevent)
> > {
> > struct work_notify *w = container_of(cevent, struct work_notify, cev);
> > struct vdi_op_message *msg = (struct vdi_op_message *)w->msg;
> > - const struct sd_vdi_req *hdr = &msg->req;
> > - struct sd_vdi_rsp *rsp = &msg->rsp;
> > - void *data = msg->data;
> > struct request *req;
> > int ret = msg->rsp.result;
> > - int i, latest_epoch;
> > - uint64_t ctime;
> > -
> > - if (ret != SD_RES_SUCCESS)
> > - goto out;
> > -
> > - switch (hdr->opcode) {
> > - case SD_OP_NEW_VDI:
> > - {
> > - unsigned long nr = rsp->vdi_id;
> > - vprintf(SDOG_INFO, "done %d %ld\n", ret, nr);
> > - set_bit(nr, sys->vdi_inuse);
> > - break;
> > - }
> > - case SD_OP_DEL_VDI:
> > - break;
> > - case SD_OP_LOCK_VDI:
> > - case SD_OP_RELEASE_VDI:
> > - case SD_OP_GET_VDI_INFO:
> > - case SD_OP_GET_VDI_ATTR:
> > - break;
> > - case SD_OP_MAKE_FS:
> > - sys->nr_sobjs = ((struct sd_so_req *)hdr)->copies;
> > - sys->flags = ((struct sd_so_req *)hdr)->flags;
> > - if (!sys->nr_sobjs)
> > - sys->nr_sobjs = SD_DEFAULT_REDUNDANCY;
> > -
> > - ctime = ((struct sd_so_req *)hdr)->ctime;
> > - set_cluster_ctime(ctime);
> > -
> > - latest_epoch = get_latest_epoch();
> > - for (i = 1; i <= latest_epoch; i++)
> > - remove_epoch(i);
> > - memset(sys->vdi_inuse, 0, sizeof(sys->vdi_inuse));
> > -
> > - sys->epoch = 1;
> > - sys->recovered_epoch = 1;
> > -
> > - dprintf("write epoch log, %d, %d\n", sys->epoch, sys->nr_nodes);
> > - ret = epoch_log_write(sys->epoch, (char *)sys->nodes,
> > - sys->nr_nodes * sizeof(struct sheepdog_node_list_entry));
> > - if (ret < 0)
> > - eprintf("can't write epoch %u\n", sys->epoch);
> > - update_epoch_store(sys->epoch);
> > + struct sd_op_template *op = get_sd_op(msg->req.opcode);
> >
> > - set_cluster_copies(sys->nr_sobjs);
> > - set_cluster_flags(sys->flags);
> > + if (ret == SD_RES_SUCCESS && op->post_process)
> > + ret = op->post_process((const struct sd_req *)&msg->req,
> > + (struct sd_rsp *)&msg->rsp, msg->data);
> >
> > - if (sys_flag_nohalt())
> > - sys_stat_set(SD_STATUS_OK);
> > - else {
> > - int nr_zones = get_zones_nr_from(sys->nodes, sys->nr_nodes);
> > -
> > - if (nr_zones >= sys->nr_sobjs)
> > - sys_stat_set(SD_STATUS_OK);
> > - else
> > - sys_stat_set(SD_STATUS_HALT);
> > - }
> > - break;
> > - case SD_OP_SHUTDOWN:
> > - sys_stat_set(SD_STATUS_SHUTDOWN);
> > - break;
> > - default:
> > - eprintf("unknown operation %d\n", hdr->opcode);
> > - ret = SD_RES_UNKNOWN;
> > - }
> > -out:
> > if (!is_myself(w->sender.addr, w->sender.port))
> > return;
> >
> > req = list_first_entry(&sys->pending_list, struct request, pending_list);
> >
> > - rsp->result = ret;
> > - memcpy(req->data, data, rsp->data_length);
> > - memcpy(&req->rp, rsp, sizeof(req->rp));
> > + msg->rsp.result = ret;
> > + memcpy(req->data, msg->data, msg->rsp.data_length);
> > + memcpy(&req->rp, &msg->rsp, sizeof(req->rp));
> > list_del(&req->pending_list);
> > req->done(req);
> > }
> > @@ -1226,7 +990,7 @@ do_retry:
> >
> > list_del(&cevent->cpg_event_list);
> >
> > - if (is_io_request(req->rq.opcode)) {
> > + if (is_io_op(req->op)) {
> > int copies = sys->nr_sobjs;
> >
> > if (copies > req->nr_zones)
> > @@ -1282,7 +1046,7 @@ do_retry:
> > }
> > }
> >
> > - if (is_cluster_request(req->rq.opcode))
> > + if (is_cluster_op(req->op))
> > queue_work(sys->cpg_wqueue, &req->work);
> > else if (req->rq.flags & SD_FLAG_CMD_IO_LOCAL)
> > queue_work(sys->io_wqueue, &req->work);
> > diff --git a/sheep/ops.c b/sheep/ops.c
> > new file mode 100644
> > index 0000000..0d38e7b
> > --- /dev/null
> > +++ b/sheep/ops.c
> > @@ -0,0 +1,427 @@
> > +/*
> > + * Copyright (C) 2011 Nippon Telegraph and Telephone Corporation.
> > + *
> > + * This program is free software; you can redistribute it and/or
> > + * modify it under the terms of the GNU General Public License version
> > + * 2 as published by the Free Software Foundation.
> > + *
> > + * You should have received a copy of the GNU General Public License
> > + * along with this program. If not, see <http://www.gnu.org/licenses/>.
> > + */
> > +#include <stdio.h>
> > +#include <stdlib.h>
> > +
> > +#include "sheep_priv.h"
> > +
> > +static int process_new_vdi(const struct sd_req *req, struct sd_rsp *rsp,
> > + void *data)
> > +{
> > + const struct sd_vdi_req *hdr = (const struct sd_vdi_req *)req;
> > + struct sd_vdi_rsp *vdi_rsp = (struct sd_vdi_rsp *)rsp;
> > + uint32_t vid = 0, nr_copies = sys->nr_sobjs;
> > + int ret;
> > +
> > + ret = add_vdi(hdr->epoch, data, hdr->data_length, hdr->vdi_size, &vid,
> > + hdr->base_vdi_id, hdr->copies,
> > + hdr->snapid, &nr_copies);
> > +
> > + vdi_rsp->vdi_id = vid;
> > + vdi_rsp->copies = nr_copies;
> > +
> > + return ret;
> > +}
> > +
> > +static int post_process_new_vdi(const struct sd_req *req, struct sd_rsp *rsp,
> > + void *data)
> > +{
> > + struct sd_vdi_rsp *vdi_rsp = (struct sd_vdi_rsp *)rsp;
> > + unsigned long nr = vdi_rsp->vdi_id;
> > + int ret = vdi_rsp->result;
> > +
> > + vprintf(SDOG_INFO, "done %d %ld\n", ret, nr);
> > + set_bit(nr, sys->vdi_inuse);
> > +
> > + return SD_RES_SUCCESS;
> > +}
> > +
> > +static int process_del_vdi(const struct sd_req *req, struct sd_rsp *rsp,
> > + void *data)
> > +{
> > + const struct sd_vdi_req *hdr = (const struct sd_vdi_req *)req;
> > + struct sd_vdi_rsp *vdi_rsp = (struct sd_vdi_rsp *)rsp;
> > + uint32_t vid = 0, nr_copies = sys->nr_sobjs;
> > + int ret;
> > +
> > + ret = del_vdi(hdr->epoch, data, hdr->data_length, &vid,
> > + hdr->snapid, &nr_copies);
> > +
> > + vdi_rsp->vdi_id = vid;
> > + vdi_rsp->copies = nr_copies;
> > +
> > + return ret;
> > +}
> > +
> > +static int process_get_vdi_info(const struct sd_req *req, struct sd_rsp *rsp,
> > + void *data)
> > +{
> > + const struct sd_vdi_req *hdr = (const struct sd_vdi_req *)req;
> > + struct sd_vdi_rsp *vdi_rsp = (struct sd_vdi_rsp *)rsp;
> > + uint32_t vid = 0, nr_copies = sys->nr_sobjs;
> > + void *tag;
> > + int ret;
> > +
> > + if (hdr->proto_ver != SD_PROTO_VER)
> > + return SD_RES_VER_MISMATCH;
> > +
> > + if (hdr->data_length == SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN)
> > + tag = (char *)data + SD_MAX_VDI_LEN;
> > + else if (hdr->data_length == SD_MAX_VDI_LEN)
> > + tag = NULL;
> > + else
> > + return SD_RES_INVALID_PARMS;
> > +
> > + ret = lookup_vdi(hdr->epoch, data, tag, &vid, hdr->snapid,
> > + &nr_copies, NULL);
> > + if (ret != SD_RES_SUCCESS)
> > + return ret;
> > +
> > + vdi_rsp->vdi_id = vid;
> > + vdi_rsp->copies = nr_copies;
> > +
> > + return ret;
> > +}
> > +
> > +static int post_process_make_fs(const struct sd_req *req, struct sd_rsp *rsp,
> > + void *data)
> > +{
> > + const struct sd_so_req *hdr = (const struct sd_so_req *)req;
> > + int i, latest_epoch, ret;
> > + uint64_t ctime;
> > +
> > + sys->nr_sobjs = hdr->copies;
> > + sys->flags = hdr->flags;
> > + if (!sys->nr_sobjs)
> > + sys->nr_sobjs = SD_DEFAULT_REDUNDANCY;
> > +
> > + ctime = hdr->ctime;
> > + set_cluster_ctime(ctime);
> > +
> > + latest_epoch = get_latest_epoch();
> > + for (i = 1; i <= latest_epoch; i++)
> > + remove_epoch(i);
> > + memset(sys->vdi_inuse, 0, sizeof(sys->vdi_inuse));
> > +
> > + sys->epoch = 1;
> > + sys->recovered_epoch = 1;
> > +
> > + dprintf("write epoch log, %d, %d\n", sys->epoch, sys->nr_nodes);
> > + ret = epoch_log_write(sys->epoch, (char *)sys->nodes,
> > + sys->nr_nodes * sizeof(struct sheepdog_node_list_entry));
> > + if (ret < 0) {
> > + eprintf("can't write epoch %u\n", sys->epoch);
> > + return SD_RES_EIO;
> > + }
> > + update_epoch_store(sys->epoch);
> > +
> > + set_cluster_copies(sys->nr_sobjs);
> > + set_cluster_flags(sys->flags);
> > +
> > + if (sys_flag_nohalt())
> > + sys_stat_set(SD_STATUS_OK);
> > + else {
> > + int nr_zones = get_zones_nr_from(sys->nodes, sys->nr_nodes);
> > +
> > + if (nr_zones >= sys->nr_sobjs)
> > + sys_stat_set(SD_STATUS_OK);
> > + else
> > + sys_stat_set(SD_STATUS_HALT);
> > + }
> > +
> > + return SD_RES_SUCCESS;
> > +}
> > +
> > +static int post_process_shutdown(const struct sd_req *req, struct sd_rsp *rsp,
> > + void *data)
> > +{
> > + sys_stat_set(SD_STATUS_SHUTDOWN);
> > +
> > + return SD_RES_SUCCESS;
> > +}
> > +
> > +static int process_get_vdi_attr(const struct sd_req *req, struct sd_rsp *rsp,
> > + void *data)
> > +{
> > + const struct sd_vdi_req *hdr = (const struct sd_vdi_req *)req;
> > + struct sd_vdi_rsp *vdi_rsp = (struct sd_vdi_rsp *)rsp;
> > + uint32_t vid = 0, attrid = 0, nr_copies = sys->nr_sobjs;
> > + uint64_t ctime = 0;
> > + int ret;
> > + struct sheepdog_vdi_attr *vattr;
> > +
> > + vattr = data;
> > + ret = lookup_vdi(hdr->epoch, vattr->name, vattr->tag,
> > + &vid, hdr->snapid, &nr_copies, &ctime);
> > + if (ret != SD_RES_SUCCESS)
> > + return ret;
> > +
> > + /* the curernt vdi id can change if we take the snapshot,
> > + so we use the hash value of the vdi name as the vdi id */
> > + vid = fnv_64a_buf(vattr->name, strlen(vattr->name), FNV1A_64_INIT);
> > + vid &= SD_NR_VDIS - 1;
> > + ret = get_vdi_attr(hdr->epoch, data, hdr->data_length, vid,
> > + &attrid, nr_copies, ctime,
> > + hdr->flags & SD_FLAG_CMD_CREAT,
> > + hdr->flags & SD_FLAG_CMD_EXCL,
> > + hdr->flags & SD_FLAG_CMD_DEL);
> > +
> > + vdi_rsp->vdi_id = vid;
> > + vdi_rsp->attr_id = attrid;
> > + vdi_rsp->copies = nr_copies;
> > +
> > + return ret;
> > +}
> > +
> > +static int post_process_read_vdis(const struct sd_req *req, struct sd_rsp *rsp,
> > + void *data)
> > +{
> > + return read_vdis(data, req->data_length, &rsp->data_length);
> > +}
> > +
> > +static int get_node_idx(struct sheepdog_node_list_entry *ent,
> > + struct sheepdog_node_list_entry *entries, int nr_nodes)
> > +{
> > + ent = bsearch(ent, entries, nr_nodes, sizeof(*ent), node_cmp);
> > + if (!ent)
> > + return -1;
> > +
> > + return ent - entries;
> > +}
> > +
> > +static int post_process_get_node_list(const struct sd_req *req, struct sd_rsp *rsp,
> > + void *data)
> > +{
> > + struct sd_node_rsp *node_rsp = (struct sd_node_rsp *)rsp;
> > + int nr_nodes;
> > +
> > + nr_nodes = sys->nr_nodes;
> > + memcpy(data, sys->nodes, sizeof(*sys->nodes) * nr_nodes);
> > + node_rsp->data_length = nr_nodes * sizeof(struct sheepdog_node_list_entry);
> > + node_rsp->nr_nodes = nr_nodes;
> > + node_rsp->local_idx = get_node_idx(&sys->this_node, data, nr_nodes);
> > + node_rsp->master_idx = -1;
> > +
> > + return SD_RES_SUCCESS;
> > +}
> > +
> > +static int process_stat_sheep(const struct sd_req *req, struct sd_rsp *rsp,
> > + void *data)
> > +{
> > + struct sd_node_rsp *node_rsp = (struct sd_node_rsp *)rsp;
> > + uint32_t epoch = req->epoch;
> > +
> > + return stat_sheep(&node_rsp->store_size, &node_rsp->store_free, epoch);
> > +}
> > +
> > +static int process_stat_cluster(const struct sd_req *req, struct sd_rsp *rsp,
> > + void *data)
> > +{
> > + struct epoch_log *log;
> > + int i, max_logs, epoch;
> > + uint32_t sys_stat = sys_stat_get();
> > +
> > + max_logs = rsp->data_length / sizeof(*log);
> > + epoch = get_latest_epoch();
> > + rsp->data_length = 0;
> > + for (i = 0; i < max_logs; i++) {
> > + if (epoch <= 0)
> > + break;
> > +
> > + log = (struct epoch_log *)data + i;
> > + log->epoch = epoch;
> > + log->ctime = get_cluster_ctime();
> > + log->nr_nodes = epoch_log_read(epoch, (char *)log->nodes,
> > + sizeof(log->nodes));
> > + if (log->nr_nodes == -1)
> > + log->nr_nodes = epoch_log_read_remote(epoch,
> > + (char *)log->nodes,
> > + sizeof(log->nodes));
> > +
> > + rsp->data_length += sizeof(*log);
> > + log->nr_nodes /= sizeof(log->nodes[0]);
> > + epoch--;
> > + }
> > +
> > + switch (sys_stat) {
> > + case SD_STATUS_OK:
> > + return SD_RES_SUCCESS;
> > + case SD_STATUS_WAIT_FOR_FORMAT:
> > + return SD_RES_WAIT_FOR_FORMAT;
> > + case SD_STATUS_WAIT_FOR_JOIN:
> > + return SD_RES_WAIT_FOR_JOIN;
> > + case SD_STATUS_SHUTDOWN:
> > + return SD_RES_SHUTDOWN;
> > + case SD_STATUS_JOIN_FAILED:
> > + return SD_RES_JOIN_FAILED;
> > + case SD_STATUS_HALT:
> > + return SD_RES_HALT;
> > + default:
> > + return SD_RES_SYSTEM_ERROR;
> > + }
> > +}
> > +
> > +static int process_kill_node(const struct sd_req *req, struct sd_rsp *rsp,
> > + void *data)
> > +{
> > + exit(1);
> > +}
> > +
> > +static int process_get_obj_list(const struct sd_req *req, struct sd_rsp *rsp,
> > + void *data)
> > +{
> > + return get_obj_list((const struct sd_list_req *)req,
> > + (struct sd_list_rsp *)rsp, data);
> > +}
> > +
> > +static int process_get_epoch(const struct sd_req *req, struct sd_rsp *rsp,
> > + void *data)
> > +{
> > + const struct sd_obj_req *obj_req = (const struct sd_obj_req *)req;
> > + struct sd_obj_rsp *obj_rsp = (struct sd_obj_rsp *)rsp;
> > + int epoch = obj_req->tgt_epoch;
> > + int len, ret;
> > + dprintf("%d\n", epoch);
> > + len = epoch_log_read(epoch, (char *)data, obj_req->data_length);
> > + if (len == -1) {
> > + ret = SD_RES_NO_TAG;
> > + obj_rsp->data_length = 0;
> > + } else {
> > + ret = SD_RES_SUCCESS;
> > + obj_rsp->data_length = len;
> > + }
> > + return ret;
> > +}
> > +
> > +static struct sd_op_template sd_ops[] = {
> > +
> > + /* cluster operations */
> > + [SD_OP_NEW_VDI] = {
> > + .type = SD_OP_TYPE_CLUSTER,
> > + .process = process_new_vdi,
> > + .post_process = post_process_new_vdi,
> > + },
> > +
> > + [SD_OP_DEL_VDI] = {
> > + .type = SD_OP_TYPE_CLUSTER,
> > + .process = process_del_vdi,
> > + },
> > +
> > + [SD_OP_GET_VDI_INFO] = {
> > + .type = SD_OP_TYPE_CLUSTER,
> > + .process = process_get_vdi_info,
> > + },
> > +
> > + [SD_OP_LOCK_VDI] = {
> > + .type = SD_OP_TYPE_CLUSTER,
> > + .process = process_get_vdi_info,
> > + },
> > +
> > + [SD_OP_MAKE_FS] = {
> > + .type = SD_OP_TYPE_CLUSTER,
> > + .available_always = 1,
> > + .post_process = post_process_make_fs,
> > + },
> > +
> > + [SD_OP_SHUTDOWN] = {
> > + .type = SD_OP_TYPE_CLUSTER,
> > + .post_process = post_process_shutdown,
> > + },
> > +
> > + [SD_OP_GET_VDI_ATTR] = {
> > + .type = SD_OP_TYPE_CLUSTER,
> > + .process = process_get_vdi_attr,
> > + },
> > +
> > + [SD_OP_RELEASE_VDI] = {
> > + .type = SD_OP_TYPE_CLUSTER,
> > + },
> > +
> > + /* store operations */
> > + [SD_OP_READ_VDIS] = {
> > + .type = SD_OP_TYPE_STORE,
> > + .available_always = 1,
> > + .post_process = post_process_read_vdis,
> > + },
> > +
> > + [SD_OP_GET_NODE_LIST] = {
> > + .type = SD_OP_TYPE_STORE,
> > + .available_always = 1,
> > + .post_process = post_process_get_node_list,
> > + },
> > +
> > + [SD_OP_STAT_SHEEP] = {
> > + .type = SD_OP_TYPE_STORE,
> > + .process = process_stat_sheep,
> > + },
> > +
> > + [SD_OP_STAT_CLUSTER] = {
> > + .type = SD_OP_TYPE_STORE,
> > + .available_always = 1,
> > + .process = process_stat_cluster,
> > + },
> > +
> > + [SD_OP_KILL_NODE] = {
> > + .type = SD_OP_TYPE_STORE,
> > + .available_always = 1,
> > + .process = process_kill_node,
> > + },
> > +
> > + [SD_OP_GET_OBJ_LIST] = {
> > + .type = SD_OP_TYPE_STORE,
> > + .process = process_get_obj_list,
> > + },
> > +
> > + [SD_OP_GET_EPOCH] = {
> > + .type = SD_OP_TYPE_STORE,
> > + .process = process_get_epoch,
> > + },
> > +
> > + /* I/O operations */
> > + [SD_OP_CREATE_AND_WRITE_OBJ] = {
> > + .type = SD_OP_TYPE_IO,
> > + },
> > +
> > + [SD_OP_READ_OBJ] = {
> > + .type = SD_OP_TYPE_IO,
> > + },
> > +
> > + [SD_OP_WRITE_OBJ] = {
> > + .type = SD_OP_TYPE_IO,
> > + },
> > +
> > + [SD_OP_REMOVE_OBJ] = {
> > + .type = SD_OP_TYPE_IO,
> > + },
> > +};
> > +
> > +struct sd_op_template *get_sd_op(uint8_t opcode)
> > +{
> > + if (sd_ops[opcode].type == 0)
> > + return NULL;
> > +
> > + return sd_ops + opcode;
> > +}
> > +
> > +int is_cluster_op(struct sd_op_template *op)
> > +{
> > + return op->type == SD_OP_TYPE_CLUSTER;
> > +}
> > +
> > +int is_store_op(struct sd_op_template *op)
> > +{
> > + return op->type == SD_OP_TYPE_STORE;
> > +}
> > +
> > +int is_io_op(struct sd_op_template *op)
> > +{
> > + return op->type == SD_OP_TYPE_IO;
> > +}
> > diff --git a/sheep/sdnet.c b/sheep/sdnet.c
> > index 474b1be..7d7e142 100644
> > --- a/sheep/sdnet.c
> > +++ b/sheep/sdnet.c
> > @@ -19,46 +19,6 @@
> >
> > #include "sheep_priv.h"
> >
> > -int is_io_request(unsigned op)
> > -{
> > - int ret = 0;
> > -
> > - switch (op) {
> > - case SD_OP_CREATE_AND_WRITE_OBJ:
> > - case SD_OP_REMOVE_OBJ:
> > - case SD_OP_READ_OBJ:
> > - case SD_OP_WRITE_OBJ:
> > - ret = 1;
> > - break;
> > - default:
> > - break;
> > - }
> > -
> > - return ret;
> > -}
> > -
> > -int is_cluster_request(unsigned op)
> > -{
> > - int ret = 0;
> > -
> > - switch (op) {
> > - case SD_OP_NEW_VDI:
> > - case SD_OP_DEL_VDI:
> > - case SD_OP_LOCK_VDI:
> > - case SD_OP_RELEASE_VDI:
> > - case SD_OP_GET_VDI_INFO:
> > - case SD_OP_MAKE_FS:
> > - case SD_OP_SHUTDOWN:
> > - case SD_OP_GET_VDI_ATTR:
> > - ret = 1;
> > - break;
> > - default:
> > - break;
> > - }
> > -
> > - return ret;
> > -}
> > -
> > void resume_pending_requests(void)
> > {
> > struct request *next, *tmp;
> > @@ -119,18 +79,21 @@ static void setup_access_to_local_objects(struct request *req)
> > static void __done(struct work *work, int idx)
> > {
> > struct request *req = container_of(work, struct request, work);
> > - struct sd_req *hdr = (struct sd_req *)&req->rq;
> > int again = 0;
> > int copies = sys->nr_sobjs;
> >
> > if (copies > req->nr_zones)
> > copies = req->nr_zones;
> >
> > - if (is_cluster_request(hdr->opcode))
> > + if (is_cluster_op(req->op))
> > /* request is forwarded to cpg group */
> > return;
> >
> > - if (is_io_request(hdr->opcode)) {
> > + if (is_store_op(req->op) && req->op->post_process)
> > + req->rp.result = req->op->post_process(&req->rq, &req->rp,
> > + req->data);
> > +
> > + if (is_io_op(req->op)) {
> > struct cpg_event *cevent = &req->cev;
> >
> > list_del(&req->r_wlist);
> > @@ -225,39 +188,15 @@ static void queue_request(struct request *req)
> >
> > dprintf("%x\n", hdr->opcode);
> >
> > - if (hdr->opcode == SD_OP_KILL_NODE) {
> > - log_close();
> > - exit(1);
> > - }
> > -
> > if (sys->status == SD_STATUS_SHUTDOWN) {
> > rsp->result = SD_RES_SHUTDOWN;
> > req->done(req);
> > return;
> > }
> >
> > - /*
> > - * we can know why this node failed to join with
> > - * SD_OP_STAT_CLUSTER, so the request should be handled even
> > - * when the cluster status is SD_STATUS_JOIN_FAILED
> > - */
> > - if (sys->status == SD_STATUS_JOIN_FAILED &&
> > - hdr->opcode != SD_OP_STAT_CLUSTER) {
> > - rsp->result = SD_RES_JOIN_FAILED;
> > - req->done(req);
> > - return;
> > - }
> > -
> > if (sys->status == SD_STATUS_WAIT_FOR_FORMAT ||
> > sys->status == SD_STATUS_WAIT_FOR_JOIN) {
> > - /* TODO: cleanup */
> > - switch (hdr->opcode) {
> > - case SD_OP_STAT_CLUSTER:
> > - case SD_OP_MAKE_FS:
> > - case SD_OP_GET_NODE_LIST:
> > - case SD_OP_READ_VDIS:
> > - break;
> > - default:
> > + if (!req->op->available_always) {
> > if (sys->status == SD_STATUS_WAIT_FOR_FORMAT)
> > rsp->result = SD_RES_WAIT_FOR_FORMAT;
> > else
> > @@ -267,33 +206,11 @@ static void queue_request(struct request *req)
> > }
> > }
> >
> > - switch (hdr->opcode) {
> > - case SD_OP_CREATE_AND_WRITE_OBJ:
> > - case SD_OP_REMOVE_OBJ:
> > - case SD_OP_READ_OBJ:
> > - case SD_OP_WRITE_OBJ:
> > - case SD_OP_STAT_SHEEP:
> > - case SD_OP_GET_OBJ_LIST:
> > + if (is_io_op(req->op) || is_store_op(req->op))
> > req->work.fn = store_queue_request;
> > - break;
> > - case SD_OP_GET_NODE_LIST:
> > - case SD_OP_NEW_VDI:
> > - case SD_OP_DEL_VDI:
> > - case SD_OP_LOCK_VDI:
> > - case SD_OP_RELEASE_VDI:
> > - case SD_OP_GET_VDI_INFO:
> > - case SD_OP_MAKE_FS:
> > - case SD_OP_SHUTDOWN:
> > - case SD_OP_STAT_CLUSTER:
> > - case SD_OP_GET_VDI_ATTR:
> > - case SD_OP_GET_EPOCH:
> > + else if (is_cluster_op(req->op))
> > req->work.fn = cluster_queue_request;
> > - break;
> > - case SD_OP_READ_VDIS:
> > - rsp->result = read_vdis(req->data, hdr->data_length, &rsp->data_length);
> > - req->done(req);
> > - return;
> > - default:
> > + else {
> > eprintf("unknown operation %d\n", hdr->opcode);
> > rsp->result = SD_RES_SYSTEM_ERROR;
> > req->done(req);
> > @@ -314,7 +231,7 @@ static void queue_request(struct request *req)
> > hdr->epoch = sys->epoch;
> >
> > setup_ordered_sd_vnode_list(req);
> > - if (is_io_request(hdr->opcode))
> > + if (is_io_op(req->op))
> > setup_access_to_local_objects(req);
> >
> > cevent->ctype = CPG_EVENT_REQUEST;
> > @@ -410,6 +327,12 @@ static void client_rx_handler(struct client_info *ci)
> >
> > /* use le_to_cpu */
> > memcpy(&req->rq, hdr, sizeof(req->rq));
> > + req->op = get_sd_op(hdr->opcode);
> > + if (!req->op) {
> > + eprintf("invalid opcode, %d\n", hdr->opcode);
> > + conn->c_rx_state = C_IO_CLOSED;
> > + break;
> > + }
> >
> > if (data_len && hdr->flags & SD_FLAG_CMD_WRITE) {
> > conn->c_rx_state = C_IO_DATA;
> > diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
> > index 65bc3aa..1fe2d4d 100644
> > --- a/sheep/sheep_priv.h
> > +++ b/sheep/sheep_priv.h
> > @@ -72,6 +72,8 @@ struct request {
> > struct sd_req rq;
> > struct sd_rsp rp;
> >
> > + struct sd_op_template *op;
> > +
> > void *data;
> >
> > struct client_info *ci;
> > @@ -152,8 +154,6 @@ extern struct cluster_info *sys;
> >
> > int create_listen_port(int port, void *data);
> >
> > -int is_io_request(unsigned op);
> > -int is_cluster_request(unsigned op);
> > int init_store(const char *dir);
> > int init_base_path(const char *dir);
> >
> > @@ -173,6 +173,7 @@ int get_vdi_attr(uint32_t epoch, struct sheepdog_vdi_attr *vattr, int data_len,
> > uint32_t vid, uint32_t *attrid, int copies, uint64_t ctime,
> > int write, int excl, int delete);
> >
> > +int get_zones_nr_from(struct sheepdog_node_list_entry *nodes, int nr_nodes);
> > void setup_ordered_sd_vnode_list(struct request *req);
> > void get_ordered_sd_vnode_list(struct sheepdog_vnode_list_entry *entries,
> > int *nr_vnodes, int *nr_zones);
> > @@ -215,6 +216,8 @@ int get_latest_epoch(void);
> > int remove_epoch(int epoch);
> > int set_cluster_ctime(uint64_t ctime);
> > uint64_t get_cluster_ctime(void);
> > +int stat_sheep(uint64_t *store_size, uint64_t *store_free, uint32_t epoch);
> > +int get_obj_list(const struct sd_list_req *hdr, struct sd_list_rsp *rsp, void *data);
> >
> > int start_recovery(uint32_t epoch);
> > void resume_recovery_work(void);
> > @@ -235,6 +238,47 @@ int remove_object(struct sheepdog_vnode_list_entry *e,
> > int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx,
> > uint32_t epoch, int worker_idx);
> >
> > +/* Operations */
> > +
> > +enum sd_op_type {
> > + SD_OP_TYPE_CLUSTER = 1, /* cluster operations */
> > + SD_OP_TYPE_STORE, /* store operations */
> > + SD_OP_TYPE_IO, /* io operations */
> > +};
> > +
> > +struct sd_op_template {
> > + enum sd_op_type type;
> > +
> > + /* process request even when cluster is not working */
> > + int available_always;
> > +
> > + /*
> > + * process() will be called in the worker thread, and
> > + * post_process() will be called in the main thread.
> > + *
> > + * If type is SD_OP_TYPE_CLUSTER, it is guaranteed that only
> > + * one node processes a cluster operation at the same time.
> > + * We can use this for something like distributed locking.
> > + * process() will be called on the local node, and
> > + * post_process() will be called on every nodes.
> > + *
> > + * If type is SD_OP_TYPE_STORE, both process() and
> > + * post_process() will be called on the local node.
> > + *
> > + * If type is SD_OP_TYPE_IO, neither process() nor
> > + * post_process() is used because this type of operation is
> > + * heavily intertwined with Sheepdog core codes. We will be
> > + * unlikely to add new operations of this type.
> > + */
> > + int (*process)(const struct sd_req *req, struct sd_rsp *rsp, void *data);
> > + int (*post_process)(const struct sd_req *req, struct sd_rsp *rsp, void *data);
> > +};
> > +
> > +struct sd_op_template *get_sd_op(uint8_t opcode);
> > +int is_cluster_op(struct sd_op_template *op);
> > +int is_store_op(struct sd_op_template *op);
> > +int is_io_op(struct sd_op_template *op);
> > +
> > /* Journal */
> > int jrnl_perform(int fd, void *buf, size_t count, off_t offset,
> > const char *path, const char *jrnl_dir);
> > diff --git a/sheep/store.c b/sheep/store.c
> > index ee8996e..83644db 100644
> > --- a/sheep/store.c
> > +++ b/sheep/store.c
> > @@ -52,7 +52,7 @@ static int obj_cmp(const void *oid1, const void *oid2)
> > return 0;
> > }
> >
> > -static int stat_sheep(uint64_t *store_size, uint64_t *store_free, uint32_t epoch)
> > +int stat_sheep(uint64_t *store_size, uint64_t *store_free, uint32_t epoch)
> > {
> > struct statvfs vs;
> > int ret;
> > @@ -96,16 +96,14 @@ static int merge_objlist(struct sheepdog_vnode_list_entry *entries, int nr_entri
> > uint64_t *list1, int nr_list1,
> > uint64_t *list2, int nr_list2, int nr_objs);
> >
> > -static int get_obj_list(struct request *req)
> > +int get_obj_list(const struct sd_list_req *hdr, struct sd_list_rsp *rsp, void *data)
> > {
> > DIR *dir;
> > struct dirent *d;
> > - struct sd_list_req *hdr = (struct sd_list_req *)&req->rq;
> > - struct sd_list_rsp *rsp = (struct sd_list_rsp *)&req->rp;
> > uint64_t oid;
> > uint32_t epoch;
> > char path[1024];
> > - uint64_t *p = (uint64_t *)req->data;
> > + uint64_t *p = (uint64_t *)data;
> > int nr = 0;
> > uint64_t *objlist = NULL;
> > int obj_nr, i;
> > @@ -778,20 +776,15 @@ void store_queue_request(struct work *work, int idx)
> > uint64_t oid = hdr->oid;
> > uint32_t opcode = hdr->opcode;
> > uint32_t epoch = hdr->epoch;
> > - struct sd_node_rsp *nrsp = (struct sd_node_rsp *)&req->rp;
> >
> > dprintf("%"PRIu32", %x, %" PRIx64" , %u\n", idx, opcode, oid, epoch);
> >
> > if (hdr->flags & SD_FLAG_CMD_RECOVERY)
> > epoch = hdr->tgt_epoch;
> >
> > - if (opcode == SD_OP_STAT_SHEEP) {
> > - ret = stat_sheep(&nrsp->store_size, &nrsp->store_free, epoch);
> > - goto out;
> > - }
> > -
> > - if (opcode == SD_OP_GET_OBJ_LIST) {
> > - ret = get_obj_list(req);
> > + if (is_store_op(req->op)) {
> > + if (req->op->process)
> > + ret = req->op->process(&req->rq, &req->rp, req->data);
> > goto out;
> > }
> >
>
>
> for i in 0 1; do ./sheep/sheep /store/$i -z $i -p 700$i;sleep 1;done
> collie/collie cluster format
> for i in 0 1; do ./collie/collie cluster info -p 700$i; done
>
> Cluster status: The sheepdog is stopped doing IO, short of living nodes
>
> Creation time Epoch Nodes
> Cluster status: The sheepdog is stopped doing IO, short of living nodes
>
> Creation time Epoch Nodes
>
>
> Seems that we lost the epoch history if the cluster status is not ok.
> I have checked the code, finding nothing wrong, weird.
Thanks for your review!
It seems that Sheepdog sets the data_length to zero when the request
doesn't return SD_RES_SUCCESS.
I'll fix this and address all of you comments in the v2 patch.
Thanks,
Kazutaka
More information about the sheepdog
mailing list