[Sheepdog] [zookeeper][PATCH 5/6] Fix two bug:
Yunkai Zhang
yunkai.me at gmail.com
Thu Apr 5 12:23:05 CEST 2012
1. Change pthread lock in zk_dispatch/zk_block so that
it will not block IO request between sheep.
2. If one node pointed to by queue_pos was send by leaver,
and it have blocked whole cluster, we should ignore it.
Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com>
---
sheep/cluster/zookeeper.c | 66 +++++++++++++++++++++++++++++++-------------
1 files changed, 46 insertions(+), 20 deletions(-)
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 16b50e3..527c6e4 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -91,12 +91,13 @@ struct zk_event {
/* leave event list */
static LIST_HEAD(zk_levent_list);
+static int zk_notify_blocked;
static struct zk_node zk_nodes[SD_MAX_NODES];
static size_t nr_zk_nodes;
static char *zk_option;
-/* protect queue_start_pos */
-static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
+/* protect zk_notify_blocked */
+static pthread_mutex_t blocked_lock = PTHREAD_MUTEX_INITIALIZER;
/* protect leave event list */
static pthread_mutex_t leave_lock = PTHREAD_MUTEX_INITIALIZER;
@@ -291,8 +292,29 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
list_del(&lev->list);
pthread_mutex_unlock(&leave_lock);
+ /* if the node pointed to by queue_pos was send by this leaver,
+ * and it have blocked whole cluster, we should ignore it. */
+ len = sizeof(*ev);
+ sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
+ rc = zk_get(zh, path, 1, (char *)ev, &len, NULL);
+ if (rc == ZOK && node_cmp(&ev->sender.node, &lev->sender.node) == 0 && ev->blocked) {
+ dprintf("this queue_pos:%d have blocked whole cluster, ignore it\n", queue_pos);
+ queue_pos++;
+
+ /* watch next data */
+ sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
+ rc = zk_exists(zh, path, 1, NULL);
+ dprintf("watch path:%s, exists:%d\n", path, (rc==ZOK));
+ if (rc == ZOK) {
+ /* we lost this message, manual notify */
+ dprintf("write event to efd:%d\n", efd);
+ eventfd_write(efd, value);
+ }
+ }
+
memcpy(ev, lev, sizeof(*ev));
free(lev);
+
return 0;
}
@@ -753,14 +775,7 @@ static int zk_join(struct sd_node *myself,
static int zk_leave(void)
{
- int rc;
-
- /* protect zk_levent_list */
- pthread_mutex_lock(&queue_lock);
- rc = add_event(zhandle, EVENT_LEAVE, &this_node, NULL, 0, NULL);
- pthread_mutex_unlock(&queue_lock);
-
- return rc;
+ return add_event(zhandle, EVENT_LEAVE, &this_node, NULL, 0, NULL);
}
static int zk_notify(void *msg, size_t msg_len, void (*block_cb)(void *arg))
@@ -773,9 +788,6 @@ static void zk_block(struct work *work)
int rc;
struct zk_event ev;
- /* get lock only after zk_dispatch finished */
- pthread_mutex_lock(&queue_lock);
-
rc = zk_queue_pop(zhandle, &ev);
assert(rc == 0);
@@ -784,7 +796,11 @@ static void zk_block(struct work *work)
zk_queue_push_back(zhandle, &ev);
- pthread_mutex_unlock(&queue_lock);
+ pthread_mutex_lock(&blocked_lock);
+ dprintf("blocked_lock\n");
+ zk_notify_blocked = 0;
+ pthread_mutex_unlock(&blocked_lock);
+ dprintf("blocked_unlock\n");
}
static void zk_block_done(struct work *work)
@@ -810,8 +826,15 @@ static int zk_dispatch(void)
if (ret < 0)
return 0;
- /* protect zk_levent_list/nr_zk_nodes and prevent zk_block working */
- pthread_mutex_lock(&queue_lock);
+ pthread_mutex_lock(&blocked_lock);
+ dprintf("blocked_lock\n");
+ if (zk_notify_blocked) {
+ pthread_mutex_unlock(&blocked_lock);
+ dprintf("blocked_unlock, return.\n");
+ return 0;
+ }
+ pthread_mutex_unlock(&blocked_lock);
+ dprintf("blocked_unlock, process...\n");
ret = zk_queue_pop(zhandle, &ev);
if (ret < 0)
@@ -819,6 +842,7 @@ static int zk_dispatch(void)
switch (ev.type) {
case EVENT_JOIN:
+ dprintf("JOIN EVENT, blocked:%d\n", ev.blocked);
if (ev.blocked) {
dprintf("one sheep joined[up], nr_nodes:%ld, sender:%s, joined:%d\n",
nr_zk_nodes, node_to_str(&ev.sender.node), ev.sender.joined);
@@ -879,6 +903,7 @@ static int zk_dispatch(void)
ev.join_result, ev.buf);
break;
case EVENT_LEAVE:
+ dprintf("LEAVE EVENT, blocked:%d\n", ev.blocked);
/*reset master if necessary */
dprintf("find node:%s\n", node_to_str(&ev.sender.node));
n = find_node(zk_nodes, nr_zk_nodes, &ev.sender);
@@ -901,13 +926,15 @@ static int zk_dispatch(void)
zk_hdlrs.leave_handler(&ev.sender.node, entries, nr_zk_nodes);
break;
case EVENT_NOTIFY:
+ dprintf("NOTIFY, blocked:%d\n", ev.blocked);
if (ev.blocked) {
- if (node_cmp(&ev.sender, &this_node) == 0 && !ev.callbacked) {
- queue_work(zk_block_wq, &work);
-
+ if (node_cmp(&ev.sender.node, &this_node.node) == 0 && !ev.callbacked) {
ev.callbacked = 1;
+ zk_notify_blocked = 1;
zk_queue_push_back(zhandle, &ev);
+
+ queue_work(zk_block_wq, &work);
} else
zk_queue_push_back(zhandle, NULL);
@@ -920,7 +947,6 @@ static int zk_dispatch(void)
break;
}
out:
- pthread_mutex_unlock(&queue_lock);
return 0;
}
--
1.7.7.6
More information about the sheepdog
mailing list