When we have update node info in sheep, we also have to update the corresponding information in the driver, since node join/leave will send the sd nodes in the corresponding handlers to sheep that will overload the nodes info in sheep. Signed-off-by: Liu Yuan <namei.unix at gmail.com> --- sheep/cluster.h | 3 +++ sheep/cluster/corosync.c | 27 +++++++++++++++++++++++++-- sheep/cluster/local.c | 22 ++++++++++++++++++++++ sheep/cluster/shepherd.c | 8 ++++++++ sheep/cluster/zookeeper.c | 19 +++++++++++++++++++ sheep/group.c | 1 + 6 files changed, 78 insertions(+), 2 deletions(-) diff --git a/sheep/cluster.h b/sheep/cluster.h index 0f6164c..a19a10b 100644 --- a/sheep/cluster.h +++ b/sheep/cluster.h @@ -101,6 +101,9 @@ struct cluster_driver { */ void (*unblock)(void *msg, size_t msg_len); + /* Update the specific node in the driver's private copy of nodes */ + void (*update_node)(struct sd_node *); + struct list_head list; }; diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c index 8262a55..767c826 100644 --- a/sheep/cluster/corosync.c +++ b/sheep/cluster/corosync.c @@ -106,6 +106,18 @@ static inline int find_cpg_node(struct cpg_node *nodes, size_t nr_nodes, return -1; } +static inline int find_sd_node(struct cpg_node *nodes, size_t nr_nodes, + struct sd_node *key) +{ + int i; + + for (i = 0; i < nr_nodes; i++) + if (node_eq(&nodes[i].ent, key)) + return i; + + return -1; +} + static inline void add_cpg_node(struct cpg_node *nodes, size_t nr_nodes, struct cpg_node *added) { @@ -324,8 +336,8 @@ static bool __corosync_dispatch_one(struct corosync_event *cevent) case CJ_RES_FAIL: build_node_list(cpg_nodes, nr_cpg_nodes, entries); sd_join_handler(&cevent->sender.ent, entries, - nr_cpg_nodes, cevent->result, - cevent->msg); + nr_cpg_nodes, cevent->result, + cevent->msg); break; } break; @@ -814,6 +826,16 @@ again: return 0; } +static void corosync_update_node(struct sd_node *node) +{ + int idx; + + idx = find_sd_node(cpg_nodes, nr_cpg_nodes, node); + if (idx < 0) + return; + cpg_nodes[idx].ent = *node; +} + static struct cluster_driver cdrv_corosync = { .name = "corosync", @@ -824,6 +846,7 @@ static struct cluster_driver cdrv_corosync = { .notify = corosync_notify, .block = corosync_block, .unblock = corosync_unblock, + .update_node = corosync_update_node, }; cdrv_register(cdrv_corosync); diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c index 4624429..2d298d8 100644 --- a/sheep/cluster/local.c +++ b/sheep/cluster/local.c @@ -62,6 +62,7 @@ enum local_event_type { EVENT_GATEWAY, EVENT_BLOCK, EVENT_NOTIFY, + EVENT_UPDATE_NODE, }; struct local_event { @@ -286,6 +287,10 @@ static void add_event(enum local_event_type type, struct local_node *lnode, case EVENT_NOTIFY: case EVENT_BLOCK: break; + case EVENT_UPDATE_NODE: + n = find_lnode(lnode, ev.nr_lnodes, ev.lnodes); + n->node = lnode->node; + break; case EVENT_JOIN_RESPONSE: abort(); } @@ -493,6 +498,8 @@ static bool local_process_event(void) case EVENT_NOTIFY: sd_notify_handler(&ev->sender.node, ev->buf, ev->buf_len); break; + case EVENT_UPDATE_NODE: + break; } out: shm_queue_remove(ev); @@ -569,6 +576,20 @@ static int local_init(const char *option) return 0; } +/* FIXME: we have to call nr of nodes times to update nodes information */ +static void local_update_node(struct sd_node *node) +{ + struct local_node n = { + .node = *node, + }; + + shm_queue_lock(); + + add_event(EVENT_UPDATE_NODE, &n, NULL, 0); + + shm_queue_unlock(); +} + static struct cluster_driver cdrv_local = { .name = "local", @@ -579,6 +600,7 @@ static struct cluster_driver cdrv_local = { .notify = local_notify, .block = local_block, .unblock = local_unblock, + .update_node = local_update_node, }; cdrv_register(cdrv_local); diff --git a/sheep/cluster/shepherd.c b/sheep/cluster/shepherd.c index ac57503..6f4f51f 100644 --- a/sheep/cluster/shepherd.c +++ b/sheep/cluster/shepherd.c @@ -676,6 +676,13 @@ static void shepherd_unblock(void *msg, size_t msg_len) do_shepherd_notify(true, msg, msg_len); } +static void shepherd_updaet_node(struct sd_node *node) +{ + for (int i = 0; i < nr_nodes; i++) + if (node_eq(node, &nodes[i])) + nodes[i] = *node; +} + static struct cluster_driver cdrv_shepherd = { .name = "shepherd", @@ -685,6 +692,7 @@ static struct cluster_driver cdrv_shepherd = { .notify = shepherd_notify, .block = shepherd_block, .unblock = shepherd_unblock, + .update_node = shepherd_updaet_node, }; cdrv_register(cdrv_shepherd); diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c index 476b863..2be3e4b 100644 --- a/sheep/cluster/zookeeper.c +++ b/sheep/cluster/zookeeper.c @@ -870,6 +870,24 @@ static int zk_init(const char *option) return 0; } +static void zk_update_node(struct sd_node *node) +{ + struct zk_node n = { + .node = *node, + }; + struct zk_node *t; + + sd_dprintf("%s", node_to_str(&n.node)); + + pthread_rwlock_rdlock(&zk_tree_lock); + t = zk_tree_search_nolock(&n.node.nid); + if (t) { + t->node = n.node; + build_node_list(); + } + pthread_rwlock_unlock(&zk_tree_lock); +} + static struct cluster_driver cdrv_zookeeper = { .name = "zookeeper", @@ -879,6 +897,7 @@ static struct cluster_driver cdrv_zookeeper = { .notify = zk_notify, .block = zk_block, .unblock = zk_unblock, + .update_node = zk_update_node, }; cdrv_register(cdrv_zookeeper); diff --git a/sheep/group.c b/sheep/group.c index b9399ee..f74ef10 100644 --- a/sheep/group.c +++ b/sheep/group.c @@ -1242,6 +1242,7 @@ void update_node_size(struct sd_node *node) int idx = get_node_idx(cur_vinfo, node); assert(idx != -1); cur_vinfo->nodes[idx].space = node->space; + sys->cdrv->update_node(node); } void kick_node_recover(void) -- 1.7.9.5 |