[sheepdog] [PATCH 3/4] sheep: add the distributed lock implemented by zookeeper for object-storage
Robin Dong
robin.k.dong at gmail.com
Sun Nov 24 09:40:25 CET 2013
Implement the distributed lock by zookeeper (refer: http://zookeeper.apache.org/doc/trunk/recipes.html)
The routine is:
1. create a seq-ephemeral znode in lock directory (use lock-id as dir name)
2. get smallest file path as owner of the lock; the other thread wait on a pthread_mutex_t
3. if owner of the lock releas it (or the owner is kileed by accident), zookeeper will
trigger zk_watch() which will wake up all waiting thread to compete new owner of the lock
We add ->local_lock for zk_mutex to avoid many threads in one sheepdog daemon to create too many files
in lock directory.
Signed-off-by: Robin Dong <sanbai at taobao.com>
---
sheep/Makefile.am | 2 +-
sheep/http/http.c | 37 ++++++++-
sheep/http/lock.c | 218 ++++++++++++++++++++++++++++++++++++++++++++++++++++
sheep/http/lock.h | 35 ++++++++
sheep/sheep.c | 2 +-
sheep/sheep_priv.h | 4 +-
6 files changed, 291 insertions(+), 7 deletions(-)
create mode 100644 sheep/http/lock.c
create mode 100644 sheep/http/lock.h
diff --git a/sheep/Makefile.am b/sheep/Makefile.am
index 552e86a..578e183 100644
--- a/sheep/Makefile.am
+++ b/sheep/Makefile.am
@@ -30,7 +30,7 @@ sheep_SOURCES = sheep.c group.c request.c gateway.c store.c vdi.c \
plain_store.c config.c migrate.c md.c
if BUILD_HTTP
-sheep_SOURCES += http/http.c http/kv.c http/s3.c http/swift.c
+sheep_SOURCES += http/http.c http/kv.c http/s3.c http/swift.c http/lock.c
endif
if BUILD_COROSYNC
sheep_SOURCES += cluster/corosync.c
diff --git a/sheep/http/http.c b/sheep/http/http.c
index 04ef364..c1fc5a8 100644
--- a/sheep/http/http.c
+++ b/sheep/http/http.c
@@ -14,6 +14,7 @@
/* This files implement RESTful interface to sheepdog storage via fastcgi */
#include "http.h"
+#include "lock.h"
#include "sheep_priv.h"
#include "option.h"
@@ -342,11 +343,12 @@ static struct option_parser http_opt_parsers[] = {
{ NULL, NULL },
};
-int http_init(const char *options)
+int http_init(const char *options, const char *cdrv_option)
{
pthread_t t;
- int err;
- char *s, address[HOST_NAME_MAX + 8];
+ int err, timeout = LOCKS_TIMEOUT;
+ char *s, address[HOST_NAME_MAX + 8], *hosts, *to;
+ const char *p = cdrv_option;
s = strdup(options);
if (s == NULL) {
@@ -354,6 +356,35 @@ int http_init(const char *options)
return -1;
}
+ if (!cdrv_option) {
+ sd_err("You must specify zookeeper servers for zk_locks_init");
+ return -1;
+ }
+
+ to = strstr(cdrv_option, "=");
+ if (to) {
+ if (sscanf(++to, "%u", &timeout) != 1) {
+ sd_err("Invalid paramter for timeout");
+ return -1;
+ }
+ p = strstr(cdrv_option, "timeout");
+ if (!p) {
+ sd_err("Invalid parameter for timeout");
+ return -1;
+ }
+ p--;
+ }
+ hosts = strndup(cdrv_option, p - cdrv_option);
+ if (hosts == NULL) {
+ sd_emerg("OOM");
+ return -1;
+ }
+
+ if (zk_locks_init(hosts, timeout)) {
+ sd_debug("Failed to init zookeeper locks");
+ return -1;
+ }
+
if (option_parse(s, ",", http_opt_parsers) < 0)
return -1;
diff --git a/sheep/http/lock.c b/sheep/http/lock.c
new file mode 100644
index 0000000..790c6c1
--- /dev/null
+++ b/sheep/http/lock.c
@@ -0,0 +1,218 @@
+#include <zookeeper/zookeeper.h>
+#include "lock.h"
+
+#define MAX_MUTEX_NR 4096
+#define LOCK_ZNODE "/sheepdog_lock"
+#define WAIT_TIME 1 /* seconds */
+
+/* iterate child znodes */
+#define FOR_EACH_ZNODE(parent, path, strs) \
+ for ((strs)->data += (strs)->count; \
+ (strs)->count-- ? \
+ snprintf(path, sizeof(path), "%s/%s", parent, \
+ *--(strs)->data) : (free((strs)->data), 0); \
+ free(*(strs)->data))
+
+static zhandle_t *zhandle;
+/*
+ * when a seq file is deleted, we nned to wake up
+ * the lock who wait on the lock-id
+ */
+static struct zk_mutex **mutex_array;
+
+/* Wait a while when create, delete or get_children fail on zookeeper */
+static void zk_wait(void)
+{
+ sleep(WAIT_TIME);
+}
+
+/* zookeeper will call this function when same events happends */
+static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
+ void *ctx)
+{
+ char str[MAX_NODE_STR_LEN];
+ uint32_t lock_id;
+
+ sd_debug("watch event type: %d state: %d", type, state);
+
+ if (type == ZOO_DELETED_EVENT) {
+ sscanf(path, LOCK_ZNODE "/%u/%s", &lock_id, str);
+ if (mutex_array && mutex_array[lock_id]) {
+ pthread_mutex_unlock(&(mutex_array[lock_id]->wait));
+ sd_debug("release lock %u %s", lock_id, str);
+ }
+ }
+}
+
+/* get smallest seq file in a zookeeper directory */
+static int zk_get_lowest_seq(const char *parent, char *lowest_seq_path,
+ int path_len)
+{
+ char path[MAX_NODE_STR_LEN], *p, *tmp;
+ struct String_vector strs;
+ int rc, lowest_seq = INT_MAX , seq;
+
+ while (true) {
+ rc = zoo_get_children(zhandle, parent, 1, &strs);
+ if (rc == ZOK)
+ break;
+ sd_err("failed to get children %s, %s", parent, zerror(rc));
+ zk_wait();
+ }
+ FOR_EACH_ZNODE(parent, path, &strs) {
+ p = strrchr(path, '/');
+ seq = strtol(++p, &tmp, 10);
+ if (seq < lowest_seq)
+ lowest_seq = seq;
+ }
+ snprintf(lowest_seq_path, path_len, "%s/%010"PRId32,
+ parent, lowest_seq);
+ sd_debug("get lowest path: %s", lowest_seq_path);
+ return 0;
+}
+
+int zk_locks_init(const char *hosts, int timeout)
+{
+ int rc;
+
+ sd_debug("hosts %s timeout %d", hosts, timeout);
+ mutex_array = xzalloc(sizeof(struct zk_mutex *) * MAX_MUTEX_NR);
+
+ zhandle = zookeeper_init(hosts, zk_watcher, timeout, NULL, NULL, 0);
+ if (!zhandle) {
+ sd_err("failed to connect to zk server for mutex");
+ return -1;
+ }
+
+ rc = zoo_create(zhandle, LOCK_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE,
+ 0, NULL, 0);
+ if (rc != ZOK && rc != ZNODEEXISTS) {
+ sd_err("failed to create %s %s", LOCK_ZNODE, zerror(rc));
+ return -1;
+ }
+
+ return 0;
+}
+
+void zk_locks_close(void)
+{
+ zookeeper_close(zhandle);
+ free(mutex_array);
+}
+
+static int zk_create_lock_path(struct zk_mutex *mutex)
+{
+ char path[MAX_NODE_STR_LEN];
+ int rc;
+
+ snprintf(path, MAX_NODE_STR_LEN, LOCK_ZNODE "/%u", mutex->id);
+ rc = zoo_create(zhandle, path, "", 0, &ZOO_OPEN_ACL_UNSAFE,
+ 0, NULL, 0);
+ if (rc != ZOK && rc != ZNODEEXISTS) {
+ sd_err("failed to create path: %s, %s", path, zerror(rc));
+ return rc;
+ }
+
+ return 0;
+}
+
+int zk_init_mutex(struct zk_mutex *mutex, uint32_t id)
+{
+ int rc;
+
+ if (id > MAX_MUTEX_NR) {
+ sd_err("lock-id is too large!");
+ rc = -1;
+ goto err;
+ }
+
+ mutex_array[id] = mutex;
+ mutex->id = id;
+ rc = zk_create_lock_path(mutex);
+ if (rc)
+ goto err;
+
+ rc = pthread_mutex_init(&mutex->wait, NULL);
+ if (rc) {
+ sd_err("failed to init mutex->wait");
+ goto err;
+ }
+
+ rc = pthread_mutex_init(&mutex->local_lock, NULL);
+ if (rc) {
+ sd_err("failed to init mutex->local_lock");
+ goto err;
+ }
+
+ return 0;
+err:
+ mutex_array[id] = NULL;
+ return rc;
+}
+
+void zk_lock_mutex(struct zk_mutex *mutex)
+{
+ int flags = ZOO_SEQUENCE | ZOO_EPHEMERAL;
+ int rc;
+ char *my_path;
+ char parent[MAX_NODE_STR_LEN];
+ char lowest_seq_path[MAX_NODE_STR_LEN];
+
+ /*
+ * if many threads use locks with same id, we should use
+ * ->local_lock to avoid the only zookeeper handler to
+ * create many seq-ephemeral files.
+ */
+ pthread_mutex_lock(&mutex->local_lock);
+
+ my_path = mutex->ephemeral_path;
+
+ snprintf(parent, MAX_NODE_STR_LEN, LOCK_ZNODE "/%u/", mutex->id);
+ while (true) {
+ rc = zoo_create(zhandle, parent, "", 0, &ZOO_OPEN_ACL_UNSAFE,
+ flags, my_path, MAX_NODE_STR_LEN);
+ if (rc == ZOK)
+ break;
+ sd_err("failed to create path:%s, %s", my_path, zerror(rc));
+ zk_wait();
+ }
+ sd_debug("create path %s success", my_path);
+
+ /* create node ok now */
+ snprintf(parent, MAX_NODE_STR_LEN, LOCK_ZNODE "/%u", mutex->id);
+ while (true) {
+ zk_get_lowest_seq(parent, lowest_seq_path, MAX_NODE_STR_LEN);
+
+ /* I got the lock */
+ if (!strncmp(lowest_seq_path, my_path, strlen(my_path))) {
+ sd_debug("I am master now. %s", lowest_seq_path);
+ return;
+ }
+
+ rc = zoo_exists(zhandle, lowest_seq_path, 1, NULL);
+ if (rc == ZOK) {
+ sd_debug("call zoo_exits success %s", lowest_seq_path);
+ pthread_mutex_lock(&mutex->wait);
+ } else {
+ sd_err("failed to call zoo_exists %s", zerror(rc));
+ if (rc != ZNONODE)
+ zk_wait();
+ }
+ }
+}
+
+void zk_unlock_mutex(struct zk_mutex *mutex)
+{
+ int rc;
+
+ while (true) {
+ rc = zoo_delete(zhandle, mutex->ephemeral_path, -1);
+ if (rc == ZOK || rc == ZNONODE)
+ break;
+ sd_err("failed to delete %s %s", mutex->ephemeral_path,
+ zerror(rc));
+ zk_wait();
+ }
+
+ pthread_mutex_unlock(&mutex->local_lock);
+}
diff --git a/sheep/http/lock.h b/sheep/http/lock.h
new file mode 100644
index 0000000..2dc47f6
--- /dev/null
+++ b/sheep/http/lock.h
@@ -0,0 +1,35 @@
+/*
+ * Copyright (C) 2013 Robin Dong <sanbai at taobao.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#ifndef __SHEEP_HTTP_LOCK_H__
+#define __SHEEP_HTTP_LOCK_H__
+
+#define MAX_LOCK_NAME 64
+#define LOCKS_TIMEOUT 30000 /* millisecond */
+
+#include <stdint.h>
+#include <pthread.h>
+#include "sheep.h"
+
+struct zk_mutex {
+ uint32_t id; /* id of this mutex */
+ pthread_mutex_t wait;
+ pthread_mutex_t local_lock;
+ char ephemeral_path[MAX_NODE_STR_LEN];
+};
+
+int zk_locks_init(const char *hosts, int timeout);
+void zk_locks_close(void);
+
+int zk_init_mutex(struct zk_mutex *mutex, uint32_t id);
+void zk_lock_mutex(struct zk_mutex *mutex);
+void zk_unlock_mutex(struct zk_mutex *mutex);
+
+#endif /* __SHEEP_HTTP_LOCK_H__ */
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 9d9afa0..f465fa6 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -820,7 +820,7 @@ int main(int argc, char **argv)
if (ret)
exit(1);
- if (http_options && http_init(http_options) != 0)
+ if (http_options && http_init(http_options, sys->cdrv_option) != 0)
exit(1);
if (pid_file && (create_pidfile(pid_file) != 0)) {
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index d333573..8887497 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -472,9 +472,9 @@ uint64_t md_get_size(uint64_t *used);
/* http.c */
#ifdef HAVE_HTTP
-int http_init(const char *options);
+int http_init(const char *options, const char *cdrv_option);
#else
-static inline int http_init(const char *options)
+static inline int http_init(const char *options, const char *cdev_option)
{
sd_notice("http service is not complied");
return 0;
--
1.7.1
More information about the sheepdog
mailing list