[sheepdog] [PATCH] zookeeper: do not overload event types
Yunkai Zhang
yunkai.me at gmail.com
Wed May 30 15:55:01 CEST 2012
it looks nice to me.
On Tue, May 29, 2012 at 5:37 PM, Christoph Hellwig <hch at infradead.org> wrote:
> Use different types for join requests vs responses, and block vs notify
> events intead of using the blocked field to overload the type.
>
> Signed-off-by: Christoph Hellwig <hch at lst.de>
>
> diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
> index 859f2b0..471c721 100644
> --- a/sheep/cluster/zookeeper.c
> +++ b/sheep/cluster/zookeeper.c
> @@ -42,8 +42,10 @@
> free(*(strs)->data))
>
> enum zk_event_type {
> - EVENT_JOIN = 1,
> + EVENT_JOIN_REQUEST = 1,
> + EVENT_JOIN_RESPONSE,
> EVENT_LEAVE,
> + EVENT_BLOCK,
> EVENT_NOTIFY,
> };
>
> @@ -59,7 +61,6 @@ struct zk_event {
>
> enum cluster_join_result join_result;
>
> - int blocked; /* set non-zero when sheep must block this event */
> int callbacked; /* set non-zero after sd_block_handler() was called */
>
> size_t buf_len;
> @@ -80,6 +81,11 @@ static struct sd_node sd_nodes[SD_MAX_NODES];
> static size_t nr_sd_nodes;
> static size_t nr_zk_nodes;
>
> +static inline int is_blocking_event(struct zk_event *ev)
> +{
> + return ev->type == EVENT_BLOCK || ev->type == EVENT_JOIN_RESPONSE;
> +}
> +
> /* zookeeper API wrapper */
> static inline ZOOAPI int zk_create(zhandle_t *zh, const char *path,
> const char *value, int valuelen, const struct ACL_vector *acl,
> @@ -251,7 +257,7 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
> rc = zk_get(zh, path, 1, (char *)ev, &len, NULL);
> if (rc == ZOK &&
> node_eq(&ev->sender.node, &lev->sender.node) &&
> - ev->blocked) {
> + is_blocking_event(ev)) {
> dprintf("this queue_pos:%010d have blocked whole cluster, ignore it\n", queue_pos);
> queue_pos++;
>
> @@ -294,7 +300,7 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
> /* this event will be pushed back to the queue,
> * we just wait for the arrival of its updated,
> * not need to watch next data. */
> - if (ev->blocked)
> + if (is_blocking_event(ev))
> goto out;
>
> /* watch next data */
> @@ -485,7 +491,7 @@ static struct zk_node this_node;
>
> static int add_event(zhandle_t *zh, enum zk_event_type type,
> struct zk_node *znode, void *buf,
> - size_t buf_len, int blocked)
> + size_t buf_len)
> {
> struct zk_event ev;
>
> @@ -493,7 +499,6 @@ static int add_event(zhandle_t *zh, enum zk_event_type type,
> ev.sender = *znode;
> ev.buf_len = buf_len;
> ev.callbacked = 0;
> - ev.blocked = blocked;
> if (buf)
> memcpy(ev.buf, buf, buf_len);
> zk_queue_push(zh, &ev);
> @@ -511,7 +516,6 @@ static int leave_event(zhandle_t *zh, struct zk_node *znode)
> ev->sender = *znode;
> ev->buf_len = 0;
> ev->callbacked = 0;
> - ev->blocked = 0;
>
> nr_levents = uatomic_add_return(&nr_zk_levents, 1);
> dprintf("nr_zk_levents:%d, tail:%u\n", nr_levents, zk_levent_tail);
> @@ -644,9 +648,8 @@ static int zk_join(struct sd_node *myself,
>
> dprintf("clientid:%ld\n", cid->client_id);
>
> - rc = add_event(zhandle, EVENT_JOIN, &this_node, opaque, opaque_len, 1);
> -
> - return rc;
> + return add_event(zhandle, EVENT_JOIN_REQUEST, &this_node,
> + opaque, opaque_len);
> }
>
> static int zk_leave(void)
> @@ -659,12 +662,12 @@ static int zk_leave(void)
>
> static int zk_notify(void *msg, size_t msg_len)
> {
> - return add_event(zhandle, EVENT_NOTIFY, &this_node, msg, msg_len, 0);
> + return add_event(zhandle, EVENT_NOTIFY, &this_node, msg, msg_len);
> }
>
> static void zk_block(void)
> {
> - add_event(zhandle, EVENT_NOTIFY, &this_node, NULL, 0, 1);
> + add_event(zhandle, EVENT_BLOCK, &this_node, NULL, 0);
> }
>
> static void zk_unblock(void *msg, size_t msg_len)
> @@ -676,7 +679,7 @@ static void zk_unblock(void *msg, size_t msg_len)
> rc = zk_queue_pop(zhandle, &ev);
> assert(rc == 0);
>
> - ev.blocked = 0;
> + ev.type = EVENT_NOTIFY;
> ev.buf_len = msg_len;
> if (msg)
> memcpy(ev.buf, msg, msg_len);
> @@ -692,7 +695,7 @@ static void zk_unblock(void *msg, size_t msg_len)
>
> static void zk_handler(int listen_fd, int events, void *data)
> {
> - int ret, rc, retry;
> + int ret, rc;
> char path[256];
> eventfd_t value;
> struct zk_event ev;
> @@ -719,37 +722,46 @@ static void zk_handler(int listen_fd, int events, void *data)
> goto out;
>
> switch (ev.type) {
> - case EVENT_JOIN:
> - dprintf("JOIN EVENT, blocked:%d\n", ev.blocked);
> - if (ev.blocked) {
> - dprintf("one sheep joined[up], nr_nodes:%ld, sender:%s, joined:%d\n",
> - nr_zk_nodes, node_to_str(&ev.sender.node), ev.sender.joined);
> - if (is_master(zhandle, &this_node)) {
> - res = sd_check_join_cb(&ev.sender.node, ev.buf);
> - ev.join_result = res;
> - ev.blocked = 0;
> - ev.sender.joined = 1;
> -
> - dprintf("I'm master, push back join event\n");
> - zk_queue_push_back(zhandle, &ev);
> -
> - if (res == CJ_RES_MASTER_TRANSFER) {
> - eprintf("failed to join sheepdog cluster: "
> - "please retry when master is up\n");
> - zk_leave();
> - exit(1);
> - }
> - } else
> - zk_queue_push_back(zhandle, NULL);
> + case EVENT_JOIN_REQUEST:
> + dprintf("JOIN REQUEST nr_nodes: %ld, sender: %s, joined: %d\n",
> + nr_zk_nodes, node_to_str(&ev.sender.node),
> + ev.sender.joined);
> +
> + if (!is_master(zhandle, &this_node)) {
> + zk_queue_push_back(zhandle, NULL);
> + break;
> + }
>
> - goto out;
> - } else if (is_master(zhandle, &this_node)
> - && !node_eq(&ev.sender.node, &this_node.node)) {
> - /* wait util member have been created */
> - sprintf(path, MEMBER_ZNODE "/%s", node_to_str(&ev.sender.node));
> - retry = MEMBER_CREATE_TIMEOUT/MEMBER_CREATE_INTERVAL;
> - while (retry && zk_exists(zhandle, path, 1, NULL) == ZNONODE) {
> - usleep(MEMBER_CREATE_INTERVAL*1000);
> + res = sd_check_join_cb(&ev.sender.node, ev.buf);
> + ev.join_result = res;
> + ev.type = EVENT_JOIN_RESPONSE;
> + ev.sender.joined = 1;
> +
> + dprintf("I'm master, push back join event\n");
> + zk_queue_push_back(zhandle, &ev);
> +
> + if (res == CJ_RES_MASTER_TRANSFER) {
> + eprintf("failed to join sheepdog cluster: "
> + "please retry when master is up\n");
> + zk_leave();
> + exit(1);
> + }
> + break;
> + case EVENT_JOIN_RESPONSE:
> + dprintf("JOIN RESPONSE\n");
> +
> + if (is_master(zhandle, &this_node) &&
> + !node_eq(&ev.sender.node, &this_node.node)) {
> + /* wait util the member node has been created */
> + int retry =
> + MEMBER_CREATE_TIMEOUT / MEMBER_CREATE_INTERVAL;
> +
> + sprintf(path, MEMBER_ZNODE "/%s",
> + node_to_str(&ev.sender.node));
> +
> + while (retry &&
> + zk_exists(zhandle, path, 1, NULL) == ZNONODE) {
> + usleep(MEMBER_CREATE_INTERVAL * 1000);
> retry--;
> }
> if (retry <= 0) {
> @@ -793,7 +805,7 @@ static void zk_handler(int listen_fd, int events, void *data)
> ev.join_result, ev.buf);
> break;
> case EVENT_LEAVE:
> - dprintf("LEAVE EVENT, blocked:%d\n", ev.blocked);
> + dprintf("LEAVE EVENT\n");
> n = node_btree_find(&zk_node_btroot, &ev.sender);
> if (!n) {
> dprintf("can't find this leave node:%s, ignore it.\n", node_to_str(&ev.sender.node));
> @@ -806,21 +818,21 @@ static void zk_handler(int listen_fd, int events, void *data)
> build_node_list(zk_node_btroot);
> sd_leave_handler(&ev.sender.node, sd_nodes, nr_sd_nodes);
> break;
> - case EVENT_NOTIFY:
> - dprintf("NOTIFY, blocked:%d\n", ev.blocked);
> - if (ev.blocked) {
> - if (node_eq(&ev.sender.node, &this_node.node)
> - && !ev.callbacked) {
> - uatomic_inc(&zk_notify_blocked);
> - ev.callbacked = 1;
> - zk_queue_push_back(zhandle, &ev);
> - sd_block_handler();
> - } else
> - zk_queue_push_back(zhandle, NULL);
> -
> - goto out;
> + case EVENT_BLOCK:
> + dprintf("BLOCK\n");
> + if (node_eq(&ev.sender.node, &this_node.node)
> + && !ev.callbacked) {
> + uatomic_inc(&zk_notify_blocked);
> + ev.callbacked = 1;
> + zk_queue_push_back(zhandle, &ev);
> + sd_block_handler();
> + } else {
> + zk_queue_push_back(zhandle, NULL);
> }
>
> + break;
> + case EVENT_NOTIFY:
> + dprintf("NOTIFY\n");
> sd_notify_handler(&ev.sender.node, ev.buf, ev.buf_len);
> break;
> }
> --
> sheepdog mailing list
> sheepdog at lists.wpkg.org
> http://lists.wpkg.org/mailman/listinfo/sheepdog
--
Yunkai Zhang
Work at Taobao
More information about the sheepdog
mailing list