[Sheepdog] [zookeeper][PATCH 6/6] Use atomic builtins to replace pthread mutex locks
Yunkai Zhang
yunkai.me at gmail.com
Thu Apr 5 12:23:06 CEST 2012
There were two pthread mutex locks in zookeeper dirver:
1. pthread mutex lock in zk_dispatch/zk_lock, this lock was used to
prevent zk_lock poping data before zk_dispatch have pushed it.
2. pthread mutex lock in zk_queue_pop/add_event, this lock was used to
protected leave event list.
This patch use GNU atomic builtins(__sync_xxx) to replace these two
pthread mutex locks. It makes zookeeper driver more lighter and
and faster.
Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com>
---
sheep/cluster/zookeeper.c | 84 ++++++++++++++++++--------------------------
1 files changed, 34 insertions(+), 50 deletions(-)
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 527c6e4..49a8ade 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -82,25 +82,21 @@ struct zk_event {
int blocked; /* set non-zero when sheep must block this event */
int callbacked; /* set non-zero if sheep already called block_cb() */
- struct list_head list; /* only used for leave event */
-
size_t buf_len;
uint8_t buf[MAX_EVENT_BUF_SIZE];
};
-/* leave event list */
-static LIST_HEAD(zk_levent_list);
-
-static int zk_notify_blocked;
-static struct zk_node zk_nodes[SD_MAX_NODES];
-static size_t nr_zk_nodes;
static char *zk_option;
+static int zk_notify_blocked;
-/* protect zk_notify_blocked */
-static pthread_mutex_t blocked_lock = PTHREAD_MUTEX_INITIALIZER;
+/* leave event circular array */
+static struct zk_event zk_levents[SD_MAX_NODES];
+static int nr_zk_levents;
+static unsigned zk_levent_head;
+static unsigned zk_levent_tail;
-/* protect leave event list */
-static pthread_mutex_t leave_lock = PTHREAD_MUTEX_INITIALIZER;
+static struct zk_node zk_nodes[SD_MAX_NODES];
+static size_t nr_zk_nodes;
/* zookeeper API wrapper */
inline ZOOAPI int zk_create(zhandle_t *zh, const char *path, const char *value,
@@ -284,13 +280,11 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
eventfd_t value = 1;
/* process leave event */
- if (!list_empty(&zk_levent_list)) {
+ if (__sync_add_and_fetch(&nr_zk_levents, 0)) {
dprintf("found a leave event.\n");
+ dprintf("nr_zk_levents:%d, head:%u\n", __sync_sub_and_fetch(&nr_zk_levents, 1) + 1, zk_levent_head);
- pthread_mutex_lock(&leave_lock);
- lev = list_first_entry(&zk_levent_list, typeof(*lev), list);
- list_del(&lev->list);
- pthread_mutex_unlock(&leave_lock);
+ lev = &zk_levents[zk_levent_head%SD_MAX_NODES];
/* if the node pointed to by queue_pos was send by this leaver,
* and it have blocked whole cluster, we should ignore it. */
@@ -313,7 +307,14 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
}
memcpy(ev, lev, sizeof(*ev));
- free(lev);
+ zk_levent_head++;
+
+ if (__sync_add_and_fetch(&nr_zk_levents, 0) || rc==ZOK) {
+ /* we have pending leave events
+ * or queue nodes, manual notify */
+ dprintf("write event to efd:%d\n", efd);
+ eventfd_write(efd, value);
+ }
return 0;
}
@@ -422,11 +423,6 @@ static void sort_zk_nodes(struct zk_node *znodes, size_t nr_nodes)
i, znodes[idxs[i].idx].seq, node_to_str(&N[i].node));
}
memcpy(zk_nodes, N, nr_nodes*sizeof(*zk_nodes));
-
- for (i=0; i<nr_nodes; i++) {
- dprintf("zk_nodes[%d], seq:%d, value:%s\n",
- i, znodes[i].seq, node_to_str(&zk_nodes[i].node));
- }
}
static void build_node_list(struct zk_node *znodes, size_t nr_nodes,
@@ -555,16 +551,13 @@ static int add_event(zhandle_t *zh, enum zk_event_type type,
ev.blocked = 1;
break;
case EVENT_LEAVE:
- lev = (struct zk_event *)malloc(sizeof(*lev));
- if (lev == NULL){
- panic("failed to create LEAVE event, oom.\n");
- }
+ lev = &zk_levents[zk_levent_tail%SD_MAX_NODES];
memcpy(lev, &ev, sizeof(ev));
- pthread_mutex_lock(&leave_lock);
- list_add_tail(&lev->list, &zk_levent_list);
- pthread_mutex_unlock(&leave_lock);
+ dprintf("nr_zk_levents:%d, tail:%u\n", __sync_add_and_fetch(&nr_zk_levents, 1), zk_levent_tail);
+
+ zk_levent_tail++;
/* manual notify */
dprintf("write event to efd:%d\n", efd);
@@ -787,6 +780,7 @@ static void zk_block(struct work *work)
{
int rc;
struct zk_event ev;
+ eventfd_t value = 1;
rc = zk_queue_pop(zhandle, &ev);
assert(rc == 0);
@@ -796,11 +790,11 @@ static void zk_block(struct work *work)
zk_queue_push_back(zhandle, &ev);
- pthread_mutex_lock(&blocked_lock);
- dprintf("blocked_lock\n");
- zk_notify_blocked = 0;
- pthread_mutex_unlock(&blocked_lock);
- dprintf("blocked_unlock\n");
+ __sync_sub_and_fetch(&zk_notify_blocked, 1);
+
+ /* this notify is necessary */
+ dprintf("write event to efd:%d\n", efd);
+ eventfd_write(efd, value);
}
static void zk_block_done(struct work *work)
@@ -809,7 +803,7 @@ static void zk_block_done(struct work *work)
static int zk_dispatch(void)
{
- int i, ret, rc, len, idx;
+ int ret, rc, len, idx;
char path[256];
eventfd_t value;
struct zk_event ev;
@@ -826,15 +820,9 @@ static int zk_dispatch(void)
if (ret < 0)
return 0;
- pthread_mutex_lock(&blocked_lock);
- dprintf("blocked_lock\n");
- if (zk_notify_blocked) {
- pthread_mutex_unlock(&blocked_lock);
- dprintf("blocked_unlock, return.\n");
+ if (__sync_add_and_fetch(&zk_notify_blocked, 0)) {
return 0;
}
- pthread_mutex_unlock(&blocked_lock);
- dprintf("blocked_unlock, process...\n");
ret = zk_queue_pop(zhandle, &ev);
if (ret < 0)
@@ -905,10 +893,9 @@ static int zk_dispatch(void)
case EVENT_LEAVE:
dprintf("LEAVE EVENT, blocked:%d\n", ev.blocked);
/*reset master if necessary */
- dprintf("find node:%s\n", node_to_str(&ev.sender.node));
n = find_node(zk_nodes, nr_zk_nodes, &ev.sender);
if (!n) {
- dprintf("can't find this leave node, ignore it.\n");
+ dprintf("can't find this leave node:%s, ignore it.\n", node_to_str(&ev.sender.node));
goto out;
}
@@ -917,10 +904,6 @@ static int zk_dispatch(void)
memmove(n, n + 1, sizeof(*n) * (nr_zk_nodes - idx));
dprintf("one sheep left, nr_nodes:%ld, idx:%d\n", nr_zk_nodes, idx);
- for (i=0; i<nr_zk_nodes; i++) {
- dprintf("zk_nodes[%d], seq:%d, value:%s\n",
- i, zk_nodes[i].seq, node_to_str(&zk_nodes[i].node));
- }
build_node_list(zk_nodes, nr_zk_nodes, entries);
zk_hdlrs.leave_handler(&ev.sender.node, entries, nr_zk_nodes);
@@ -930,7 +913,8 @@ static int zk_dispatch(void)
if (ev.blocked) {
if (node_cmp(&ev.sender.node, &this_node.node) == 0 && !ev.callbacked) {
ev.callbacked = 1;
- zk_notify_blocked = 1;
+
+ __sync_add_and_fetch(&zk_notify_blocked, 1);
zk_queue_push_back(zhandle, &ev);
--
1.7.7.6
More information about the sheepdog
mailing list