From: Robin Dong <sanbai at taobao.com> Implement the distributed lock by zookeeper (refer:http://zookeeper.apache.org/doc/trunk/recipes.html) The routine is: 1. create a seq-ephemeral znode in lock directory (use lock-id as dir name) 2. get smallest file path as owner of the lock; the other thread wait on a pthread_mutex_t (cluster_lock->wait) 3. if owner of the lock release it (or the owner is killed by accident), zookeeper will trigger zk_watch() which will wake up all waiting threads to compete new owner of the lock We use dlock_array to store pointers of cluster_locks in this sheep daemon so when receiving the event of ZOO_DELETED_EVENT the program will wake up all waiters (in this sheep daemon) who is sleeping on the lock id and let them compete for new owner. dlock_array is just a normal array using lock-id as index, so imaging a scenario: two threads (A and B) in one sheep daemon call zk_lock() for same lock-id, they will create two znodes in zookeeper but set dlock_array[lock_id] to only one of them (for example, set to B). After that, when ZOO_DELETED_EVENT comes, the zk_waiter() will only wake up thread B and thread A will sleep on '->wait' forever becuase no one could wakeup him. We have two method to solve this problem: A. using more complicated structure instead of dlock_array to store both A and B 's lock handle. B. adding a lock to avoid A and B call zk_lock() in the same time. We prefer method B because it also avoid creating too many files in a directory of zookeeper which will take too much pressure on zookeeper server if the number of sheep deamons are huge. Therefore we add 'local_lock' in 'struct cluster_lock'. v6 --> v7: 1. change bucket number of lock table from 4097 to 1021 2. use sd_hash() for lock table v5 --> v6: 1. change name of "wait" to "wait_relase" 2. change name of "local_lock" to "id_lock" 3. change name of "ephemeral_path" to "lock_path" 4. modify the names of functions for lock table v4 --> v5: 1. rename 'dlock' to 'cluster_lock' 2. change dlock_array to hash table v3 --> v4: 1. change some comment 2. change type of 'lock_id' from uint32_t to uint64_t v2 --> v3: 1. change cluster interface to init_lock()/lock()/unlock() 2. change 'struct zk_mutex' to 'struct cluster_lock' 3. add empty implementation to local/corosync v1 --> v2: move code from sheep/http/lock.c into sheep/cluster/zookeeper.c using cluster framework Signed-off-by: Robin Dong <sanbai at taobao.com> --- include/sheep.h | 12 +++ sheep/cluster.h | 29 +++++++ sheep/cluster/corosync.c | 16 ++++ sheep/cluster/local.c | 16 ++++ sheep/cluster/zookeeper.c | 209 +++++++++++++++++++++++++++++++++++++++++++++- 5 files changed, 280 insertions(+), 2 deletions(-) diff --git a/include/sheep.h b/include/sheep.h index 293e057..e5726e8 100644 --- a/include/sheep.h +++ b/include/sheep.h @@ -255,6 +255,18 @@ static inline void nodes_to_buffer(struct rb_root *nroot, void *buffer) #define MAX_NODE_STR_LEN 256 +/* structure for distributed lock */ +struct cluster_lock { + struct hlist_node hnode; + /* id is passed by users to represent a lock handle */ + uint64_t id; + /* wait for the release of id by other lock owner */ + pthread_mutex_t wait_release; + /* lock for different threads of the same node on the same id */ + pthread_mutex_t id_lock; + char lock_path[MAX_NODE_STR_LEN]; +}; + static inline const char *node_to_str(const struct sd_node *id) { static __thread char str[MAX_NODE_STR_LEN]; diff --git a/sheep/cluster.h b/sheep/cluster.h index 81b5ae4..08df91c 100644 --- a/sheep/cluster.h +++ b/sheep/cluster.h @@ -109,6 +109,35 @@ struct cluster_driver { int (*unblock)(void *msg, size_t msg_len); /* + * Init a distributed mutually exclusive lock to avoid race condition + * when the whole sheepdog cluster process one exclusive resource. + * + * This function use 'lock_id' as the id of this distributed lock. + * A thread can create many locks in one sheep daemon. + * + * Returns SD_RES_XXX + */ + int (*init_lock)(struct cluster_lock *lock, uint64_t lock_id); + + /* + * Acquire the distributed lock. + * + * The cluster_lock referenced by 'lock' shall be locked by calling + * cluster->lock(). If the cluster_lock is already locked, the calling + * thread shall block until the cluster_lock becomes available. + */ + void (*lock)(struct cluster_lock *lock); + + /* + * Release the distributed lock. + * + * If the owner of the cluster_lock release it (or the owner is + * killed by accident), zookeeper will trigger zk_watch() which will + * wake up all waiting threads to compete new owner of the lock + */ + void (*unlock)(struct cluster_lock *lock); + + /* * Update the specific node in the driver's private copy of nodes * * Returns SD_RES_XXX diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c index ea4421b..19fc73c 100644 --- a/sheep/cluster/corosync.c +++ b/sheep/cluster/corosync.c @@ -774,6 +774,19 @@ again: return 0; } +static int corosync_init_lock(struct cluster_lock *cluster_lock, uint64_t id) +{ + return -1; +} + +static void corosync_lock(struct cluster_lock *cluster_lock) +{ +} + +static void corosync_unlock(struct cluster_lock *cluster_lock) +{ +} + static int corosync_update_node(struct sd_node *node) { struct cpg_node cnode = this_node; @@ -794,6 +807,9 @@ static struct cluster_driver cdrv_corosync = { .notify = corosync_notify, .block = corosync_block, .unblock = corosync_unblock, + .init_lock = corosync_init_lock, + .lock = corosync_lock, + .unlock = corosync_unlock, .update_node = corosync_update_node, }; diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c index b8cbb5c..4c4d83b 100644 --- a/sheep/cluster/local.c +++ b/sheep/cluster/local.c @@ -547,6 +547,19 @@ static int local_init(const char *option) return 0; } +static int local_init_lock(struct cluster_lock *cluster_lock, uint64_t id) +{ + return -1; +} + +static void local_lock(struct cluster_lock *cluster_lock) +{ +} + +static void local_unlock(struct cluster_lock *cluster_lock) +{ +} + static int local_update_node(struct sd_node *node) { struct local_node lnode = this_node; @@ -566,6 +579,9 @@ static struct cluster_driver cdrv_local = { .notify = local_notify, .block = local_block, .unblock = local_unblock, + .init_lock = local_init_lock, + .lock = local_lock, + .unlock = local_unlock, .update_node = local_update_node, }; diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c index fa89c46..debe18f 100644 --- a/sheep/cluster/zookeeper.c +++ b/sheep/cluster/zookeeper.c @@ -30,6 +30,72 @@ #define QUEUE_ZNODE BASE_ZNODE "/queue" #define MEMBER_ZNODE BASE_ZNODE "/member" #define MASTER_ZNONE BASE_ZNODE "/master" +#define LOCK_ZNODE BASE_ZNODE "/lock" + +#define WAIT_TIME 1 /* second */ + +#define HASH_BUCKET_NR 1021 +static struct hlist_head *cluster_locks_table; +static pthread_mutex_t table_locks[HASH_BUCKET_NR]; + +/* + * All the operations of the lock table is protected by + * cluster_lock->id_lock so we don't need to add lock here + */ + +static void lock_table_del(uint64_t lock_id) +{ + uint64_t hval = sd_hash(&lock_id, sizeof(lock_id)) % HASH_BUCKET_NR; + struct hlist_node *iter; + struct cluster_lock *lock; + + pthread_mutex_lock(table_locks + hval); + hlist_for_each_entry(lock, iter, cluster_locks_table + hval, hnode) { + if (lock->id == lock_id) { + hlist_del(iter); + break; + } + } + pthread_mutex_unlock(table_locks + hval); +} + +static void lock_table_add(uint64_t lock_id, + struct cluster_lock *cluster_lock) +{ + uint64_t hval = sd_hash(&lock_id, sizeof(lock_id)) % HASH_BUCKET_NR; + + pthread_mutex_lock(table_locks + hval); + hlist_add_head(&(cluster_lock->hnode), cluster_locks_table + hval); + pthread_mutex_unlock(table_locks + hval); +} + +static int lock_table_lookup_release(uint64_t lock_id) +{ + uint64_t hval = sd_hash(&lock_id, sizeof(lock_id)) % HASH_BUCKET_NR; + int res = -1; + struct hlist_node *iter; + struct cluster_lock *lock; + + pthread_mutex_lock(table_locks + hval); + hlist_for_each_entry(lock, iter, cluster_locks_table + hval, hnode) { + if (lock->id == lock_id) { + pthread_mutex_unlock(&lock->wait_release); + res = 0; + break; + } + } + pthread_mutex_unlock(table_locks + hval); + return res; +} + +/* + * Wait a while when create, delete or get_children fail on + * zookeeper lock so it will not print too much loop log + */ +static void zk_wait(void) +{ + sleep(WAIT_TIME); +} /* iterate child znodes */ #define FOR_EACH_ZNODE(parent, path, strs) \ @@ -506,6 +572,7 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path, { struct zk_node znode; char str[MAX_NODE_STR_LEN], *p; + uint64_t lock_id; int ret; if (type == ZOO_SESSION_EVENT && state == ZOO_EXPIRED_SESSION_STATE) { @@ -528,6 +595,15 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path, } else if (type == ZOO_DELETED_EVENT) { struct zk_node *n; + /* process distributed lock */ + ret = sscanf(path, LOCK_ZNODE "/%lu/%s", &lock_id, str); + if (ret == 2) { + ret = lock_table_lookup_release(lock_id); + if (ret) + sd_debug("release lock %lu %s", lock_id, str); + return; + } + ret = sscanf(path, MASTER_ZNONE "/%s", str); if (ret == 1) { zk_compete_master(); @@ -580,9 +656,17 @@ static int zk_get_least_seq(const char *parent, char *least_seq_path, { char path[MAX_NODE_STR_LEN], *p, *tmp; struct String_vector strs; - int rc, least_seq = INT_MAX , seq; + int rc, least_seq, seq; while (true) { + /* + * If first loop fail, the least_seq may be a very small number + * which had been deleted in zookeeper, the new create file will + * be all larger than it and it will cause dead loop. + * Therefore we need to set least_seq to INT_MAX in every loop. + */ + least_seq = INT_MAX; + RETURN_IF_ERROR(zk_get_children(parent, &strs), ""); FOR_EACH_ZNODE(parent, path, &strs) { @@ -1058,6 +1142,109 @@ kick_block_event: kick_block_event(); } +static int zk_init_lock(struct cluster_lock *cluster_lock, uint64_t lock_id) +{ + int rc = 0; + char path[MAX_NODE_STR_LEN]; + + cluster_lock->id = lock_id; + snprintf(path, MAX_NODE_STR_LEN, LOCK_ZNODE "/%lu", cluster_lock->id); + rc = zk_init_node(path); + if (rc) + goto out; + + rc = pthread_mutex_init(&cluster_lock->wait_release, NULL); + if (rc) { + sd_err("failed to init cluster_lock->wait_release"); + goto out; + } + + rc = pthread_mutex_init(&cluster_lock->id_lock, NULL); + if (rc) { + sd_err("failed to init cluster_lock->id_lock"); + goto out; + } +out: + return rc; +} + +/* + * This operation will create a seq-ephemeral znode in lock directory + * of zookeeper (use lock-id as dir name). The smallest file path in + * this directory wil be the owner of the lock; the other threads will + * wait on a pthread_mutex_t (cluster_lock->wait_release) + */ +static void zk_lock(struct cluster_lock *cluster_lock) +{ + int flags = ZOO_SEQUENCE | ZOO_EPHEMERAL; + int rc, len = MAX_NODE_STR_LEN; + char *my_path; + char parent[MAX_NODE_STR_LEN]; + char lowest_seq_path[MAX_NODE_STR_LEN]; + char owner_name[MAX_NODE_STR_LEN]; + + /* + * if many threads use locks with same id, we should use + * ->id_lock to avoid the only zookeeper handler to + * create many seq-ephemeral files. + */ + pthread_mutex_lock(&cluster_lock->id_lock); + + lock_table_add(cluster_lock->id, cluster_lock); + + my_path = cluster_lock->lock_path; + + /* compete owner of lock is just like zk_compete_master() */ + snprintf(parent, MAX_NODE_STR_LEN, LOCK_ZNODE "/%lu/", + cluster_lock->id); + while (true) { + rc = zoo_create(zhandle, parent, "", 0, &ZOO_OPEN_ACL_UNSAFE, + flags, my_path, MAX_NODE_STR_LEN); + if (rc == ZOK) + break; + sd_err("failed to create path:%s, %s", my_path, zerror(rc)); + zk_wait(); + } + sd_debug("create path %s success", my_path); + + /* create node ok now */ + snprintf(parent, MAX_NODE_STR_LEN, LOCK_ZNODE "/%lu", cluster_lock->id); + while (true) { + zk_get_least_seq(parent, lowest_seq_path, MAX_NODE_STR_LEN, + owner_name, &len); + + /* I got the lock */ + if (!strncmp(lowest_seq_path, my_path, strlen(my_path))) { + sd_debug("I am master now. %s", lowest_seq_path); + return; + } + + /* I failed to get the lock */ + rc = zoo_exists(zhandle, lowest_seq_path, 1, NULL); + if (rc == ZOK) { + sd_debug("call zoo_exits success %s", lowest_seq_path); + pthread_mutex_lock(&cluster_lock->wait_release); + } else { + sd_err("failed to call zoo_exists %s", zerror(rc)); + if (rc != ZNONODE) + zk_wait(); + } + } +} + +static void zk_unlock(struct cluster_lock *cluster_lock) +{ + int rc; + rc = zk_delete_node(cluster_lock->lock_path, -1); + if (rc != ZOK) + sd_err("Failed to delete path: %s %s", + cluster_lock->lock_path, zerror(rc)); + + lock_table_del(cluster_lock->id); + pthread_mutex_unlock(&cluster_lock->id_lock); +} + + static int zk_init(const char *option) { char *hosts, *to, *p; @@ -1102,6 +1289,21 @@ static int zk_init(const char *option) return -1; } + /* init distributed lock structures */ + + cluster_locks_table = xzalloc(sizeof(struct list_head) * + HASH_BUCKET_NR); + for (uint64_t i = 0; i < HASH_BUCKET_NR; i++) { + INIT_HLIST_HEAD(cluster_locks_table + i); + pthread_mutex_init(table_locks + i, NULL); + } + + ret = zk_init_node(LOCK_ZNODE); + if (ret != ZOK) { + sd_err("Failed to create %s %s", LOCK_ZNODE, zerror(ret)); + free(cluster_locks_table); + return -1; + } return 0; } @@ -1122,7 +1324,10 @@ static struct cluster_driver cdrv_zookeeper = { .notify = zk_notify, .block = zk_block, .unblock = zk_unblock, - .update_node = zk_update_node, + .init_lock = zk_init_lock, + .lock = zk_lock, + .unlock = zk_unlock, + .update_node = zk_update_node, .get_local_addr = get_local_addr, }; -- 1.7.12.4 |