[sheepdog] [PATCH 12/13] zookeeper: rework block/unblock and leave event handling
Liu Yuan
namei.unix at gmail.com
Tue Dec 18 06:38:01 CET 2012
From: Liu Yuan <tailai.ly at taobao.com>
This patch kicks start zookeeper driver to work with the current master.
- Put all the events into the zk_queue and result in a totally ordered event
sequence.
- Block/unblock events are linked into a dedicated list to implement blocking
semantics as corosyn
- better master choosing method to allow concurrent start of multile nodes.
Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
sheep/cluster/zookeeper.c | 302 ++++++++++++++++-----------------------------
1 file changed, 109 insertions(+), 193 deletions(-)
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 8b15f1b..feb5ec8 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -11,11 +11,9 @@
#include <stdio.h>
#include <string.h>
#include <unistd.h>
-#include <search.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <zookeeper/zookeeper.h>
-#include <urcu/uatomic.h>
#include "cluster.h"
#include "event.h"
@@ -30,6 +28,7 @@
#define BASE_ZNODE "/sheepdog"
#define QUEUE_ZNODE BASE_ZNODE "/queue"
#define MEMBER_ZNODE BASE_ZNODE "/member"
+#define MASTER_ZNONE BASE_ZNODE "/master"
/* iterate child znodes */
#define FOR_EACH_ZNODE(parent, path, strs) \
@@ -45,6 +44,7 @@ enum zk_event_type {
EVENT_JOIN_RESPONSE,
EVENT_LEAVE,
EVENT_BLOCK,
+ EVENT_UNBLOCK,
EVENT_NOTIFY,
};
@@ -54,33 +54,19 @@ struct zk_node {
};
struct zk_event {
+ struct list_head list;
+ bool callbacked;
enum zk_event_type type;
struct zk_node sender;
-
enum cluster_join_result join_result;
-
size_t buf_len;
uint8_t buf[SD_MAX_EVENT_BUF_SIZE];
};
-static uatomic_bool zk_notify_blocked;
-
-/* leave event circular array */
-static struct zk_event zk_levents[SD_MAX_NODES];
-static int nr_zk_levents;
-static unsigned zk_levent_head;
-static unsigned zk_levent_tail;
-static bool called_by_zk_unblock;
-
static struct sd_node sd_nodes[SD_MAX_NODES];
static size_t nr_sd_nodes;
-
struct rb_root zk_node_root = RB_ROOT;
-
-static inline bool is_blocking_event(struct zk_event *ev)
-{
- return ev->type == EVENT_BLOCK || ev->type == EVENT_JOIN_REQUEST;
-}
+static LIST_HEAD(zk_block_event_list);
static struct zk_node *zk_tree_insert(struct zk_node *new)
{
@@ -103,10 +89,8 @@ static struct zk_node *zk_tree_insert(struct zk_node *new)
/* already has this entry */
return entry;
}
-
rb_link_node(&new->rb, parent, p);
rb_insert_color(&new->rb, &zk_node_root);
-
return NULL; /* insert successfully */
}
@@ -128,7 +112,6 @@ static struct zk_node *zk_tree_search(const struct node_id *nid)
else
return t; /* found it */
}
-
return NULL;
}
@@ -160,7 +143,7 @@ zk_init_node(const char *path)
panic("failed, path:%s, rc:%d\n", path, rc);
}
-static inline ZOOAPI void
+static inline ZOOAPI int
zk_create_node(const char *path, const char *value, int valuelen,
const struct ACL_vector *acl, int flags, char *path_buffer,
int path_buffer_len)
@@ -170,8 +153,7 @@ zk_create_node(const char *path, const char *value, int valuelen,
rc = zoo_create(zhandle, path, value, valuelen, acl,
flags, path_buffer, path_buffer_len);
} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
- if (rc != ZOK)
- panic("failed, path:%s, rc:%d\n", path, rc);
+ return rc;
}
static inline ZOOAPI int zk_get_data(const char *path, void *buffer,
@@ -218,11 +200,11 @@ static inline ZOOAPI void zk_get_children(const char *path,
panic("failed:%s, rc:%d\n", path, rc);
}
-/* ZooKeeper-based queue */
+/* ZooKeeper-based queue give us an totally ordered events */
static int efd;
static int32_t queue_pos;
-static bool zk_queue_empty(void)
+static bool zk_queue_peek(void)
{
int rc;
char path[256];
@@ -239,25 +221,26 @@ static bool zk_queue_empty(void)
static void zk_queue_push(struct zk_event *ev)
{
static bool first_push = true;
- int len;
+ int len, ret;
char path[256], buf[256];
- eventfd_t value = 1;
len = (char *)(ev->buf) - (char *)ev + ev->buf_len;
sprintf(path, "%s/", QUEUE_ZNODE);
- zk_create_node(path, (char *)ev, len,
- &ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE, buf, sizeof(buf));
- dprintf("create:%s, queue_pos:%010"PRId32", len:%d\n", buf, queue_pos,
- len);
-
+ ret = zk_create_node(path, (char *)ev, len,
+ &ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE, buf,
+ sizeof(buf));
if (first_push) {
- uint32_t seq;
+ int32_t seq;
sscanf(buf, QUEUE_ZNODE "/%"PRId32, &seq);
queue_pos = seq;
- eventfd_write(efd, value);
+ eventfd_write(efd, 1);
first_push = false;
}
+
+ if (ret == ZOK)
+ dprintf("create:%s, queue_pos:%010"PRId32", len:%d\n",
+ buf, queue_pos, len);
}
/*
@@ -280,94 +263,26 @@ static int zk_queue_push_back(struct zk_event *ev)
return 0;
}
-/*
- * Peek next queue event and if it exists, we must watch it and manually notify
- * it in order not to lose it.
- */
-static void zk_queue_peek_next_notify(const char *path)
-{
- int rc = zk_node_exists(path);
- if (rc == ZOK)
- eventfd_write(efd, 1);
-}
-
static int zk_queue_pop(struct zk_event *ev)
{
- int rc, len;
- int nr_levents;
+ int len;
char path[256];
- struct zk_event *lev;
- eventfd_t value = 1;
-
- /*
- * Continue to process LEAVE event even if we have an unfinished BLOCK
- * event.
- */
- if (!called_by_zk_unblock && uatomic_read(&nr_zk_levents)) {
- nr_levents = uatomic_sub_return(&nr_zk_levents, 1) + 1;
- dprintf("nr_levents:%d, head:%u\n", nr_levents, zk_levent_head);
- lev = &zk_levents[zk_levent_head%SD_MAX_NODES];
-
- /*
- * If the node pointed to by queue_pos was send by this leaver,
- * and it have blocked whole cluster, we should ignore it.
- */
- len = sizeof(*ev);
- sprintf(path, QUEUE_ZNODE "/%010"PRId32, queue_pos);
- rc = zk_get_data(path, ev, &len);
- if (rc == ZOK &&
- node_eq(&ev->sender.node, &lev->sender.node) &&
- is_blocking_event(ev)) {
- dprintf("this queue_pos:%010"PRId32" have blocked whole"
- " cluster, ignore it\n", queue_pos);
- queue_pos++;
-
- sprintf(path, QUEUE_ZNODE "/%010"PRId32, queue_pos);
- zk_queue_peek_next_notify(path);
- }
-
- memcpy(ev, lev, sizeof(*ev));
- zk_levent_head++;
-
- if (uatomic_read(&nr_zk_levents) || rc == ZOK) {
- /*
- * we have pending leave events or queue nodes,
- * manual notify
- */
- dprintf("write event to efd:%d\n", efd);
- eventfd_write(efd, value);
- }
-
- return 0;
- }
-
- if (!called_by_zk_unblock && uatomic_is_true(&zk_notify_blocked))
- return -1;
-
- if (zk_queue_empty())
+ if (zk_queue_peek())
return -1;
len = sizeof(*ev);
sprintf(path, QUEUE_ZNODE "/%010"PRId32, queue_pos);
- rc = zk_get_data(path, ev, &len);
- if (rc != ZOK)
- panic("failed to zk_get_data path:%s, rc:%d\n", path, rc);
- dprintf("read path:%s, type:%d, len:%d, rc:%d\n", path, ev->type,
- len, rc);
+ assert(zk_get_data(path, ev, &len) == ZOK);
+ dprintf("read path:%s, type:%d, len:%d\n", path, ev->type, len);
+ /* watch next and kick next event if any */
queue_pos++;
-
- /*
- * This event will be pushed back to the queue,
- * we just wait for the arrival of its updated,
- * not need to watch next data.
- */
- if (is_blocking_event(ev))
- return 0;
-
sprintf(path, QUEUE_ZNODE "/%010"PRId32, queue_pos);
- zk_queue_peek_next_notify(path);
+ if (zk_node_exists(path) == ZOK)
+ /* Someone has created this node, go kick event handler */
+ eventfd_write(efd, 1);
+
return 0;
}
@@ -418,20 +333,27 @@ static inline void build_node_list(void)
dprintf("nr_sd_nodes:%zu\n", nr_sd_nodes);
}
+static inline int zk_master_create(void)
+{
+ return zk_create_node(MASTER_ZNONE, "", 0, &ZOO_OPEN_ACL_UNSAFE,
+ ZOO_EPHEMERAL, NULL, 0);
+}
+
static bool is_master(void)
{
- struct rb_node *n;
struct zk_node *zk;
if (!nr_sd_nodes) {
- if (zk_member_empty())
- return true;
- else
+ if (zk_member_empty()) {
+ if (zk_master_create() == ZOK)
+ return true;
+ else
+ return false;
+ } else
return false;
}
- n = rb_first(&zk_node_root);
- zk = rb_entry(n, struct zk_node, rb);
+ zk = rb_entry(rb_first(&zk_node_root), struct zk_node, rb);
if (node_eq(&zk->node, &this_node.node))
return true;
@@ -486,39 +408,20 @@ static int add_event(enum zk_event_type type, struct zk_node *znode, void *buf,
return 0;
}
-static int leave_event(struct zk_node *znode)
-{
- int nr_levents;
- struct zk_event *ev;
- const eventfd_t value = 1;
-
- ev = &zk_levents[zk_levent_tail % SD_MAX_NODES];
- ev->type = EVENT_LEAVE;
- ev->sender = *znode;
- ev->buf_len = 0;
-
- nr_levents = uatomic_add_return(&nr_zk_levents, 1);
- dprintf("nr_zk_levents:%d, tail:%u\n", nr_levents, zk_levent_tail);
-
- zk_levent_tail++;
-
- eventfd_write(efd, value);
- return 0;
-}
-
static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
void *ctx)
{
+ struct zk_node znode;
char str[256], *p;
int ret;
- struct zk_node znode;
+/* CREATED_EVENT 1, DELETED_EVENT 2, CHANGED_EVENT 3, CHILD_EVENT 4 */
dprintf("path:%s, type:%d\n", path, type);
-
if (type == ZOO_CREATED_EVENT || type == ZOO_CHANGED_EVENT) {
ret = sscanf(path, MEMBER_ZNODE "/%s", str);
if (ret == 1)
zk_node_exists(path);
+ /* kick off the event handler */
eventfd_write(efd, 1);
} else if (type == ZOO_DELETED_EVENT) {
ret = sscanf(path, MEMBER_ZNODE "/%s", str);
@@ -526,12 +429,9 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
return;
p = strrchr(path, '/');
p++;
-
str_to_node(p, &znode.node);
- dprintf("zk_nodes leave:%s\n", node_to_str(&znode.node));
-
- leave_event(&znode);
- return;
+ /* FIXME: remove redundant leave events */
+ add_event(EVENT_LEAVE, &znode, NULL, 0);
}
}
@@ -547,21 +447,16 @@ static int zk_join(const struct sd_node *myself,
sprintf(path, MEMBER_ZNODE "/%s", node_to_str(myself));
rc = zk_node_exists(path);
if (rc == ZOK) {
- eprintf("Previous zookeeper session exist, shoot myself.\n"
- "Wait for a while and restart me again\n");
+ eprintf("Previous zookeeper session exist, shoot myself.\n");
exit(1);
}
- return add_event(EVENT_JOIN_REQUEST, &this_node,
- opaque, opaque_len);
+ return add_event(EVENT_JOIN_REQUEST, &this_node, opaque, opaque_len);
}
static int zk_leave(void)
{
- char path[256];
- sprintf(path, MEMBER_ZNODE "/%s", node_to_str(&this_node.node));
- dprintf("try to delete member path:%s\n", path);
- return zk_delete_node(path, -1);
+ return add_event(EVENT_LEAVE, &this_node, NULL, 0);
}
static int zk_notify(void *msg, size_t msg_len)
@@ -576,27 +471,7 @@ static void zk_block(void)
static void zk_unblock(void *msg, size_t msg_len)
{
- int rc;
- struct zk_event ev;
- eventfd_t value = 1;
-
- called_by_zk_unblock = true;
- rc = zk_queue_pop(&ev);
- called_by_zk_unblock = false;
- assert(rc == 0);
-
- ev.type = EVENT_NOTIFY;
- ev.buf_len = msg_len;
- if (msg)
- memcpy(ev.buf, msg, msg_len);
-
- zk_queue_push_back(&ev);
-
- uatomic_set_false(&zk_notify_blocked);
-
- /* this notify is necessary */
- dprintf("write event to efd:%d\n", efd);
- eventfd_write(efd, value);
+ add_event(EVENT_UNBLOCK, &this_node, msg, msg_len);
}
static void zk_handle_join_request(struct zk_event *ev)
@@ -604,23 +479,19 @@ static void zk_handle_join_request(struct zk_event *ev)
enum cluster_join_result res;
dprintf("sender: %s\n", node_to_str(&ev->sender.node));
-
if (!is_master()) {
/* Let's await master acking the join-request */
queue_pos--;
return;
}
-
res = sd_check_join_cb(&ev->sender.node, ev->buf);
ev->join_result = res;
ev->type = EVENT_JOIN_RESPONSE;
-
zk_queue_push_back(ev);
-
if (res == CJ_RES_MASTER_TRANSFER) {
eprintf("failed to join sheepdog cluster: "
"please retry when master is up\n");
- zk_leave();
+ add_event(EVENT_LEAVE, &this_node, NULL, 0);
exit(1);
}
dprintf("I'm the master now\n");
@@ -634,12 +505,8 @@ static void zk_handle_join_response(struct zk_event *ev)
if (is_master() &&
!node_eq(&ev->sender.node, &this_node.node)) {
/* wait util the member node has been created */
- int retry =
- MEMBER_CREATE_TIMEOUT / MEMBER_CREATE_INTERVAL;
-
- sprintf(path, MEMBER_ZNODE "/%s",
- node_to_str(&ev->sender.node));
-
+ int retry = MEMBER_CREATE_TIMEOUT / MEMBER_CREATE_INTERVAL;
+ sprintf(path, MEMBER_ZNODE"/%s", node_to_str(&ev->sender.node));
while (retry && zk_node_exists(path) == ZNONODE) {
usleep(MEMBER_CREATE_INTERVAL * 1000);
retry--;
@@ -669,13 +536,11 @@ static void zk_handle_join_response(struct zk_event *ev)
case CJ_RES_SUCCESS:
case CJ_RES_JOIN_LATER:
case CJ_RES_MASTER_TRANSFER:
- sprintf(path, MEMBER_ZNODE "/%s",
- node_to_str(&ev->sender.node));
+ sprintf(path, MEMBER_ZNODE"/%s", node_to_str(&ev->sender.node));
if (node_eq(&ev->sender.node, &this_node.node)) {
dprintf("create path:%s\n", path);
zk_create_node(path, (char *)&ev->sender,
- sizeof(ev->sender),
- &ZOO_OPEN_ACL_UNSAFE,
+ sizeof(ev->sender), &ZOO_OPEN_ACL_UNSAFE,
ZOO_EPHEMERAL, NULL, 0);
} else {
zk_node_exists(path);
@@ -690,14 +555,45 @@ static void zk_handle_join_response(struct zk_event *ev)
ev->join_result, ev->buf);
}
+/* When block event is deleted from list, we should call this function */
+static void kick_next_block_event(void)
+{
+ struct zk_event *zke;
+
+ if (list_empty(&zk_block_event_list))
+ return;
+ zke = list_first_entry(&zk_block_event_list, typeof(*zke), list);
+ if (!zke->callbacked)
+ zke->callbacked = sd_block_handler(&zke->sender.node);
+}
+
+static bool block_event_list_del(struct zk_node *n)
+{
+ struct zk_event *ev, *t;
+ bool ret = false;
+
+ list_for_each_entry_safe(ev, t, &zk_block_event_list, list) {
+ if (node_eq(&ev->sender.node, &n->node)) {
+ list_del(&ev->list);
+ free(ev);
+ ret = true;
+ }
+ }
+
+ return ret;
+}
+
static void zk_handle_leave(struct zk_event *ev)
{
struct zk_node *n = zk_tree_search(&ev->sender.node.nid);
+
if (!n) {
dprintf("can't find this leave node:%s, ignore it.\n",
node_to_str(&ev->sender.node));
return;
}
+ if (block_event_list_del(n))
+ kick_next_block_event();
zk_tree_del(n);
build_node_list();
sd_leave_handler(&ev->sender.node, sd_nodes, nr_sd_nodes);
@@ -705,10 +601,29 @@ static void zk_handle_leave(struct zk_event *ev)
static void zk_handle_block(struct zk_event *ev)
{
+ struct zk_event *zke = xzalloc(sizeof(*zke));
+
dprintf("BLOCK\n");
- queue_pos--;
- if (sd_block_handler(&ev->sender.node))
- assert(uatomic_set_true(&zk_notify_blocked));
+ *zke = *ev;
+ zke->callbacked = false;
+ list_add_tail(&zke->list, &zk_block_event_list);
+ zke = list_first_entry(&zk_block_event_list, typeof(*zke), list);
+ if (!zke->callbacked)
+ zke->callbacked = sd_block_handler(&zke->sender.node);
+}
+
+static void zk_handle_unblock(struct zk_event *ev)
+{
+ struct zk_event *zke;
+
+ dprintf("UNBLOCK\n");
+ zke = list_first_entry(&zk_block_event_list, typeof(*zke), list);
+ if (zke->callbacked)
+ add_event(EVENT_NOTIFY, &zke->sender, ev->buf, ev->buf_len);
+
+ list_del(&zke->list);
+ free(zke);
+ kick_next_block_event();
}
static void zk_handle_notify(struct zk_event *ev)
@@ -722,6 +637,7 @@ static void (*const zk_event_handlers[])(struct zk_event *ev) = {
[EVENT_JOIN_RESPONSE] = zk_handle_join_response,
[EVENT_LEAVE] = zk_handle_leave,
[EVENT_BLOCK] = zk_handle_block,
+ [EVENT_UNBLOCK] = zk_handle_unblock,
[EVENT_NOTIFY] = zk_handle_notify,
};
--
1.7.9.5
More information about the sheepdog
mailing list