[Sheepdog] [zookeeper][PATCH v2 05/11] Use atomic builtins to replace pthread mutex locks

Yunkai Zhang yunkai.me at gmail.com
Thu Apr 26 17:21:24 CEST 2012


From: Yunkai Zhang <qiushu.zyk at taobao.com>

There were two pthread mutex locks in zookeeper dirver:
1. pthread mutex lock in zk_dispatch/zk_lock, this lock was used to
prevent zk_lock poping data before zk_dispatch have pushed it.
2. pthread mutex lock in zk_queue_pop/add_event, this lock was used to
protected leave event list.

This patch use GNU atomic builtins(__sync_xxx) to replace these two
pthread mutex locks. It makes zookeeper driver more lighter and
and faster.

Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com>
---
 sheep/cluster/zookeeper.c |  107 ++++++++++++++++++++++++---------------------
 1 files changed, 57 insertions(+), 50 deletions(-)

diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 2d7dd96..9d88f1d 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -82,24 +82,20 @@ struct zk_event {
 	int blocked; /* set non-zero when sheep must block this event */
 	int callbacked; /* set non-zero if sheep already called block_cb() */
 
-	struct list_head list; /* only used for leave event */
-
 	size_t buf_len;
 	uint8_t buf[MAX_EVENT_BUF_SIZE];
 };
 
-/* leave event list */
-static LIST_HEAD(zk_levent_list);
-
 static int zk_notify_blocked;
-static struct zk_node zk_nodes[SD_MAX_NODES];
-static size_t nr_zk_nodes;
 
-/* protect zk_notify_blocked */
-static pthread_mutex_t blocked_lock = PTHREAD_MUTEX_INITIALIZER;
+/* leave event circular array */
+static struct zk_event zk_levents[SD_MAX_NODES];
+static int nr_zk_levents;
+static unsigned zk_levent_head;
+static unsigned zk_levent_tail;
 
-/* protect leave event list */
-static pthread_mutex_t leave_lock = PTHREAD_MUTEX_INITIALIZER;
+static struct zk_node zk_nodes[SD_MAX_NODES];
+static size_t nr_zk_nodes;
 
 /* zookeeper API wrapper */
 inline ZOOAPI int zk_create(zhandle_t *zh, const char *path, const char *value,
@@ -284,21 +280,47 @@ static int zk_queue_push_back(zhandle_t *zh, struct zk_event *ev)
 static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
 {
 	int rc, len;
+	int nr_levents;
 	char path[256];
 	struct zk_event *lev;
 	eventfd_t value = 1;
 
 	/* process leave event */
-	if (!list_empty(&zk_levent_list)) {
-		dprintf("found a leave event.\n");
+	if (__sync_add_and_fetch(&nr_zk_levents, 0)) {
+		nr_levents = __sync_sub_and_fetch(&nr_zk_levents, 1) + 1;
+		dprintf("nr_zk_levents:%d, head:%u\n", nr_levents, zk_levent_head);
 
-		pthread_mutex_lock(&leave_lock);
-		lev = list_first_entry(&zk_levent_list, typeof(*lev), list);
-		list_del(&lev->list);
-		pthread_mutex_unlock(&leave_lock);
+		lev = &zk_levents[zk_levent_head%SD_MAX_NODES];
+
+		/* if the node pointed to by queue_pos was send by this leaver,
+		 * and it have blocked whole cluster, we should ignore it. */
+		len = sizeof(*ev);
+		sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
+		rc = zk_get(zh, path, 1, (char *)ev, &len, NULL);
+		if (rc == ZOK && node_cmp(&ev->sender.node, &lev->sender.node) == 0 && ev->blocked) {
+			dprintf("this queue_pos:%d have blocked whole cluster, ignore it\n", queue_pos);
+			queue_pos++;
+
+			/* watch next data */
+			sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
+			rc = zk_exists(zh, path, 1, NULL);
+			dprintf("watch path:%s, exists:%d\n", path, (rc==ZOK));
+			if (rc == ZOK) {
+				/* we lost this message, manual notify */
+				dprintf("write event to efd:%d\n", efd);
+				eventfd_write(efd, value);
+			}
+		}
 
 		memcpy(ev, lev, sizeof(*ev));
-		free(lev);
+		zk_levent_head++;
+
+		if (__sync_add_and_fetch(&nr_zk_levents, 0) || rc==ZOK) {
+			/* we have pending leave events
+			 * or queue nodes, manual notify */
+			dprintf("write event to efd:%d\n", efd);
+			eventfd_write(efd, value);
+		}
 
 		return 0;
 	}
@@ -407,11 +429,6 @@ static void sort_zk_nodes(struct zk_node *znodes, size_t nr_nodes)
 			i, znodes[idxs[i].idx].seq, node_to_str(&N[i].node));
 	}
 	memcpy(zk_nodes, N, nr_nodes*sizeof(*zk_nodes));
-
-	for (i=0; i<nr_nodes; i++) {
-		dprintf("zk_nodes[%d], seq:%d, value:%s\n",
-			i, znodes[i].seq, node_to_str(&zk_nodes[i].node));
-	}
 }
 
 static void build_node_list(struct zk_node *znodes, size_t nr_nodes,
@@ -520,6 +537,7 @@ static int add_event(zhandle_t *zh, enum zk_event_type type,
 		     struct zk_node *znode, void *buf,
 		     size_t buf_len, void (*block_cb)(void *arg))
 {
+	int nr_levents;
 	struct zk_event ev, *lev;
 	eventfd_t value = 1;
 
@@ -536,16 +554,14 @@ static int add_event(zhandle_t *zh, enum zk_event_type type,
 		ev.blocked = 1;
 		break;
 	case EVENT_LEAVE:
-		lev = (struct zk_event *)malloc(sizeof(*lev));
-		if (lev == NULL){
-			panic("failed to create LEAVE event, oom.\n");
-		}
+		lev = &zk_levents[zk_levent_tail%SD_MAX_NODES];
 
 		memcpy(lev, &ev, sizeof(ev));
 
-		pthread_mutex_lock(&leave_lock);
-		list_add_tail(&lev->list, &zk_levent_list);
-		pthread_mutex_unlock(&leave_lock);
+		nr_levents = __sync_add_and_fetch(&nr_zk_levents, 1);
+		dprintf("nr_zk_levents:%d, tail:%u\n", nr_levents, zk_levent_tail);
+
+		zk_levent_tail++;
 
 		/* manual notify */
 		dprintf("write event to efd:%d\n", efd);
@@ -743,6 +759,7 @@ static void zk_block(struct work *work)
 {
 	int rc;
 	struct zk_event ev;
+	eventfd_t value = 1;
 
 	rc = zk_queue_pop(zhandle, &ev);
 	assert(rc == 0);
@@ -752,11 +769,11 @@ static void zk_block(struct work *work)
 
 	zk_queue_push_back(zhandle, &ev);
 
-	pthread_mutex_lock(&blocked_lock);
-	dprintf("blocked_lock\n");
-	zk_notify_blocked = 0;
-	pthread_mutex_unlock(&blocked_lock);
-	dprintf("blocked_unlock\n");
+	__sync_sub_and_fetch(&zk_notify_blocked, 1);
+
+	/* this notify is necessary */
+	dprintf("write event to efd:%d\n", efd);
+	eventfd_write(efd, value);
 }
 
 static void zk_block_done(struct work *work)
@@ -765,7 +782,7 @@ static void zk_block_done(struct work *work)
 
 static int zk_dispatch(void)
 {
-	int i, ret, rc, len, idx;
+	int ret, rc, len, idx;
 	char path[256];
 	eventfd_t value;
 	struct zk_event ev;
@@ -782,15 +799,9 @@ static int zk_dispatch(void)
 	if (ret < 0)
 		return 0;
 
-	pthread_mutex_lock(&blocked_lock);
-	dprintf("blocked_lock\n");
-	if (zk_notify_blocked) {
-		pthread_mutex_unlock(&blocked_lock);
-		dprintf("blocked_unlock, return.\n");
+	if (__sync_add_and_fetch(&zk_notify_blocked, 0)) {
 		return 0;
 	}
-	pthread_mutex_unlock(&blocked_lock);
-	dprintf("blocked_unlock, process...\n");
 
 	ret = zk_queue_pop(zhandle, &ev);
 	if (ret < 0)
@@ -861,10 +872,9 @@ static int zk_dispatch(void)
 	case EVENT_LEAVE:
 		dprintf("LEAVE EVENT, blocked:%d\n", ev.blocked);
 		/*reset master if necessary */
-		dprintf("find node:%s\n", node_to_str(&ev.sender.node));
 		n = find_node(zk_nodes, nr_zk_nodes, &ev.sender);
 		if (!n) {
-			dprintf("can't find this leave node, ignore it.\n");
+			dprintf("can't find this leave node:%s, ignore it.\n", node_to_str(&ev.sender.node));
 			goto out;
 		}
 
@@ -873,10 +883,6 @@ static int zk_dispatch(void)
 
 		memmove(n, n + 1, sizeof(*n) * (nr_zk_nodes - idx));
 		dprintf("one sheep left, nr_nodes:%ld, idx:%d\n", nr_zk_nodes, idx);
-		for (i=0; i<nr_zk_nodes; i++) {
-			dprintf("zk_nodes[%d], seq:%d, value:%s\n",
-				i, zk_nodes[i].seq, node_to_str(&zk_nodes[i].node));
-		}
 
 		build_node_list(zk_nodes, nr_zk_nodes, entries);
 		sd_leave_handler(&ev.sender.node, entries, nr_zk_nodes);
@@ -886,7 +892,8 @@ static int zk_dispatch(void)
 		if (ev.blocked) {
 			if (node_cmp(&ev.sender.node, &this_node.node) == 0 && !ev.callbacked) {
 				ev.callbacked = 1;
-				zk_notify_blocked = 1;
+
+				__sync_add_and_fetch(&zk_notify_blocked, 1);
 
 				zk_queue_push_back(zhandle, &ev);
 
-- 
1.7.7.6




More information about the sheepdog mailing list