[sheepdog] [PATCH 2/2] sheep: remove join_message
MORITA Kazutaka
morita.kazutaka at gmail.com
Wed Aug 7 23:29:13 CEST 2013
From: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
Now we have only a few fields in struct join_message. To simplify the
codes, this patch moves the fields to struct cluster_info, and make
sheep send cluster_info instead of join_message.
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
include/internal_proto.h | 33 +++++------
sheep/group.c | 148 ++++++++++++++++++++--------------------------
sheep/object_cache.c | 3 +-
sheep/ops.c | 12 ++--
sheep/request.c | 6 +-
sheep/sheep.c | 6 +-
sheep/sheep_priv.h | 1 -
7 files changed, 91 insertions(+), 118 deletions(-)
diff --git a/include/internal_proto.h b/include/internal_proto.h
index 7db954e..3ca46cf 100644
--- a/include/internal_proto.h
+++ b/include/internal_proto.h
@@ -104,10 +104,12 @@
#define SD_RES_STALE_OBJ 0x90 /* Object may be stale */
#define SD_RES_CLUSTER_ERROR 0x91 /* Cluster driver error */
-#define SD_STATUS_OK 0x00000001
-#define SD_STATUS_WAIT 0x00000004
-#define SD_STATUS_SHUTDOWN 0x00000008
-#define SD_STATUS_KILLED 0x00000040
+enum sd_status {
+ SD_STATUS_OK = 1,
+ SD_STATUS_WAIT,
+ SD_STATUS_SHUTDOWN,
+ SD_STATUS_KILLED,
+};
struct node_id {
uint8_t addr[16];
@@ -126,14 +128,20 @@ struct sd_node {
uint64_t space;
};
+/*
+ * A joining sheep multicasts the local cluster info. Then, the existing nodes
+ * reply the latest cluster info which is unique among all of the nodes.
+ */
struct cluster_info {
- uint8_t nr_copies;
+ uint8_t proto_ver; /* the version number of the internal protocol */
uint8_t disable_recovery;
int16_t nr_nodes;
uint32_t epoch;
uint64_t ctime;
uint16_t flags;
- uint16_t __pad[3];
+ uint8_t nr_copies;
+ enum sd_status status : 8;
+ uint32_t __pad;
uint8_t store[STORE_LEN];
/* node list at cluster_info->epoch */
@@ -150,19 +158,6 @@ struct epoch_log {
struct sd_node nodes[SD_MAX_NODES];
};
-struct join_message {
- uint8_t proto_ver;
- uint8_t __pad[3];
- uint32_t cluster_status;
-
- /*
- * A joining sheep puts the local cluster info here. After the master
- * replies it will contain the latest cluster info which is shared among
- * the existing nodes.
- */
- struct cluster_info cinfo;
-};
-
struct vdi_op_message {
struct sd_req req;
struct sd_rsp rsp;
diff --git a/sheep/group.c b/sheep/group.c
index c049970..59bb398 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -357,15 +357,14 @@ int epoch_log_read_remote(uint32_t epoch, struct sd_node *nodes, int len,
return 0;
}
-static bool cluster_ctime_check(const struct join_message *jm)
+static bool cluster_ctime_check(const struct cluster_info *cinfo)
{
- if (jm->cinfo.epoch == 0 || sys->cinfo.epoch == 0)
+ if (cinfo->epoch == 0 || sys->cinfo.epoch == 0)
return true;
- if (jm->cinfo.ctime != sys->cinfo.ctime) {
- sd_eprintf("joining node ctime doesn't match: %"
- PRIu64 " vs %" PRIu64, jm->cinfo.ctime,
- sys->cinfo.ctime);
+ if (cinfo->ctime != sys->cinfo.ctime) {
+ sd_eprintf("joining node ctime doesn't match: %" PRIu64 " vs %"
+ PRIu64, cinfo->ctime, sys->cinfo.ctime);
return false;
}
@@ -378,13 +377,13 @@ static bool cluster_ctime_check(const struct join_message *jm)
* Sheepdog can start automatically if and only if all the members in the latest
* epoch are gathered.
*/
-static bool enough_nodes_gathered(struct join_message *jm,
+static bool enough_nodes_gathered(struct cluster_info *cinfo,
const struct sd_node *joining,
const struct sd_node *nodes,
size_t nr_nodes)
{
- for (int i = 0; i < jm->cinfo.nr_nodes; i++) {
- const struct sd_node *key = jm->cinfo.nodes + i, *n;
+ for (int i = 0; i < cinfo->nr_nodes; i++) {
+ const struct sd_node *key = cinfo->nodes + i, *n;
n = xlfind(key, nodes, nr_nodes, node_cmp);
if (n == NULL && !node_eq(key, joining)) {
@@ -393,28 +392,25 @@ static bool enough_nodes_gathered(struct join_message *jm,
}
}
- sd_dprintf("all the nodes are gathered, %d, %zd", jm->cinfo.nr_nodes,
+ sd_dprintf("all the nodes are gathered, %d, %zd", cinfo->nr_nodes,
nr_nodes);
return true;
}
-static int cluster_wait_check(const struct sd_node *joining,
- const struct sd_node *nodes, size_t nr_nodes,
- struct join_message *jm)
+static enum sd_status cluster_wait_check(const struct sd_node *joining,
+ const struct sd_node *nodes,
+ size_t nr_nodes,
+ struct cluster_info *cinfo)
{
- if (!cluster_ctime_check(jm)) {
+ if (!cluster_ctime_check(cinfo)) {
sd_dprintf("joining node is invalid");
- return sys->status;
+ return sys->cinfo.status;
}
- if (jm->cinfo.epoch > sys->cinfo.epoch) {
+ if (cinfo->epoch > sys->cinfo.epoch) {
sd_dprintf("joining node has a larger epoch, %" PRIu32 ", %"
- PRIu32, jm->cinfo.epoch, sys->cinfo.epoch);
- sys->cinfo = jm->cinfo;
- } else if (jm->cinfo.epoch < sys->cinfo.epoch) {
- sd_dprintf("joining node has a smaller epoch, %" PRIu32 ", %"
- PRIu32, jm->cinfo.epoch, sys->cinfo.epoch);
- jm->cinfo = sys->cinfo;
+ PRIu32, cinfo->epoch, sys->cinfo.epoch);
+ sys->cinfo = *cinfo;
}
/*
@@ -422,10 +418,10 @@ static int cluster_wait_check(const struct sd_node *joining,
* node list, we can set the cluster live now.
*/
if (sys->cinfo.epoch > 0 &&
- enough_nodes_gathered(jm, joining, nodes, nr_nodes))
+ enough_nodes_gathered(&sys->cinfo, joining, nodes, nr_nodes))
return SD_STATUS_OK;
- return sys->status;
+ return sys->cinfo.status;
}
static int get_vdis_from(struct sd_node *node)
@@ -542,18 +538,17 @@ static struct vnode_info *alloc_old_vnode_info(const struct sd_node *joined,
return alloc_vnode_info(old_nodes, nr_nodes);
}
-static void setup_backend_store(const struct join_message *jm)
+static void setup_backend_store(const struct cluster_info *cinfo)
{
int ret;
- if (jm->cinfo.store[0] == '\0')
+ if (cinfo->store[0] == '\0')
return;
if (!sd_store) {
- sd_store = find_store_driver((char *)jm->cinfo.store);
+ sd_store = find_store_driver((char *)cinfo->store);
if (!sd_store)
- panic("backend store %s not supported",
- jm->cinfo.store);
+ panic("backend store %s not supported", cinfo->store);
ret = sd_store->init();
if (ret != SD_RES_SUCCESS)
@@ -564,7 +559,7 @@ static void setup_backend_store(const struct join_message *jm)
* We need to purge the stale objects for sheep joining back
* after crash
*/
- if (xlfind(&sys->this_node, jm->cinfo.nodes, jm->cinfo.nr_nodes,
+ if (xlfind(&sys->this_node, cinfo->nodes, cinfo->nr_nodes,
node_cmp) == NULL) {
ret = sd_store->purge_obj();
if (ret != SD_RES_SUCCESS)
@@ -572,9 +567,7 @@ static void setup_backend_store(const struct join_message *jm)
}
}
-static void finish_join(const struct join_message *msg,
- const struct sd_node *joined,
- const struct sd_node *nodes, size_t nr_nodes)
+static void finish_join(const struct sd_node *nodes, size_t nr_nodes)
{
sockfd_cache_add_group(nodes, nr_nodes);
}
@@ -636,21 +629,20 @@ void recalculate_vnodes(struct sd_node *nodes, int nr_nodes)
}
}
-static void update_cluster_info(const struct join_message *msg,
+static void update_cluster_info(const struct cluster_info *cinfo,
const struct sd_node *joined,
const struct sd_node *nodes,
size_t nr_nodes)
{
struct vnode_info *old_vnode_info;
- sd_dprintf("status = %d, epoch = %d", msg->cluster_status,
- msg->cinfo.epoch);
+ sd_dprintf("status = %d, epoch = %d", cinfo->status, cinfo->epoch);
if (!sys->gateway_only)
- setup_backend_store(msg);
+ setup_backend_store(cinfo);
if (node_is_local(joined))
- finish_join(msg, joined, nodes, nr_nodes);
+ finish_join(nodes, nr_nodes);
old_vnode_info = main_thread_get(current_vnode_info);
main_thread_set(current_vnode_info,
@@ -658,14 +650,12 @@ static void update_cluster_info(const struct join_message *msg,
get_vdis(nodes, nr_nodes, joined);
- if (msg->cluster_status == SD_STATUS_OK) {
- if (sys->status == SD_STATUS_WAIT) {
- if (!is_cluster_formatted())
- /* initialize config file */
- set_cluster_config(&sys->cinfo);
- }
+ if (cinfo->status == SD_STATUS_OK) {
+ if (!is_cluster_formatted())
+ /* initialize config file */
+ set_cluster_config(&sys->cinfo);
- if (nr_nodes != msg->cinfo.nr_nodes) {
+ if (nr_nodes != cinfo->nr_nodes) {
int ret = inc_and_log_epoch();
if (ret != 0)
panic("cannot log current epoch %d",
@@ -684,8 +674,6 @@ static void update_cluster_info(const struct join_message *msg,
false);
}
- sys->status = msg->cluster_status;
-
put_vnode_info(old_vnode_info);
sockfd_cache_add(&joined->nid);
@@ -749,7 +737,8 @@ bool sd_join_handler(const struct sd_node *joining,
const struct sd_node *nodes, size_t nr_nodes,
void *opaque)
{
- struct join_message *jm = opaque;
+ struct cluster_info *cinfo = opaque;
+ enum sd_status status;
char str[MAX_NODE_STR_LEN];
/*
@@ -762,40 +751,29 @@ bool sd_join_handler(const struct sd_node *joining,
return false;
}
- sd_dprintf("check %s, %d", node_to_str(joining), sys->status);
-
- jm->proto_ver = SD_SHEEP_PROTO_VER;
+ sd_dprintf("check %s, %d", node_to_str(joining), sys->cinfo.status);
- if (sys->status == SD_STATUS_WAIT)
- jm->cluster_status = cluster_wait_check(joining, nodes,
- nr_nodes, jm);
+ if (sys->cinfo.status == SD_STATUS_WAIT)
+ status = cluster_wait_check(joining, nodes, nr_nodes, cinfo);
else
- jm->cluster_status = sys->status;
+ status = sys->cinfo.status;
+
+ *cinfo = sys->cinfo;
+ cinfo->status = status;
+ cinfo->proto_ver = SD_SHEEP_PROTO_VER;
sd_dprintf("%s: cluster_status = 0x%x",
addr_to_str(str, sizeof(str), joining->nid.addr,
- joining->nid.port), jm->cluster_status);
-
- jm->cinfo = sys->cinfo;
+ joining->nid.port), cinfo->status);
return true;
}
static int send_join_request(struct sd_node *ent)
{
- struct join_message *msg;
- int ret;
-
- msg = xzalloc(sizeof(*msg));
- msg->cinfo = sys->cinfo;
-
- ret = sys->cdrv->join(ent, msg, sizeof(*msg));
-
sd_printf(SDOG_INFO, "%s", node_to_str(&sys->this_node));
- free(msg);
-
- return ret;
+ return sys->cdrv->join(ent, &sys->cinfo, sizeof(sys->cinfo));
}
static void requeue_cluster_request(void)
@@ -865,7 +843,7 @@ static void requeue_cluster_request(void)
int sd_reconnect_handler(void)
{
- sys->status = SD_STATUS_WAIT;
+ sys->cinfo.status = SD_STATUS_WAIT;
if (sys->cdrv->init(sys->cdrv_option) != 0)
return -1;
if (send_join_request(&sys->this_node) != 0)
@@ -874,20 +852,20 @@ int sd_reconnect_handler(void)
return 0;
}
-static bool cluster_join_check(const struct join_message *jm)
+static bool cluster_join_check(const struct cluster_info *cinfo)
{
- if (jm->proto_ver != SD_SHEEP_PROTO_VER) {
+ if (cinfo->proto_ver != SD_SHEEP_PROTO_VER) {
sd_eprintf("invalid protocol version: %d, %d",
- jm->proto_ver, SD_SHEEP_PROTO_VER);
+ cinfo->proto_ver, SD_SHEEP_PROTO_VER);
return false;
}
- if (!cluster_ctime_check(jm))
+ if (!cluster_ctime_check(cinfo))
return false;
- if (jm->cinfo.epoch == sys->cinfo.epoch &&
- memcmp(jm->cinfo.nodes, sys->cinfo.nodes,
- sizeof(jm->cinfo.nodes[0]) * jm->cinfo.nr_nodes) != 0) {
+ if (cinfo->epoch == sys->cinfo.epoch &&
+ memcmp(cinfo->nodes, sys->cinfo.nodes,
+ sizeof(cinfo->nodes[0]) * cinfo->nr_nodes) != 0) {
sd_printf(SDOG_ALERT, "epoch log entries does not match");
return false;
}
@@ -900,23 +878,23 @@ void sd_accept_handler(const struct sd_node *joined,
const void *opaque)
{
int i;
- const struct join_message *jm = opaque;
+ const struct cluster_info *cinfo = opaque;
- if (!cluster_join_check(jm)) {
+ if (!cluster_join_check(cinfo)) {
sd_eprintf("failed to join Sheepdog");
exit(1);
}
- sys->cinfo = jm->cinfo;
+ sys->cinfo = *cinfo;
sd_dprintf("join %s", node_to_str(joined));
for (i = 0; i < nr_members; i++)
sd_dprintf("[%x] %s", i, node_to_str(members + i));
- if (sys->status == SD_STATUS_SHUTDOWN)
+ if (sys->cinfo.status == SD_STATUS_SHUTDOWN)
return;
- update_cluster_info(jm, joined, members, nr_members);
+ update_cluster_info(cinfo, joined, members, nr_members);
if (node_is_local(joined))
/* this output is used for testing */
@@ -933,7 +911,7 @@ void sd_leave_handler(const struct sd_node *left, const struct sd_node *members,
for (i = 0; i < nr_members; i++)
sd_dprintf("[%x] %s", i, node_to_str(members + i));
- if (sys->status == SD_STATUS_SHUTDOWN)
+ if (sys->cinfo.status == SD_STATUS_SHUTDOWN)
return;
if (node_is_local(left))
@@ -943,7 +921,7 @@ void sd_leave_handler(const struct sd_node *left, const struct sd_node *members,
old_vnode_info = main_thread_get(current_vnode_info);
main_thread_set(current_vnode_info,
alloc_vnode_info(members, nr_members));
- if (sys->status == SD_STATUS_OK) {
+ if (sys->cinfo.status == SD_STATUS_OK) {
ret = inc_and_log_epoch();
if (ret != 0)
panic("cannot log current epoch %d", sys->cinfo.epoch);
@@ -1026,7 +1004,7 @@ int create_cluster(int port, int64_t zone, int nr_vnodes,
if (sys->cinfo.nr_nodes == -1)
return -1;
}
- sys->status = SD_STATUS_WAIT;
+ sys->cinfo.status = SD_STATUS_WAIT;
main_thread_set(pending_block_list,
xzalloc(sizeof(struct list_head)));
diff --git a/sheep/object_cache.c b/sheep/object_cache.c
index dcf6972..c2b2ccb 100644
--- a/sheep/object_cache.c
+++ b/sheep/object_cache.c
@@ -279,7 +279,8 @@ static void add_to_dirty_list(struct object_cache_entry *entry)
list_add_tail(&entry->dirty_list, &oc->dirty_head);
/* FIXME read sys->status atomically */
if (uatomic_add_return(&oc->dirty_count, 1) > MAX_DIRTY_OBJECT_COUNT
- && !uatomic_is_true(&oc->in_push) && sys->status == SD_STATUS_OK)
+ && !uatomic_is_true(&oc->in_push)
+ && sys->cinfo.status == SD_STATUS_OK)
kick_background_pusher(oc);
}
diff --git a/sheep/ops.c b/sheep/ops.c
index 1f1f702..f0d89df 100644
--- a/sheep/ops.c
+++ b/sheep/ops.c
@@ -264,7 +264,7 @@ static int cluster_make_fs(const struct sd_req *req, struct sd_rsp *rsp,
if (ret)
return SD_RES_EIO;
- sys->status = SD_STATUS_OK;
+ sys->cinfo.status = SD_STATUS_OK;
return SD_RES_SUCCESS;
}
@@ -272,7 +272,7 @@ static int cluster_make_fs(const struct sd_req *req, struct sd_rsp *rsp,
static int cluster_shutdown(const struct sd_req *req, struct sd_rsp *rsp,
void *data)
{
- sys->status = SD_STATUS_SHUTDOWN;
+ sys->cinfo.status = SD_STATUS_SHUTDOWN;
return SD_RES_SUCCESS;
}
@@ -435,7 +435,7 @@ static int local_stat_cluster(struct request *req)
epoch--;
}
out:
- switch (sys->status) {
+ switch (sys->cinfo.status) {
case SD_STATUS_OK:
return SD_RES_SUCCESS;
case SD_STATUS_WAIT:
@@ -488,7 +488,7 @@ static int cluster_force_recover_work(struct request *req)
* 2) some nodes are physically down (same epoch condition).
* In both case, the nodes(s) stat is WAIT_FOR_JOIN.
*/
- if (sys->status != SD_STATUS_WAIT || req->vinfo == NULL)
+ if (sys->cinfo.status != SD_STATUS_WAIT || req->vinfo == NULL)
return SD_RES_FORCE_RECOVER;
old_vnode_info = get_vnode_info_epoch(epoch, req->vinfo);
@@ -539,7 +539,7 @@ static int cluster_force_recover_main(const struct sd_req *req,
/* initialize config file */
set_cluster_config(&sys->cinfo);
- sys->status = SD_STATUS_OK;
+ sys->cinfo.status = SD_STATUS_OK;
vnode_info = get_vnode_info();
old_vnode_info = alloc_vnode_info(nodes, nr_nodes);
@@ -804,7 +804,7 @@ static int local_trace_read_buf(struct request *request)
static int local_kill_node(const struct sd_req *req, struct sd_rsp *rsp,
void *data)
{
- sys->status = SD_STATUS_KILLED;
+ sys->cinfo.status = SD_STATUS_KILLED;
return SD_RES_SUCCESS;
}
diff --git a/sheep/request.c b/sheep/request.c
index ee1e987..386390b 100644
--- a/sheep/request.c
+++ b/sheep/request.c
@@ -344,9 +344,9 @@ static void queue_request(struct request *req)
goto done;
}
- sd_dprintf("%s, %d", op_name(req->op), sys->status);
+ sd_dprintf("%s, %d", op_name(req->op), sys->cinfo.status);
- switch (sys->status) {
+ switch (sys->cinfo.status) {
case SD_STATUS_KILLED:
rsp->result = SD_RES_KILLED;
goto done;
@@ -805,7 +805,7 @@ static void listen_handler(int listen_fd, int events, void *data)
struct client_info *ci;
bool is_inet_socket = *(bool *)data;
- if (sys->status == SD_STATUS_SHUTDOWN) {
+ if (sys->cinfo.status == SD_STATUS_SHUTDOWN) {
sd_dprintf("unregistering connection %d", listen_fd);
unregister_event(listen_fd);
return;
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 39288a8..6a1efe5 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -212,7 +212,7 @@ static void signal_handler(int listen_fd, int events, void *data)
sd_dprintf("signal %d", siginfo.ssi_signo);
switch (siginfo.ssi_signo) {
case SIGTERM:
- sys->status = SD_STATUS_KILLED;
+ sys->cinfo.status = SD_STATUS_KILLED;
break;
default:
sd_eprintf("signal %d unhandled", siginfo.ssi_signo);
@@ -874,8 +874,8 @@ int main(int argc, char **argv)
PACKAGE_VERSION);
while (sys->nr_outstanding_reqs != 0 ||
- (sys->status != SD_STATUS_KILLED &&
- sys->status != SD_STATUS_SHUTDOWN))
+ (sys->cinfo.status != SD_STATUS_KILLED &&
+ sys->cinfo.status != SD_STATUS_SHUTDOWN))
event_loop(-1);
sd_printf(SDOG_INFO, "shutdown");
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 1652218..6c3cc50 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -94,7 +94,6 @@ struct system_info {
struct sd_node this_node;
struct cluster_info cinfo;
- uint32_t status;
uint64_t disk_space;
--
1.7.9.5
More information about the sheepdog
mailing list