This is a wrapper for complex atomic_cmpxchg. Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp> --- include/util.h | 23 +++++++++++++++++++++++ sheep/cluster/zookeeper.c | 12 +++++++----- sheep/object_cache.c | 18 +++++------------- sheep/sockfd_cache.c | 14 ++++++-------- sheepfs/volume.c | 8 ++++---- 5 files changed, 45 insertions(+), 30 deletions(-) diff --git a/include/util.h b/include/util.h index afab903..5fb19c2 100644 --- a/include/util.h +++ b/include/util.h @@ -8,6 +8,7 @@ #include <limits.h> #include <stdint.h> #include <unistd.h> +#include <urcu/uatomic.h> #include "bitops.h" #include "list.h" @@ -103,4 +104,26 @@ void set_trimmed_sectors(void *buf, uint64_t offset, uint32_t len, #endif /* NDEBUG */ +/* urcu helpers */ + +/* Boolean data type which can be accessed by multiple threads */ +typedef unsigned long uatomic_bool; + +static inline bool uatomic_is_true(uatomic_bool *val) +{ + return uatomic_read(val) == 1; +} + +/* success if the old value is false */ +static inline bool uatomic_set_true(uatomic_bool *val) +{ + return uatomic_cmpxchg(val, 0, 1) == 0; +} + +static inline void uatomic_set_false(uatomic_bool *val) +{ + assert(uatomic_is_true(val)); + uatomic_set(val, 0); +} + #endif diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c index 388bebc..91ea608 100644 --- a/sheep/cluster/zookeeper.c +++ b/sheep/cluster/zookeeper.c @@ -64,7 +64,7 @@ struct zk_event { uint8_t buf[SD_MAX_EVENT_BUF_SIZE]; }; -static int zk_notify_blocked; +static uatomic_bool zk_notify_blocked; /* leave event circular array */ static struct zk_event zk_levents[SD_MAX_NODES]; @@ -286,7 +286,7 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev) return 0; } - if (!called_by_zk_unblock && uatomic_read(&zk_notify_blocked) > 0) + if (!called_by_zk_unblock && uatomic_is_true(&zk_notify_blocked)) return -1; if (zk_queue_empty(zh)) @@ -639,7 +639,7 @@ static void zk_unblock(void *msg, size_t msg_len) zk_queue_push_back(zhandle, &ev); - uatomic_dec(&zk_notify_blocked); + uatomic_set_false(&zk_notify_blocked); /* this notify is necessary */ dprintf("write event to efd:%d\n", efd); @@ -778,8 +778,10 @@ static void zk_handler(int listen_fd, int events, void *data) case EVENT_BLOCK: dprintf("BLOCK\n"); zk_queue_push_back(zhandle, NULL); - if (sd_block_handler(&ev.sender.node)) - uatomic_inc(&zk_notify_blocked); + if (sd_block_handler(&ev.sender.node)) { + bool result = uatomic_set_true(&zk_notify_blocked); + assert(result); + } break; case EVENT_NOTIFY: dprintf("NOTIFY\n"); diff --git a/sheep/object_cache.c b/sheep/object_cache.c index ab6499d..d5b6264 100644 --- a/sheep/object_cache.c +++ b/sheep/object_cache.c @@ -49,7 +49,7 @@ struct global_cache { uint32_t cache_size; - int in_reclaim; + uatomic_bool in_reclaim; struct cds_list_head cache_lru_list; }; @@ -90,15 +90,6 @@ static pthread_mutex_t hashtable_lock[HASH_SIZE] = { static struct hlist_head cache_hashtable[HASH_SIZE]; -/* - * If the cache is already in reclaim, return 1, otherwise return 0 - * and set sys_cache.in_reclaim to 1 - */ -static inline int mark_cache_in_reclaim(void) -{ - return uatomic_cmpxchg(&sys_cache.in_reclaim, 0, 1); -} - static inline bool entry_is_dirty(const struct object_cache_entry *entry) { return !!entry->bmap; @@ -558,7 +549,7 @@ static void do_reclaim(struct work *work) static void reclaim_done(struct work *work) { - uatomic_set(&sys_cache.in_reclaim, 0); + uatomic_set_false(&sys_cache.in_reclaim); free(work); } @@ -626,7 +617,8 @@ void object_cache_try_to_reclaim(void) if (uatomic_read(&sys_cache.cache_size) < sys->object_cache_size) return; - if (mark_cache_in_reclaim()) + if (!uatomic_set_true(&sys_cache.in_reclaim)) + /* the cache is already in reclaim, */ return; work = xzalloc(sizeof(struct work)); @@ -1236,7 +1228,7 @@ int object_cache_init(const char *p) CDS_INIT_LIST_HEAD(&sys_cache.cache_lru_list); uatomic_set(&sys_cache.cache_size, 0); - uatomic_set(&sys_cache.in_reclaim, 0); + sys_cache.in_reclaim = false; ret = load_existing_cache(); err: diff --git a/sheep/sockfd_cache.c b/sheep/sockfd_cache.c index 6e5e289..d8b3c63 100644 --- a/sheep/sockfd_cache.c +++ b/sheep/sockfd_cache.c @@ -65,7 +65,7 @@ static int fds_count = DEFAULT_FDS_COUNT; struct sockfd_cache_fd { int fd; - uint8_t in_use; + uatomic_bool in_use; }; struct sockfd_cache_entry { @@ -128,7 +128,7 @@ static inline int get_free_slot(struct sockfd_cache_entry *entry) int idx = -1, i; for (i = 0; i < fds_count; i++) { - if (uatomic_cmpxchg(&entry->fds[i].in_use, 0, 1)) + if (!uatomic_set_true(&entry->fds[i].in_use)) continue; idx = i; break; @@ -165,7 +165,7 @@ static inline bool slots_all_free(struct sockfd_cache_entry *entry) { int i; for (i = 0; i < fds_count; i++) - if (uatomic_read(&entry->fds[i].in_use)) + if (uatomic_is_true(&entry->fds[i].in_use)) return false; return true; } @@ -299,7 +299,7 @@ static void do_grow_fds(struct work *work) entry->fds = xrealloc(entry->fds, new_size); for (i = old_fds_count; i < new_fds_count; i++) { entry->fds[i].fd = -1; - entry->fds[i].in_use = 0; + entry->fds[i].in_use = false; } } pthread_rwlock_unlock(&sockfd_cache.lock); @@ -354,7 +354,7 @@ static struct sockfd *sockfd_cache_get(const struct node_id *nid, char *name) dprintf("create connection %s:%d idx %d\n", name, nid->port, idx); fd = connect_to(name, nid->port); if (fd < 0) { - uatomic_dec(&entry->fds[idx].in_use); + uatomic_set_false(&entry->fds[idx].in_use); return NULL; } entry->fds[idx].fd = fd; @@ -370,7 +370,6 @@ static void sockfd_cache_put(const struct node_id *nid, int idx) { struct sockfd_cache_entry *entry; char name[INET6_ADDRSTRLEN]; - int refcnt; addr_to_str(name, sizeof(name), nid->addr, 0); dprintf("%s:%d idx %d\n", name, nid->port, idx); @@ -380,8 +379,7 @@ static void sockfd_cache_put(const struct node_id *nid, int idx) pthread_rwlock_unlock(&sockfd_cache.lock); assert(entry); - refcnt = uatomic_cmpxchg(&entry->fds[idx].in_use, 1, 0); - assert(refcnt == 1); + uatomic_set_false(&entry->fds[idx].in_use); } /* diff --git a/sheepfs/volume.c b/sheepfs/volume.c index 3f1391f..bce1ade 100644 --- a/sheepfs/volume.c +++ b/sheepfs/volume.c @@ -56,7 +56,7 @@ struct vdi_inode { * to simulate aysnc read. All sockets point to the same gateway */ int socket_pool[SOCKET_POOL_SIZE]; - char socket_in_use[SOCKET_POOL_SIZE]; /* 1 means in use */ + uatomic_bool socket_in_use[SOCKET_POOL_SIZE]; unsigned socket_poll_adder; }; @@ -120,8 +120,8 @@ static inline int get_socket_fd(struct vdi_inode *vdi, int *idx) retry: sock_idx = uatomic_add_return(&vdi->socket_poll_adder, 1) % SOCKET_POOL_SIZE; - /* if socket_in_use[sock_idx] == 0, set it to 1, otherwise, retry */ - if (uatomic_cmpxchg(&vdi->socket_in_use[sock_idx], 0, 1)) + /* if socket_in_use[sock_idx] is false, set it to true, otherwise, retry */ + if (uatomic_set_true(&vdi->socket_in_use[sock_idx])) goto retry; fd = vdi->socket_pool[sock_idx]; *idx = sock_idx; @@ -131,7 +131,7 @@ retry: static inline void put_socket_fd(struct vdi_inode *vdi, int idx) { - uatomic_dec(&vdi->socket_in_use[idx]); + uatomic_set_false(&vdi->socket_in_use[idx]); } static int volume_rw_object(char *buf, uint64_t oid, size_t size, -- 1.7.2.5 |