[sheepdog] [PATCH 6/8] zookeeper: fix kick_block_event logic
Liu Yuan
namei.unix at gmail.com
Sun Dec 23 16:26:30 CET 2012
From: Liu Yuan <tailai.ly at taobao.com>
Kick block event only if there is no nonblock event. We perfer to
handle nonblock event becasue:
1. Sheep assuems that unblock() and notify() is a transaction, so we
can only kick next block event after sd_notify_handler() is called
2. We should process leave/join event as soon as possible.
- use zk_node for block event
Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
sheep/cluster/zookeeper.c | 106 +++++++++++++++++++++++----------------------
1 file changed, 54 insertions(+), 52 deletions(-)
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index fe4093d..336c827 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -49,13 +49,13 @@ enum zk_event_type {
};
struct zk_node {
+ struct list_head list;
struct rb_node rb;
struct sd_node node;
+ bool callbacked;
};
struct zk_event {
- struct list_head list;
- bool callbacked;
enum zk_event_type type;
struct zk_node sender;
enum cluster_join_result join_result;
@@ -66,7 +66,7 @@ struct zk_event {
static struct sd_node sd_nodes[SD_MAX_NODES];
static size_t nr_sd_nodes;
struct rb_root zk_node_root = RB_ROOT;
-static LIST_HEAD(zk_block_event_list);
+static LIST_HEAD(zk_block_list);
static bool joined;
static struct zk_node *zk_tree_insert(struct zk_node *new)
@@ -214,9 +214,9 @@ static bool zk_queue_peek(void)
rc = zk_node_exists(path);
if (rc == ZOK)
- return false;
+ return true;
- return true;
+ return false;
}
static void zk_queue_push(struct zk_event *ev)
@@ -264,29 +264,17 @@ static int zk_queue_push_back(struct zk_event *ev)
return 0;
}
-static int zk_queue_pop(struct zk_event *ev)
+static void zk_queue_pop_advance(struct zk_event *ev)
{
int len;
char path[256];
- if (zk_queue_peek())
- return -1;
-
len = sizeof(*ev);
sprintf(path, QUEUE_ZNODE "/%010"PRId32, queue_pos);
assert(zk_get_data(path, ev, &len) == ZOK);
dprintf("%s, type:%d, len:%d, pos:%"PRId32"\n",
path, ev->type, len, queue_pos);
-
- /* watch next */
queue_pos++;
- sprintf(path, QUEUE_ZNODE "/%010"PRId32, queue_pos);
- /* If not joined, we must wait for join response. No kick. */
- if (zk_node_exists(path) == ZOK && joined)
- /* Someone has created this node, go kick event handler */
- eventfd_write(efd, 1);
-
- return 0;
}
static int zk_member_empty(void)
@@ -550,32 +538,27 @@ static void zk_handle_join_response(struct zk_event *ev)
ev->join_result, ev->buf);
}
-/* When block event is deleted from list, we should call this function */
-static void kick_next_block_event(void)
+static void kick_block_event(void)
{
- struct zk_event *zke;
+ struct zk_node *block;
- if (list_empty(&zk_block_event_list))
+ if (list_empty(&zk_block_list))
return;
- zke = list_first_entry(&zk_block_event_list, typeof(*zke), list);
- if (!zke->callbacked)
- zke->callbacked = sd_block_handler(&zke->sender.node);
+ block = list_first_entry(&zk_block_list, typeof(*block), list);
+ if (!block->callbacked)
+ block->callbacked = sd_block_handler(&block->node);
}
-static bool block_event_list_del(struct zk_node *n)
+static void block_event_list_del(struct zk_node *n)
{
- struct zk_event *ev, *t;
- bool ret = false;
+ struct zk_node *ev, *t;
- list_for_each_entry_safe(ev, t, &zk_block_event_list, list) {
- if (node_eq(&ev->sender.node, &n->node)) {
+ list_for_each_entry_safe(ev, t, &zk_block_list, list) {
+ if (node_eq(&ev->node, &n->node)) {
list_del(&ev->list);
free(ev);
- ret = true;
}
}
-
- return ret;
}
static void zk_handle_leave(struct zk_event *ev)
@@ -587,8 +570,7 @@ static void zk_handle_leave(struct zk_event *ev)
node_to_str(&ev->sender.node));
return;
}
- if (block_event_list_del(n))
- kick_next_block_event();
+ block_event_list_del(n);
zk_tree_del(n);
build_node_list();
sd_leave_handler(&ev->sender.node, sd_nodes, nr_sd_nodes);
@@ -596,29 +578,30 @@ static void zk_handle_leave(struct zk_event *ev)
static void zk_handle_block(struct zk_event *ev)
{
- struct zk_event *zke = xzalloc(sizeof(*zke));
+ struct zk_node *block = xmalloc(sizeof(*block));
dprintf("BLOCK\n");
- *zke = *ev;
- zke->callbacked = false;
- list_add_tail(&zke->list, &zk_block_event_list);
- zke = list_first_entry(&zk_block_event_list, typeof(*zke), list);
- if (!zke->callbacked)
- zke->callbacked = sd_block_handler(&zke->sender.node);
+ block->node = ev->sender.node;
+ block->callbacked = false;
+ list_add_tail(&block->list, &zk_block_list);
+ block = list_first_entry(&zk_block_list, typeof(*block), list);
+ if (!block->callbacked)
+ block->callbacked = sd_block_handler(&block->node);
}
static void zk_handle_unblock(struct zk_event *ev)
{
- struct zk_event *zke;
+ struct zk_node *block;
dprintf("UNBLOCK\n");
- zke = list_first_entry(&zk_block_event_list, typeof(*zke), list);
- if (zke->callbacked)
- add_event(EVENT_NOTIFY, &zke->sender, ev->buf, ev->buf_len);
+ if (list_empty(&zk_block_list))
+ return;
+ block = list_first_entry(&zk_block_list, typeof(*block), list);
+ if (block->callbacked)
+ add_event(EVENT_NOTIFY, block, ev->buf, ev->buf_len);
- list_del(&zke->list);
- free(zke);
- kick_next_block_event();
+ list_del(&block->list);
+ free(block);
}
static void zk_handle_notify(struct zk_event *ev)
@@ -643,6 +626,7 @@ static void zk_event_handler(int listen_fd, int events, void *data)
eventfd_t value;
struct zk_event ev;
+ dprintf("%d, %d\n", events, queue_pos);
if (events & EPOLLHUP) {
eprintf("zookeeper driver received EPOLLHUP event, exiting.\n");
log_close();
@@ -654,13 +638,31 @@ static void zk_event_handler(int listen_fd, int events, void *data)
return;
}
- if (zk_queue_pop(&ev) < 0)
- return;
+ if (!zk_queue_peek())
+ goto kick_block_event;
+ zk_queue_pop_advance(&ev);
if (ev.type < zk_max_event_handlers && zk_event_handlers[ev.type])
zk_event_handlers[ev.type](&ev);
else
- eprintf("unhandled type %d\n", ev.type);
+ panic("unhandled type %d\n", ev.type);
+
+ /* Someone has created next event, go kick event handler. */
+ if (zk_queue_peek() && joined) {
+ eventfd_write(efd, 1);
+ return;
+ }
+
+kick_block_event:
+ /*
+ * Kick block event only if there is no nonblock event. We perfer to
+ * handle nonblock event becasue:
+ *
+ * 1. Sheep assuems that unblock() and notify() is a transaction, so we
+ * can only kick next block event after sd_notify_handler() is called
+ * 2. We should process leave/join event as soon as possible.
+ */
+ kick_block_event();
}
static int zk_init(const char *option)
--
1.7.9.5
More information about the sheepdog
mailing list