[sheepdog] [PATCH v4 1/3] sheep: rework update node logic
Liu Yuan
namei.unix at gmail.com
Wed Jul 10 10:15:01 CEST 2013
The new logic is
1. size-varied node call cdrv->update_node() to send a non-blocking message to
all nodes
2. all nodes in the cluster
- update cluster driver's internal state of sender's size if any
- cluster driver call sd_update_node_handler() to update sender's size in sheep
SD_OP_UPDATE_SIZE is no longer used and removed.
Signed-off-by: Liu Yuan <namei.unix at gmail.com>
---
include/internal_proto.h | 1 -
sheep/cluster.h | 9 +++++++--
sheep/cluster/corosync.c | 42 +++++++++++++++++++++++++++++----------
sheep/cluster/local.c | 13 ++++++++----
sheep/cluster/shepherd.c | 5 ++---
sheep/cluster/zookeeper.c | 37 ++++++++++++++++++++++------------
sheep/group.c | 11 ++++++++---
sheep/ops.c | 48 +--------------------------------------------
sheep/sheep_priv.h | 2 --
9 files changed, 83 insertions(+), 85 deletions(-)
diff --git a/include/internal_proto.h b/include/internal_proto.h
index 149f8f8..317096e 100644
--- a/include/internal_proto.h
+++ b/include/internal_proto.h
@@ -74,7 +74,6 @@
#define SD_OP_MD_UNPLUG 0xB3
#define SD_OP_GET_HASH 0xB4
#define SD_OP_REWEIGHT 0xB5
-#define SD_OP_UPDATE_SIZE 0xB6
/* internal flags for hdr.flags, must be above 0x80 */
#define SD_FLAG_CMD_RECOVERY 0x0080
diff --git a/sheep/cluster.h b/sheep/cluster.h
index 4851290..3760001 100644
--- a/sheep/cluster.h
+++ b/sheep/cluster.h
@@ -105,8 +105,12 @@ struct cluster_driver {
*/
int (*unblock)(void *msg, size_t msg_len);
- /* Update the specific node in the driver's private copy of nodes */
- void (*update_node)(struct sd_node *);
+ /*
+ * Update the specific node in the driver's private copy of nodes
+ *
+ * Returns SD_RES_XXX
+ */
+ int (*update_node)(struct sd_node *);
struct list_head list;
};
@@ -161,6 +165,7 @@ void sd_leave_handler(const struct sd_node *left, const struct sd_node *members,
void sd_notify_handler(const struct sd_node *sender, void *msg, size_t msg_len);
bool sd_block_handler(const struct sd_node *sender);
int sd_reconnect_handler(void);
+void sd_update_node_handler(struct sd_node *);
enum cluster_join_result sd_check_join_cb(const struct sd_node *joining,
const struct sd_node *nodes,
size_t nr_nodes, void *opaque);
diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
index 9b59126..a9c2806 100644
--- a/sheep/cluster/corosync.c
+++ b/sheep/cluster/corosync.c
@@ -51,6 +51,7 @@ enum corosync_event_type {
COROSYNC_EVENT_TYPE_LEAVE,
COROSYNC_EVENT_TYPE_BLOCK,
COROSYNC_EVENT_TYPE_NOTIFY,
+ COROSYNC_EVENT_TYPE_UPDATE_NODE,
};
/* multicast message type */
@@ -61,6 +62,7 @@ enum corosync_message_type {
COROSYNC_MSG_TYPE_NOTIFY,
COROSYNC_MSG_TYPE_BLOCK,
COROSYNC_MSG_TYPE_UNBLOCK,
+ COROSYNC_MSG_TYPE_UPDATE_NODE,
};
struct corosync_event {
@@ -279,8 +281,9 @@ static void build_node_list(const struct cpg_node *nodes, size_t nr_nodes,
static bool __corosync_dispatch_one(struct corosync_event *cevent)
{
enum cluster_join_result res;
- struct sd_node entries[SD_MAX_NODES];
+ struct sd_node entries[SD_MAX_NODES], *node;
struct cpg_node *n;
+ int idx;
switch (cevent->type) {
case COROSYNC_EVENT_TYPE_JOIN_REQUEST:
@@ -343,6 +346,17 @@ static bool __corosync_dispatch_one(struct corosync_event *cevent)
sd_notify_handler(&cevent->sender.ent, cevent->msg,
cevent->msg_len);
break;
+ case COROSYNC_EVENT_TYPE_UPDATE_NODE:
+ node = &cevent->sender.ent;
+
+ if (node_eq(node, &this_node.ent))
+ this_node.ent = *node;
+
+ idx = find_sd_node(cpg_nodes, nr_cpg_nodes, node);
+ assert(idx >= 0);
+ cpg_nodes[idx].ent = *node;
+ sd_update_node_handler(node);
+ break;
}
return true;
@@ -486,11 +500,19 @@ static void cdrv_cpg_deliver(cpg_handle_t handle,
/* fall through */
case COROSYNC_MSG_TYPE_BLOCK:
case COROSYNC_MSG_TYPE_NOTIFY:
+ case COROSYNC_MSG_TYPE_UPDATE_NODE:
cevent = xzalloc(sizeof(*cevent));
- if (cmsg->type == COROSYNC_MSG_TYPE_BLOCK)
+ switch (cmsg->type) {
+ case COROSYNC_MSG_TYPE_BLOCK:
cevent->type = COROSYNC_EVENT_TYPE_BLOCK;
- else
+ break;
+ case COROSYNC_MSG_TYPE_UPDATE_NODE:
+ cevent->type = COROSYNC_EVENT_TYPE_UPDATE_NODE;
+ break;
+ default:
cevent->type = COROSYNC_EVENT_TYPE_NOTIFY;
+ break;
+ }
cevent->sender = cmsg->sender;
cevent->msg_len = cmsg->msg_len;
@@ -544,7 +566,7 @@ static void cdrv_cpg_deliver(cpg_handle_t handle,
}
static void build_cpg_node_list(struct cpg_node *nodes,
- const struct cpg_address *list, size_t nr)
+ const struct cpg_address *list, size_t nr)
{
int i;
@@ -804,14 +826,14 @@ again:
return 0;
}
-static void corosync_update_node(struct sd_node *node)
+static int corosync_update_node(struct sd_node *node)
{
- int idx;
+ struct cpg_node cnode = {
+ .ent = *node,
+ };
- idx = find_sd_node(cpg_nodes, nr_cpg_nodes, node);
- if (idx < 0)
- return;
- cpg_nodes[idx].ent = *node;
+ return send_message(COROSYNC_MSG_TYPE_UPDATE_NODE, 0, &cnode,
+ NULL, 0, NULL, 0);
}
static struct cluster_driver cdrv_corosync = {
diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c
index e6fa149..80228b3 100644
--- a/sheep/cluster/local.c
+++ b/sheep/cluster/local.c
@@ -478,6 +478,10 @@ static bool local_process_event(void)
sd_notify_handler(&ev->sender.node, ev->buf, ev->buf_len);
break;
case EVENT_UPDATE_NODE:
+ if (node_eq(&ev->sender.node, &this_node.node))
+ this_node.node = ev->sender.node;
+
+ sd_update_node_handler(&ev->sender.node);
break;
}
out:
@@ -555,18 +559,19 @@ static int local_init(const char *option)
return 0;
}
-/* FIXME: we have to call nr of nodes times to update nodes information */
-static void local_update_node(struct sd_node *node)
+static int local_update_node(struct sd_node *node)
{
- struct local_node n = {
+ struct local_node lnode = {
.node = *node,
};
shm_queue_lock();
- add_event(EVENT_UPDATE_NODE, &n, NULL, 0);
+ add_event(EVENT_UPDATE_NODE, &lnode, NULL, 0);
shm_queue_unlock();
+
+ return SD_RES_SUCCESS;
}
static struct cluster_driver cdrv_local = {
diff --git a/sheep/cluster/shepherd.c b/sheep/cluster/shepherd.c
index df8737f..3aebfc1 100644
--- a/sheep/cluster/shepherd.c
+++ b/sheep/cluster/shepherd.c
@@ -666,10 +666,9 @@ static int shepherd_unblock(void *msg, size_t msg_len)
}
/* FIXME: shepherd server also has to udpate node information */
-static void shepherd_update_node(struct sd_node *node)
+static int shepherd_update_node(struct sd_node *node)
{
- struct sd_node *n = xlfind(node, nodes, nr_nodes, node_cmp);
- *n = *node;
+ return SD_RES_NO_SUPPORT;
}
static struct cluster_driver cdrv_shepherd = {
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index ac6347a..cb86024 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -47,6 +47,7 @@ enum zk_event_type {
EVENT_BLOCK,
EVENT_UNBLOCK,
EVENT_NOTIFY,
+ EVENT_UPDATE_NODE,
};
struct zk_node {
@@ -992,6 +993,25 @@ static void zk_handle_notify(struct zk_event *ev)
sd_notify_handler(&ev->sender.node, ev->buf, ev->buf_len);
}
+static void zk_handle_update_node(struct zk_event *ev)
+{
+ struct zk_node *t;
+ struct sd_node *snode = &ev->sender.node;
+
+ sd_dprintf("%s", node_to_str(snode));
+
+ if (node_eq(snode, &this_node.node))
+ this_node.node = *snode;
+
+ pthread_rwlock_rdlock(&zk_tree_lock);
+ t = zk_tree_search_nolock(&snode->nid);
+ assert(t);
+ t->node = *snode;
+ build_node_list();
+ pthread_rwlock_unlock(&zk_tree_lock);
+ sd_update_node_handler(snode);
+}
+
static void (*const zk_event_handlers[])(struct zk_event *ev) = {
[EVENT_JOIN_REQUEST] = zk_handle_join_request,
[EVENT_JOIN_RESPONSE] = zk_handle_join_response,
@@ -999,6 +1019,7 @@ static void (*const zk_event_handlers[])(struct zk_event *ev) = {
[EVENT_BLOCK] = zk_handle_block,
[EVENT_UNBLOCK] = zk_handle_unblock,
[EVENT_NOTIFY] = zk_handle_notify,
+ [EVENT_UPDATE_NODE] = zk_handle_update_node,
};
static const int zk_max_event_handlers = ARRAY_SIZE(zk_event_handlers);
@@ -1132,22 +1153,12 @@ static int zk_init(const char *option)
return 0;
}
-static void zk_update_node(struct sd_node *node)
+static int zk_update_node(struct sd_node *node)
{
- struct zk_node n = {
+ struct zk_node znode = {
.node = *node,
};
- struct zk_node *t;
-
- sd_dprintf("%s", node_to_str(&n.node));
-
- pthread_rwlock_rdlock(&zk_tree_lock);
- t = zk_tree_search_nolock(&n.node.nid);
- if (t) {
- t->node = n.node;
- build_node_list();
- }
- pthread_rwlock_unlock(&zk_tree_lock);
+ return add_event(EVENT_UPDATE_NODE, &znode, NULL, 0);
}
static struct cluster_driver cdrv_zookeeper = {
diff --git a/sheep/group.c b/sheep/group.c
index 370c625..936fc88 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -1090,16 +1090,15 @@ void sd_leave_handler(const struct sd_node *left, const struct sd_node *members,
sockfd_cache_del(&left->nid);
}
-void update_node_size(struct sd_node *node)
+static void update_node_size(struct sd_node *node)
{
struct vnode_info *cur_vinfo = main_thread_get(current_vnode_info);
int idx = get_node_idx(cur_vinfo, node);
assert(idx != -1);
cur_vinfo->nodes[idx].space = node->space;
- sys->cdrv->update_node(node);
}
-void kick_node_recover(void)
+static void kick_node_recover(void)
{
struct vnode_info *old = main_thread_get(current_vnode_info);
@@ -1111,6 +1110,12 @@ void kick_node_recover(void)
put_vnode_info(old);
}
+void sd_update_node_handler(struct sd_node *node)
+{
+ update_node_size(node);
+ kick_node_recover();
+}
+
int create_cluster(int port, int64_t zone, int nr_vnodes,
bool explicit_addr)
{
diff --git a/sheep/ops.c b/sheep/ops.c
index bf975bf..55c2deb 100644
--- a/sheep/ops.c
+++ b/sheep/ops.c
@@ -662,35 +662,6 @@ static int cluster_recovery_completion(const struct sd_req *req,
return SD_RES_SUCCESS;
}
-static void do_reweight(struct work *work)
-{
- struct sd_req hdr;
- int ret;
-
- sd_init_req(&hdr, SD_OP_UPDATE_SIZE);
- hdr.flags = SD_FLAG_CMD_WRITE;
- hdr.data_length = sizeof(sys->this_node);
-
- ret = exec_local_req(&hdr, &sys->this_node);
- if (ret != SD_RES_SUCCESS)
- sd_eprintf("failed to update node size");
-}
-
-static void reweight_done(struct work *work)
-{
- free(work);
-}
-
-static void reweight_node(void)
-{
- struct work *rw = xzalloc(sizeof(*rw));
-
- rw->fn = do_reweight;
- rw->done = reweight_done;
-
- queue_work(sys->recovery_wqueue, rw);
-}
-
static bool node_size_varied(void)
{
uint64_t new, used, old = sys->this_node.space;
@@ -724,18 +695,7 @@ static int cluster_reweight(const struct sd_req *req, struct sd_rsp *rsp,
void *data)
{
if (node_size_varied())
- reweight_node();
-
- return SD_RES_SUCCESS;
-}
-
-static int cluster_update_size(const struct sd_req *req, struct sd_rsp *rsp,
- void *data)
-{
- struct sd_node *node = (struct sd_node *)data;
-
- update_node_size(node);
- kick_node_recover();
+ return sys->cdrv->update_node(&sys->this_node);
return SD_RES_SUCCESS;
}
@@ -1091,12 +1051,6 @@ static struct sd_op_template sd_ops[] = {
.process_main = cluster_reweight,
},
- [SD_OP_UPDATE_SIZE] = {
- .name = "UPDATE_SIZE",
- .type = SD_OP_TYPE_CLUSTER,
- .process_main = cluster_update_size,
- },
-
[SD_OP_ENABLE_RECOVER] = {
.name = "ENABLE_RECOVER",
.type = SD_OP_TYPE_CLUSTER,
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 1f002bf..127ee5f 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -433,8 +433,6 @@ uint32_t md_get_info(struct sd_md_info *info);
int md_plug_disks(char *disks);
int md_unplug_disks(char *disks);
uint64_t md_get_size(uint64_t *used);
-void kick_node_recover(void);
-void update_node_size(struct sd_node *node);
/* http.c */
#ifdef HAVE_HTTP
--
1.7.9.5
More information about the sheepdog
mailing list