[sheepdog] [PATCH v7] sheep/cluster: add distributed-lock implemented by zookeeper for object-storage

Robin Dong robin.k.dong at gmail.com
Tue Dec 3 11:15:53 CET 2013


From: Robin Dong <sanbai at taobao.com>

Implement the distributed lock by zookeeper
(refer:http://zookeeper.apache.org/doc/trunk/recipes.html)

The routine is:
        1. create a seq-ephemeral znode in lock directory
	   (use lock-id as dir name)
        2. get smallest file path as owner of the lock; the other thread wait
	   on a pthread_mutex_t (cluster_lock->wait)
        3. if owner of the lock release it (or the owner is killed by accident),
	   zookeeper will trigger zk_watch() which will wake up all waiting
	   threads to compete new owner of the lock

We use dlock_array to store pointers of cluster_locks in this sheep daemon so
when receiving the event of ZOO_DELETED_EVENT the program will wake up all
waiters (in this sheep daemon) who is sleeping on the lock id and let them
compete for new owner.

dlock_array is just a normal array using lock-id as index, so imaging a
scenario: two threads (A and B) in one sheep daemon call zk_lock() for same
lock-id, they will create two znodes in zookeeper but set dlock_array[lock_id]
to only one of them (for example, set to B). After that, when ZOO_DELETED_EVENT
comes, the zk_waiter() will only wake up thread B and thread A will sleep on
'->wait' forever becuase no one could wakeup him.

We have two method to solve this problem:
	A. using more complicated structure instead of dlock_array to store
	   both A and B 's lock handle.
	B. adding a lock to avoid A and B call zk_lock() in the same time.
We prefer method B because it also avoid creating too many files in a directory
of zookeeper which will take too much pressure on zookeeper server if the
number of sheep deamons are huge. Therefore we add 'local_lock' in
'struct cluster_lock'.

v6 --> v7:
	1. change bucket number of lock table from 4097 to 1021
	2. use sd_hash() for lock table

v5 --> v6:
	1. change name of "wait" to "wait_relase"
	2. change name of "local_lock" to "id_lock"
	3. change name of "ephemeral_path" to "lock_path"
	4. modify the names of functions for lock table

v4 --> v5:
	1. rename 'dlock' to 'cluster_lock'
	2. change dlock_array to hash table

v3 --> v4:
	1. change some comment
	2. change type of 'lock_id' from uint32_t to uint64_t

v2 --> v3:
	1. change cluster interface to init_lock()/lock()/unlock()
	2. change 'struct zk_mutex' to 'struct cluster_lock'
	3. add empty implementation to local/corosync

v1 --> v2:
	move code from sheep/http/lock.c into sheep/cluster/zookeeper.c using cluster framework

Signed-off-by: Robin Dong <sanbai at taobao.com>
---
 include/sheep.h           |  12 +++
 sheep/cluster.h           |  29 +++++++
 sheep/cluster/corosync.c  |  16 ++++
 sheep/cluster/local.c     |  16 ++++
 sheep/cluster/zookeeper.c | 209 +++++++++++++++++++++++++++++++++++++++++++++-
 5 files changed, 280 insertions(+), 2 deletions(-)

diff --git a/include/sheep.h b/include/sheep.h
index 293e057..e5726e8 100644
--- a/include/sheep.h
+++ b/include/sheep.h
@@ -255,6 +255,18 @@ static inline void nodes_to_buffer(struct rb_root *nroot, void *buffer)
 
 #define MAX_NODE_STR_LEN 256
 
+/* structure for distributed lock */
+struct cluster_lock {
+	struct hlist_node hnode;
+	/* id is passed by users to represent a lock handle */
+	uint64_t id;
+	/* wait for the release of id by other lock owner */
+	pthread_mutex_t wait_release;
+	/* lock for different threads of the same node on the same id */
+	pthread_mutex_t id_lock;
+	char lock_path[MAX_NODE_STR_LEN];
+};
+
 static inline const char *node_to_str(const struct sd_node *id)
 {
 	static __thread char str[MAX_NODE_STR_LEN];
diff --git a/sheep/cluster.h b/sheep/cluster.h
index 81b5ae4..08df91c 100644
--- a/sheep/cluster.h
+++ b/sheep/cluster.h
@@ -109,6 +109,35 @@ struct cluster_driver {
 	int (*unblock)(void *msg, size_t msg_len);
 
 	/*
+	 * Init a distributed mutually exclusive lock to avoid race condition
+	 * when the whole sheepdog cluster process one exclusive resource.
+	 *
+	 * This function use 'lock_id' as the id of this distributed lock.
+	 * A thread can create many locks in one sheep daemon.
+	 *
+	 * Returns SD_RES_XXX
+	 */
+	int (*init_lock)(struct cluster_lock *lock, uint64_t lock_id);
+
+	/*
+	 * Acquire the distributed lock.
+	 *
+	 * The cluster_lock referenced by 'lock' shall be locked by calling
+	 * cluster->lock(). If the cluster_lock is already locked, the calling
+	 * thread shall block until the cluster_lock becomes available.
+	 */
+	void (*lock)(struct cluster_lock *lock);
+
+	/*
+	 * Release the distributed lock.
+	 *
+	 * If the owner of the cluster_lock release it (or the owner is
+	 * killed by accident), zookeeper will trigger zk_watch() which will
+	 * wake up all waiting threads to compete new owner of the lock
+	 */
+	void (*unlock)(struct cluster_lock *lock);
+
+	/*
 	 * Update the specific node in the driver's private copy of nodes
 	 *
 	 * Returns SD_RES_XXX
diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
index ea4421b..19fc73c 100644
--- a/sheep/cluster/corosync.c
+++ b/sheep/cluster/corosync.c
@@ -774,6 +774,19 @@ again:
 	return 0;
 }
 
+static int corosync_init_lock(struct cluster_lock *cluster_lock, uint64_t id)
+{
+	return -1;
+}
+
+static void corosync_lock(struct cluster_lock *cluster_lock)
+{
+}
+
+static void corosync_unlock(struct cluster_lock *cluster_lock)
+{
+}
+
 static int corosync_update_node(struct sd_node *node)
 {
 	struct cpg_node cnode = this_node;
@@ -794,6 +807,9 @@ static struct cluster_driver cdrv_corosync = {
 	.notify		= corosync_notify,
 	.block		= corosync_block,
 	.unblock	= corosync_unblock,
+	.init_lock	= corosync_init_lock,
+	.lock		= corosync_lock,
+	.unlock		= corosync_unlock,
 	.update_node	= corosync_update_node,
 };
 
diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c
index b8cbb5c..4c4d83b 100644
--- a/sheep/cluster/local.c
+++ b/sheep/cluster/local.c
@@ -547,6 +547,19 @@ static int local_init(const char *option)
 	return 0;
 }
 
+static int local_init_lock(struct cluster_lock *cluster_lock, uint64_t id)
+{
+	return -1;
+}
+
+static void local_lock(struct cluster_lock *cluster_lock)
+{
+}
+
+static void local_unlock(struct cluster_lock *cluster_lock)
+{
+}
+
 static int local_update_node(struct sd_node *node)
 {
 	struct local_node lnode = this_node;
@@ -566,6 +579,9 @@ static struct cluster_driver cdrv_local = {
 	.notify		= local_notify,
 	.block		= local_block,
 	.unblock	= local_unblock,
+	.init_lock	= local_init_lock,
+	.lock		= local_lock,
+	.unlock		= local_unlock,
 	.update_node    = local_update_node,
 };
 
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index fa89c46..debe18f 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -30,6 +30,72 @@
 #define QUEUE_ZNODE BASE_ZNODE "/queue"
 #define MEMBER_ZNODE BASE_ZNODE "/member"
 #define MASTER_ZNONE BASE_ZNODE "/master"
+#define LOCK_ZNODE BASE_ZNODE "/lock"
+
+#define WAIT_TIME	1		/* second */
+
+#define HASH_BUCKET_NR	1021
+static struct hlist_head *cluster_locks_table;
+static pthread_mutex_t table_locks[HASH_BUCKET_NR];
+
+/*
+ * All the operations of the lock table is protected by
+ * cluster_lock->id_lock so we don't need to add lock here
+ */
+
+static void lock_table_del(uint64_t lock_id)
+{
+	uint64_t hval = sd_hash(&lock_id, sizeof(lock_id)) % HASH_BUCKET_NR;
+	struct hlist_node *iter;
+	struct cluster_lock *lock;
+
+	pthread_mutex_lock(table_locks + hval);
+	hlist_for_each_entry(lock, iter, cluster_locks_table + hval, hnode) {
+		if (lock->id == lock_id) {
+			hlist_del(iter);
+			break;
+		}
+	}
+	pthread_mutex_unlock(table_locks + hval);
+}
+
+static void lock_table_add(uint64_t lock_id,
+			   struct cluster_lock *cluster_lock)
+{
+	uint64_t hval = sd_hash(&lock_id, sizeof(lock_id)) % HASH_BUCKET_NR;
+
+	pthread_mutex_lock(table_locks + hval);
+	hlist_add_head(&(cluster_lock->hnode), cluster_locks_table + hval);
+	pthread_mutex_unlock(table_locks + hval);
+}
+
+static int lock_table_lookup_release(uint64_t lock_id)
+{
+	uint64_t hval = sd_hash(&lock_id, sizeof(lock_id)) % HASH_BUCKET_NR;
+	int res = -1;
+	struct hlist_node *iter;
+	struct cluster_lock *lock;
+
+	pthread_mutex_lock(table_locks + hval);
+	hlist_for_each_entry(lock, iter, cluster_locks_table + hval, hnode) {
+		if (lock->id == lock_id) {
+			pthread_mutex_unlock(&lock->wait_release);
+			res = 0;
+			break;
+		}
+	}
+	pthread_mutex_unlock(table_locks + hval);
+	return res;
+}
+
+/*
+ * Wait a while when create, delete or get_children fail on
+ * zookeeper lock so it will not print too much loop log
+ */
+static void zk_wait(void)
+{
+	sleep(WAIT_TIME);
+}
 
 /* iterate child znodes */
 #define FOR_EACH_ZNODE(parent, path, strs)			       \
@@ -506,6 +572,7 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
 {
 	struct zk_node znode;
 	char str[MAX_NODE_STR_LEN], *p;
+	uint64_t lock_id;
 	int ret;
 
 	if (type == ZOO_SESSION_EVENT && state == ZOO_EXPIRED_SESSION_STATE) {
@@ -528,6 +595,15 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
 	} else if (type == ZOO_DELETED_EVENT) {
 		struct zk_node *n;
 
+		/* process distributed lock */
+		ret = sscanf(path, LOCK_ZNODE "/%lu/%s", &lock_id, str);
+		if (ret == 2) {
+			ret = lock_table_lookup_release(lock_id);
+			if (ret)
+				sd_debug("release lock %lu %s", lock_id, str);
+			return;
+		}
+
 		ret = sscanf(path, MASTER_ZNONE "/%s", str);
 		if (ret == 1) {
 			zk_compete_master();
@@ -580,9 +656,17 @@ static int zk_get_least_seq(const char *parent, char *least_seq_path,
 {
 	char path[MAX_NODE_STR_LEN], *p, *tmp;
 	struct String_vector strs;
-	int rc, least_seq = INT_MAX , seq;
+	int rc, least_seq, seq;
 
 	while (true) {
+		/*
+		 * If first loop fail, the least_seq may be a very small number
+		 * which had been deleted in zookeeper, the new create file will
+		 * be all larger than it and it will cause dead loop.
+		 * Therefore we need to set least_seq to INT_MAX in every loop.
+		 */
+		least_seq = INT_MAX;
+
 		RETURN_IF_ERROR(zk_get_children(parent, &strs), "");
 
 		FOR_EACH_ZNODE(parent, path, &strs) {
@@ -1058,6 +1142,109 @@ kick_block_event:
 	kick_block_event();
 }
 
+static int zk_init_lock(struct cluster_lock *cluster_lock, uint64_t lock_id)
+{
+	int rc = 0;
+	char path[MAX_NODE_STR_LEN];
+
+	cluster_lock->id = lock_id;
+	snprintf(path, MAX_NODE_STR_LEN, LOCK_ZNODE "/%lu", cluster_lock->id);
+	rc = zk_init_node(path);
+	if (rc)
+		goto out;
+
+	rc = pthread_mutex_init(&cluster_lock->wait_release, NULL);
+	if (rc) {
+		sd_err("failed to init cluster_lock->wait_release");
+		goto out;
+	}
+
+	rc = pthread_mutex_init(&cluster_lock->id_lock, NULL);
+	if (rc) {
+		sd_err("failed to init cluster_lock->id_lock");
+		goto out;
+	}
+out:
+	return rc;
+}
+
+/*
+ * This operation will create a seq-ephemeral znode in lock directory
+ * of zookeeper (use lock-id as dir name). The smallest file path in
+ * this directory wil be the owner of the lock; the other threads will
+ * wait on a pthread_mutex_t (cluster_lock->wait_release)
+ */
+static void zk_lock(struct cluster_lock *cluster_lock)
+{
+	int flags = ZOO_SEQUENCE | ZOO_EPHEMERAL;
+	int rc, len = MAX_NODE_STR_LEN;
+	char *my_path;
+	char parent[MAX_NODE_STR_LEN];
+	char lowest_seq_path[MAX_NODE_STR_LEN];
+	char owner_name[MAX_NODE_STR_LEN];
+
+	/*
+	 * if many threads use locks with same id, we should use
+	 * ->id_lock to avoid the only zookeeper handler to
+	 * create many seq-ephemeral files.
+	 */
+	pthread_mutex_lock(&cluster_lock->id_lock);
+
+	lock_table_add(cluster_lock->id, cluster_lock);
+
+	my_path = cluster_lock->lock_path;
+
+	/* compete owner of lock is just like zk_compete_master() */
+	snprintf(parent, MAX_NODE_STR_LEN, LOCK_ZNODE "/%lu/",
+		 cluster_lock->id);
+	while (true) {
+		rc = zoo_create(zhandle, parent, "", 0, &ZOO_OPEN_ACL_UNSAFE,
+				flags, my_path, MAX_NODE_STR_LEN);
+		if (rc == ZOK)
+			break;
+		sd_err("failed to create path:%s, %s", my_path, zerror(rc));
+		zk_wait();
+	}
+	sd_debug("create path %s success", my_path);
+
+	/* create node ok now */
+	snprintf(parent, MAX_NODE_STR_LEN, LOCK_ZNODE "/%lu", cluster_lock->id);
+	while (true) {
+		zk_get_least_seq(parent, lowest_seq_path, MAX_NODE_STR_LEN,
+				 owner_name, &len);
+
+		/* I got the lock */
+		if (!strncmp(lowest_seq_path, my_path, strlen(my_path))) {
+			sd_debug("I am master now. %s", lowest_seq_path);
+			return;
+		}
+
+		/* I failed to get the lock */
+		rc = zoo_exists(zhandle, lowest_seq_path, 1, NULL);
+		if (rc == ZOK) {
+			sd_debug("call zoo_exits success %s", lowest_seq_path);
+			pthread_mutex_lock(&cluster_lock->wait_release);
+		} else {
+			sd_err("failed to call zoo_exists %s", zerror(rc));
+			if (rc != ZNONODE)
+				zk_wait();
+		}
+	}
+}
+
+static void zk_unlock(struct cluster_lock *cluster_lock)
+{
+	int rc;
+	rc = zk_delete_node(cluster_lock->lock_path, -1);
+	if (rc != ZOK)
+		sd_err("Failed to delete path: %s %s",
+		       cluster_lock->lock_path, zerror(rc));
+
+	lock_table_del(cluster_lock->id);
+	pthread_mutex_unlock(&cluster_lock->id_lock);
+}
+
+
 static int zk_init(const char *option)
 {
 	char *hosts, *to, *p;
@@ -1102,6 +1289,21 @@ static int zk_init(const char *option)
 		return -1;
 	}
 
+	/* init distributed lock structures */
+
+	cluster_locks_table = xzalloc(sizeof(struct list_head) *
+				      HASH_BUCKET_NR);
+	for (uint64_t i = 0; i < HASH_BUCKET_NR; i++) {
+		INIT_HLIST_HEAD(cluster_locks_table + i);
+		pthread_mutex_init(table_locks + i, NULL);
+	}
+
+	ret = zk_init_node(LOCK_ZNODE);
+	if (ret != ZOK) {
+		sd_err("Failed to create %s %s", LOCK_ZNODE, zerror(ret));
+		free(cluster_locks_table);
+		return -1;
+	}
 	return 0;
 }
 
@@ -1122,7 +1324,10 @@ static struct cluster_driver cdrv_zookeeper = {
 	.notify     = zk_notify,
 	.block      = zk_block,
 	.unblock    = zk_unblock,
-	.update_node = zk_update_node,
+	.init_lock  = zk_init_lock,
+	.lock       = zk_lock,
+	.unlock     = zk_unlock,
+	.update_node  = zk_update_node,
 	.get_local_addr = get_local_addr,
 };
 
-- 
1.7.12.4




More information about the sheepdog mailing list