[sheepdog] [PATCH v1] cluster: change the interface of distributed lock

Robin Dong robin.k.dong at gmail.com
Fri Dec 6 09:22:53 CET 2013


From: Robin Dong <sanbai at taobao.com>

Using only lock_id as parameter of lock interface as:

	create_lock(uint64_t lock_id)
	lock(uint64_t lock_id)
	unlock(uint64_t lock_id)
	destroy_lock(uint64_t lock_id)

is more convient for end user. So we use existed hash table to store the
"<lock_id, cluster_lock*>" and add destroy method to release all the resources
of a distributed lock.

Signed-off-by: Robin Dong <sanbai at taobao.com>
---
 include/sheep.h           |  12 ----
 sheep/cluster.h           |  23 +++++---
 sheep/cluster/corosync.c  |  13 +++--
 sheep/cluster/local.c     |  13 +++--
 sheep/cluster/zookeeper.c | 141 +++++++++++++++++++++++++++++++++++-----------
 5 files changed, 141 insertions(+), 61 deletions(-)

diff --git a/include/sheep.h b/include/sheep.h
index e5726e8..293e057 100644
--- a/include/sheep.h
+++ b/include/sheep.h
@@ -255,18 +255,6 @@ static inline void nodes_to_buffer(struct rb_root *nroot, void *buffer)
 
 #define MAX_NODE_STR_LEN 256
 
-/* structure for distributed lock */
-struct cluster_lock {
-	struct hlist_node hnode;
-	/* id is passed by users to represent a lock handle */
-	uint64_t id;
-	/* wait for the release of id by other lock owner */
-	pthread_mutex_t wait_release;
-	/* lock for different threads of the same node on the same id */
-	pthread_mutex_t id_lock;
-	char lock_path[MAX_NODE_STR_LEN];
-};
-
 static inline const char *node_to_str(const struct sd_node *id)
 {
 	static __thread char str[MAX_NODE_STR_LEN];
diff --git a/sheep/cluster.h b/sheep/cluster.h
index 08df91c..3e9adb9 100644
--- a/sheep/cluster.h
+++ b/sheep/cluster.h
@@ -109,7 +109,7 @@ struct cluster_driver {
 	int (*unblock)(void *msg, size_t msg_len);
 
 	/*
-	 * Init a distributed mutually exclusive lock to avoid race condition
+	 * Create a distributed mutually exclusive lock to avoid race condition
 	 * when the whole sheepdog cluster process one exclusive resource.
 	 *
 	 * This function use 'lock_id' as the id of this distributed lock.
@@ -117,25 +117,32 @@ struct cluster_driver {
 	 *
 	 * Returns SD_RES_XXX
 	 */
-	int (*init_lock)(struct cluster_lock *lock, uint64_t lock_id);
+	int (*create_lock)(uint64_t lock_id);
 
 	/*
 	 * Acquire the distributed lock.
 	 *
-	 * The cluster_lock referenced by 'lock' shall be locked by calling
-	 * cluster->lock(). If the cluster_lock is already locked, the calling
-	 * thread shall block until the cluster_lock becomes available.
+	 * The cluster lock referenced by 'lock' shall be locked by calling
+	 * cluster->lock(). If the cluster lock is already locked, the calling
+	 * thread shall block until the cluster lock becomes available.
 	 */
-	void (*lock)(struct cluster_lock *lock);
+	void (*lock)(uint64_t lock_id);
 
 	/*
 	 * Release the distributed lock.
 	 *
-	 * If the owner of the cluster_lock release it (or the owner is
+	 * If the owner of the cluster lock release it (or the owner is
 	 * killed by accident), zookeeper will trigger zk_watch() which will
 	 * wake up all waiting threads to compete new owner of the lock
 	 */
-	void (*unlock)(struct cluster_lock *lock);
+	void (*unlock)(uint64_t lock_id);
+
+	/*
+	 * Destroy the distributed lock.
+	 *
+	 * All the resource of this distributed lock will be release.
+	 */
+	void (*destroy_lock)(uint64_t lock_id);
 
 	/*
 	 * Update the specific node in the driver's private copy of nodes
diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
index 19fc73c..899ef4b 100644
--- a/sheep/cluster/corosync.c
+++ b/sheep/cluster/corosync.c
@@ -774,16 +774,20 @@ again:
 	return 0;
 }
 
-static int corosync_init_lock(struct cluster_lock *cluster_lock, uint64_t id)
+static int corosync_create_lock(uint64_t lock_id)
 {
 	return -1;
 }
 
-static void corosync_lock(struct cluster_lock *cluster_lock)
+static void corosync_lock(uint64_t lock_id)
 {
 }
 
-static void corosync_unlock(struct cluster_lock *cluster_lock)
+static void corosync_unlock(uint64_t lock_id)
+{
+}
+
+static void corosync_destroy_lock(uint64_t lock_id)
 {
 }
 
@@ -807,9 +811,10 @@ static struct cluster_driver cdrv_corosync = {
 	.notify		= corosync_notify,
 	.block		= corosync_block,
 	.unblock	= corosync_unblock,
-	.init_lock	= corosync_init_lock,
+	.create_lock	= corosync_create_lock,
 	.lock		= corosync_lock,
 	.unlock		= corosync_unlock,
+	.destroy_lock   = corosync_destroy_lock,
 	.update_node	= corosync_update_node,
 };
 
diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c
index 4c4d83b..36279f8 100644
--- a/sheep/cluster/local.c
+++ b/sheep/cluster/local.c
@@ -547,16 +547,20 @@ static int local_init(const char *option)
 	return 0;
 }
 
-static int local_init_lock(struct cluster_lock *cluster_lock, uint64_t id)
+static int local_create_lock(uint64_t lock_id)
 {
 	return -1;
 }
 
-static void local_lock(struct cluster_lock *cluster_lock)
+static void local_lock(uint64_t lock_id)
 {
 }
 
-static void local_unlock(struct cluster_lock *cluster_lock)
+static void local_unlock(uint64_t lock_id)
+{
+}
+
+static void local_destroy_lock(uint64_t lock_id)
 {
 }
 
@@ -579,9 +583,10 @@ static struct cluster_driver cdrv_local = {
 	.notify		= local_notify,
 	.block		= local_block,
 	.unblock	= local_unblock,
-	.init_lock	= local_init_lock,
+	.create_lock	= local_create_lock,
 	.lock		= local_lock,
 	.unlock		= local_unlock,
+	.destroy_lock   = local_destroy_lock,
 	.update_node    = local_update_node,
 };
 
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 9e4ffa5..22b477e 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -32,6 +32,20 @@
 #define MASTER_ZNONE BASE_ZNODE "/master"
 #define LOCK_ZNODE BASE_ZNODE "/lock"
 
+static inline ZOOAPI int zk_delete_node(const char *path, int version);
+
+/* structure for distributed lock */
+struct cluster_lock {
+	struct hlist_node hnode;
+	/* id is passed by users to represent a lock handle */
+	uint64_t id;
+	/* wait for the release of id by other lock owner */
+	pthread_mutex_t wait_wakeup;
+	/* lock for different threads of the same node on the same id */
+	pthread_mutex_t id_lock;
+	char lock_path[MAX_NODE_STR_LEN];
+};
+
 #define WAIT_TIME	1		/* second */
 
 #define HASH_BUCKET_NR	1021
@@ -43,33 +57,42 @@ static pthread_mutex_t table_locks[HASH_BUCKET_NR];
  * cluster_lock->id_lock so we don't need to add lock here
  */
 
-static void lock_table_del(uint64_t lock_id)
+static struct cluster_lock *lock_table_del(uint64_t lock_id)
 {
 	uint64_t hval = sd_hash_64(lock_id) % HASH_BUCKET_NR;
 	struct hlist_node *iter;
-	struct cluster_lock *lock;
+	struct cluster_lock *lock, *ret_lock = NULL;
 
 	pthread_mutex_lock(table_locks + hval);
 	hlist_for_each_entry(lock, iter, cluster_locks_table + hval, hnode) {
 		if (lock->id == lock_id) {
 			hlist_del(iter);
+			ret_lock = lock;
 			break;
 		}
 	}
 	pthread_mutex_unlock(table_locks + hval);
+	return ret_lock;
 }
 
 static void lock_table_add(uint64_t lock_id,
 			   struct cluster_lock *cluster_lock)
 {
 	uint64_t hval = sd_hash_64(lock_id) % HASH_BUCKET_NR;
+	struct hlist_node *iter;
+	struct cluster_lock *lock;
 
 	pthread_mutex_lock(table_locks + hval);
+	hlist_for_each_entry(lock, iter, cluster_locks_table + hval, hnode) {
+		if (lock->id == lock_id)
+			goto out;
+	}
 	hlist_add_head(&(cluster_lock->hnode), cluster_locks_table + hval);
+out:
 	pthread_mutex_unlock(table_locks + hval);
 }
 
-static int lock_table_lookup_release(uint64_t lock_id)
+static int lock_table_lookup_wakeup(uint64_t lock_id)
 {
 	uint64_t hval = sd_hash_64(lock_id) % HASH_BUCKET_NR;
 	int res = -1;
@@ -79,7 +102,7 @@ static int lock_table_lookup_release(uint64_t lock_id)
 	pthread_mutex_lock(table_locks + hval);
 	hlist_for_each_entry(lock, iter, cluster_locks_table + hval, hnode) {
 		if (lock->id == lock_id) {
-			pthread_mutex_unlock(&lock->wait_release);
+			pthread_mutex_unlock(&lock->wait_wakeup);
 			res = 0;
 			break;
 		}
@@ -88,6 +111,51 @@ static int lock_table_lookup_release(uint64_t lock_id)
 	return res;
 }
 
+static struct cluster_lock *lock_table_lookup_acquire(uint64_t lock_id)
+{
+	uint64_t hval = sd_hash_64(lock_id) % HASH_BUCKET_NR;
+	struct hlist_node *iter;
+	struct cluster_lock *lock, *ret_lock = NULL;
+
+	pthread_mutex_lock(table_locks + hval);
+	hlist_for_each_entry(lock, iter, cluster_locks_table + hval, hnode) {
+		if (lock->id == lock_id) {
+			ret_lock = lock;
+			break;
+		}
+	}
+	pthread_mutex_unlock(table_locks + hval);
+	/*
+	 * if many threads use locks with same id, we should use
+	 * ->id_lock to avoid the only zookeeper handler to
+	 * create many seq-ephemeral files.
+	 */
+	pthread_mutex_lock(&ret_lock->id_lock);
+	return ret_lock;
+}
+
+static void lock_table_lookup_release(uint64_t lock_id)
+{
+	uint64_t hval = sd_hash_64(lock_id) % HASH_BUCKET_NR;
+	int rc;
+	struct hlist_node *iter;
+	struct cluster_lock *lock;
+
+	pthread_mutex_lock(table_locks + hval);
+	hlist_for_each_entry(lock, iter, cluster_locks_table + hval, hnode) {
+		if (lock->id == lock_id) {
+			rc = zk_delete_node(lock->lock_path, -1);
+			if (rc != ZOK)
+				sd_err("Failed to delete path: %s %s",
+				       lock->lock_path, zerror(rc));
+			pthread_mutex_unlock(&lock->id_lock);
+			goto out;
+		}
+	}
+out:
+	pthread_mutex_unlock(table_locks + hval);
+}
+
 /*
  * Wait a while when create, delete or get_children fail on
  * zookeeper lock so it will not print too much loop log
@@ -598,7 +666,7 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
 		/* process distributed lock */
 		ret = sscanf(path, LOCK_ZNODE "/%lu/%s", &lock_id, str);
 		if (ret == 2) {
-			ret = lock_table_lookup_release(lock_id);
+			ret = lock_table_lookup_wakeup(lock_id);
 			if (ret)
 				sd_debug("release lock %lu %s", lock_id, str);
 			return;
@@ -1142,8 +1210,9 @@ kick_block_event:
 	kick_block_event();
 }
 
-static int zk_init_lock(struct cluster_lock *cluster_lock, uint64_t lock_id)
+static int zk_create_lock(uint64_t lock_id)
 {
+	struct cluster_lock *cluster_lock = xmalloc(sizeof(*cluster_lock));
 	int rc = 0;
 	char path[MAX_NODE_STR_LEN];
 
@@ -1153,9 +1222,9 @@ static int zk_init_lock(struct cluster_lock *cluster_lock, uint64_t lock_id)
 	if (rc)
 		goto out;
 
-	rc = pthread_mutex_init(&cluster_lock->wait_release, NULL);
+	rc = pthread_mutex_init(&cluster_lock->wait_wakeup, NULL);
 	if (rc) {
-		sd_err("failed to init cluster_lock->wait_release");
+		sd_err("failed to init cluster_lock->wait_wakeup");
 		goto out;
 	}
 
@@ -1164,17 +1233,34 @@ static int zk_init_lock(struct cluster_lock *cluster_lock, uint64_t lock_id)
 		sd_err("failed to init cluster_lock->id_lock");
 		goto out;
 	}
+	lock_table_add(lock_id, cluster_lock);
 out:
 	return rc;
 }
 
+static void zk_destroy_lock(uint64_t lock_id)
+{
+	struct cluster_lock *cluster_lock;
+	char path[MAX_NODE_STR_LEN];
+
+	cluster_lock = lock_table_del(lock_id);
+	if (cluster_lock) {
+		pthread_mutex_destroy(&cluster_lock->id_lock);
+		pthread_mutex_destroy(&cluster_lock->wait_wakeup);
+		snprintf(path, MAX_NODE_STR_LEN, LOCK_ZNODE "/%lu",
+			 cluster_lock->id);
+		zk_delete_node(path, -1);
+		free(cluster_lock);
+	}
+}
+
 /*
  * This operation will create a seq-ephemeral znode in lock directory
  * of zookeeper (use lock-id as dir name). The smallest file path in
  * this directory wil be the owner of the lock; the other threads will
- * wait on a pthread_mutex_t (cluster_lock->wait_release)
+ * wait on a pthread_mutex_t (cluster_lock->wait_wakeup)
  */
-static void zk_lock(struct cluster_lock *cluster_lock)
+static void zk_lock(uint64_t lock_id)
 {
 	int flags = ZOO_SEQUENCE | ZOO_EPHEMERAL;
 	int rc, len = MAX_NODE_STR_LEN;
@@ -1182,15 +1268,11 @@ static void zk_lock(struct cluster_lock *cluster_lock)
 	char parent[MAX_NODE_STR_LEN];
 	char lowest_seq_path[MAX_NODE_STR_LEN];
 	char owner_name[MAX_NODE_STR_LEN];
+	struct cluster_lock *cluster_lock;
 
-	/*
-	 * if many threads use locks with same id, we should use
-	 * ->id_lock to avoid the only zookeeper handler to
-	 * create many seq-ephemeral files.
-	 */
-	pthread_mutex_lock(&cluster_lock->id_lock);
-
-	lock_table_add(cluster_lock->id, cluster_lock);
+	cluster_lock = lock_table_lookup_acquire(lock_id);
+	if (!cluster_lock)
+		panic("Failed to lock %lu", lock_id);
 
 	my_path = cluster_lock->lock_path;
 
@@ -1223,28 +1305,20 @@ static void zk_lock(struct cluster_lock *cluster_lock)
 		rc = zoo_exists(zhandle, lowest_seq_path, 1, NULL);
 		if (rc == ZOK) {
 			sd_debug("call zoo_exits success %s", lowest_seq_path);
-			pthread_mutex_lock(&cluster_lock->wait_release);
+			pthread_mutex_lock(&cluster_lock->wait_wakeup);
 		} else {
-			sd_err("failed to call zoo_exists %s", zerror(rc));
+			sd_debug("failed to call zoo_exists %s", zerror(rc));
 			if (rc != ZNONODE)
 				zk_wait();
 		}
 	}
 }
 
-static void zk_unlock(struct cluster_lock *cluster_lock)
+static void zk_unlock(uint64_t lock_id)
 {
-	int rc;
-	rc = zk_delete_node(cluster_lock->lock_path, -1);
-	if (rc != ZOK)
-		sd_err("Failed to delete path: %s %s",
-		       cluster_lock->lock_path, zerror(rc));
-
-	lock_table_del(cluster_lock->id);
-	pthread_mutex_unlock(&cluster_lock->id_lock);
+	lock_table_lookup_release(lock_id);
 }
 
-
 static int zk_init(const char *option)
 {
 	char *hosts, *to, *p;
@@ -1324,9 +1398,10 @@ static struct cluster_driver cdrv_zookeeper = {
 	.notify     = zk_notify,
 	.block      = zk_block,
 	.unblock    = zk_unblock,
-	.init_lock  = zk_init_lock,
-	.lock       = zk_lock,
-	.unlock     = zk_unlock,
+	.create_lock  = zk_create_lock,
+	.lock         = zk_lock,
+	.unlock       = zk_unlock,
+	.destroy_lock = zk_destroy_lock,
 	.update_node  = zk_update_node,
 	.get_local_addr = get_local_addr,
 };
-- 
1.7.12.4




More information about the sheepdog mailing list