[sheepdog] [PATCH v2] cluster: change the interface of distributed lock

Robin Dong robin.k.dong at gmail.com
Mon Dec 9 05:42:38 CET 2013


From: Robin Dong <sanbai at taobao.com>

Using only lock_id as parameter of lock interface and reduce function to only two:

	lock(uint64_t lock_id)
	unlock(uint64_t lock_id)

The ->lock will create and acquire the distributed lock and ->unlock will release it.
It is more convient for end user.
We use existed hash table to store the
"<lock_id, cluster_lock*>" and release all the resources of a distributed lock after
all threads call "->unlock".

Signed-off-by: Robin Dong <sanbai at taobao.com>
---
 include/sheep.h           |  12 ----
 sheep/cluster.h           |  31 +++++----
 sheep/cluster/corosync.c  |  10 +--
 sheep/cluster/local.c     |  10 +--
 sheep/cluster/zookeeper.c | 167 ++++++++++++++++++++++++++++------------------
 5 files changed, 121 insertions(+), 109 deletions(-)

diff --git a/include/sheep.h b/include/sheep.h
index e5726e8..293e057 100644
--- a/include/sheep.h
+++ b/include/sheep.h
@@ -255,18 +255,6 @@ static inline void nodes_to_buffer(struct rb_root *nroot, void *buffer)
 
 #define MAX_NODE_STR_LEN 256
 
-/* structure for distributed lock */
-struct cluster_lock {
-	struct hlist_node hnode;
-	/* id is passed by users to represent a lock handle */
-	uint64_t id;
-	/* wait for the release of id by other lock owner */
-	pthread_mutex_t wait_release;
-	/* lock for different threads of the same node on the same id */
-	pthread_mutex_t id_lock;
-	char lock_path[MAX_NODE_STR_LEN];
-};
-
 static inline const char *node_to_str(const struct sd_node *id)
 {
 	static __thread char str[MAX_NODE_STR_LEN];
diff --git a/sheep/cluster.h b/sheep/cluster.h
index 08df91c..0693633 100644
--- a/sheep/cluster.h
+++ b/sheep/cluster.h
@@ -109,33 +109,32 @@ struct cluster_driver {
 	int (*unblock)(void *msg, size_t msg_len);
 
 	/*
-	 * Init a distributed mutually exclusive lock to avoid race condition
-	 * when the whole sheepdog cluster process one exclusive resource.
+	 * Acquire the distributed lock.
 	 *
-	 * This function use 'lock_id' as the id of this distributed lock.
-	 * A thread can create many locks in one sheep daemon.
+	 * Create a distributed mutually exclusive lock to avoid race condition
+	 * and try to acquire the lock.
 	 *
-	 * Returns SD_RES_XXX
-	 */
-	int (*init_lock)(struct cluster_lock *lock, uint64_t lock_id);
-
-	/*
-	 * Acquire the distributed lock.
+	 * This function use 'lock_id' as the id of this distributed lock.
+	 * A thread can acquire many locks with different lock_id in one
+	 * sheep daemon.
 	 *
-	 * The cluster_lock referenced by 'lock' shall be locked by calling
-	 * cluster->lock(). If the cluster_lock is already locked, the calling
-	 * thread shall block until the cluster_lock becomes available.
+	 * The cluster lock referenced by 'lock' shall be locked by calling
+	 * cluster->lock(). If the cluster lock is already locked, the calling
+	 * thread shall block until the cluster lock becomes available.
 	 */
-	void (*lock)(struct cluster_lock *lock);
+	void (*lock)(uint64_t lock_id);
 
 	/*
 	 * Release the distributed lock.
 	 *
-	 * If the owner of the cluster_lock release it (or the owner is
+	 * If the owner of the cluster lock release it (or the owner is
 	 * killed by accident), zookeeper will trigger zk_watch() which will
 	 * wake up all waiting threads to compete new owner of the lock
+	 *
+	 * After all thread unlock, all the resource of this distributed lock
+	 * will be released.
 	 */
-	void (*unlock)(struct cluster_lock *lock);
+	void (*unlock)(uint64_t lock_id);
 
 	/*
 	 * Update the specific node in the driver's private copy of nodes
diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
index 19fc73c..6974dd9 100644
--- a/sheep/cluster/corosync.c
+++ b/sheep/cluster/corosync.c
@@ -774,16 +774,11 @@ again:
 	return 0;
 }
 
-static int corosync_init_lock(struct cluster_lock *cluster_lock, uint64_t id)
-{
-	return -1;
-}
-
-static void corosync_lock(struct cluster_lock *cluster_lock)
+static void corosync_lock(uint64_t lock_id)
 {
 }
 
-static void corosync_unlock(struct cluster_lock *cluster_lock)
+static void corosync_unlock(uint64_t lock_id)
 {
 }
 
@@ -807,7 +802,6 @@ static struct cluster_driver cdrv_corosync = {
 	.notify		= corosync_notify,
 	.block		= corosync_block,
 	.unblock	= corosync_unblock,
-	.init_lock	= corosync_init_lock,
 	.lock		= corosync_lock,
 	.unlock		= corosync_unlock,
 	.update_node	= corosync_update_node,
diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c
index 4c4d83b..6d0af68 100644
--- a/sheep/cluster/local.c
+++ b/sheep/cluster/local.c
@@ -547,16 +547,11 @@ static int local_init(const char *option)
 	return 0;
 }
 
-static int local_init_lock(struct cluster_lock *cluster_lock, uint64_t id)
+static void local_lock(uint64_t lock_id)
 {
-	return -1;
 }
 
-static void local_lock(struct cluster_lock *cluster_lock)
-{
-}
-
-static void local_unlock(struct cluster_lock *cluster_lock)
+static void local_unlock(uint64_t lock_id)
 {
 }
 
@@ -579,7 +574,6 @@ static struct cluster_driver cdrv_local = {
 	.notify		= local_notify,
 	.block		= local_block,
 	.unblock	= local_unblock,
-	.init_lock	= local_init_lock,
 	.lock		= local_lock,
 	.unlock		= local_unlock,
 	.update_node    = local_update_node,
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 9e4ffa5..3af20fd 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -32,6 +32,23 @@
 #define MASTER_ZNONE BASE_ZNODE "/master"
 #define LOCK_ZNODE BASE_ZNODE "/lock"
 
+static inline ZOOAPI int zk_init_node(const char *path);
+static inline ZOOAPI int zk_delete_node(const char *path, int version);
+
+/* structure for distributed lock */
+struct cluster_lock {
+	struct hlist_node hnode;
+	/* id is passed by users to represent a lock handle */
+	uint64_t id;
+	/* referenced by different threads in one sheepdog daemon */
+	uint64_t ref;
+	/* wait for the release of id by other lock owner */
+	pthread_mutex_t wait_wakeup;
+	/* lock for different threads of the same node on the same id */
+	pthread_mutex_t id_lock;
+	char lock_path[MAX_NODE_STR_LEN];
+};
+
 #define WAIT_TIME	1		/* second */
 
 #define HASH_BUCKET_NR	1021
@@ -43,49 +60,109 @@ static pthread_mutex_t table_locks[HASH_BUCKET_NR];
  * cluster_lock->id_lock so we don't need to add lock here
  */
 
-static void lock_table_del(uint64_t lock_id)
+static int lock_table_lookup_wakeup(uint64_t lock_id)
 {
 	uint64_t hval = sd_hash_64(lock_id) % HASH_BUCKET_NR;
+	int res = -1;
 	struct hlist_node *iter;
 	struct cluster_lock *lock;
 
 	pthread_mutex_lock(table_locks + hval);
 	hlist_for_each_entry(lock, iter, cluster_locks_table + hval, hnode) {
 		if (lock->id == lock_id) {
-			hlist_del(iter);
+			pthread_mutex_unlock(&lock->wait_wakeup);
+			res = 0;
 			break;
 		}
 	}
 	pthread_mutex_unlock(table_locks + hval);
+	return res;
 }
 
-static void lock_table_add(uint64_t lock_id,
-			   struct cluster_lock *cluster_lock)
+static struct cluster_lock *lock_table_lookup_acquire(uint64_t lock_id)
 {
 	uint64_t hval = sd_hash_64(lock_id) % HASH_BUCKET_NR;
+	int rc;
+	struct hlist_node *iter;
+	struct cluster_lock *lock, *ret_lock = NULL;
+	char path[MAX_NODE_STR_LEN];
 
 	pthread_mutex_lock(table_locks + hval);
-	hlist_add_head(&(cluster_lock->hnode), cluster_locks_table + hval);
+	hlist_for_each_entry(lock, iter, cluster_locks_table + hval, hnode) {
+		if (lock->id == lock_id) {
+			ret_lock = lock;
+			ret_lock->ref++;
+			break;
+		}
+	}
+
+	if (!ret_lock) {
+		/* create lock and add it to hash table */
+		ret_lock = xzalloc(sizeof(*ret_lock));
+		ret_lock->id = lock_id;
+		ret_lock->ref = 1;
+		snprintf(path, MAX_NODE_STR_LEN, LOCK_ZNODE "/%lu",
+			 ret_lock->id);
+		rc = zk_init_node(path);
+		if (rc)
+			panic("Failed to init node %s", path);
+
+		rc = pthread_mutex_init(&ret_lock->wait_wakeup, NULL);
+		if (rc)
+			panic("failed to init cluster_lock->wait_wakeup");
+
+		rc = pthread_mutex_init(&ret_lock->id_lock, NULL);
+		if (rc)
+			panic("failed to init cluster_lock->id_lock");
+
+		hlist_add_head(&(ret_lock->hnode), cluster_locks_table + hval);
+	}
 	pthread_mutex_unlock(table_locks + hval);
+
+	/*
+	 * if many threads use locks with same id, we should use
+	 * ->id_lock to avoid the only zookeeper handler to
+	 * create many seq-ephemeral files.
+	 */
+	pthread_mutex_lock(&ret_lock->id_lock);
+	return ret_lock;
 }
 
-static int lock_table_lookup_release(uint64_t lock_id)
+static void lock_table_lookup_release(uint64_t lock_id)
 {
 	uint64_t hval = sd_hash_64(lock_id) % HASH_BUCKET_NR;
-	int res = -1;
+	int rc;
 	struct hlist_node *iter;
 	struct cluster_lock *lock;
+	char path[MAX_NODE_STR_LEN];
 
 	pthread_mutex_lock(table_locks + hval);
 	hlist_for_each_entry(lock, iter, cluster_locks_table + hval, hnode) {
-		if (lock->id == lock_id) {
-			pthread_mutex_unlock(&lock->wait_release);
-			res = 0;
-			break;
+		if (lock->id != lock_id)
+			continue;
+		rc = zk_delete_node(lock->lock_path, -1);
+		if (rc != ZOK)
+			sd_err("Failed to delete path: %s %s", lock->lock_path,
+			       zerror(rc));
+		lock->lock_path[0] = '\0';
+		pthread_mutex_unlock(&lock->id_lock);
+		lock->ref--;
+		if (!lock->ref) {
+			hlist_del(iter);
+			/* free all resource used by this lock */
+			pthread_mutex_destroy(&lock->id_lock);
+			pthread_mutex_destroy(&lock->wait_wakeup);
+			snprintf(path, MAX_NODE_STR_LEN, LOCK_ZNODE "/%lu",
+				 lock->id);
+			rc = zk_delete_node(path, -1);
+			if (rc != ZOK)
+				sd_err("Failed to delete path: %s %s", path,
+				      zerror(rc));
+			free(lock);
 		}
+		break;
 	}
 	pthread_mutex_unlock(table_locks + hval);
-	return res;
 }
 
 /*
@@ -183,6 +260,7 @@ static struct zk_node this_node;
 	switch (rc) {							\
 	case ZNONODE:							\
 	case ZNODEEXISTS:						\
+	case ZNOTEMPTY:							\
 		break;							\
 	case ZINVALIDSTATE:						\
 	case ZSESSIONEXPIRED:						\
@@ -585,7 +663,7 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
 	}
 
 /* CREATED_EVENT 1, DELETED_EVENT 2, CHANGED_EVENT 3, CHILD_EVENT 4 */
-	sd_debug("path:%s, type:%d", path, type);
+	sd_debug("path:%s, type:%d, state:%d", path, type, state);
 	if (type == ZOO_CREATED_EVENT || type == ZOO_CHANGED_EVENT) {
 		ret = sscanf(path, MEMBER_ZNODE "/%s", str);
 		if (ret == 1)
@@ -598,7 +676,7 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
 		/* process distributed lock */
 		ret = sscanf(path, LOCK_ZNODE "/%lu/%s", &lock_id, str);
 		if (ret == 2) {
-			ret = lock_table_lookup_release(lock_id);
+			ret = lock_table_lookup_wakeup(lock_id);
 			if (ret)
 				sd_debug("release lock %lu %s", lock_id, str);
 			return;
@@ -1142,39 +1220,13 @@ kick_block_event:
 	kick_block_event();
 }
 
-static int zk_init_lock(struct cluster_lock *cluster_lock, uint64_t lock_id)
-{
-	int rc = 0;
-	char path[MAX_NODE_STR_LEN];
-
-	cluster_lock->id = lock_id;
-	snprintf(path, MAX_NODE_STR_LEN, LOCK_ZNODE "/%lu", cluster_lock->id);
-	rc = zk_init_node(path);
-	if (rc)
-		goto out;
-
-	rc = pthread_mutex_init(&cluster_lock->wait_release, NULL);
-	if (rc) {
-		sd_err("failed to init cluster_lock->wait_release");
-		goto out;
-	}
-
-	rc = pthread_mutex_init(&cluster_lock->id_lock, NULL);
-	if (rc) {
-		sd_err("failed to init cluster_lock->id_lock");
-		goto out;
-	}
-out:
-	return rc;
-}
-
 /*
  * This operation will create a seq-ephemeral znode in lock directory
  * of zookeeper (use lock-id as dir name). The smallest file path in
  * this directory wil be the owner of the lock; the other threads will
- * wait on a pthread_mutex_t (cluster_lock->wait_release)
+ * wait on a pthread_mutex_t (cluster_lock->wait_wakeup)
  */
-static void zk_lock(struct cluster_lock *cluster_lock)
+static void zk_lock(uint64_t lock_id)
 {
 	int flags = ZOO_SEQUENCE | ZOO_EPHEMERAL;
 	int rc, len = MAX_NODE_STR_LEN;
@@ -1182,15 +1234,9 @@ static void zk_lock(struct cluster_lock *cluster_lock)
 	char parent[MAX_NODE_STR_LEN];
 	char lowest_seq_path[MAX_NODE_STR_LEN];
 	char owner_name[MAX_NODE_STR_LEN];
+	struct cluster_lock *cluster_lock;
 
-	/*
-	 * if many threads use locks with same id, we should use
-	 * ->id_lock to avoid the only zookeeper handler to
-	 * create many seq-ephemeral files.
-	 */
-	pthread_mutex_lock(&cluster_lock->id_lock);
-
-	lock_table_add(cluster_lock->id, cluster_lock);
+	cluster_lock = lock_table_lookup_acquire(lock_id);
 
 	my_path = cluster_lock->lock_path;
 
@@ -1223,28 +1269,20 @@ static void zk_lock(struct cluster_lock *cluster_lock)
 		rc = zoo_exists(zhandle, lowest_seq_path, 1, NULL);
 		if (rc == ZOK) {
 			sd_debug("call zoo_exits success %s", lowest_seq_path);
-			pthread_mutex_lock(&cluster_lock->wait_release);
+			pthread_mutex_lock(&cluster_lock->wait_wakeup);
 		} else {
-			sd_err("failed to call zoo_exists %s", zerror(rc));
+			sd_debug("failed to call zoo_exists %s", zerror(rc));
 			if (rc != ZNONODE)
 				zk_wait();
 		}
 	}
 }
 
-static void zk_unlock(struct cluster_lock *cluster_lock)
+static void zk_unlock(uint64_t lock_id)
 {
-	int rc;
-	rc = zk_delete_node(cluster_lock->lock_path, -1);
-	if (rc != ZOK)
-		sd_err("Failed to delete path: %s %s",
-		       cluster_lock->lock_path, zerror(rc));
-
-	lock_table_del(cluster_lock->id);
-	pthread_mutex_unlock(&cluster_lock->id_lock);
+	lock_table_lookup_release(lock_id);
 }
 
-
 static int zk_init(const char *option)
 {
 	char *hosts, *to, *p;
@@ -1324,9 +1362,8 @@ static struct cluster_driver cdrv_zookeeper = {
 	.notify     = zk_notify,
 	.block      = zk_block,
 	.unblock    = zk_unblock,
-	.init_lock  = zk_init_lock,
-	.lock       = zk_lock,
-	.unlock     = zk_unlock,
+	.lock         = zk_lock,
+	.unlock       = zk_unlock,
 	.update_node  = zk_update_node,
 	.get_local_addr = get_local_addr,
 };
-- 
1.7.12.4




More information about the sheepdog mailing list