[sheepdog] [PATCH v5] sheep/cluster: add the distributed lock implemented by zookeeper for object-storage
Robin Dong
robin.k.dong at gmail.com
Mon Dec 2 05:53:54 CET 2013
Implement the distributed lock by zookeeper (refer: http://zookeeper.apache.org/doc/trunk/recipes.html)
The routine is:
1. create a seq-ephemeral znode in lock directory (use lock-id as dir name)
2. get smallest file path as owner of the lock; the other thread wait on a pthread_mutex_t (cluster_lock->wait)
3. if owner of the lock release it (or the owner is killed by accident), zookeeper will
trigger zk_watch() which will wake up all waiting threads to compete new owner of the lock
We use dlock_array to store pointers of cluster_locks in this sheep daemon so when receiving the event of ZOO_DELETED_EVENT
the program will wake up all waiters (in this sheep daemon) who is sleeping on the lock id and let them compete for new
owner.
dlock_array is just a normal array using lock-id as index, so imaging a scenario: two threads (A and B) in one sheep daemon
call zk_lock() for same lock-id, they will create two znodes in zookeeper but set dlock_array[lock_id] to only one of
them (for example, set to B). After that, when ZOO_DELETED_EVENT comes, the zk_waiter() will only wake up thread B and thread A
will sleep on '->wait' forever becuase no one could wakeup him.
We have two method to solve this problem:
A. using more complicated structure instead of dlock_array to store both A and B 's lock handle.
B. adding a lock to avoid A and B call zk_lock() in the same time.
We prefer method B because it also avoid creating too many files in a directory of zookeeper which will take too much pressure
on zookeeper server if the number of sheep deamons are huge. Therefore we add 'local_lock' in 'struct cluster_lock'.
v4 --> v5:
1. rename 'dlock' to 'cluster_lock'
2. change dlock_array to hash table
v3 --> v4:
1. change some comment
2. change type of 'lock_id' from uint32_t to uint64_t
v2 --> v3:
1. change cluster interface to init_lock()/lock()/unlock()
2. change 'struct zk_mutex' to 'struct cluster_lock'
3. add empty implementation to local/corosync
v1 --> v2:
move code from sheep/http/lock.c into sheep/cluster/zookeeper.c using cluster framework
Signed-off-by: Robin Dong <sanbai at taobao.com>
---
include/sheep.h | 9 ++
sheep/cluster.h | 29 +++++++
sheep/cluster/corosync.c | 16 ++++
sheep/cluster/local.c | 16 ++++
sheep/cluster/zookeeper.c | 187 ++++++++++++++++++++++++++++++++++++++++++++-
5 files changed, 256 insertions(+), 1 deletions(-)
diff --git a/include/sheep.h b/include/sheep.h
index 293e057..aab69ef 100644
--- a/include/sheep.h
+++ b/include/sheep.h
@@ -255,6 +255,15 @@ static inline void nodes_to_buffer(struct rb_root *nroot, void *buffer)
#define MAX_NODE_STR_LEN 256
+/* structure for distributed lock */
+struct cluster_lock {
+ struct hlist_node hnode;
+ uint64_t id; /* id of this mutex */
+ pthread_mutex_t wait;
+ pthread_mutex_t local_lock;
+ char ephemeral_path[MAX_NODE_STR_LEN];
+};
+
static inline const char *node_to_str(const struct sd_node *id)
{
static __thread char str[MAX_NODE_STR_LEN];
diff --git a/sheep/cluster.h b/sheep/cluster.h
index 81b5ae4..08df91c 100644
--- a/sheep/cluster.h
+++ b/sheep/cluster.h
@@ -109,6 +109,35 @@ struct cluster_driver {
int (*unblock)(void *msg, size_t msg_len);
/*
+ * Init a distributed mutually exclusive lock to avoid race condition
+ * when the whole sheepdog cluster process one exclusive resource.
+ *
+ * This function use 'lock_id' as the id of this distributed lock.
+ * A thread can create many locks in one sheep daemon.
+ *
+ * Returns SD_RES_XXX
+ */
+ int (*init_lock)(struct cluster_lock *lock, uint64_t lock_id);
+
+ /*
+ * Acquire the distributed lock.
+ *
+ * The cluster_lock referenced by 'lock' shall be locked by calling
+ * cluster->lock(). If the cluster_lock is already locked, the calling
+ * thread shall block until the cluster_lock becomes available.
+ */
+ void (*lock)(struct cluster_lock *lock);
+
+ /*
+ * Release the distributed lock.
+ *
+ * If the owner of the cluster_lock release it (or the owner is
+ * killed by accident), zookeeper will trigger zk_watch() which will
+ * wake up all waiting threads to compete new owner of the lock
+ */
+ void (*unlock)(struct cluster_lock *lock);
+
+ /*
* Update the specific node in the driver's private copy of nodes
*
* Returns SD_RES_XXX
diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
index ea4421b..7382d9a 100644
--- a/sheep/cluster/corosync.c
+++ b/sheep/cluster/corosync.c
@@ -774,6 +774,19 @@ again:
return 0;
}
+static int corosync_init_lock(struct cluster_lock *cluster_lock, uint32_t id)
+{
+ return -1;
+}
+
+static void corosync_lock(struct cluster_lock *cluster_lock)
+{
+}
+
+static void corosync_unlock(struct cluster_lock *cluster_lock)
+{
+}
+
static int corosync_update_node(struct sd_node *node)
{
struct cpg_node cnode = this_node;
@@ -794,6 +807,9 @@ static struct cluster_driver cdrv_corosync = {
.notify = corosync_notify,
.block = corosync_block,
.unblock = corosync_unblock,
+ .init_lock = corosync_init_lock,
+ .lock = corosync_lock,
+ .unlock = corosync_unlock,
.update_node = corosync_update_node,
};
diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c
index b8cbb5c..4c4d83b 100644
--- a/sheep/cluster/local.c
+++ b/sheep/cluster/local.c
@@ -547,6 +547,19 @@ static int local_init(const char *option)
return 0;
}
+static int local_init_lock(struct cluster_lock *cluster_lock, uint64_t id)
+{
+ return -1;
+}
+
+static void local_lock(struct cluster_lock *cluster_lock)
+{
+}
+
+static void local_unlock(struct cluster_lock *cluster_lock)
+{
+}
+
static int local_update_node(struct sd_node *node)
{
struct local_node lnode = this_node;
@@ -566,6 +579,9 @@ static struct cluster_driver cdrv_local = {
.notify = local_notify,
.block = local_block,
.unblock = local_unblock,
+ .init_lock = local_init_lock,
+ .lock = local_lock,
+ .unlock = local_unlock,
.update_node = local_update_node,
};
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index fa89c46..fab31f4 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -30,6 +30,60 @@
#define QUEUE_ZNODE BASE_ZNODE "/queue"
#define MEMBER_ZNODE BASE_ZNODE "/member"
#define MASTER_ZNONE BASE_ZNODE "/master"
+#define LOCK_ZNODE BASE_ZNODE "/lock"
+
+#define WAIT_TIME 1 /* second */
+
+#define HASH_BUCKET_NR 4097
+static struct hlist_head *cluster_locks_table;
+
+/*
+ * All the operations of the lock table is protected by
+ * cluster_lock->local_lock so we don't need to add lock here
+ */
+
+static void del_lock_in_table(uint64_t lock_id)
+{
+ uint64_t hval = lock_id % HASH_BUCKET_NR;
+ struct hlist_node *iter;
+ struct cluster_lock *lock;
+
+ hlist_for_each_entry(lock, iter, cluster_locks_table + hval, hnode) {
+ if (lock->id == lock_id) {
+ hlist_del(iter);
+ break;
+ }
+ }
+}
+
+static void set_lock_in_table(uint64_t lock_id,
+ struct cluster_lock *cluster_lock)
+{
+ uint64_t hval = lock_id % HASH_BUCKET_NR;
+ hlist_add_head(&(cluster_lock->hnode), cluster_locks_table + hval);
+}
+
+static struct cluster_lock *get_lock_in_table(uint64_t lock_id)
+{
+ uint64_t hval = lock_id % HASH_BUCKET_NR;
+ struct hlist_node *iter;
+ struct cluster_lock *lock;
+
+ hlist_for_each_entry(lock, iter, cluster_locks_table + hval, hnode)
+ if (lock->id == lock_id)
+ return lock;
+
+ return NULL;
+}
+
+/*
+ * Wait a while when create, delete or get_children fail on
+ * zookeeper lock so it will not print too much loop log
+ */
+static void zk_wait(void)
+{
+ sleep(WAIT_TIME);
+}
/* iterate child znodes */
#define FOR_EACH_ZNODE(parent, path, strs) \
@@ -505,7 +559,9 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
void *ctx)
{
struct zk_node znode;
+ struct cluster_lock *cluster_lock;
char str[MAX_NODE_STR_LEN], *p;
+ uint64_t lock_id;
int ret;
if (type == ZOO_SESSION_EVENT && state == ZOO_EXPIRED_SESSION_STATE) {
@@ -528,6 +584,16 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
} else if (type == ZOO_DELETED_EVENT) {
struct zk_node *n;
+ /* process distributed lock */
+ ret = sscanf(path, LOCK_ZNODE "/%lu/%s", &lock_id, str);
+ if (ret == 2) {
+ cluster_lock = get_lock_in_table(lock_id);
+ if (cluster_lock) {
+ pthread_mutex_unlock(&(cluster_lock->wait));
+ sd_debug("release lock %lu %s", lock_id, str);
+ }
+ }
+
ret = sscanf(path, MASTER_ZNONE "/%s", str);
if (ret == 1) {
zk_compete_master();
@@ -1058,6 +1124,109 @@ kick_block_event:
kick_block_event();
}
+static int zk_init_lock(struct cluster_lock *cluster_lock, uint64_t lock_id)
+{
+ int rc = 0;
+ char path[MAX_NODE_STR_LEN];
+
+ cluster_lock->id = lock_id;
+ snprintf(path, MAX_NODE_STR_LEN, LOCK_ZNODE "/%lu", cluster_lock->id);
+ rc = zk_init_node(path);
+ if (rc)
+ goto out;
+
+ rc = pthread_mutex_init(&cluster_lock->wait, NULL);
+ if (rc) {
+ sd_err("failed to init cluster_lock->wait");
+ goto out;
+ }
+
+ rc = pthread_mutex_init(&cluster_lock->local_lock, NULL);
+ if (rc) {
+ sd_err("failed to init cluster_lock->local_lock");
+ goto out;
+ }
+out:
+ return rc;
+}
+
+/*
+ * This operation will create a seq-ephemeral znode in lock directory
+ * of zookeeper (use lock-id as dir name). The smallest file path in
+ * this directory wil be the owner of the lock; the other threads will
+ * wait on a pthread_mutex_t (cluster_lock->wait)
+ */
+static void zk_lock(struct cluster_lock *cluster_lock)
+{
+ int flags = ZOO_SEQUENCE | ZOO_EPHEMERAL;
+ int rc, len = MAX_NODE_STR_LEN;
+ char *my_path;
+ char parent[MAX_NODE_STR_LEN];
+ char lowest_seq_path[MAX_NODE_STR_LEN];
+ char owner_name[MAX_NODE_STR_LEN];
+
+ /*
+ * if many threads use locks with same id, we should use
+ * ->local_lock to avoid the only zookeeper handler to
+ * create many seq-ephemeral files.
+ */
+ pthread_mutex_lock(&cluster_lock->local_lock);
+
+ set_lock_in_table(cluster_lock->id, cluster_lock);
+
+ my_path = cluster_lock->ephemeral_path;
+
+ /* compete owner of lock is just like zk_compete_master() */
+ snprintf(parent, MAX_NODE_STR_LEN, LOCK_ZNODE "/%lu/",
+ cluster_lock->id);
+ while (true) {
+ rc = zoo_create(zhandle, parent, "", 0, &ZOO_OPEN_ACL_UNSAFE,
+ flags, my_path, MAX_NODE_STR_LEN);
+ if (rc == ZOK)
+ break;
+ sd_err("failed to create path:%s, %s", my_path, zerror(rc));
+ zk_wait();
+ }
+ sd_debug("create path %s success", my_path);
+
+ /* create node ok now */
+ snprintf(parent, MAX_NODE_STR_LEN, LOCK_ZNODE "/%lu", cluster_lock->id);
+ while (true) {
+ zk_get_least_seq(parent, lowest_seq_path, MAX_NODE_STR_LEN,
+ owner_name, &len);
+
+ /* I got the lock */
+ if (!strncmp(lowest_seq_path, my_path, strlen(my_path))) {
+ sd_debug("I am master now. %s", lowest_seq_path);
+ return;
+ }
+
+ /* I failed to get the lock */
+ rc = zoo_exists(zhandle, lowest_seq_path, 1, NULL);
+ if (rc == ZOK) {
+ sd_debug("call zoo_exits success %s", lowest_seq_path);
+ pthread_mutex_lock(&cluster_lock->wait);
+ } else {
+ sd_err("failed to call zoo_exists %s", zerror(rc));
+ if (rc != ZNONODE)
+ zk_wait();
+ }
+ }
+}
+
+static void zk_unlock(struct cluster_lock *cluster_lock)
+{
+ int rc;
+ rc = zk_delete_node(cluster_lock->ephemeral_path, -1);
+ if (rc != ZOK)
+ sd_err("Failed to delete path: %s %s",
+ cluster_lock->ephemeral_path, zerror(rc));
+
+ del_lock_in_table(cluster_lock->id);
+ pthread_mutex_unlock(&cluster_lock->local_lock);
+}
+
+
static int zk_init(const char *option)
{
char *hosts, *to, *p;
@@ -1102,6 +1271,19 @@ static int zk_init(const char *option)
return -1;
}
+ /* init distributed lock structures */
+
+ cluster_locks_table = xzalloc(sizeof(struct list_head) *
+ HASH_BUCKET_NR);
+ for (uint64_t i = 0; i < HASH_BUCKET_NR; i++)
+ INIT_HLIST_HEAD(cluster_locks_table + i);
+
+ ret = zk_init_node(LOCK_ZNODE);
+ if (ret != ZOK) {
+ sd_err("Failed to create %s %s", LOCK_ZNODE, zerror(ret));
+ free(cluster_locks_table);
+ return -1;
+ }
return 0;
}
@@ -1122,7 +1304,10 @@ static struct cluster_driver cdrv_zookeeper = {
.notify = zk_notify,
.block = zk_block,
.unblock = zk_unblock,
- .update_node = zk_update_node,
+ .init_lock = zk_init_lock,
+ .lock = zk_lock,
+ .unlock = zk_unlock,
+ .update_node = zk_update_node,
.get_local_addr = get_local_addr,
};
--
1.7.1
More information about the sheepdog
mailing list