[sheepdog] [PATCH] sheep: rework update node logic
Liu Yuan
namei.unix at gmail.com
Fri Jul 5 19:23:41 CEST 2013
The new logic is
1. size-varied node call cdrv->update_node() to send a non-blocking message to
all nodes
2. all nodes in the cluster
- update cluster driver's internal state of sender's size if any
- cluster driver call sd_update_node_handler() to update sender's size in sheep
SD_OP_UPDATE_SIZE is no longer used and removed.
Signed-off-by: Liu Yuan <namei.unix at gmail.com>
---
include/internal_proto.h | 1 -
sheep/cluster.h | 1 +
sheep/cluster/corosync.c | 39 +++++++++++++++++++++++++------------
sheep/cluster/local.c | 9 +++------
sheep/cluster/shepherd.c | 3 +--
sheep/cluster/zookeeper.c | 33 +++++++++++++++++--------------
sheep/group.c | 11 ++++++++---
sheep/ops.c | 47 +++------------------------------------------
sheep/sheep_priv.h | 2 --
9 files changed, 62 insertions(+), 84 deletions(-)
diff --git a/include/internal_proto.h b/include/internal_proto.h
index c5cd76d..bdcd04c 100644
--- a/include/internal_proto.h
+++ b/include/internal_proto.h
@@ -74,7 +74,6 @@
#define SD_OP_MD_UNPLUG 0xB3
#define SD_OP_GET_HASH 0xB4
#define SD_OP_REWEIGHT 0xB5
-#define SD_OP_UPDATE_SIZE 0xB6
/* internal flags for hdr.flags, must be above 0x80 */
#define SD_FLAG_CMD_RECOVERY 0x0080
diff --git a/sheep/cluster.h b/sheep/cluster.h
index a912985..33e033a 100644
--- a/sheep/cluster.h
+++ b/sheep/cluster.h
@@ -157,6 +157,7 @@ void sd_leave_handler(const struct sd_node *left, const struct sd_node *members,
void sd_notify_handler(const struct sd_node *sender, void *msg, size_t msg_len);
bool sd_block_handler(const struct sd_node *sender);
int sd_reconnect_handler(void);
+void sd_update_node_handler(struct sd_node *);
enum cluster_join_result sd_check_join_cb(const struct sd_node *joining,
const struct sd_node *nodes,
size_t nr_nodes, void *opaque);
diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
index bf90209..52adba8 100644
--- a/sheep/cluster/corosync.c
+++ b/sheep/cluster/corosync.c
@@ -51,6 +51,7 @@ enum corosync_event_type {
COROSYNC_EVENT_TYPE_LEAVE,
COROSYNC_EVENT_TYPE_BLOCK,
COROSYNC_EVENT_TYPE_NOTIFY,
+ COROSYNC_EVENT_TYPE_UPDATE_NODE,
};
/* multicast message type */
@@ -61,6 +62,7 @@ enum corosync_message_type {
COROSYNC_MSG_TYPE_NOTIFY,
COROSYNC_MSG_TYPE_BLOCK,
COROSYNC_MSG_TYPE_UNBLOCK,
+ COROSYNC_MSG_TYPE_UPDATE_NODE,
};
struct corosync_event {
@@ -279,8 +281,9 @@ static void build_node_list(const struct cpg_node *nodes, size_t nr_nodes,
static bool __corosync_dispatch_one(struct corosync_event *cevent)
{
enum cluster_join_result res;
- struct sd_node entries[SD_MAX_NODES];
+ struct sd_node entries[SD_MAX_NODES], *node;
struct cpg_node *n;
+ int idx;
switch (cevent->type) {
case COROSYNC_EVENT_TYPE_JOIN_REQUEST:
@@ -343,6 +346,13 @@ static bool __corosync_dispatch_one(struct corosync_event *cevent)
sd_notify_handler(&cevent->sender.ent, cevent->msg,
cevent->msg_len);
break;
+ case COROSYNC_EVENT_TYPE_UPDATE_NODE:
+ node = &cevent->sender.ent;
+ idx = find_sd_node(cpg_nodes, nr_cpg_nodes, node);
+ assert(idx != 0);
+ cpg_nodes[idx].ent = *node;
+ sd_update_node_handler(node);
+ break;
}
return true;
@@ -486,11 +496,19 @@ static void cdrv_cpg_deliver(cpg_handle_t handle,
/* fall through */
case COROSYNC_MSG_TYPE_BLOCK:
case COROSYNC_MSG_TYPE_NOTIFY:
+ case COROSYNC_MSG_TYPE_UPDATE_NODE:
cevent = xzalloc(sizeof(*cevent));
- if (cmsg->type == COROSYNC_MSG_TYPE_BLOCK)
+ switch (cmsg->type) {
+ case COROSYNC_MSG_TYPE_BLOCK:
cevent->type = COROSYNC_EVENT_TYPE_BLOCK;
- else
+ break;
+ case COROSYNC_MSG_TYPE_UPDATE_NODE:
+ cevent->type = COROSYNC_EVENT_TYPE_UPDATE_NODE;
+ break;
+ default:
cevent->type = COROSYNC_EVENT_TYPE_NOTIFY;
+ break;
+ }
cevent->sender = cmsg->sender;
cevent->msg_len = cmsg->msg_len;
@@ -544,7 +562,7 @@ static void cdrv_cpg_deliver(cpg_handle_t handle,
}
static void build_cpg_node_list(struct cpg_node *nodes,
- const struct cpg_address *list, size_t nr)
+ const struct cpg_address *list, size_t nr)
{
int i;
@@ -707,7 +725,7 @@ static int corosync_leave(void)
static void corosync_block(void)
{
send_message(COROSYNC_MSG_TYPE_BLOCK, 0, &this_node, NULL, 0,
- NULL, 0);
+ NULL, 0);
}
static void corosync_unblock(void *msg, size_t msg_len)
@@ -719,7 +737,7 @@ static void corosync_unblock(void *msg, size_t msg_len)
static int corosync_notify(void *msg, size_t msg_len)
{
return send_message(COROSYNC_MSG_TYPE_NOTIFY, 0, &this_node,
- NULL, 0, msg, msg_len);
+ NULL, 0, msg, msg_len);
}
static void corosync_handler(int listen_fd, int events, void *data)
@@ -806,12 +824,9 @@ again:
static void corosync_update_node(struct sd_node *node)
{
- int idx;
-
- idx = find_sd_node(cpg_nodes, nr_cpg_nodes, node);
- if (idx < 0)
- return;
- cpg_nodes[idx].ent = *node;
+ this_node.ent = *node;
+ send_message(COROSYNC_MSG_TYPE_UPDATE_NODE, 0, &this_node,
+ NULL, 0, NULL, 0);
}
static struct cluster_driver cdrv_corosync = {
diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c
index 307a69e..2c5d197 100644
--- a/sheep/cluster/local.c
+++ b/sheep/cluster/local.c
@@ -474,6 +474,7 @@ static bool local_process_event(void)
sd_notify_handler(&ev->sender.node, ev->buf, ev->buf_len);
break;
case EVENT_UPDATE_NODE:
+ sd_update_node_handler(&ev->sender.node);
break;
}
out:
@@ -551,16 +552,12 @@ static int local_init(const char *option)
return 0;
}
-/* FIXME: we have to call nr of nodes times to update nodes information */
static void local_update_node(struct sd_node *node)
{
- struct local_node n = {
- .node = *node,
- };
-
+ this_node.node = *node;
shm_queue_lock();
- add_event(EVENT_UPDATE_NODE, &n, NULL, 0);
+ add_event(EVENT_UPDATE_NODE, &this_node, NULL, 0);
shm_queue_unlock();
}
diff --git a/sheep/cluster/shepherd.c b/sheep/cluster/shepherd.c
index fba329c..e0aa6ea 100644
--- a/sheep/cluster/shepherd.c
+++ b/sheep/cluster/shepherd.c
@@ -664,8 +664,7 @@ static void shepherd_unblock(void *msg, size_t msg_len)
/* FIXME: shepherd server also has to udpate node information */
static void shepherd_update_node(struct sd_node *node)
{
- struct sd_node *n = xlfind(node, nodes, nr_nodes, node_cmp);
- *n = *node;
+ panic("not implemented");
}
static struct cluster_driver cdrv_shepherd = {
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 0feb8cf..6c40b5f 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -47,6 +47,7 @@ enum zk_event_type {
EVENT_BLOCK,
EVENT_UNBLOCK,
EVENT_NOTIFY,
+ EVENT_UPDATE_NODE,
};
struct zk_node {
@@ -993,6 +994,21 @@ static void zk_handle_notify(struct zk_event *ev)
sd_notify_handler(&ev->sender.node, ev->buf, ev->buf_len);
}
+static void zk_handle_update_node(struct zk_event *ev)
+{
+ struct zk_node *t, *n = &ev->sender;
+
+ sd_dprintf("%s", node_to_str(&n->node));
+
+ pthread_rwlock_rdlock(&zk_tree_lock);
+ t = zk_tree_search_nolock(&n->node.nid);
+ assert(t);
+ t->node = n->node;
+ build_node_list();
+ pthread_rwlock_unlock(&zk_tree_lock);
+ sd_update_node_handler(&n->node);
+}
+
static void (*const zk_event_handlers[])(struct zk_event *ev) = {
[EVENT_JOIN_REQUEST] = zk_handle_join_request,
[EVENT_JOIN_RESPONSE] = zk_handle_join_response,
@@ -1000,6 +1016,7 @@ static void (*const zk_event_handlers[])(struct zk_event *ev) = {
[EVENT_BLOCK] = zk_handle_block,
[EVENT_UNBLOCK] = zk_handle_unblock,
[EVENT_NOTIFY] = zk_handle_notify,
+ [EVENT_UPDATE_NODE] = zk_handle_update_node,
};
static const int zk_max_event_handlers = ARRAY_SIZE(zk_event_handlers);
@@ -1135,20 +1152,8 @@ static int zk_init(const char *option)
static void zk_update_node(struct sd_node *node)
{
- struct zk_node n = {
- .node = *node,
- };
- struct zk_node *t;
-
- sd_dprintf("%s", node_to_str(&n.node));
-
- pthread_rwlock_rdlock(&zk_tree_lock);
- t = zk_tree_search_nolock(&n.node.nid);
- if (t) {
- t->node = n.node;
- build_node_list();
- }
- pthread_rwlock_unlock(&zk_tree_lock);
+ this_node.node = *node,
+ add_event(EVENT_UPDATE_NODE, &this_node, NULL, 0);
}
static struct cluster_driver cdrv_zookeeper = {
diff --git a/sheep/group.c b/sheep/group.c
index 6e01a8d..73a9ed4 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -985,16 +985,15 @@ void sd_leave_handler(const struct sd_node *left, const struct sd_node *members,
sockfd_cache_del(&left->nid);
}
-void update_node_size(struct sd_node *node)
+static void update_node_size(struct sd_node *node)
{
struct vnode_info *cur_vinfo = main_thread_get(current_vnode_info);
int idx = get_node_idx(cur_vinfo, node);
assert(idx != -1);
cur_vinfo->nodes[idx].space = node->space;
- sys->cdrv->update_node(node);
}
-void kick_node_recover(void)
+static void kick_node_recover(void)
{
struct vnode_info *old = main_thread_get(current_vnode_info);
@@ -1006,6 +1005,12 @@ void kick_node_recover(void)
put_vnode_info(old);
}
+void sd_update_node_handler(struct sd_node *node)
+{
+ update_node_size(node);
+ kick_node_recover();
+}
+
int create_cluster(int port, int64_t zone, int nr_vnodes,
bool explicit_addr)
{
diff --git a/sheep/ops.c b/sheep/ops.c
index bf975bf..49630a9 100644
--- a/sheep/ops.c
+++ b/sheep/ops.c
@@ -662,33 +662,9 @@ static int cluster_recovery_completion(const struct sd_req *req,
return SD_RES_SUCCESS;
}
-static void do_reweight(struct work *work)
+static void reweight_this_node(void)
{
- struct sd_req hdr;
- int ret;
-
- sd_init_req(&hdr, SD_OP_UPDATE_SIZE);
- hdr.flags = SD_FLAG_CMD_WRITE;
- hdr.data_length = sizeof(sys->this_node);
-
- ret = exec_local_req(&hdr, &sys->this_node);
- if (ret != SD_RES_SUCCESS)
- sd_eprintf("failed to update node size");
-}
-
-static void reweight_done(struct work *work)
-{
- free(work);
-}
-
-static void reweight_node(void)
-{
- struct work *rw = xzalloc(sizeof(*rw));
-
- rw->fn = do_reweight;
- rw->done = reweight_done;
-
- queue_work(sys->recovery_wqueue, rw);
+ sys->cdrv->update_node(&sys->this_node);
}
static bool node_size_varied(void)
@@ -724,18 +700,7 @@ static int cluster_reweight(const struct sd_req *req, struct sd_rsp *rsp,
void *data)
{
if (node_size_varied())
- reweight_node();
-
- return SD_RES_SUCCESS;
-}
-
-static int cluster_update_size(const struct sd_req *req, struct sd_rsp *rsp,
- void *data)
-{
- struct sd_node *node = (struct sd_node *)data;
-
- update_node_size(node);
- kick_node_recover();
+ reweight_this_node();
return SD_RES_SUCCESS;
}
@@ -1091,12 +1056,6 @@ static struct sd_op_template sd_ops[] = {
.process_main = cluster_reweight,
},
- [SD_OP_UPDATE_SIZE] = {
- .name = "UPDATE_SIZE",
- .type = SD_OP_TYPE_CLUSTER,
- .process_main = cluster_update_size,
- },
-
[SD_OP_ENABLE_RECOVER] = {
.name = "ENABLE_RECOVER",
.type = SD_OP_TYPE_CLUSTER,
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 382d246..54ae22f 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -425,8 +425,6 @@ uint32_t md_get_info(struct sd_md_info *info);
int md_plug_disks(char *disks);
int md_unplug_disks(char *disks);
uint64_t md_get_size(uint64_t *used);
-void kick_node_recover(void);
-void update_node_size(struct sd_node *node);
/* http.c */
#ifdef HAVE_HTTP
--
1.7.9.5
More information about the sheepdog
mailing list