[sheepdog] [PATCH 3/4] sheep: add the distributed lock implemented by zookeeper for object-storage

Robin Dong robin.k.dong at gmail.com
Sun Nov 24 09:40:25 CET 2013


Implement the distributed lock by zookeeper (refer: http://zookeeper.apache.org/doc/trunk/recipes.html)
The routine is:
	1. create a seq-ephemeral znode in lock directory (use lock-id as dir name)
	2. get smallest file path as owner of the lock; the other thread wait on a  pthread_mutex_t
	3. if owner of the lock releas it (or the owner is kileed by accident), zookeeper will
           trigger zk_watch() which will wake up all waiting thread to compete new owner of the lock

We add ->local_lock for zk_mutex to avoid many threads in one sheepdog daemon to create too many files
in lock directory.

Signed-off-by: Robin Dong <sanbai at taobao.com>
---
 sheep/Makefile.am  |    2 +-
 sheep/http/http.c  |   37 ++++++++-
 sheep/http/lock.c  |  218 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 sheep/http/lock.h  |   35 ++++++++
 sheep/sheep.c      |    2 +-
 sheep/sheep_priv.h |    4 +-
 6 files changed, 291 insertions(+), 7 deletions(-)
 create mode 100644 sheep/http/lock.c
 create mode 100644 sheep/http/lock.h

diff --git a/sheep/Makefile.am b/sheep/Makefile.am
index 552e86a..578e183 100644
--- a/sheep/Makefile.am
+++ b/sheep/Makefile.am
@@ -30,7 +30,7 @@ sheep_SOURCES		= sheep.c group.c request.c gateway.c store.c vdi.c \
 			  plain_store.c config.c migrate.c md.c
 
 if BUILD_HTTP
-sheep_SOURCES		+= http/http.c http/kv.c http/s3.c http/swift.c
+sheep_SOURCES		+= http/http.c http/kv.c http/s3.c http/swift.c http/lock.c
 endif
 if BUILD_COROSYNC
 sheep_SOURCES		+= cluster/corosync.c
diff --git a/sheep/http/http.c b/sheep/http/http.c
index 04ef364..c1fc5a8 100644
--- a/sheep/http/http.c
+++ b/sheep/http/http.c
@@ -14,6 +14,7 @@
 /* This files implement RESTful interface to sheepdog storage via fastcgi */
 
 #include "http.h"
+#include "lock.h"
 #include "sheep_priv.h"
 #include "option.h"
 
@@ -342,11 +343,12 @@ static struct option_parser http_opt_parsers[] = {
 	{ NULL, NULL },
 };
 
-int http_init(const char *options)
+int http_init(const char *options, const char *cdrv_option)
 {
 	pthread_t t;
-	int err;
-	char *s, address[HOST_NAME_MAX + 8];
+	int err, timeout = LOCKS_TIMEOUT;
+	char *s, address[HOST_NAME_MAX + 8], *hosts, *to;
+	const char *p = cdrv_option;
 
 	s = strdup(options);
 	if (s == NULL) {
@@ -354,6 +356,35 @@ int http_init(const char *options)
 		return -1;
 	}
 
+	if (!cdrv_option) {
+		sd_err("You must specify zookeeper servers for zk_locks_init");
+		return -1;
+	}
+
+	to = strstr(cdrv_option, "=");
+	if (to) {
+		if (sscanf(++to, "%u", &timeout) != 1) {
+			sd_err("Invalid paramter for timeout");
+			return -1;
+		}
+		p = strstr(cdrv_option, "timeout");
+		if (!p) {
+			sd_err("Invalid parameter for timeout");
+			return -1;
+		}
+		p--;
+	}
+	hosts = strndup(cdrv_option, p - cdrv_option);
+	if (hosts == NULL) {
+		sd_emerg("OOM");
+		return -1;
+	}
+
+	if (zk_locks_init(hosts, timeout)) {
+		sd_debug("Failed to init zookeeper locks");
+		return -1;
+	}
+
 	if (option_parse(s, ",", http_opt_parsers) < 0)
 		return -1;
 
diff --git a/sheep/http/lock.c b/sheep/http/lock.c
new file mode 100644
index 0000000..790c6c1
--- /dev/null
+++ b/sheep/http/lock.c
@@ -0,0 +1,218 @@
+#include <zookeeper/zookeeper.h>
+#include "lock.h"
+
+#define MAX_MUTEX_NR	4096
+#define LOCK_ZNODE	"/sheepdog_lock"
+#define WAIT_TIME	1			/* seconds */
+
+/* iterate child znodes */
+#define FOR_EACH_ZNODE(parent, path, strs)			       \
+	for ((strs)->data += (strs)->count;			       \
+	     (strs)->count-- ?					       \
+		     snprintf(path, sizeof(path), "%s/%s", parent,     \
+			      *--(strs)->data) : (free((strs)->data), 0); \
+	     free(*(strs)->data))
+
+static zhandle_t *zhandle;
+/*
+ * when a seq file is deleted, we nned to wake up
+ * the lock who wait on the lock-id
+ */
+static struct zk_mutex **mutex_array;
+
+/* Wait a while when create, delete or get_children fail on zookeeper */
+static void zk_wait(void)
+{
+	sleep(WAIT_TIME);
+}
+
+/* zookeeper will call this function when same events happends */
+static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
+		       void *ctx)
+{
+	char str[MAX_NODE_STR_LEN];
+	uint32_t lock_id;
+
+	sd_debug("watch event type: %d state: %d", type, state);
+
+	if (type == ZOO_DELETED_EVENT) {
+		sscanf(path, LOCK_ZNODE "/%u/%s", &lock_id, str);
+		if (mutex_array && mutex_array[lock_id]) {
+			pthread_mutex_unlock(&(mutex_array[lock_id]->wait));
+			sd_debug("release lock %u %s", lock_id, str);
+		}
+	}
+}
+
+/* get smallest seq file in a zookeeper directory */
+static int zk_get_lowest_seq(const char *parent, char *lowest_seq_path,
+			    int path_len)
+{
+	char path[MAX_NODE_STR_LEN], *p, *tmp;
+	struct String_vector strs;
+	int rc, lowest_seq = INT_MAX , seq;
+
+	while (true) {
+		rc = zoo_get_children(zhandle, parent, 1, &strs);
+		if (rc == ZOK)
+			break;
+		sd_err("failed to get children %s, %s", parent, zerror(rc));
+		zk_wait();
+	}
+	FOR_EACH_ZNODE(parent, path, &strs) {
+		p = strrchr(path, '/');
+		seq = strtol(++p, &tmp, 10);
+		if (seq < lowest_seq)
+			lowest_seq = seq;
+	}
+	snprintf(lowest_seq_path, path_len, "%s/%010"PRId32,
+		 parent, lowest_seq);
+	sd_debug("get lowest path: %s", lowest_seq_path);
+	return 0;
+}
+
+int zk_locks_init(const char *hosts, int timeout)
+{
+	int rc;
+
+	sd_debug("hosts %s timeout %d", hosts, timeout);
+	mutex_array = xzalloc(sizeof(struct zk_mutex *) * MAX_MUTEX_NR);
+
+	zhandle = zookeeper_init(hosts, zk_watcher, timeout, NULL, NULL, 0);
+	if (!zhandle) {
+		sd_err("failed to connect to zk server for mutex");
+		return -1;
+	}
+
+	rc = zoo_create(zhandle, LOCK_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE,
+			0, NULL, 0);
+	if (rc != ZOK && rc != ZNODEEXISTS) {
+		sd_err("failed to create %s %s", LOCK_ZNODE, zerror(rc));
+		return -1;
+	}
+
+	return 0;
+}
+
+void zk_locks_close(void)
+{
+	zookeeper_close(zhandle);
+	free(mutex_array);
+}
+
+static int zk_create_lock_path(struct zk_mutex *mutex)
+{
+	char path[MAX_NODE_STR_LEN];
+	int rc;
+
+	snprintf(path, MAX_NODE_STR_LEN, LOCK_ZNODE "/%u", mutex->id);
+	rc = zoo_create(zhandle, path, "", 0, &ZOO_OPEN_ACL_UNSAFE,
+			0, NULL, 0);
+	if (rc != ZOK && rc != ZNODEEXISTS) {
+		sd_err("failed to create path: %s, %s", path, zerror(rc));
+		return rc;
+	}
+
+	return 0;
+}
+
+int zk_init_mutex(struct zk_mutex *mutex, uint32_t id)
+{
+	int rc;
+
+	if (id > MAX_MUTEX_NR) {
+		sd_err("lock-id is too large!");
+		rc = -1;
+		goto err;
+	}
+
+	mutex_array[id] = mutex;
+	mutex->id = id;
+	rc = zk_create_lock_path(mutex);
+	if (rc)
+		goto err;
+
+	rc = pthread_mutex_init(&mutex->wait, NULL);
+	if (rc) {
+		sd_err("failed to init mutex->wait");
+		goto err;
+	}
+
+	rc = pthread_mutex_init(&mutex->local_lock, NULL);
+	if (rc) {
+		sd_err("failed to init mutex->local_lock");
+		goto err;
+	}
+
+	return 0;
+err:
+	mutex_array[id] = NULL;
+	return rc;
+}
+
+void zk_lock_mutex(struct zk_mutex *mutex)
+{
+	int flags = ZOO_SEQUENCE | ZOO_EPHEMERAL;
+	int rc;
+	char *my_path;
+	char parent[MAX_NODE_STR_LEN];
+	char lowest_seq_path[MAX_NODE_STR_LEN];
+
+	/*
+	 * if many threads use locks with same id, we should use
+	 * ->local_lock to avoid the only zookeeper handler to
+	 * create many seq-ephemeral files.
+	 */
+	pthread_mutex_lock(&mutex->local_lock);
+
+	my_path = mutex->ephemeral_path;
+
+	snprintf(parent, MAX_NODE_STR_LEN, LOCK_ZNODE "/%u/", mutex->id);
+	while (true) {
+		rc = zoo_create(zhandle, parent, "", 0, &ZOO_OPEN_ACL_UNSAFE,
+				flags, my_path, MAX_NODE_STR_LEN);
+		if (rc == ZOK)
+			break;
+		sd_err("failed to create path:%s, %s", my_path, zerror(rc));
+		zk_wait();
+	}
+	sd_debug("create path %s success", my_path);
+
+	/* create node ok now */
+	snprintf(parent, MAX_NODE_STR_LEN, LOCK_ZNODE "/%u", mutex->id);
+	while (true) {
+		zk_get_lowest_seq(parent, lowest_seq_path, MAX_NODE_STR_LEN);
+
+		/* I got the lock */
+		if (!strncmp(lowest_seq_path, my_path, strlen(my_path))) {
+			sd_debug("I am master now. %s", lowest_seq_path);
+			return;
+		}
+
+		rc = zoo_exists(zhandle, lowest_seq_path, 1, NULL);
+		if (rc == ZOK) {
+			sd_debug("call zoo_exits success %s", lowest_seq_path);
+			pthread_mutex_lock(&mutex->wait);
+		} else {
+			sd_err("failed to call zoo_exists %s", zerror(rc));
+			if (rc != ZNONODE)
+				zk_wait();
+		}
+	}
+}
+
+void zk_unlock_mutex(struct zk_mutex *mutex)
+{
+	int rc;
+
+	while (true) {
+		rc = zoo_delete(zhandle, mutex->ephemeral_path, -1);
+		if (rc == ZOK || rc == ZNONODE)
+			break;
+		sd_err("failed to delete %s %s", mutex->ephemeral_path,
+		       zerror(rc));
+		zk_wait();
+	}
+
+	pthread_mutex_unlock(&mutex->local_lock);
+}
diff --git a/sheep/http/lock.h b/sheep/http/lock.h
new file mode 100644
index 0000000..2dc47f6
--- /dev/null
+++ b/sheep/http/lock.h
@@ -0,0 +1,35 @@
+/*
+ * Copyright (C) 2013 Robin Dong <sanbai at taobao.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#ifndef __SHEEP_HTTP_LOCK_H__
+#define __SHEEP_HTTP_LOCK_H__
+
+#define MAX_LOCK_NAME 64
+#define LOCKS_TIMEOUT 30000		/* millisecond */
+
+#include <stdint.h>
+#include <pthread.h>
+#include "sheep.h"
+
+struct zk_mutex {
+	uint32_t id; /* id of this mutex */
+	pthread_mutex_t wait;
+	pthread_mutex_t local_lock;
+	char ephemeral_path[MAX_NODE_STR_LEN];
+};
+
+int zk_locks_init(const char *hosts, int timeout);
+void zk_locks_close(void);
+
+int zk_init_mutex(struct zk_mutex *mutex, uint32_t id);
+void zk_lock_mutex(struct zk_mutex *mutex);
+void zk_unlock_mutex(struct zk_mutex *mutex);
+
+#endif /* __SHEEP_HTTP_LOCK_H__ */
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 9d9afa0..f465fa6 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -820,7 +820,7 @@ int main(int argc, char **argv)
 	if (ret)
 		exit(1);
 
-	if (http_options && http_init(http_options) != 0)
+	if (http_options && http_init(http_options, sys->cdrv_option) != 0)
 		exit(1);
 
 	if (pid_file && (create_pidfile(pid_file) != 0)) {
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index d333573..8887497 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -472,9 +472,9 @@ uint64_t md_get_size(uint64_t *used);
 
 /* http.c */
 #ifdef HAVE_HTTP
-int http_init(const char *options);
+int http_init(const char *options, const char *cdrv_option);
 #else
-static inline int http_init(const char *options)
+static inline int http_init(const char *options, const char *cdev_option)
 {
 	sd_notice("http service is not complied");
 	return 0;
-- 
1.7.1




More information about the sheepdog mailing list