[Sheepdog] [zookeeper][PATCH 3/5] Fix bug: lose zookeeper event
Yunkai Zhang
yunkai.me at gmail.com
Tue Mar 6 19:10:34 CET 2012
Use zookeeper distributed lock to prevent event lost.
Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com>
---
sheep/cluster/zookeeper.c | 63 +++++++++++++++++++++++++++++++++++----------
1 files changed, 49 insertions(+), 14 deletions(-)
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 23ce4ab..c2592d2 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -72,8 +72,10 @@ static void zk_lock(zhandle_t *zh)
again:
rc = zoo_create(zh, LOCK_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE,
ZOO_EPHEMERAL, NULL, 0);
- if (rc == ZOK)
+ if (rc == ZOK){
+ dprintf("locked\n");
return;
+ }
else if (rc == ZNODEEXISTS) {
dprintf("retry\n");
usleep(10000); /* FIXME: use watch notification */
@@ -89,11 +91,14 @@ static void zk_unlock(zhandle_t *zh)
rc = zoo_delete(zh, LOCK_ZNODE, -1);
if (rc != ZOK)
panic("failed to release lock\n");
+
+ dprintf("unlocked\n");
}
/* ZooKeeper-based queue */
+static int efd;
static int queue_pos;
static int zk_queue_empty(zhandle_t *zh)
@@ -114,19 +119,27 @@ static void zk_queue_push(zhandle_t *zh, struct zk_event *ev)
{
int rc;
char path[256], buf[256];
+ eventfd_t value = 1;
+
+ zk_lock(zh);
sprintf(path, "%s/", QUEUE_ZNODE);
rc = zoo_create(zh, path, (char *)ev, sizeof(*ev),
&ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE, buf, sizeof(buf));
+ assert(rc == ZOK);
+ dprintf("create path:%s, queue_pos:%d\n", buf, queue_pos);
if (queue_pos < 0) {
/* the first pushed data should be EVENT_JOIN */
assert(ev->type == EVENT_JOIN);
sscanf(buf, QUEUE_ZNODE "/%010d", &queue_pos);
- /* watch */
- zoo_exists(zh, buf, 1, NULL);
+ /* manual notify */
+ dprintf("write event to efd:%d\n", efd);
+ eventfd_write(efd, value);
}
+
+ zk_unlock(zh);
}
static int zk_queue_push_back(zhandle_t *zh, struct zk_event *ev)
@@ -134,33 +147,57 @@ static int zk_queue_push_back(zhandle_t *zh, struct zk_event *ev)
int rc;
char path[256];
+ zk_lock(zh);
+
queue_pos--;
if (ev) {
/* update the last popped data */
sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
+ dprintf("update path:%s, queue_pos:%d\n", path, queue_pos);
rc = zoo_set(zh, path, (char *)ev, sizeof(*ev), -1);
+ assert(rc == ZOK);
+
}
+ zk_unlock(zh);
+
return 0;
}
static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
{
- int rc, len;
+ int rc, len, pos;
char path[256];
+ eventfd_t value = 1;
if (zk_queue_empty(zh))
return -1;
+ zk_lock(zh);
+
sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
len = sizeof(*ev);
+ dprintf("read path:%s\n", path);
rc = zoo_get(zh, path, 1, (char *)ev, &len, NULL);
+ assert(rc == ZOK);
- /* watch next data */
- queue_pos++;
- sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
- zoo_exists(zh, path, 1, NULL);
+ pos = queue_pos++;
+ do {
+ /* watch next data */
+ pos++;
+ sprintf(path, QUEUE_ZNODE "/%010d", pos);
+ dprintf("watch path:%s\n", path);
+
+ rc = zoo_exists(zh, path, 1, NULL);
+ if (rc == ZOK) {
+ /* we lost this message, manual notify */
+ dprintf("write event to efd:%d\n", efd);
+ eventfd_write(efd, value);
+ }
+ } while (rc == ZOK);
+
+ zk_unlock(zh);
return 0;
}
@@ -219,7 +256,6 @@ static void zk_queue_init(zhandle_t *zh)
/* ZooKeeper driver APIs */
static zhandle_t *zhandle;
-static int efd;
static struct work_queue *zk_block_wq;
@@ -262,8 +298,6 @@ static int add_event(zhandle_t *zh, enum zk_event_type type,
struct sd_node *n;
struct zk_event ev;
- zk_lock(zh);
-
ev.type = type;
ev.sender = *node;
ev.buf_len = buf_len;
@@ -272,6 +306,7 @@ static int add_event(zhandle_t *zh, enum zk_event_type type,
memcpy(ev.buf, buf, buf_len);
ev.nr_nodes = get_nodes(zh, ev.nodes);
+ dprintf("get_nodes, nr_nodes:%d\n", ev.nr_nodes);
switch (type) {
case EVENT_JOIN:
@@ -296,8 +331,6 @@ static int add_event(zhandle_t *zh, enum zk_event_type type,
zk_queue_push(zh, &ev);
out:
- zk_unlock(zh);
-
return 0;
}
@@ -447,11 +480,13 @@ static int zk_notify(void *msg, size_t msg_len, void (*block_cb)(void *arg))
static void zk_block(struct work *work)
{
+ int rc;
struct zk_event ev;
pthread_mutex_lock(&queue_lock);
- zk_queue_pop(zhandle, &ev);
+ rc = zk_queue_pop(zhandle, &ev);
+ assert(rc == 0);
ev.block_cb(ev.buf);
ev.blocked = 0;
--
1.7.7.6
More information about the sheepdog
mailing list