There were two pthread mutex locks in zookeeper dirver: 1. pthread mutex lock in zk_dispatch/zk_lock, this lock was used to prevent zk_lock poping data before zk_dispatch have pushed it. 2. pthread mutex lock in zk_queue_pop/add_event, this lock was used to protected leave event list. This patch use GNU atomic builtins(__sync_xxx) to replace these two pthread mutex locks. It makes zookeeper driver more lighter and and faster. Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com> --- sheep/cluster/zookeeper.c | 84 ++++++++++++++++++-------------------------- 1 files changed, 34 insertions(+), 50 deletions(-) diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c index 527c6e4..49a8ade 100644 --- a/sheep/cluster/zookeeper.c +++ b/sheep/cluster/zookeeper.c @@ -82,25 +82,21 @@ struct zk_event { int blocked; /* set non-zero when sheep must block this event */ int callbacked; /* set non-zero if sheep already called block_cb() */ - struct list_head list; /* only used for leave event */ - size_t buf_len; uint8_t buf[MAX_EVENT_BUF_SIZE]; }; -/* leave event list */ -static LIST_HEAD(zk_levent_list); - -static int zk_notify_blocked; -static struct zk_node zk_nodes[SD_MAX_NODES]; -static size_t nr_zk_nodes; static char *zk_option; +static int zk_notify_blocked; -/* protect zk_notify_blocked */ -static pthread_mutex_t blocked_lock = PTHREAD_MUTEX_INITIALIZER; +/* leave event circular array */ +static struct zk_event zk_levents[SD_MAX_NODES]; +static int nr_zk_levents; +static unsigned zk_levent_head; +static unsigned zk_levent_tail; -/* protect leave event list */ -static pthread_mutex_t leave_lock = PTHREAD_MUTEX_INITIALIZER; +static struct zk_node zk_nodes[SD_MAX_NODES]; +static size_t nr_zk_nodes; /* zookeeper API wrapper */ inline ZOOAPI int zk_create(zhandle_t *zh, const char *path, const char *value, @@ -284,13 +280,11 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev) eventfd_t value = 1; /* process leave event */ - if (!list_empty(&zk_levent_list)) { + if (__sync_add_and_fetch(&nr_zk_levents, 0)) { dprintf("found a leave event.\n"); + dprintf("nr_zk_levents:%d, head:%u\n", __sync_sub_and_fetch(&nr_zk_levents, 1) + 1, zk_levent_head); - pthread_mutex_lock(&leave_lock); - lev = list_first_entry(&zk_levent_list, typeof(*lev), list); - list_del(&lev->list); - pthread_mutex_unlock(&leave_lock); + lev = &zk_levents[zk_levent_head%SD_MAX_NODES]; /* if the node pointed to by queue_pos was send by this leaver, * and it have blocked whole cluster, we should ignore it. */ @@ -313,7 +307,14 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev) } memcpy(ev, lev, sizeof(*ev)); - free(lev); + zk_levent_head++; + + if (__sync_add_and_fetch(&nr_zk_levents, 0) || rc==ZOK) { + /* we have pending leave events + * or queue nodes, manual notify */ + dprintf("write event to efd:%d\n", efd); + eventfd_write(efd, value); + } return 0; } @@ -422,11 +423,6 @@ static void sort_zk_nodes(struct zk_node *znodes, size_t nr_nodes) i, znodes[idxs[i].idx].seq, node_to_str(&N[i].node)); } memcpy(zk_nodes, N, nr_nodes*sizeof(*zk_nodes)); - - for (i=0; i<nr_nodes; i++) { - dprintf("zk_nodes[%d], seq:%d, value:%s\n", - i, znodes[i].seq, node_to_str(&zk_nodes[i].node)); - } } static void build_node_list(struct zk_node *znodes, size_t nr_nodes, @@ -555,16 +551,13 @@ static int add_event(zhandle_t *zh, enum zk_event_type type, ev.blocked = 1; break; case EVENT_LEAVE: - lev = (struct zk_event *)malloc(sizeof(*lev)); - if (lev == NULL){ - panic("failed to create LEAVE event, oom.\n"); - } + lev = &zk_levents[zk_levent_tail%SD_MAX_NODES]; memcpy(lev, &ev, sizeof(ev)); - pthread_mutex_lock(&leave_lock); - list_add_tail(&lev->list, &zk_levent_list); - pthread_mutex_unlock(&leave_lock); + dprintf("nr_zk_levents:%d, tail:%u\n", __sync_add_and_fetch(&nr_zk_levents, 1), zk_levent_tail); + + zk_levent_tail++; /* manual notify */ dprintf("write event to efd:%d\n", efd); @@ -787,6 +780,7 @@ static void zk_block(struct work *work) { int rc; struct zk_event ev; + eventfd_t value = 1; rc = zk_queue_pop(zhandle, &ev); assert(rc == 0); @@ -796,11 +790,11 @@ static void zk_block(struct work *work) zk_queue_push_back(zhandle, &ev); - pthread_mutex_lock(&blocked_lock); - dprintf("blocked_lock\n"); - zk_notify_blocked = 0; - pthread_mutex_unlock(&blocked_lock); - dprintf("blocked_unlock\n"); + __sync_sub_and_fetch(&zk_notify_blocked, 1); + + /* this notify is necessary */ + dprintf("write event to efd:%d\n", efd); + eventfd_write(efd, value); } static void zk_block_done(struct work *work) @@ -809,7 +803,7 @@ static void zk_block_done(struct work *work) static int zk_dispatch(void) { - int i, ret, rc, len, idx; + int ret, rc, len, idx; char path[256]; eventfd_t value; struct zk_event ev; @@ -826,15 +820,9 @@ static int zk_dispatch(void) if (ret < 0) return 0; - pthread_mutex_lock(&blocked_lock); - dprintf("blocked_lock\n"); - if (zk_notify_blocked) { - pthread_mutex_unlock(&blocked_lock); - dprintf("blocked_unlock, return.\n"); + if (__sync_add_and_fetch(&zk_notify_blocked, 0)) { return 0; } - pthread_mutex_unlock(&blocked_lock); - dprintf("blocked_unlock, process...\n"); ret = zk_queue_pop(zhandle, &ev); if (ret < 0) @@ -905,10 +893,9 @@ static int zk_dispatch(void) case EVENT_LEAVE: dprintf("LEAVE EVENT, blocked:%d\n", ev.blocked); /*reset master if necessary */ - dprintf("find node:%s\n", node_to_str(&ev.sender.node)); n = find_node(zk_nodes, nr_zk_nodes, &ev.sender); if (!n) { - dprintf("can't find this leave node, ignore it.\n"); + dprintf("can't find this leave node:%s, ignore it.\n", node_to_str(&ev.sender.node)); goto out; } @@ -917,10 +904,6 @@ static int zk_dispatch(void) memmove(n, n + 1, sizeof(*n) * (nr_zk_nodes - idx)); dprintf("one sheep left, nr_nodes:%ld, idx:%d\n", nr_zk_nodes, idx); - for (i=0; i<nr_zk_nodes; i++) { - dprintf("zk_nodes[%d], seq:%d, value:%s\n", - i, zk_nodes[i].seq, node_to_str(&zk_nodes[i].node)); - } build_node_list(zk_nodes, nr_zk_nodes, entries); zk_hdlrs.leave_handler(&ev.sender.node, entries, nr_zk_nodes); @@ -930,7 +913,8 @@ static int zk_dispatch(void) if (ev.blocked) { if (node_cmp(&ev.sender.node, &this_node.node) == 0 && !ev.callbacked) { ev.callbacked = 1; - zk_notify_blocked = 1; + + __sync_add_and_fetch(&zk_notify_blocked, 1); zk_queue_push_back(zhandle, &ev); -- 1.7.7.6 |