[sheepdog] [PATCH v2 1/2] introduce uatomic_bool

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Fri Nov 2 06:23:18 CET 2012


This is a wrapper for complex atomic_cmpxchg.

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 include/util.h            |   23 +++++++++++++++++++++++
 sheep/cluster/zookeeper.c |   12 +++++++-----
 sheep/object_cache.c      |   18 +++++-------------
 sheep/sockfd_cache.c      |   14 ++++++--------
 sheepfs/volume.c          |    8 ++++----
 5 files changed, 45 insertions(+), 30 deletions(-)

diff --git a/include/util.h b/include/util.h
index afab903..5fb19c2 100644
--- a/include/util.h
+++ b/include/util.h
@@ -8,6 +8,7 @@
 #include <limits.h>
 #include <stdint.h>
 #include <unistd.h>
+#include <urcu/uatomic.h>
 
 #include "bitops.h"
 #include "list.h"
@@ -103,4 +104,26 @@ void set_trimmed_sectors(void *buf, uint64_t offset, uint32_t len,
 
 #endif	/* NDEBUG */
 
+/* urcu helpers */
+
+/* Boolean data type which can be accessed by multiple threads */
+typedef unsigned long uatomic_bool;
+
+static inline bool uatomic_is_true(uatomic_bool *val)
+{
+	return uatomic_read(val) == 1;
+}
+
+/* success if the old value is false */
+static inline bool uatomic_set_true(uatomic_bool *val)
+{
+	return uatomic_cmpxchg(val, 0, 1) == 0;
+}
+
+static inline void uatomic_set_false(uatomic_bool *val)
+{
+	assert(uatomic_is_true(val));
+	uatomic_set(val, 0);
+}
+
 #endif
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 388bebc..91ea608 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -64,7 +64,7 @@ struct zk_event {
 	uint8_t buf[SD_MAX_EVENT_BUF_SIZE];
 };
 
-static int zk_notify_blocked;
+static uatomic_bool zk_notify_blocked;
 
 /* leave event circular array */
 static struct zk_event zk_levents[SD_MAX_NODES];
@@ -286,7 +286,7 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
 		return 0;
 	}
 
-	if (!called_by_zk_unblock && uatomic_read(&zk_notify_blocked) > 0)
+	if (!called_by_zk_unblock && uatomic_is_true(&zk_notify_blocked))
 		return -1;
 
 	if (zk_queue_empty(zh))
@@ -639,7 +639,7 @@ static void zk_unblock(void *msg, size_t msg_len)
 
 	zk_queue_push_back(zhandle, &ev);
 
-	uatomic_dec(&zk_notify_blocked);
+	uatomic_set_false(&zk_notify_blocked);
 
 	/* this notify is necessary */
 	dprintf("write event to efd:%d\n", efd);
@@ -778,8 +778,10 @@ static void zk_handler(int listen_fd, int events, void *data)
 	case EVENT_BLOCK:
 		dprintf("BLOCK\n");
 		zk_queue_push_back(zhandle, NULL);
-		if (sd_block_handler(&ev.sender.node))
-			uatomic_inc(&zk_notify_blocked);
+		if (sd_block_handler(&ev.sender.node)) {
+			bool result = uatomic_set_true(&zk_notify_blocked);
+			assert(result);
+		}
 		break;
 	case EVENT_NOTIFY:
 		dprintf("NOTIFY\n");
diff --git a/sheep/object_cache.c b/sheep/object_cache.c
index ab6499d..d5b6264 100644
--- a/sheep/object_cache.c
+++ b/sheep/object_cache.c
@@ -49,7 +49,7 @@
 
 struct global_cache {
 	uint32_t cache_size;
-	int in_reclaim;
+	uatomic_bool in_reclaim;
 	struct cds_list_head cache_lru_list;
 };
 
@@ -90,15 +90,6 @@ static pthread_mutex_t hashtable_lock[HASH_SIZE] = {
 
 static struct hlist_head cache_hashtable[HASH_SIZE];
 
-/*
- * If the cache is already in reclaim, return 1, otherwise return 0
- * and set sys_cache.in_reclaim to 1
- */
-static inline int mark_cache_in_reclaim(void)
-{
-	return uatomic_cmpxchg(&sys_cache.in_reclaim, 0, 1);
-}
-
 static inline bool entry_is_dirty(const struct object_cache_entry *entry)
 {
 	return !!entry->bmap;
@@ -558,7 +549,7 @@ static void do_reclaim(struct work *work)
 
 static void reclaim_done(struct work *work)
 {
-	uatomic_set(&sys_cache.in_reclaim, 0);
+	uatomic_set_false(&sys_cache.in_reclaim);
 	free(work);
 }
 
@@ -626,7 +617,8 @@ void object_cache_try_to_reclaim(void)
 	if (uatomic_read(&sys_cache.cache_size) < sys->object_cache_size)
 		return;
 
-	if (mark_cache_in_reclaim())
+	if (!uatomic_set_true(&sys_cache.in_reclaim))
+		/* the cache is already in reclaim, */
 		return;
 
 	work = xzalloc(sizeof(struct work));
@@ -1236,7 +1228,7 @@ int object_cache_init(const char *p)
 
 	CDS_INIT_LIST_HEAD(&sys_cache.cache_lru_list);
 	uatomic_set(&sys_cache.cache_size, 0);
-	uatomic_set(&sys_cache.in_reclaim, 0);
+	sys_cache.in_reclaim = false;
 
 	ret = load_existing_cache();
 err:
diff --git a/sheep/sockfd_cache.c b/sheep/sockfd_cache.c
index 6e5e289..d8b3c63 100644
--- a/sheep/sockfd_cache.c
+++ b/sheep/sockfd_cache.c
@@ -65,7 +65,7 @@ static int fds_count = DEFAULT_FDS_COUNT;
 
 struct sockfd_cache_fd {
 	int fd;
-	uint8_t in_use;
+	uatomic_bool in_use;
 };
 
 struct sockfd_cache_entry {
@@ -128,7 +128,7 @@ static inline int get_free_slot(struct sockfd_cache_entry *entry)
 	int idx = -1, i;
 
 	for (i = 0; i < fds_count; i++) {
-		if (uatomic_cmpxchg(&entry->fds[i].in_use, 0, 1))
+		if (!uatomic_set_true(&entry->fds[i].in_use))
 			continue;
 		idx = i;
 		break;
@@ -165,7 +165,7 @@ static inline bool slots_all_free(struct sockfd_cache_entry *entry)
 {
 	int i;
 	for (i = 0; i < fds_count; i++)
-		if (uatomic_read(&entry->fds[i].in_use))
+		if (uatomic_is_true(&entry->fds[i].in_use))
 			return false;
 	return true;
 }
@@ -299,7 +299,7 @@ static void do_grow_fds(struct work *work)
 		entry->fds = xrealloc(entry->fds, new_size);
 		for (i = old_fds_count; i < new_fds_count; i++) {
 			entry->fds[i].fd = -1;
-			entry->fds[i].in_use = 0;
+			entry->fds[i].in_use = false;
 		}
 	}
 	pthread_rwlock_unlock(&sockfd_cache.lock);
@@ -354,7 +354,7 @@ static struct sockfd *sockfd_cache_get(const struct node_id *nid, char *name)
 	dprintf("create connection %s:%d idx %d\n", name, nid->port, idx);
 	fd = connect_to(name, nid->port);
 	if (fd < 0) {
-		uatomic_dec(&entry->fds[idx].in_use);
+		uatomic_set_false(&entry->fds[idx].in_use);
 		return NULL;
 	}
 	entry->fds[idx].fd = fd;
@@ -370,7 +370,6 @@ static void sockfd_cache_put(const struct node_id *nid, int idx)
 {
 	struct sockfd_cache_entry *entry;
 	char name[INET6_ADDRSTRLEN];
-	int refcnt;
 
 	addr_to_str(name, sizeof(name), nid->addr, 0);
 	dprintf("%s:%d idx %d\n", name, nid->port, idx);
@@ -380,8 +379,7 @@ static void sockfd_cache_put(const struct node_id *nid, int idx)
 	pthread_rwlock_unlock(&sockfd_cache.lock);
 
 	assert(entry);
-	refcnt = uatomic_cmpxchg(&entry->fds[idx].in_use, 1, 0);
-	assert(refcnt == 1);
+	uatomic_set_false(&entry->fds[idx].in_use);
 }
 
 /*
diff --git a/sheepfs/volume.c b/sheepfs/volume.c
index 3f1391f..bce1ade 100644
--- a/sheepfs/volume.c
+++ b/sheepfs/volume.c
@@ -56,7 +56,7 @@ struct vdi_inode {
  * to simulate aysnc read. All sockets point to the same gateway
  */
 	int socket_pool[SOCKET_POOL_SIZE];
-	char socket_in_use[SOCKET_POOL_SIZE]; /* 1 means in use */
+	uatomic_bool socket_in_use[SOCKET_POOL_SIZE];
 	unsigned socket_poll_adder;
 };
 
@@ -120,8 +120,8 @@ static inline int get_socket_fd(struct vdi_inode *vdi, int *idx)
 retry:
 	sock_idx = uatomic_add_return(&vdi->socket_poll_adder, 1) %
 		   SOCKET_POOL_SIZE;
-	/* if socket_in_use[sock_idx] == 0, set it to 1, otherwise, retry */
-	if (uatomic_cmpxchg(&vdi->socket_in_use[sock_idx], 0, 1))
+	/* if socket_in_use[sock_idx] is false, set it to true, otherwise, retry */
+	if (uatomic_set_true(&vdi->socket_in_use[sock_idx]))
 		goto retry;
 	fd = vdi->socket_pool[sock_idx];
 	*idx = sock_idx;
@@ -131,7 +131,7 @@ retry:
 
 static inline void put_socket_fd(struct vdi_inode *vdi, int idx)
 {
-	uatomic_dec(&vdi->socket_in_use[idx]);
+	uatomic_set_false(&vdi->socket_in_use[idx]);
 }
 
 static int volume_rw_object(char *buf, uint64_t oid, size_t size,
-- 
1.7.2.5




More information about the sheepdog mailing list