From: Yunkai Zhang <qiushu.zyk at taobao.com> There were two pthread mutex locks in zookeeper dirver: 1. pthread mutex lock in zk_dispatch/zk_lock, this lock was used to prevent zk_lock poping data before zk_dispatch have pushed it. 2. pthread mutex lock in zk_queue_pop/add_event, this lock was used to protected leave event list. This patch use GNU atomic builtins(__sync_xxx) to replace these two pthread mutex locks. It makes zookeeper driver more lighter and and faster. Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com> --- sheep/cluster/zookeeper.c | 107 ++++++++++++++++++++++++--------------------- 1 files changed, 57 insertions(+), 50 deletions(-) diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c index 2d7dd96..9d88f1d 100644 --- a/sheep/cluster/zookeeper.c +++ b/sheep/cluster/zookeeper.c @@ -82,24 +82,20 @@ struct zk_event { int blocked; /* set non-zero when sheep must block this event */ int callbacked; /* set non-zero if sheep already called block_cb() */ - struct list_head list; /* only used for leave event */ - size_t buf_len; uint8_t buf[MAX_EVENT_BUF_SIZE]; }; -/* leave event list */ -static LIST_HEAD(zk_levent_list); - static int zk_notify_blocked; -static struct zk_node zk_nodes[SD_MAX_NODES]; -static size_t nr_zk_nodes; -/* protect zk_notify_blocked */ -static pthread_mutex_t blocked_lock = PTHREAD_MUTEX_INITIALIZER; +/* leave event circular array */ +static struct zk_event zk_levents[SD_MAX_NODES]; +static int nr_zk_levents; +static unsigned zk_levent_head; +static unsigned zk_levent_tail; -/* protect leave event list */ -static pthread_mutex_t leave_lock = PTHREAD_MUTEX_INITIALIZER; +static struct zk_node zk_nodes[SD_MAX_NODES]; +static size_t nr_zk_nodes; /* zookeeper API wrapper */ inline ZOOAPI int zk_create(zhandle_t *zh, const char *path, const char *value, @@ -284,21 +280,47 @@ static int zk_queue_push_back(zhandle_t *zh, struct zk_event *ev) static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev) { int rc, len; + int nr_levents; char path[256]; struct zk_event *lev; eventfd_t value = 1; /* process leave event */ - if (!list_empty(&zk_levent_list)) { - dprintf("found a leave event.\n"); + if (__sync_add_and_fetch(&nr_zk_levents, 0)) { + nr_levents = __sync_sub_and_fetch(&nr_zk_levents, 1) + 1; + dprintf("nr_zk_levents:%d, head:%u\n", nr_levents, zk_levent_head); - pthread_mutex_lock(&leave_lock); - lev = list_first_entry(&zk_levent_list, typeof(*lev), list); - list_del(&lev->list); - pthread_mutex_unlock(&leave_lock); + lev = &zk_levents[zk_levent_head%SD_MAX_NODES]; + + /* if the node pointed to by queue_pos was send by this leaver, + * and it have blocked whole cluster, we should ignore it. */ + len = sizeof(*ev); + sprintf(path, QUEUE_ZNODE "/%010d", queue_pos); + rc = zk_get(zh, path, 1, (char *)ev, &len, NULL); + if (rc == ZOK && node_cmp(&ev->sender.node, &lev->sender.node) == 0 && ev->blocked) { + dprintf("this queue_pos:%d have blocked whole cluster, ignore it\n", queue_pos); + queue_pos++; + + /* watch next data */ + sprintf(path, QUEUE_ZNODE "/%010d", queue_pos); + rc = zk_exists(zh, path, 1, NULL); + dprintf("watch path:%s, exists:%d\n", path, (rc==ZOK)); + if (rc == ZOK) { + /* we lost this message, manual notify */ + dprintf("write event to efd:%d\n", efd); + eventfd_write(efd, value); + } + } memcpy(ev, lev, sizeof(*ev)); - free(lev); + zk_levent_head++; + + if (__sync_add_and_fetch(&nr_zk_levents, 0) || rc==ZOK) { + /* we have pending leave events + * or queue nodes, manual notify */ + dprintf("write event to efd:%d\n", efd); + eventfd_write(efd, value); + } return 0; } @@ -407,11 +429,6 @@ static void sort_zk_nodes(struct zk_node *znodes, size_t nr_nodes) i, znodes[idxs[i].idx].seq, node_to_str(&N[i].node)); } memcpy(zk_nodes, N, nr_nodes*sizeof(*zk_nodes)); - - for (i=0; i<nr_nodes; i++) { - dprintf("zk_nodes[%d], seq:%d, value:%s\n", - i, znodes[i].seq, node_to_str(&zk_nodes[i].node)); - } } static void build_node_list(struct zk_node *znodes, size_t nr_nodes, @@ -520,6 +537,7 @@ static int add_event(zhandle_t *zh, enum zk_event_type type, struct zk_node *znode, void *buf, size_t buf_len, void (*block_cb)(void *arg)) { + int nr_levents; struct zk_event ev, *lev; eventfd_t value = 1; @@ -536,16 +554,14 @@ static int add_event(zhandle_t *zh, enum zk_event_type type, ev.blocked = 1; break; case EVENT_LEAVE: - lev = (struct zk_event *)malloc(sizeof(*lev)); - if (lev == NULL){ - panic("failed to create LEAVE event, oom.\n"); - } + lev = &zk_levents[zk_levent_tail%SD_MAX_NODES]; memcpy(lev, &ev, sizeof(ev)); - pthread_mutex_lock(&leave_lock); - list_add_tail(&lev->list, &zk_levent_list); - pthread_mutex_unlock(&leave_lock); + nr_levents = __sync_add_and_fetch(&nr_zk_levents, 1); + dprintf("nr_zk_levents:%d, tail:%u\n", nr_levents, zk_levent_tail); + + zk_levent_tail++; /* manual notify */ dprintf("write event to efd:%d\n", efd); @@ -743,6 +759,7 @@ static void zk_block(struct work *work) { int rc; struct zk_event ev; + eventfd_t value = 1; rc = zk_queue_pop(zhandle, &ev); assert(rc == 0); @@ -752,11 +769,11 @@ static void zk_block(struct work *work) zk_queue_push_back(zhandle, &ev); - pthread_mutex_lock(&blocked_lock); - dprintf("blocked_lock\n"); - zk_notify_blocked = 0; - pthread_mutex_unlock(&blocked_lock); - dprintf("blocked_unlock\n"); + __sync_sub_and_fetch(&zk_notify_blocked, 1); + + /* this notify is necessary */ + dprintf("write event to efd:%d\n", efd); + eventfd_write(efd, value); } static void zk_block_done(struct work *work) @@ -765,7 +782,7 @@ static void zk_block_done(struct work *work) static int zk_dispatch(void) { - int i, ret, rc, len, idx; + int ret, rc, len, idx; char path[256]; eventfd_t value; struct zk_event ev; @@ -782,15 +799,9 @@ static int zk_dispatch(void) if (ret < 0) return 0; - pthread_mutex_lock(&blocked_lock); - dprintf("blocked_lock\n"); - if (zk_notify_blocked) { - pthread_mutex_unlock(&blocked_lock); - dprintf("blocked_unlock, return.\n"); + if (__sync_add_and_fetch(&zk_notify_blocked, 0)) { return 0; } - pthread_mutex_unlock(&blocked_lock); - dprintf("blocked_unlock, process...\n"); ret = zk_queue_pop(zhandle, &ev); if (ret < 0) @@ -861,10 +872,9 @@ static int zk_dispatch(void) case EVENT_LEAVE: dprintf("LEAVE EVENT, blocked:%d\n", ev.blocked); /*reset master if necessary */ - dprintf("find node:%s\n", node_to_str(&ev.sender.node)); n = find_node(zk_nodes, nr_zk_nodes, &ev.sender); if (!n) { - dprintf("can't find this leave node, ignore it.\n"); + dprintf("can't find this leave node:%s, ignore it.\n", node_to_str(&ev.sender.node)); goto out; } @@ -873,10 +883,6 @@ static int zk_dispatch(void) memmove(n, n + 1, sizeof(*n) * (nr_zk_nodes - idx)); dprintf("one sheep left, nr_nodes:%ld, idx:%d\n", nr_zk_nodes, idx); - for (i=0; i<nr_zk_nodes; i++) { - dprintf("zk_nodes[%d], seq:%d, value:%s\n", - i, zk_nodes[i].seq, node_to_str(&zk_nodes[i].node)); - } build_node_list(zk_nodes, nr_zk_nodes, entries); sd_leave_handler(&ev.sender.node, entries, nr_zk_nodes); @@ -886,7 +892,8 @@ static int zk_dispatch(void) if (ev.blocked) { if (node_cmp(&ev.sender.node, &this_node.node) == 0 && !ev.callbacked) { ev.callbacked = 1; - zk_notify_blocked = 1; + + __sync_add_and_fetch(&zk_notify_blocked, 1); zk_queue_push_back(zhandle, &ev); -- 1.7.7.6 |