[Sheepdog] [zookeeper][PATCH v2 05/11] Use atomic builtins to replace pthread mutex locks
Yunkai Zhang
yunkai.me at gmail.com
Thu Apr 26 17:21:24 CEST 2012
From: Yunkai Zhang <qiushu.zyk at taobao.com>
There were two pthread mutex locks in zookeeper dirver:
1. pthread mutex lock in zk_dispatch/zk_lock, this lock was used to
prevent zk_lock poping data before zk_dispatch have pushed it.
2. pthread mutex lock in zk_queue_pop/add_event, this lock was used to
protected leave event list.
This patch use GNU atomic builtins(__sync_xxx) to replace these two
pthread mutex locks. It makes zookeeper driver more lighter and
and faster.
Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com>
---
sheep/cluster/zookeeper.c | 107 ++++++++++++++++++++++++---------------------
1 files changed, 57 insertions(+), 50 deletions(-)
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 2d7dd96..9d88f1d 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -82,24 +82,20 @@ struct zk_event {
int blocked; /* set non-zero when sheep must block this event */
int callbacked; /* set non-zero if sheep already called block_cb() */
- struct list_head list; /* only used for leave event */
-
size_t buf_len;
uint8_t buf[MAX_EVENT_BUF_SIZE];
};
-/* leave event list */
-static LIST_HEAD(zk_levent_list);
-
static int zk_notify_blocked;
-static struct zk_node zk_nodes[SD_MAX_NODES];
-static size_t nr_zk_nodes;
-/* protect zk_notify_blocked */
-static pthread_mutex_t blocked_lock = PTHREAD_MUTEX_INITIALIZER;
+/* leave event circular array */
+static struct zk_event zk_levents[SD_MAX_NODES];
+static int nr_zk_levents;
+static unsigned zk_levent_head;
+static unsigned zk_levent_tail;
-/* protect leave event list */
-static pthread_mutex_t leave_lock = PTHREAD_MUTEX_INITIALIZER;
+static struct zk_node zk_nodes[SD_MAX_NODES];
+static size_t nr_zk_nodes;
/* zookeeper API wrapper */
inline ZOOAPI int zk_create(zhandle_t *zh, const char *path, const char *value,
@@ -284,21 +280,47 @@ static int zk_queue_push_back(zhandle_t *zh, struct zk_event *ev)
static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
{
int rc, len;
+ int nr_levents;
char path[256];
struct zk_event *lev;
eventfd_t value = 1;
/* process leave event */
- if (!list_empty(&zk_levent_list)) {
- dprintf("found a leave event.\n");
+ if (__sync_add_and_fetch(&nr_zk_levents, 0)) {
+ nr_levents = __sync_sub_and_fetch(&nr_zk_levents, 1) + 1;
+ dprintf("nr_zk_levents:%d, head:%u\n", nr_levents, zk_levent_head);
- pthread_mutex_lock(&leave_lock);
- lev = list_first_entry(&zk_levent_list, typeof(*lev), list);
- list_del(&lev->list);
- pthread_mutex_unlock(&leave_lock);
+ lev = &zk_levents[zk_levent_head%SD_MAX_NODES];
+
+ /* if the node pointed to by queue_pos was send by this leaver,
+ * and it have blocked whole cluster, we should ignore it. */
+ len = sizeof(*ev);
+ sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
+ rc = zk_get(zh, path, 1, (char *)ev, &len, NULL);
+ if (rc == ZOK && node_cmp(&ev->sender.node, &lev->sender.node) == 0 && ev->blocked) {
+ dprintf("this queue_pos:%d have blocked whole cluster, ignore it\n", queue_pos);
+ queue_pos++;
+
+ /* watch next data */
+ sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
+ rc = zk_exists(zh, path, 1, NULL);
+ dprintf("watch path:%s, exists:%d\n", path, (rc==ZOK));
+ if (rc == ZOK) {
+ /* we lost this message, manual notify */
+ dprintf("write event to efd:%d\n", efd);
+ eventfd_write(efd, value);
+ }
+ }
memcpy(ev, lev, sizeof(*ev));
- free(lev);
+ zk_levent_head++;
+
+ if (__sync_add_and_fetch(&nr_zk_levents, 0) || rc==ZOK) {
+ /* we have pending leave events
+ * or queue nodes, manual notify */
+ dprintf("write event to efd:%d\n", efd);
+ eventfd_write(efd, value);
+ }
return 0;
}
@@ -407,11 +429,6 @@ static void sort_zk_nodes(struct zk_node *znodes, size_t nr_nodes)
i, znodes[idxs[i].idx].seq, node_to_str(&N[i].node));
}
memcpy(zk_nodes, N, nr_nodes*sizeof(*zk_nodes));
-
- for (i=0; i<nr_nodes; i++) {
- dprintf("zk_nodes[%d], seq:%d, value:%s\n",
- i, znodes[i].seq, node_to_str(&zk_nodes[i].node));
- }
}
static void build_node_list(struct zk_node *znodes, size_t nr_nodes,
@@ -520,6 +537,7 @@ static int add_event(zhandle_t *zh, enum zk_event_type type,
struct zk_node *znode, void *buf,
size_t buf_len, void (*block_cb)(void *arg))
{
+ int nr_levents;
struct zk_event ev, *lev;
eventfd_t value = 1;
@@ -536,16 +554,14 @@ static int add_event(zhandle_t *zh, enum zk_event_type type,
ev.blocked = 1;
break;
case EVENT_LEAVE:
- lev = (struct zk_event *)malloc(sizeof(*lev));
- if (lev == NULL){
- panic("failed to create LEAVE event, oom.\n");
- }
+ lev = &zk_levents[zk_levent_tail%SD_MAX_NODES];
memcpy(lev, &ev, sizeof(ev));
- pthread_mutex_lock(&leave_lock);
- list_add_tail(&lev->list, &zk_levent_list);
- pthread_mutex_unlock(&leave_lock);
+ nr_levents = __sync_add_and_fetch(&nr_zk_levents, 1);
+ dprintf("nr_zk_levents:%d, tail:%u\n", nr_levents, zk_levent_tail);
+
+ zk_levent_tail++;
/* manual notify */
dprintf("write event to efd:%d\n", efd);
@@ -743,6 +759,7 @@ static void zk_block(struct work *work)
{
int rc;
struct zk_event ev;
+ eventfd_t value = 1;
rc = zk_queue_pop(zhandle, &ev);
assert(rc == 0);
@@ -752,11 +769,11 @@ static void zk_block(struct work *work)
zk_queue_push_back(zhandle, &ev);
- pthread_mutex_lock(&blocked_lock);
- dprintf("blocked_lock\n");
- zk_notify_blocked = 0;
- pthread_mutex_unlock(&blocked_lock);
- dprintf("blocked_unlock\n");
+ __sync_sub_and_fetch(&zk_notify_blocked, 1);
+
+ /* this notify is necessary */
+ dprintf("write event to efd:%d\n", efd);
+ eventfd_write(efd, value);
}
static void zk_block_done(struct work *work)
@@ -765,7 +782,7 @@ static void zk_block_done(struct work *work)
static int zk_dispatch(void)
{
- int i, ret, rc, len, idx;
+ int ret, rc, len, idx;
char path[256];
eventfd_t value;
struct zk_event ev;
@@ -782,15 +799,9 @@ static int zk_dispatch(void)
if (ret < 0)
return 0;
- pthread_mutex_lock(&blocked_lock);
- dprintf("blocked_lock\n");
- if (zk_notify_blocked) {
- pthread_mutex_unlock(&blocked_lock);
- dprintf("blocked_unlock, return.\n");
+ if (__sync_add_and_fetch(&zk_notify_blocked, 0)) {
return 0;
}
- pthread_mutex_unlock(&blocked_lock);
- dprintf("blocked_unlock, process...\n");
ret = zk_queue_pop(zhandle, &ev);
if (ret < 0)
@@ -861,10 +872,9 @@ static int zk_dispatch(void)
case EVENT_LEAVE:
dprintf("LEAVE EVENT, blocked:%d\n", ev.blocked);
/*reset master if necessary */
- dprintf("find node:%s\n", node_to_str(&ev.sender.node));
n = find_node(zk_nodes, nr_zk_nodes, &ev.sender);
if (!n) {
- dprintf("can't find this leave node, ignore it.\n");
+ dprintf("can't find this leave node:%s, ignore it.\n", node_to_str(&ev.sender.node));
goto out;
}
@@ -873,10 +883,6 @@ static int zk_dispatch(void)
memmove(n, n + 1, sizeof(*n) * (nr_zk_nodes - idx));
dprintf("one sheep left, nr_nodes:%ld, idx:%d\n", nr_zk_nodes, idx);
- for (i=0; i<nr_zk_nodes; i++) {
- dprintf("zk_nodes[%d], seq:%d, value:%s\n",
- i, zk_nodes[i].seq, node_to_str(&zk_nodes[i].node));
- }
build_node_list(zk_nodes, nr_zk_nodes, entries);
sd_leave_handler(&ev.sender.node, entries, nr_zk_nodes);
@@ -886,7 +892,8 @@ static int zk_dispatch(void)
if (ev.blocked) {
if (node_cmp(&ev.sender.node, &this_node.node) == 0 && !ev.callbacked) {
ev.callbacked = 1;
- zk_notify_blocked = 1;
+
+ __sync_add_and_fetch(&zk_notify_blocked, 1);
zk_queue_push_back(zhandle, &ev);
--
1.7.7.6
More information about the sheepdog
mailing list