[sheepdog] [PATCH v2 1/2] cluster: add the distributed lock implemented by zookeeper for object-storage

Robin Dong robin.k.dong at gmail.com
Fri Nov 29 10:10:31 CET 2013


Implement the distributed lock by zookeeper (refer: http://zookeeper.apache.org/doc/trunk/recipes.html)
The routine is:
        1. create a seq-ephemeral znode in lock directory (use lock-id as dir name)
        2. get smallest file path as owner of the lock; the other thread wait on a pthread_mutex_t
        3. if owner of the lock release it (or the owner is killed by accident), zookeeper will
           trigger zk_watch() which will wake up all waiting threads to compete new owner of the lock

We add ->local_lock for dist_mutex to avoid many threads in one sheepdog daemon to create too many files
in a lock directory.

Signed-off-by: Robin Dong <sanbai at taobao.com>
---
 sheep/cluster.h           |   15 +++++
 sheep/cluster/zookeeper.c |  140 ++++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 154 insertions(+), 1 deletions(-)

diff --git a/sheep/cluster.h b/sheep/cluster.h
index 81b5ae4..0aa058c 100644
--- a/sheep/cluster.h
+++ b/sheep/cluster.h
@@ -23,6 +23,13 @@
 #include "sheep.h"
 #include "config.h"
 
+struct dist_mutex {
+	uint32_t id;		/* id of this mutex */
+	pthread_mutex_t wait;
+	pthread_mutex_t local_lock;
+	char ephemeral_path[MAX_NODE_STR_LEN];
+};
+
 /*
  * maximum payload size sent in ->notify and ->unblock, it should be large
  * enough to support COROSYNC_MAX_NODES * struct sd_node
@@ -109,6 +116,14 @@ struct cluster_driver {
 	int (*unblock)(void *msg, size_t msg_len);
 
 	/*
+	 * A distributed mutex lock to avoid race condition when using swift
+	 * interface to add/delete/list object.
+	 */
+	int (*init_mutex)(struct dist_mutex *mutex, uint32_t id);
+	void (*lock_mutex)(struct dist_mutex *mutex);
+	void (*unlock_mutex)(struct dist_mutex *mutex);
+
+	/*
 	 * Update the specific node in the driver's private copy of nodes
 	 *
 	 * Returns SD_RES_XXX
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index fa89c46..ddf6a22 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -30,6 +30,21 @@
 #define QUEUE_ZNODE BASE_ZNODE "/queue"
 #define MEMBER_ZNODE BASE_ZNODE "/member"
 #define MASTER_ZNONE BASE_ZNODE "/master"
+#define LOCK_ZNODE BASE_ZNODE "/lock"
+
+#define MAX_MUTEX_NR	4096
+#define WAIT_TIME	1		/* second */
+
+static struct dist_mutex **mutex_array;
+
+/*
+ * Wait a while when create, delete or get_children fail on
+ * zookeeper lock so it will not print too much loop log
+ */
+static void zk_wait(void)
+{
+	sleep(WAIT_TIME);
+}
 
 /* iterate child znodes */
 #define FOR_EACH_ZNODE(parent, path, strs)			       \
@@ -506,6 +521,7 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
 {
 	struct zk_node znode;
 	char str[MAX_NODE_STR_LEN], *p;
+	uint32_t lock_id;
 	int ret;
 
 	if (type == ZOO_SESSION_EVENT && state == ZOO_EXPIRED_SESSION_STATE) {
@@ -528,6 +544,14 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
 	} else if (type == ZOO_DELETED_EVENT) {
 		struct zk_node *n;
 
+		/* process distributed lock */
+		ret = sscanf(path, LOCK_ZNODE "/%u/%s", &lock_id, str);
+		if (ret == 2 && lock_id < MAX_MUTEX_NR &&
+		    mutex_array && mutex_array[lock_id]) {
+			pthread_mutex_unlock(&(mutex_array[lock_id]->wait));
+			sd_debug("release lock %u %s", lock_id, str);
+		}
+
 		ret = sscanf(path, MASTER_ZNONE "/%s", str);
 		if (ret == 1) {
 			zk_compete_master();
@@ -1058,6 +1082,108 @@ kick_block_event:
 	kick_block_event();
 }
 
+static int zk_init_mutex(struct dist_mutex *mutex, uint32_t id)
+{
+	int rc;
+	char path[MAX_NODE_STR_LEN];
+
+	if (id > MAX_MUTEX_NR) {
+		sd_err("lock-id is too large!");
+		rc = -1;
+		goto err;
+	}
+
+	mutex_array[id] = mutex;
+	mutex->id = id;
+	snprintf(path, MAX_NODE_STR_LEN, LOCK_ZNODE "/%u", mutex->id);
+	rc = zk_init_node(path);
+	if (rc)
+		goto err;
+
+	rc = pthread_mutex_init(&mutex->wait, NULL);
+	if (rc) {
+		sd_err("failed to init mutex->wait");
+		goto err;
+	}
+
+	rc = pthread_mutex_init(&mutex->local_lock, NULL);
+	if (rc) {
+		sd_err("failed to init mutex->local_lock");
+		goto err;
+	}
+
+	return 0;
+err:
+	mutex_array[id] = NULL;
+	return rc;
+}
+
+static void zk_lock_mutex(struct dist_mutex *mutex)
+{
+	int flags = ZOO_SEQUENCE | ZOO_EPHEMERAL;
+	int rc, len = MAX_NODE_STR_LEN;
+	char *my_path;
+	char parent[MAX_NODE_STR_LEN];
+	char lowest_seq_path[MAX_NODE_STR_LEN];
+	char owner_name[MAX_NODE_STR_LEN];
+
+	/*
+	 * if many threads use locks with same id, we should use
+	 * ->local_lock to avoid the only zookeeper handler to
+	 * create many seq-ephemeral files.
+	 */
+	pthread_mutex_lock(&mutex->local_lock);
+
+	my_path = mutex->ephemeral_path;
+
+	/* compete owner of lock is just like zk_compete_master() */
+	snprintf(parent, MAX_NODE_STR_LEN, LOCK_ZNODE "/%u/", mutex->id);
+	while (true) {
+		rc = zoo_create(zhandle, parent, "", 0, &ZOO_OPEN_ACL_UNSAFE,
+				flags, my_path, MAX_NODE_STR_LEN);
+		if (rc == ZOK)
+			break;
+		sd_err("failed to create path:%s, %s", my_path, zerror(rc));
+		zk_wait();
+	}
+	sd_debug("create path %s success", my_path);
+
+	/* create node ok now */
+	snprintf(parent, MAX_NODE_STR_LEN, LOCK_ZNODE "/%u", mutex->id);
+	while (true) {
+		zk_get_least_seq(parent, lowest_seq_path, MAX_NODE_STR_LEN,
+				 owner_name, &len);
+
+		/* I got the lock */
+		if (!strncmp(lowest_seq_path, my_path, strlen(my_path))) {
+			sd_debug("I am master now. %s", lowest_seq_path);
+			return;
+		}
+
+		/* I failed to get the lock */
+		rc = zoo_exists(zhandle, lowest_seq_path, 1, NULL);
+		if (rc == ZOK) {
+			sd_debug("call zoo_exits success %s", lowest_seq_path);
+			pthread_mutex_lock(&mutex->wait);
+		} else {
+			sd_err("failed to call zoo_exists %s", zerror(rc));
+			if (rc != ZNONODE)
+				zk_wait();
+		}
+	}
+}
+
+static void zk_unlock_mutex(struct dist_mutex *mutex)
+{
+	int rc;
+	rc = zk_delete_node(mutex->ephemeral_path, -1);
+	if (rc != ZOK)
+		sd_err("Failed to delete path: %s %s", mutex->ephemeral_path,
+		       zerror(rc));
+	pthread_mutex_unlock(&mutex->local_lock);
+}
+
+
 static int zk_init(const char *option)
 {
 	char *hosts, *to, *p;
@@ -1102,6 +1228,15 @@ static int zk_init(const char *option)
 		return -1;
 	}
 
+	/* init distributed mutex lock */
+	mutex_array = xzalloc(sizeof(struct zk_mutex *) * MAX_MUTEX_NR);
+
+	ret = zk_init_node(LOCK_ZNODE);
+	if (ret != ZOK) {
+		sd_err("Failed to create %s %s", LOCK_ZNODE, zerror(ret));
+		free(mutex_array);
+		return -1;
+	}
 	return 0;
 }
 
@@ -1122,7 +1257,10 @@ static struct cluster_driver cdrv_zookeeper = {
 	.notify     = zk_notify,
 	.block      = zk_block,
 	.unblock    = zk_unblock,
-	.update_node = zk_update_node,
+	.init_mutex   = zk_init_mutex,
+	.lock_mutex   = zk_lock_mutex,
+	.unlock_mutex = zk_unlock_mutex,
+	.update_node  = zk_update_node,
 	.get_local_addr = get_local_addr,
 };
 
-- 
1.7.1




More information about the sheepdog mailing list