[sheepdog] [PATCH] Modified zookeeper driver to support multi-sheep cluster in one zookeeper cluster. In order to distinguish different sheep cluster in one zookeeper, one more level directory is added in znode as domain. The same domain means the same sheep cluster. For example, the current directory tree seems like /sheepdog/(queue, member, master, lock), in this patch, it seems like /sheepdog/domain/(queue, member, master, lock). In sheep.c, cluster_help is modified for tips, In zookeeper.c, all codes related to znode path is modified.
Yu Yang
yuyang at cmss.chinamobile.com
Tue Feb 24 11:17:57 CET 2015
Signed-off-by: Yu Yang <yuyang at cmss.chinamobile.com>
---
sheep/cluster/zookeeper.c | 197 +++++++++++++++++++++++++++++++--------------
sheep/sheep.c | 8 +-
2 files changed, 142 insertions(+), 63 deletions(-)
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 3248af2..87d28bd 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -26,15 +26,22 @@
#include "rbtree.h"
#define SESSION_TIMEOUT 30000 /* millisecond */
+#define SD_DEFAULT_DOMAIN "sd_domain_default"
+
+static char base_znode[] = "/sheepdog";
+static char queue_znode[MAX_NODE_STR_LEN] = "";
+static char queue_znode_post[] = "/queue";
+static char member_znode[MAX_NODE_STR_LEN] = "";
+static char member_znode_post[] = "/member";
+static char master_znode[MAX_NODE_STR_LEN] = "";
+static char master_znode_post[] = "/master";
+static char lock_znode[MAX_NODE_STR_LEN] = "";
+static char lock_znode_post[] = "/lock";
-#define BASE_ZNODE "/sheepdog"
-#define QUEUE_ZNODE BASE_ZNODE "/queue"
-#define MEMBER_ZNODE BASE_ZNODE "/member"
-#define MASTER_ZNODE BASE_ZNODE "/master"
-#define LOCK_ZNODE BASE_ZNODE "/lock"
static int zk_timeout = SESSION_TIMEOUT;
static int my_master_seq;
+static char sd_domain[MAX_NODE_STR_LEN] = SD_DEFAULT_DOMAIN;
/* structure for distributed lock */
struct cluster_lock {
@@ -347,7 +354,7 @@ static struct cluster_lock *lock_table_lookup_acquire(uint64_t lock_id)
ret_lock = xzalloc(sizeof(*ret_lock));
ret_lock->id = lock_id;
ret_lock->ref = 1;
- snprintf(path, MAX_NODE_STR_LEN, LOCK_ZNODE "/%"PRIu64,
+ snprintf(path, MAX_NODE_STR_LEN, "%s/%"PRIu64,lock_znode,
ret_lock->id);
rc = zk_init_node(path);
if (rc)
@@ -399,8 +406,8 @@ static void lock_table_lookup_release(uint64_t lock_id)
/* free all resource used by this lock */
sd_destroy_mutex(&lock->id_lock);
sem_destroy(&lock->wait_wakeup);
- snprintf(path, MAX_NODE_STR_LEN, LOCK_ZNODE "/%"PRIu64,
- lock->id);
+ snprintf(path, MAX_NODE_STR_LEN, "%s/%"PRIu64,
+ lock_znode, lock->id);
/*
* If deletion of directory 'lock_id' fail, we only get
* a * empty directory in zookeeper. That's unharmful
@@ -458,7 +465,7 @@ static int zk_queue_peek(bool *peek)
int rc;
char path[MAX_NODE_STR_LEN];
- snprintf(path, sizeof(path), QUEUE_ZNODE "/%010"PRId32, queue_pos);
+ snprintf(path, sizeof(path), "%s/%010"PRId32, queue_znode, queue_pos);
rc = zk_node_exists(path);
switch (rc) {
@@ -483,7 +490,7 @@ static int zk_find_seq_node(uint64_t id, char *seq_path, int seq_path_len,
for (int seq = queue_pos; ; seq++) {
struct zk_event ev;
- snprintf(seq_path, seq_path_len, QUEUE_ZNODE"/%010"PRId32, seq);
+ snprintf(seq_path, seq_path_len, "%s/%010"PRId32, queue_znode, seq);
len = offsetof(typeof(ev), id) + sizeof(ev.id);
rc = zk_get_data(seq_path, &ev, &len);
switch (rc) {
@@ -513,7 +520,7 @@ static int zk_queue_push(struct zk_event *ev)
bool found;
len = offsetof(typeof(*ev), buf) + ev->buf_len;
- snprintf(path, sizeof(path), "%s/", QUEUE_ZNODE);
+ snprintf(path, sizeof(path), "%s/", queue_znode);
again:
rc = zk_create_seq_node(path, (char *)ev, len, buf, sizeof(buf), false);
switch (rc) {
@@ -536,8 +543,11 @@ again:
}
if (first_push) {
int32_t seq;
-
- sscanf(buf, QUEUE_ZNODE "/%"PRId32, &seq);
+
+ char temp_char_form[MAX_NODE_STR_LEN] = "";
+ strcpy(temp_char_form,queue_znode);
+ strcat(temp_char_form,"/%"PRId32);
+ sscanf(buf, temp_char_form, &seq);
queue_pos = seq;
eventfd_xwrite(efd, 1);
first_push = false;
@@ -568,7 +578,7 @@ static int push_join_response(struct zk_event *ev)
queue_pos--;
len = offsetof(typeof(*ev), buf) + ev->buf_len;
- snprintf(path, sizeof(path), QUEUE_ZNODE "/%010"PRId32, queue_pos);
+ snprintf(path, sizeof(path), "%s/%010"PRId32, queue_znode, queue_pos);
RETURN_IF_ERROR(zk_set_data(path, (char *)ev, len, -1), "");
sd_debug("update path:%s, queue_pos:%010" PRId32 ", len:%d", path,
@@ -582,7 +592,7 @@ static int zk_queue_pop_advance(struct zk_event *ev)
char path[MAX_NODE_STR_LEN];
len = sizeof(*ev);
- snprintf(path, sizeof(path), QUEUE_ZNODE "/%010"PRId32, queue_pos);
+ snprintf(path, sizeof(path), "%s/%010"PRId32, queue_znode, queue_pos);
RETURN_IF_ERROR(zk_get_data(path, ev, &len), "path %s", path);
sd_debug("%s, type:%d, len:%d, pos:%" PRId32, path, ev->type, len,
@@ -641,10 +651,15 @@ static inline void build_node_list(void)
static int zk_queue_init(void)
{
- RETURN_IF_ERROR(zk_init_node(BASE_ZNODE), "path %s", BASE_ZNODE);
- RETURN_IF_ERROR(zk_init_node(MASTER_ZNODE), "path %s", MASTER_ZNODE);
- RETURN_IF_ERROR(zk_init_node(QUEUE_ZNODE), "path %s", QUEUE_ZNODE);
- RETURN_IF_ERROR(zk_init_node(MEMBER_ZNODE), "path %s", MEMBER_ZNODE);
+ char sd_domain_znode[128] = "";
+ strcpy(sd_domain_znode, base_znode);
+ strcat(sd_domain_znode, "/");
+ strcat(sd_domain_znode, sd_domain);
+ RETURN_IF_ERROR(zk_init_node(base_znode), "path %s", base_znode);
+ RETURN_IF_ERROR(zk_init_node(sd_domain_znode), "path %s", sd_domain_znode);
+ RETURN_IF_ERROR(zk_init_node(master_znode), "path %s", master_znode);
+ RETURN_IF_ERROR(zk_init_node(queue_znode), "path %s", queue_znode);
+ RETURN_IF_ERROR(zk_init_node(member_znode), "path %s", member_znode);
return ZOK;
}
@@ -692,6 +707,7 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
char str[MAX_NODE_STR_LEN], *p;
uint64_t lock_id;
int ret;
+ char temp_char_form[MAX_NODE_STR_LEN] = "";
if (type == ZOO_SESSION_EVENT && state == ZOO_EXPIRED_SESSION_STATE) {
/*
@@ -705,7 +721,9 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
/* CREATED_EVENT 1, DELETED_EVENT 2, CHANGED_EVENT 3, CHILD_EVENT 4 */
sd_debug("path:%s, type:%d, state:%d", path, type, state);
if (type == ZOO_CREATED_EVENT || type == ZOO_CHANGED_EVENT) {
- ret = sscanf(path, MEMBER_ZNODE "/%s", str);
+ strcpy(temp_char_form,member_znode);
+ strcat(temp_char_form,"/%s");
+ ret = sscanf(path, temp_char_form, str);
if (ret == 1)
zk_node_exists(path);
/* kick off the event handler */
@@ -714,7 +732,9 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
struct zk_node *n;
/* process distributed lock */
- ret = sscanf(path, LOCK_ZNODE "/%"PRIu64"/%s", &lock_id, str);
+ strcpy(temp_char_form,lock_znode);
+ strcat(temp_char_form,"/%"PRIu64"/%s");
+ ret = sscanf(path, temp_char_form, &lock_id, str);
if (ret == 2) {
ret = lock_table_lookup_wakeup(lock_id);
if (ret)
@@ -722,14 +742,18 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
lock_id, str);
return;
}
-
- ret = sscanf(path, MASTER_ZNODE "/%s", str);
+
+ strcpy(temp_char_form,master_znode);
+ strcat(temp_char_form,"/%s");
+ ret = sscanf(path, temp_char_form, str);
if (ret == 1) {
zk_compete_master();
return;
}
- ret = sscanf(path, MEMBER_ZNODE "/%s", str);
+ strcpy(temp_char_form,member_znode);
+ strcat(temp_char_form,"/%s");
+ ret = sscanf(path, temp_char_form, str);
if (ret != 1)
return;
p = strrchr(path, '/');
@@ -815,19 +839,23 @@ static int zk_find_master(int *master_seq, char *master_name)
{
int rc, len = MAX_NODE_STR_LEN;
char master_compete_path[MAX_NODE_STR_LEN];
+ char temp_char_form[MAX_NODE_STR_LEN] = "";
if (*master_seq < 0) {
- RETURN_IF_ERROR(zk_get_least_seq(MASTER_ZNODE,
+ RETURN_IF_ERROR(zk_get_least_seq(master_znode,
master_compete_path,
MAX_NODE_STR_LEN, master_name,
&len), "");
- sscanf(master_compete_path, MASTER_ZNODE "/%"PRId32,
- master_seq);
+ strcpy(temp_char_form,master_znode);
+ strcat(temp_char_form,"/%"PRId32);
+ sscanf(master_compete_path, temp_char_form, master_seq);
return ZOK;
} else {
while (true) {
+ strcpy(temp_char_form,master_znode);
+ strcat(temp_char_form,"/%010"PRId32);
snprintf(master_compete_path, len,
- MASTER_ZNODE "/%010"PRId32, *master_seq);
+ temp_char_form, *master_seq);
rc = zk_get_data(master_compete_path, master_name,
&len);
switch (rc) {
@@ -854,10 +882,12 @@ static int zk_verify_last_sheep_join(int seq, int *last_sheep)
{
int rc, len = MAX_NODE_STR_LEN;
char path[MAX_NODE_STR_LEN], name[MAX_NODE_STR_LEN];
+ char temp_char_form[MAX_NODE_STR_LEN] = "";
for (*last_sheep = seq - 1; *last_sheep >= 0; (*last_sheep)--) {
- snprintf(path, MAX_NODE_STR_LEN, MASTER_ZNODE "/%010"PRId32,
- *last_sheep);
+ strcpy(temp_char_form, master_znode);
+ strcat(temp_char_form, "/%010"PRId32 );
+ snprintf(path, MAX_NODE_STR_LEN, temp_char_form, *last_sheep);
rc = zk_get_data(path, name, &len);
switch (rc) {
case ZNONODE:
@@ -871,8 +901,9 @@ static int zk_verify_last_sheep_join(int seq, int *last_sheep)
if (!strcmp(name, node_to_str(&this_node.node)))
continue;
-
- snprintf(path, MAX_NODE_STR_LEN, MEMBER_ZNODE "/%s", name);
+ strcpy(temp_char_form, member_znode);
+ strcat(temp_char_form, "/%s");
+ snprintf(path, MAX_NODE_STR_LEN, temp_char_form, name);
rc = zk_node_exists(path);
switch (rc) {
case ZOK:
@@ -898,7 +929,7 @@ static void zk_compete_master(void)
char master_name[MAX_NODE_STR_LEN];
char my_compete_path[MAX_NODE_STR_LEN];
static int master_seq = -1, my_seq;
-
+
/*
* This is to protect master_seq and my_seq because this function will
* be called by both main thread and zookeeper's event thread.
@@ -909,23 +940,28 @@ static void zk_compete_master(void)
goto out_unlock;
if (!joined) {
+ char temp_char_form[MAX_NODE_STR_LEN] = "";
+ strcpy(temp_char_form, master_znode);
+ strcat(temp_char_form, "/" );
sd_debug("start to compete master for the first time");
do {
if (uatomic_is_true(&stop))
goto out_unlock;
/* duplicate sequential node has no side-effect */
- rc = zk_create_seq_node(MASTER_ZNODE "/",
+ rc = zk_create_seq_node(temp_char_form,
node_to_str(&this_node.node),
MAX_NODE_STR_LEN,
my_compete_path,
MAX_NODE_STR_LEN, true);
} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
- CHECK_ZK_RC(rc, MASTER_ZNODE "/");
+ CHECK_ZK_RC(rc, temp_char_form);
if (rc != ZOK)
goto out_unlock;
-
+
+ strcpy(temp_char_form, master_znode);
+ strcat(temp_char_form, "/%"PRId32 );
sd_debug("my compete path: %s", my_compete_path);
- sscanf(my_compete_path, MASTER_ZNODE "/%"PRId32,
+ sscanf(my_compete_path, temp_char_form,
&my_seq);
}
@@ -964,10 +1000,12 @@ static int zk_join(const struct sd_node *myself,
{
int rc;
char path[MAX_NODE_STR_LEN];
+ char temp_char_form[MAX_NODE_STR_LEN] = "";
this_node.node = *myself;
-
- snprintf(path, sizeof(path), MEMBER_ZNODE "/%s", node_to_str(myself));
+ strcpy(temp_char_form, member_znode );
+ strcat(temp_char_form, "/%s");
+ snprintf(path, sizeof(path), temp_char_form, node_to_str(myself));
rc = zk_node_exists(path);
if (rc == ZOK) {
sd_err("Previous zookeeper session exist, shoot myself. Please "
@@ -985,17 +1023,21 @@ static int zk_join(const struct sd_node *myself,
static int zk_leave(void)
{
char path[PATH_MAX];
+ char temp_char_form[MAX_NODE_STR_LEN] = "";
sd_info("leaving from cluster");
uatomic_set_true(&stop);
if (uatomic_is_true(&is_master)) {
- snprintf(path, sizeof(path), MASTER_ZNODE "/%010"PRId32,
- my_master_seq);
+ strcpy(temp_char_form, master_znode);
+ strcat(temp_char_form, "/%010"PRId32 );
+ snprintf(path, sizeof(path), temp_char_form, my_master_seq);
zk_delete_node(path, -1);
}
- snprintf(path, sizeof(path), MEMBER_ZNODE"/%s",
+ strcpy(temp_char_form, member_znode);
+ strcat(temp_char_form, "/%s" );
+ snprintf(path, sizeof(path), temp_char_form,
node_to_str(&this_node.node));
add_event(EVENT_LEAVE, &this_node, NULL, 0);
lock_table_remove_znodes();
@@ -1038,9 +1080,9 @@ static void watch_all_nodes(void)
struct String_vector strs;
char path[MAX_NODE_STR_LEN];
- RETURN_VOID_IF_ERROR(zk_get_children(MEMBER_ZNODE, &strs), "");
+ RETURN_VOID_IF_ERROR(zk_get_children(member_znode, &strs), "");
- FOR_EACH_ZNODE(MEMBER_ZNODE, path, &strs) {
+ FOR_EACH_ZNODE(member_znode, path, &strs) {
RETURN_VOID_IF_ERROR(zk_node_exists(path), "");
}
}
@@ -1066,6 +1108,7 @@ static void zk_handle_accept(struct zk_event *ev)
{
char path[MAX_NODE_STR_LEN];
int rc;
+ char temp_char_form[MAX_NODE_STR_LEN] = "";
sd_debug("ACCEPT");
if (node_eq(&ev->sender.node, &this_node.node))
@@ -1074,7 +1117,9 @@ static void zk_handle_accept(struct zk_event *ev)
sd_debug("%s", node_to_str(&ev->sender.node));
- snprintf(path, sizeof(path), MEMBER_ZNODE"/%s",
+ strcpy(temp_char_form,member_znode);
+ strcat(temp_char_form, "/%s");
+ snprintf(path, sizeof(path), temp_char_form,
node_to_str(&ev->sender.node));
if (node_eq(&ev->sender.node, &this_node.node)) {
joined = true;
@@ -1286,18 +1331,22 @@ static void zk_lock(uint64_t lock_id)
char lowest_seq_path[MAX_NODE_STR_LEN];
char owner_name[MAX_NODE_STR_LEN];
struct cluster_lock *cluster_lock;
+ char temp_char_form[MAX_NODE_STR_LEN] = "";
cluster_lock = lock_table_lookup_acquire(lock_id);
my_path = cluster_lock->lock_path;
- snprintf(parent, MAX_NODE_STR_LEN, LOCK_ZNODE "/%"PRIu64"/",
+ strcpy(temp_char_form, lock_znode );
+ strcat(temp_char_form, "/%"PRIu64"/" );
+ snprintf(parent, MAX_NODE_STR_LEN, temp_char_form,
cluster_lock->id);
/*
* It need using path without end of '/' to create node of lock_id in
* zookeeper's API, so we use 'parent_node'.
*/
- snprintf(parent_node, MAX_NODE_STR_LEN, LOCK_ZNODE "/%"PRIu64,
+ temp_char_form[strlen(temp_char_form)-1] = '\0';
+ snprintf(parent_node, MAX_NODE_STR_LEN, temp_char_form,
cluster_lock->id);
create_seq_node:
/* compete owner of lock is just like zk_compete_master() */
@@ -1359,7 +1408,8 @@ static void zk_unlock(uint64_t lock_id)
static int zk_init(const char *option)
{
- char *hosts, *to, *p;
+ char hosts[MAX_NODE_STR_LEN];
+ const char *pt, *pd;
int ret, interval, retry = 0, max_retry;
if (!option) {
@@ -1367,17 +1417,28 @@ static int zk_init(const char *option)
return -1;
}
- hosts = strtok((char *)option, "=");
- if ((to = strtok(NULL, "="))) {
- if (sscanf(to, "%u", &zk_timeout) != 1) {
- sd_err("Invalid parameter for timeout");
- return -1;
- }
- p = strstr(hosts, "timeout");
- *--p = '\0';
+ pt = strstr(option,"timeout=");
+ pd = strstr(option,"domain=");
+ if( pt==NULL && pd==NULL ){
+ strcpy(hosts,option);
+ }else if( pt ){
+ int i = 0;
+ while(option != pt)
+ hosts[i++] = *option++;
+ hosts[i-1] = '\0';
+ sscanf(pt,"timeout=%d",&zk_timeout);
+ if( pd )
+ sscanf(pd,"domain=%s",sd_domain);
+ }else{
+ int i = 0;
+ while(option != pd)
+ hosts[i++] = *option++;
+ hosts[i-1] = '\0';
+ sscanf(pd,"domain=%s",sd_domain);
}
- sd_debug("version %d.%d.%d, address %s, timeout %d", ZOO_MAJOR_VERSION,
- ZOO_MINOR_VERSION, ZOO_PATCH_VERSION, hosts, zk_timeout);
+
+ sd_debug("version %d.%d.%d, address %s, timeout %d, sheepdog domain %s", ZOO_MAJOR_VERSION,
+ ZOO_MINOR_VERSION, ZOO_PATCH_VERSION, hosts, zk_timeout, sd_domain);
zhandle = zookeeper_init(hosts, zk_watcher, zk_timeout, NULL, NULL, 0);
if (!zhandle) {
sd_err("failed to initialize zk server %s", option);
@@ -1398,6 +1459,18 @@ static int zk_init(const char *option)
uatomic_set_false(&stop);
uatomic_set_false(&is_master);
+ strcpy(master_znode,base_znode);
+ strcat(master_znode,"/");
+ strcat(master_znode,sd_domain);
+ strcat(master_znode,master_znode_post);
+ strcpy(queue_znode,base_znode);
+ strcat(queue_znode,"/");
+ strcat(queue_znode,sd_domain);
+ strcat(queue_znode,queue_znode_post);
+ strcpy(member_znode,base_znode);
+ strcat(member_znode,"/");
+ strcat(member_znode,sd_domain);
+ strcat(member_znode,member_znode_post);
if (zk_queue_init() != ZOK)
return -1;
@@ -1421,9 +1494,13 @@ static int zk_init(const char *option)
sd_init_mutex(table_locks + i);
}
- ret = zk_init_node(LOCK_ZNODE);
+ strcpy(lock_znode,base_znode);
+ strcat(lock_znode,"/");
+ strcat(lock_znode,sd_domain);
+ strcat(lock_znode,lock_znode_post);
+ ret = zk_init_node(lock_znode);
if (ret != ZOK) {
- sd_err("Failed to create %s %s", LOCK_ZNODE, zerror(ret));
+ sd_err("Failed to create %s %s", lock_znode, zerror(ret));
free(cluster_locks_table);
return -1;
}
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 7d5fa0f..a5ba208 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -68,12 +68,14 @@ static const char cluster_help[] =
"\tlocal: use local driver\n"
"\tcorosync: use corosync driver\n"
"\tzookeeper: use zookeeper driver, need extra arguments\n"
-"\n\tzookeeper arguments: address-list,timeout=value (default as 3000)\n"
+"\n\tzookeeper arguments: address-list,timeout=value(default as 3000),domain=value(default as sd_domain_default)\n"
"\nExample:\n\t"
-"$ sheep -c zookeeper:IP1:PORT1,IP2:PORT2,IP3:PORT3,timeout=1000 ...\n"
+"$ sheep -c zookeeper:IP1:PORT1,IP2:PORT2,IP3:PORT3,timeout=1000,domain=sheep_domain ...\n"
"This tries to use 3 node zookeeper cluster, which can be reached by\n"
"IP1:PORT1, IP2:PORT2, IP3:PORT3 to manage membership and broadcast message\n"
-"and set the timeout of node heartbeat as 1000 milliseconds\n";
+"and set the timeout of node heartbeat as 1000 milliseconds\n"
+"and join the domain sheep_domain.\n"
+"Notice that timeout should be followed by domain if both are given explicitly.\n";
static const char cache_help[] =
"Available arguments:\n"
--
1.7.9.5
More information about the sheepdog
mailing list