[sheepdog] [PATCH 1/2] zookeeper: update queue_pos to zookeeper-server
Meng Lingkun
starmlk at 163.com
Mon Mar 9 05:55:49 CET 2015
From: Meng Lingkun <menglingkun at cmss.chinamobile.com>
When sheep connects to zookeeper server, create and initialize
queue_pos/$sheep_id. And in the running time, zookeeper driver
will update its queue_pos to zookeeper-server.
Signed-off-by: Meng Lingkun <menglingkun at cmss.chinamobile.com>
---
sheep/cluster/zookeeper.c | 32 ++++++++++++++++++++++++++++++++
1 file changed, 32 insertions(+)
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index bf94871..b346592 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -32,6 +32,7 @@
#define MEMBER_ZNODE "/member"
#define MASTER_ZNODE "/master"
#define LOCK_ZNODE "/lock"
+#define QUEUE_POS_ZNODE "/queue_pos"
static int zk_timeout = SESSION_TIMEOUT;
static int my_master_seq;
@@ -452,6 +453,7 @@ static void lock_table_remove_znodes(void)
/* ZooKeeper-based queue give us an totally ordered events */
static int efd;
static int32_t queue_pos;
+#define QUEUE_DEL_BATCH 1000
static int zk_queue_peek(bool *peek)
{
@@ -580,6 +582,7 @@ static int zk_queue_pop_advance(struct zk_event *ev)
{
int len;
char path[MAX_NODE_STR_LEN];
+ char queue_pos_path[MAX_NODE_STR_LEN];
len = sizeof(*ev);
snprintf(path, sizeof(path), QUEUE_ZNODE "/%010"PRId32, queue_pos);
@@ -587,6 +590,16 @@ static int zk_queue_pop_advance(struct zk_event *ev)
RETURN_IF_ERROR(zk_get_data(path, ev, &len), "path %s", path);
sd_debug("%s, type:%d, len:%d, pos:%" PRId32, path, ev->type, len,
queue_pos);
+
+ if (queue_pos % QUEUE_DEL_BATCH == 0 && ev->type != EVENT_JOIN) {
+ snprintf(queue_pos_path, sizeof(queue_pos_path),
+ QUEUE_POS_ZNODE"/%s", node_to_str(&this_node.node));
+ RETURN_IF_ERROR(zk_set_data(queue_pos_path, (char *)&queue_pos,
+ sizeof(queue_pos), -1), "");
+ sd_debug("update queue pos %s to pos %" PRId32,
+ queue_pos_path, queue_pos);
+ }
+
queue_pos++;
return ZOK;
}
@@ -644,6 +657,8 @@ static int zk_queue_init(void)
RETURN_IF_ERROR(zk_init_node(MASTER_ZNODE), "path %s", MASTER_ZNODE);
RETURN_IF_ERROR(zk_init_node(QUEUE_ZNODE), "path %s", QUEUE_ZNODE);
RETURN_IF_ERROR(zk_init_node(MEMBER_ZNODE), "path %s", MEMBER_ZNODE);
+ RETURN_IF_ERROR(zk_init_node(QUEUE_POS_ZNODE), "path %s",
+ QUEUE_POS_ZNODE);
return ZOK;
}
@@ -749,6 +764,12 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
return;
}
+ ret = sscanf(path, QUEUE_ZNODE "/%s", str);
+ if (ret == 1) {
+ sd_debug("deleted queue %s", str);
+ return;
+ }
+
ret = sscanf(path, MEMBER_ZNODE "/%s", str);
if (ret != 1)
return;
@@ -1085,6 +1106,8 @@ static void init_node_list(struct zk_event *ev)
static void zk_handle_accept(struct zk_event *ev)
{
char path[MAX_NODE_STR_LEN];
+ char queue_pos_path[MAX_NODE_STR_LEN];
+ uint32_t pos = -1;
int rc;
sd_debug("ACCEPT");
@@ -1096,6 +1119,8 @@ static void zk_handle_accept(struct zk_event *ev)
snprintf(path, sizeof(path), MEMBER_ZNODE"/%s",
node_to_str(&ev->sender.node));
+ snprintf(queue_pos_path, sizeof(queue_pos_path), QUEUE_POS_ZNODE"/%s",
+ node_to_str(&ev->sender.node));
if (node_eq(&ev->sender.node, &this_node.node)) {
joined = true;
sd_debug("create path:%s", path);
@@ -1105,6 +1130,13 @@ static void zk_handle_accept(struct zk_event *ev)
&ZOO_OPEN_ACL_UNSAFE,
ZOO_EPHEMERAL, NULL, 0);
RETURN_VOID_IF_ERROR(rc, "");
+ sd_debug("create path:%s", queue_pos_path);
+ rc = zk_create_node(queue_pos_path,
+ (char *)&pos,
+ sizeof(pos),
+ &ZOO_OPEN_ACL_UNSAFE,
+ ZOO_EPHEMERAL, NULL, 0);
+ RETURN_VOID_IF_ERROR(rc, "");
} else
zk_node_exists(path);
--
1.9.1
More information about the sheepdog
mailing list