[sheepdog] [PATCH] zookeeper: do not overload event types

Yunkai Zhang yunkai.me at gmail.com
Wed May 30 15:55:01 CEST 2012


it looks nice to me.

On Tue, May 29, 2012 at 5:37 PM, Christoph Hellwig <hch at infradead.org> wrote:
> Use different types for join requests vs responses, and block vs notify
> events intead of using the blocked field to overload the type.
>
> Signed-off-by: Christoph Hellwig <hch at lst.de>
>
> diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
> index 859f2b0..471c721 100644
> --- a/sheep/cluster/zookeeper.c
> +++ b/sheep/cluster/zookeeper.c
> @@ -42,8 +42,10 @@
>             free(*(strs)->data))
>
>  enum zk_event_type {
> -       EVENT_JOIN = 1,
> +       EVENT_JOIN_REQUEST = 1,
> +       EVENT_JOIN_RESPONSE,
>        EVENT_LEAVE,
> +       EVENT_BLOCK,
>        EVENT_NOTIFY,
>  };
>
> @@ -59,7 +61,6 @@ struct zk_event {
>
>        enum cluster_join_result join_result;
>
> -       int blocked; /* set non-zero when sheep must block this event */
>        int callbacked; /* set non-zero after sd_block_handler() was called */
>
>        size_t buf_len;
> @@ -80,6 +81,11 @@ static struct sd_node sd_nodes[SD_MAX_NODES];
>  static size_t nr_sd_nodes;
>  static size_t nr_zk_nodes;
>
> +static inline int is_blocking_event(struct zk_event *ev)
> +{
> +       return ev->type == EVENT_BLOCK || ev->type == EVENT_JOIN_RESPONSE;
> +}
> +
>  /* zookeeper API wrapper */
>  static inline ZOOAPI int zk_create(zhandle_t *zh, const char *path,
>                const char *value, int valuelen, const struct ACL_vector *acl,
> @@ -251,7 +257,7 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
>                rc = zk_get(zh, path, 1, (char *)ev, &len, NULL);
>                if (rc == ZOK &&
>                    node_eq(&ev->sender.node, &lev->sender.node) &&
> -                   ev->blocked) {
> +                   is_blocking_event(ev)) {
>                        dprintf("this queue_pos:%010d have blocked whole cluster, ignore it\n", queue_pos);
>                        queue_pos++;
>
> @@ -294,7 +300,7 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
>        /* this event will be pushed back to the queue,
>         * we just wait for the arrival of its updated,
>         * not need to watch next data. */
> -       if (ev->blocked)
> +       if (is_blocking_event(ev))
>                goto out;
>
>        /* watch next data */
> @@ -485,7 +491,7 @@ static struct zk_node this_node;
>
>  static int add_event(zhandle_t *zh, enum zk_event_type type,
>                     struct zk_node *znode, void *buf,
> -                    size_t buf_len, int blocked)
> +                    size_t buf_len)
>  {
>        struct zk_event ev;
>
> @@ -493,7 +499,6 @@ static int add_event(zhandle_t *zh, enum zk_event_type type,
>        ev.sender = *znode;
>        ev.buf_len = buf_len;
>        ev.callbacked = 0;
> -       ev.blocked = blocked;
>        if (buf)
>                memcpy(ev.buf, buf, buf_len);
>        zk_queue_push(zh, &ev);
> @@ -511,7 +516,6 @@ static int leave_event(zhandle_t *zh, struct zk_node *znode)
>        ev->sender = *znode;
>        ev->buf_len = 0;
>        ev->callbacked = 0;
> -       ev->blocked = 0;
>
>        nr_levents = uatomic_add_return(&nr_zk_levents, 1);
>        dprintf("nr_zk_levents:%d, tail:%u\n", nr_levents, zk_levent_tail);
> @@ -644,9 +648,8 @@ static int zk_join(struct sd_node *myself,
>
>        dprintf("clientid:%ld\n", cid->client_id);
>
> -       rc = add_event(zhandle, EVENT_JOIN, &this_node, opaque, opaque_len, 1);
> -
> -       return rc;
> +       return add_event(zhandle, EVENT_JOIN_REQUEST, &this_node,
> +                        opaque, opaque_len);
>  }
>
>  static int zk_leave(void)
> @@ -659,12 +662,12 @@ static int zk_leave(void)
>
>  static int zk_notify(void *msg, size_t msg_len)
>  {
> -       return add_event(zhandle, EVENT_NOTIFY, &this_node, msg, msg_len, 0);
> +       return add_event(zhandle, EVENT_NOTIFY, &this_node, msg, msg_len);
>  }
>
>  static void zk_block(void)
>  {
> -       add_event(zhandle, EVENT_NOTIFY, &this_node, NULL, 0, 1);
> +       add_event(zhandle, EVENT_BLOCK, &this_node, NULL, 0);
>  }
>
>  static void zk_unblock(void *msg, size_t msg_len)
> @@ -676,7 +679,7 @@ static void zk_unblock(void *msg, size_t msg_len)
>        rc = zk_queue_pop(zhandle, &ev);
>        assert(rc == 0);
>
> -       ev.blocked = 0;
> +       ev.type = EVENT_NOTIFY;
>        ev.buf_len = msg_len;
>        if (msg)
>                memcpy(ev.buf, msg, msg_len);
> @@ -692,7 +695,7 @@ static void zk_unblock(void *msg, size_t msg_len)
>
>  static void zk_handler(int listen_fd, int events, void *data)
>  {
> -       int ret, rc, retry;
> +       int ret, rc;
>        char path[256];
>        eventfd_t value;
>        struct zk_event ev;
> @@ -719,37 +722,46 @@ static void zk_handler(int listen_fd, int events, void *data)
>                goto out;
>
>        switch (ev.type) {
> -       case EVENT_JOIN:
> -               dprintf("JOIN EVENT, blocked:%d\n", ev.blocked);
> -               if (ev.blocked) {
> -                       dprintf("one sheep joined[up], nr_nodes:%ld, sender:%s, joined:%d\n",
> -                                       nr_zk_nodes, node_to_str(&ev.sender.node), ev.sender.joined);
> -                       if (is_master(zhandle, &this_node)) {
> -                               res = sd_check_join_cb(&ev.sender.node, ev.buf);
> -                               ev.join_result = res;
> -                               ev.blocked = 0;
> -                               ev.sender.joined = 1;
> -
> -                               dprintf("I'm master, push back join event\n");
> -                               zk_queue_push_back(zhandle, &ev);
> -
> -                               if (res == CJ_RES_MASTER_TRANSFER) {
> -                                       eprintf("failed to join sheepdog cluster: "
> -                                               "please retry when master is up\n");
> -                                       zk_leave();
> -                                       exit(1);
> -                               }
> -                       } else
> -                               zk_queue_push_back(zhandle, NULL);
> +       case EVENT_JOIN_REQUEST:
> +               dprintf("JOIN REQUEST nr_nodes: %ld, sender: %s, joined: %d\n",
> +                       nr_zk_nodes, node_to_str(&ev.sender.node),
> +                       ev.sender.joined);
> +
> +               if (!is_master(zhandle, &this_node)) {
> +                       zk_queue_push_back(zhandle, NULL);
> +                       break;
> +               }
>
> -                       goto out;
> -               } else if (is_master(zhandle, &this_node)
> -                       && !node_eq(&ev.sender.node, &this_node.node)) {
> -                       /* wait util member have been created */
> -                       sprintf(path, MEMBER_ZNODE "/%s", node_to_str(&ev.sender.node));
> -                       retry = MEMBER_CREATE_TIMEOUT/MEMBER_CREATE_INTERVAL;
> -                       while (retry && zk_exists(zhandle, path, 1, NULL) == ZNONODE) {
> -                               usleep(MEMBER_CREATE_INTERVAL*1000);
> +               res = sd_check_join_cb(&ev.sender.node, ev.buf);
> +               ev.join_result = res;
> +               ev.type = EVENT_JOIN_RESPONSE;
> +               ev.sender.joined = 1;
> +
> +               dprintf("I'm master, push back join event\n");
> +               zk_queue_push_back(zhandle, &ev);
> +
> +               if (res == CJ_RES_MASTER_TRANSFER) {
> +                       eprintf("failed to join sheepdog cluster: "
> +                               "please retry when master is up\n");
> +                       zk_leave();
> +                       exit(1);
> +               }
> +               break;
> +       case EVENT_JOIN_RESPONSE:
> +               dprintf("JOIN RESPONSE\n");
> +
> +               if (is_master(zhandle, &this_node) &&
> +                   !node_eq(&ev.sender.node, &this_node.node)) {
> +                       /* wait util the member node has been created */
> +                       int retry =
> +                               MEMBER_CREATE_TIMEOUT / MEMBER_CREATE_INTERVAL;
> +
> +                       sprintf(path, MEMBER_ZNODE "/%s",
> +                               node_to_str(&ev.sender.node));
> +
> +                       while (retry &&
> +                              zk_exists(zhandle, path, 1, NULL) == ZNONODE) {
> +                               usleep(MEMBER_CREATE_INTERVAL * 1000);
>                                retry--;
>                        }
>                        if (retry <= 0) {
> @@ -793,7 +805,7 @@ static void zk_handler(int listen_fd, int events, void *data)
>                                    ev.join_result, ev.buf);
>                break;
>        case EVENT_LEAVE:
> -               dprintf("LEAVE EVENT, blocked:%d\n", ev.blocked);
> +               dprintf("LEAVE EVENT\n");
>                n = node_btree_find(&zk_node_btroot, &ev.sender);
>                if (!n) {
>                        dprintf("can't find this leave node:%s, ignore it.\n", node_to_str(&ev.sender.node));
> @@ -806,21 +818,21 @@ static void zk_handler(int listen_fd, int events, void *data)
>                build_node_list(zk_node_btroot);
>                sd_leave_handler(&ev.sender.node, sd_nodes, nr_sd_nodes);
>                break;
> -       case EVENT_NOTIFY:
> -               dprintf("NOTIFY, blocked:%d\n", ev.blocked);
> -               if (ev.blocked) {
> -                       if (node_eq(&ev.sender.node, &this_node.node)
> -                                       && !ev.callbacked) {
> -                               uatomic_inc(&zk_notify_blocked);
> -                               ev.callbacked = 1;
> -                               zk_queue_push_back(zhandle, &ev);
> -                               sd_block_handler();
> -                       } else
> -                               zk_queue_push_back(zhandle, NULL);
> -
> -                       goto out;
> +       case EVENT_BLOCK:
> +               dprintf("BLOCK\n");
> +               if (node_eq(&ev.sender.node, &this_node.node)
> +                               && !ev.callbacked) {
> +                       uatomic_inc(&zk_notify_blocked);
> +                       ev.callbacked = 1;
> +                       zk_queue_push_back(zhandle, &ev);
> +                       sd_block_handler();
> +               } else {
> +                       zk_queue_push_back(zhandle, NULL);
>                }
>
> +               break;
> +       case EVENT_NOTIFY:
> +               dprintf("NOTIFY\n");
>                sd_notify_handler(&ev.sender.node, ev.buf, ev.buf_len);
>                break;
>        }
> --
> sheepdog mailing list
> sheepdog at lists.wpkg.org
> http://lists.wpkg.org/mailman/listinfo/sheepdog



-- 
Yunkai Zhang
Work at Taobao



More information about the sheepdog mailing list