[sheepdog] [PATCH] zookeeper: do not overload event types
Christoph Hellwig
hch at infradead.org
Tue May 29 11:37:21 CEST 2012
Use different types for join requests vs responses, and block vs notify
events intead of using the blocked field to overload the type.
Signed-off-by: Christoph Hellwig <hch at lst.de>
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 859f2b0..471c721 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -42,8 +42,10 @@
free(*(strs)->data))
enum zk_event_type {
- EVENT_JOIN = 1,
+ EVENT_JOIN_REQUEST = 1,
+ EVENT_JOIN_RESPONSE,
EVENT_LEAVE,
+ EVENT_BLOCK,
EVENT_NOTIFY,
};
@@ -59,7 +61,6 @@ struct zk_event {
enum cluster_join_result join_result;
- int blocked; /* set non-zero when sheep must block this event */
int callbacked; /* set non-zero after sd_block_handler() was called */
size_t buf_len;
@@ -80,6 +81,11 @@ static struct sd_node sd_nodes[SD_MAX_NODES];
static size_t nr_sd_nodes;
static size_t nr_zk_nodes;
+static inline int is_blocking_event(struct zk_event *ev)
+{
+ return ev->type == EVENT_BLOCK || ev->type == EVENT_JOIN_RESPONSE;
+}
+
/* zookeeper API wrapper */
static inline ZOOAPI int zk_create(zhandle_t *zh, const char *path,
const char *value, int valuelen, const struct ACL_vector *acl,
@@ -251,7 +257,7 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
rc = zk_get(zh, path, 1, (char *)ev, &len, NULL);
if (rc == ZOK &&
node_eq(&ev->sender.node, &lev->sender.node) &&
- ev->blocked) {
+ is_blocking_event(ev)) {
dprintf("this queue_pos:%010d have blocked whole cluster, ignore it\n", queue_pos);
queue_pos++;
@@ -294,7 +300,7 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
/* this event will be pushed back to the queue,
* we just wait for the arrival of its updated,
* not need to watch next data. */
- if (ev->blocked)
+ if (is_blocking_event(ev))
goto out;
/* watch next data */
@@ -485,7 +491,7 @@ static struct zk_node this_node;
static int add_event(zhandle_t *zh, enum zk_event_type type,
struct zk_node *znode, void *buf,
- size_t buf_len, int blocked)
+ size_t buf_len)
{
struct zk_event ev;
@@ -493,7 +499,6 @@ static int add_event(zhandle_t *zh, enum zk_event_type type,
ev.sender = *znode;
ev.buf_len = buf_len;
ev.callbacked = 0;
- ev.blocked = blocked;
if (buf)
memcpy(ev.buf, buf, buf_len);
zk_queue_push(zh, &ev);
@@ -511,7 +516,6 @@ static int leave_event(zhandle_t *zh, struct zk_node *znode)
ev->sender = *znode;
ev->buf_len = 0;
ev->callbacked = 0;
- ev->blocked = 0;
nr_levents = uatomic_add_return(&nr_zk_levents, 1);
dprintf("nr_zk_levents:%d, tail:%u\n", nr_levents, zk_levent_tail);
@@ -644,9 +648,8 @@ static int zk_join(struct sd_node *myself,
dprintf("clientid:%ld\n", cid->client_id);
- rc = add_event(zhandle, EVENT_JOIN, &this_node, opaque, opaque_len, 1);
-
- return rc;
+ return add_event(zhandle, EVENT_JOIN_REQUEST, &this_node,
+ opaque, opaque_len);
}
static int zk_leave(void)
@@ -659,12 +662,12 @@ static int zk_leave(void)
static int zk_notify(void *msg, size_t msg_len)
{
- return add_event(zhandle, EVENT_NOTIFY, &this_node, msg, msg_len, 0);
+ return add_event(zhandle, EVENT_NOTIFY, &this_node, msg, msg_len);
}
static void zk_block(void)
{
- add_event(zhandle, EVENT_NOTIFY, &this_node, NULL, 0, 1);
+ add_event(zhandle, EVENT_BLOCK, &this_node, NULL, 0);
}
static void zk_unblock(void *msg, size_t msg_len)
@@ -676,7 +679,7 @@ static void zk_unblock(void *msg, size_t msg_len)
rc = zk_queue_pop(zhandle, &ev);
assert(rc == 0);
- ev.blocked = 0;
+ ev.type = EVENT_NOTIFY;
ev.buf_len = msg_len;
if (msg)
memcpy(ev.buf, msg, msg_len);
@@ -692,7 +695,7 @@ static void zk_unblock(void *msg, size_t msg_len)
static void zk_handler(int listen_fd, int events, void *data)
{
- int ret, rc, retry;
+ int ret, rc;
char path[256];
eventfd_t value;
struct zk_event ev;
@@ -719,37 +722,46 @@ static void zk_handler(int listen_fd, int events, void *data)
goto out;
switch (ev.type) {
- case EVENT_JOIN:
- dprintf("JOIN EVENT, blocked:%d\n", ev.blocked);
- if (ev.blocked) {
- dprintf("one sheep joined[up], nr_nodes:%ld, sender:%s, joined:%d\n",
- nr_zk_nodes, node_to_str(&ev.sender.node), ev.sender.joined);
- if (is_master(zhandle, &this_node)) {
- res = sd_check_join_cb(&ev.sender.node, ev.buf);
- ev.join_result = res;
- ev.blocked = 0;
- ev.sender.joined = 1;
-
- dprintf("I'm master, push back join event\n");
- zk_queue_push_back(zhandle, &ev);
-
- if (res == CJ_RES_MASTER_TRANSFER) {
- eprintf("failed to join sheepdog cluster: "
- "please retry when master is up\n");
- zk_leave();
- exit(1);
- }
- } else
- zk_queue_push_back(zhandle, NULL);
+ case EVENT_JOIN_REQUEST:
+ dprintf("JOIN REQUEST nr_nodes: %ld, sender: %s, joined: %d\n",
+ nr_zk_nodes, node_to_str(&ev.sender.node),
+ ev.sender.joined);
+
+ if (!is_master(zhandle, &this_node)) {
+ zk_queue_push_back(zhandle, NULL);
+ break;
+ }
- goto out;
- } else if (is_master(zhandle, &this_node)
- && !node_eq(&ev.sender.node, &this_node.node)) {
- /* wait util member have been created */
- sprintf(path, MEMBER_ZNODE "/%s", node_to_str(&ev.sender.node));
- retry = MEMBER_CREATE_TIMEOUT/MEMBER_CREATE_INTERVAL;
- while (retry && zk_exists(zhandle, path, 1, NULL) == ZNONODE) {
- usleep(MEMBER_CREATE_INTERVAL*1000);
+ res = sd_check_join_cb(&ev.sender.node, ev.buf);
+ ev.join_result = res;
+ ev.type = EVENT_JOIN_RESPONSE;
+ ev.sender.joined = 1;
+
+ dprintf("I'm master, push back join event\n");
+ zk_queue_push_back(zhandle, &ev);
+
+ if (res == CJ_RES_MASTER_TRANSFER) {
+ eprintf("failed to join sheepdog cluster: "
+ "please retry when master is up\n");
+ zk_leave();
+ exit(1);
+ }
+ break;
+ case EVENT_JOIN_RESPONSE:
+ dprintf("JOIN RESPONSE\n");
+
+ if (is_master(zhandle, &this_node) &&
+ !node_eq(&ev.sender.node, &this_node.node)) {
+ /* wait util the member node has been created */
+ int retry =
+ MEMBER_CREATE_TIMEOUT / MEMBER_CREATE_INTERVAL;
+
+ sprintf(path, MEMBER_ZNODE "/%s",
+ node_to_str(&ev.sender.node));
+
+ while (retry &&
+ zk_exists(zhandle, path, 1, NULL) == ZNONODE) {
+ usleep(MEMBER_CREATE_INTERVAL * 1000);
retry--;
}
if (retry <= 0) {
@@ -793,7 +805,7 @@ static void zk_handler(int listen_fd, int events, void *data)
ev.join_result, ev.buf);
break;
case EVENT_LEAVE:
- dprintf("LEAVE EVENT, blocked:%d\n", ev.blocked);
+ dprintf("LEAVE EVENT\n");
n = node_btree_find(&zk_node_btroot, &ev.sender);
if (!n) {
dprintf("can't find this leave node:%s, ignore it.\n", node_to_str(&ev.sender.node));
@@ -806,21 +818,21 @@ static void zk_handler(int listen_fd, int events, void *data)
build_node_list(zk_node_btroot);
sd_leave_handler(&ev.sender.node, sd_nodes, nr_sd_nodes);
break;
- case EVENT_NOTIFY:
- dprintf("NOTIFY, blocked:%d\n", ev.blocked);
- if (ev.blocked) {
- if (node_eq(&ev.sender.node, &this_node.node)
- && !ev.callbacked) {
- uatomic_inc(&zk_notify_blocked);
- ev.callbacked = 1;
- zk_queue_push_back(zhandle, &ev);
- sd_block_handler();
- } else
- zk_queue_push_back(zhandle, NULL);
-
- goto out;
+ case EVENT_BLOCK:
+ dprintf("BLOCK\n");
+ if (node_eq(&ev.sender.node, &this_node.node)
+ && !ev.callbacked) {
+ uatomic_inc(&zk_notify_blocked);
+ ev.callbacked = 1;
+ zk_queue_push_back(zhandle, &ev);
+ sd_block_handler();
+ } else {
+ zk_queue_push_back(zhandle, NULL);
}
+ break;
+ case EVENT_NOTIFY:
+ dprintf("NOTIFY\n");
sd_notify_handler(&ev.sender.node, ev.buf, ev.buf_len);
break;
}
More information about the sheepdog
mailing list