[Sheepdog] [PATCH 1/7] sheep: move node membership management into cluster driver

Yibin Shen zituan at taobao.com
Mon Oct 17 05:00:53 CEST 2011


On Wed, Oct 12, 2011 at 9:22 PM, MORITA Kazutaka
<morita.kazutaka at lab.ntt.co.jp> wrote:
>
> Currently, Sheepdog has two node lists; sd_node_list and
> cpg_node_list.  The former is used for consistent hashing and seen
> from users.  The latter is managed in the cluster driver and notified
> in join_handler/leave_handler.  But this design is too complex.  We
> should move all the cluster management stuff into the cluster driver.
> Main changes of this patch are as follows:
>
>  - make join process one phase
>
>   Node joining was really complex; cpg_confchg() notifies the newly
>   joining node, the node multicasts a SD_MSG_JOIN message, and the
>   master node receives it and multicasts the response.  Moreover, we
>   couldn't allow any I/O events during two multicasting.  This patch
>   moves all of them into the cluster driver.
>
>  - add check_join_handler() to cdrv_handlers
>
>   This new handler is called on one of the Sheepdog nodes (e.g. in
>   the case of the corosync driver, the master server will call this).
>   check_join_handler() checks whether the joining node may join the
>   cluster, and returns the result.
>
>  - use sheepdog_node_list_entry in the arguments of
>   join_handler()/leave_handler()
>
>   We can use the notified node list for consistent hashing now.
>
> Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
> ---
>  sheep/cluster.h          |   82 +++-----
>  sheep/cluster/corosync.c |  460 ++++++++++++++++++++++++++++++++------------
>  sheep/group.c            |  484 +++++++++++++++++++++++++--------------------
>  sheep/sheep_priv.h       |    1 -
>  4 files changed, 634 insertions(+), 393 deletions(-)
>
> diff --git a/sheep/cluster.h b/sheep/cluster.h
> index 89d0566..a6689c6 100644
> --- a/sheep/cluster.h
> +++ b/sheep/cluster.h
> @@ -21,17 +21,28 @@
>  #include "sheep.h"
>  #include "logger.h"
>
> -struct sheepid {
> -       uint8_t addr[16];
> -       uint64_t pid;
> +enum cluster_join_result {
> +       CJ_RES_SUCCESS, /* Success */
> +       CJ_RES_FAIL, /* Fail to join.  The joining node has an invalidepoch. */
> +       CJ_RES_JOIN_LATER, /* Fail to join.  The joining node should
> +                           * be added after the cluster start working. */
> +       CJ_RES_MASTER_TRANSFER, /* Transfer mastership.  The joining
> +                                * node has a newer epoch, so this node
> +                                * will leave the cluster (restart later). */
>  };
>
>  struct cdrv_handlers {
> -       void (*join_handler)(struct sheepid *joined, struct sheepid *members,
> -                            size_t nr_members);
> -       void (*leave_handler)(struct sheepid *left, struct sheepid *members,
> +       enum cluster_join_result (*check_join_handler)(
> +               struct sheepdog_node_list_entry *joining, void *opaque);
> +       void (*join_handler)(struct sheepdog_node_list_entry *joined,
> +                            struct sheepdog_node_list_entry *members,
> +                            size_t nr_members, enum cluster_join_result result,
> +                            void *opaque);
> +       void (*leave_handler)(struct sheepdog_node_list_entry *left,
> +                             struct sheepdog_node_list_entry *members,
>                              size_t nr_members);
> -       void (*notify_handler)(struct sheepid *sender, void *msg, size_t msg_len);
> +       void (*notify_handler)(struct sheepdog_node_list_entry *sender,
> +                              void *msg, size_t msg_len);
>  };
>
>  struct cluster_driver {
> @@ -44,17 +55,23 @@ struct cluster_driver {
>         * may be used with the poll(2) to monitor cluster events.  On
>         * error, returns -1.
>         */
> -       int (*init)(struct cdrv_handlers *handlers, struct sheepid *myid);
> +       int (*init)(struct cdrv_handlers *handlers, uint8_t *myaddr);
>
>        /*
>         * Join the cluster
>         *
>         * This function is used to join the cluster, and notifies a
> -        * join event to all the nodes.
> +        * join event to all the nodes.  The copy of 'opaque' is
> +        * passed to check_join_handler() and join_handler().
> +        * check_join_handler() is called on only one of the nodes
> +        * which already paticipate in the cluster.  If the content of
> +        * 'opaque' is changed in check_join_handler(), the updated
> +        * 'opaque' must be passed to join_handler().
>         *
>         * Returns zero on success, -1 on error
>         */
> -       int (*join)(void);
> +       int (*join)(struct sheepdog_node_list_entry *myself,
> +                   void *opaque, size_t opaque_len);
>
>        /*
>         * Leave the cluster
> @@ -112,54 +129,15 @@ static void __attribute__((constructor)) regist_ ## driver(void) {        \
>        list_for_each_entry(driver, &cluster_drivers, list)
>
>
> -static inline int sheepid_find(struct sheepid *sheeps, size_t nr_sheeps,
> -                              struct sheepid *key)
> -{
> -       int i;
> -
> -       for (i = 0; i < nr_sheeps; i++) {
> -               if (memcmp(sheeps + i, key, sizeof(*key)) == 0)
> -                       return i;
> -       }
> -       return -1;
> -}
> -
> -static inline void sheepid_add(struct sheepid *sheeps1, size_t nr_sheeps1,
> -                              struct sheepid *sheeps2, size_t nr_sheeps2)
> -{
> -       memcpy(sheeps1 + nr_sheeps1, sheeps2, sizeof(*sheeps2) * nr_sheeps2);
> -}
> -
> -static inline void sheepid_del(struct sheepid *sheeps1, size_t nr_sheeps1,
> -                              struct sheepid *sheeps2, size_t nr_sheeps2)
> -{
> -       int i, idx;
> -
> -       for (i = 0; i < nr_sheeps2; i++) {
> -               idx = sheepid_find(sheeps1, nr_sheeps1, sheeps2 + i);
> -               if (idx < 0)
> -                       panic("internal error: cannot find sheepid\n");
> -
> -               nr_sheeps1--;
> -               memmove(sheeps1 + idx, sheeps1 + idx + 1,
> -                       sizeof(*sheeps1) * nr_sheeps1 - idx);
> -       }
> -}
> -
> -static inline char *sheepid_to_str(struct sheepid *id)
> +static inline char *node_to_str(struct sheepdog_node_list_entry *id)
>  {
>        static char str[256];
>        char name[256];
>
> -       snprintf(str, sizeof(str), "ip: %s, pid: %" PRIu64,
> -                addr_to_str(name, sizeof(name), id->addr, 0), id->pid);
> +       snprintf(str, sizeof(str), "ip: %s, port: %d",
> +                addr_to_str(name, sizeof(name), id->addr, 0), id->port);
>
>        return str;
>  }
>
> -static inline int sheepid_cmp(struct sheepid *id1, struct sheepid *id2)
> -{
> -       return memcmp(id1, id2, sizeof(*id1));
> -}
> -
>  #endif
> diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
> index 7aa3d02..99cd69f 100644
> --- a/sheep/cluster/corosync.c
> +++ b/sheep/cluster/corosync.c
> @@ -16,11 +16,17 @@
>  #include "cluster.h"
>  #include "work.h"
>
> +struct cpg_node {
> +       uint32_t nodeid;
> +       uint32_t pid;
> +       struct sheepdog_node_list_entry ent;
> +};
> +
>  static cpg_handle_t cpg_handle;
>  static struct cpg_name cpg_group = { 9, "sheepdog" };
>
>  static corosync_cfg_handle_t cfg_handle;
> -static struct sheepid this_sheepid;
> +static struct cpg_node this_node;
>
>  static struct work_queue *corosync_block_wq;
>
> @@ -29,6 +35,9 @@ static struct cdrv_handlers corosync_handlers;
>  static LIST_HEAD(corosync_event_list);
>  static LIST_HEAD(corosync_block_list);
>
> +static struct cpg_node cpg_nodes[SD_MAX_NODES];
> +static size_t nr_cpg_nodes;
> +
>  /* event types which are dispatched in corosync_dispatch() */
>  enum corosync_event_type {
>        COROSYNC_EVENT_TYPE_JOIN,
> @@ -38,6 +47,8 @@ enum corosync_event_type {
>
>  /* multicast message type */
>  enum corosync_message_type {
> +       COROSYNC_MSG_TYPE_JOIN_REQUEST,
> +       COROSYNC_MSG_TYPE_JOIN_RESPONSE,
>        COROSYNC_MSG_TYPE_NOTIFY,
>        COROSYNC_MSG_TYPE_BLOCK,
>        COROSYNC_MSG_TYPE_UNBLOCK,
> @@ -46,19 +57,31 @@ enum corosync_message_type {
>  struct corosync_event {
>        enum corosync_event_type type;
>
> -       struct sheepid members[SD_MAX_NODES];
> -       size_t nr_members;
> -
> -       struct sheepid sender;
> +       struct cpg_node sender;
>        void *msg;
>        size_t msg_len;
>
> +       enum cluster_join_result result;
> +       uint32_t nr_nodes;
> +       struct cpg_node nodes[SD_MAX_NODES];
> +
>        int blocked;
>        int callbacked;
> +       int first_node;
>
>        struct list_head list;
>  };
>
> +struct corosync_message {
> +       struct cpg_node sender;
> +       enum corosync_message_type type : 4;
> +       enum cluster_join_result result : 4;
> +       uint32_t msg_len;
> +       uint32_t nr_nodes;
> +       struct cpg_node nodes[SD_MAX_NODES];
> +       uint8_t msg[0];
> +};
> +
>  struct corosync_block_msg {
>        void *msg;
>        size_t msg_len;
> @@ -68,6 +91,44 @@ struct corosync_block_msg {
>        struct list_head list;
>  };
>
> +static int cpg_node_equal(struct cpg_node *a, struct cpg_node *b)
> +{
> +       return (a->nodeid == b->nodeid && a->pid == b->pid);
> +}
> +
> +static inline int find_cpg_node(struct cpg_node *nodes, size_t nr_nodes,
> +                               struct cpg_node *key)
> +{
> +       int i;
> +
> +       for (i = 0; i < nr_nodes; i++)
> +               if (cpg_node_equal(nodes + i, key))
> +                       return i;
> +
> +       return -1;
> +}
> +
> +static inline void add_cpg_node(struct cpg_node *nodes, size_t nr_nodes,
> +                               struct cpg_node *added)
> +{
> +       nodes[nr_nodes++] = *added;
> +}
> +
> +static inline void del_cpg_node(struct cpg_node *nodes, size_t nr_nodes,
> +                               struct cpg_node *deled)
> +{
> +       int idx;
> +
> +       idx = find_cpg_node(nodes, nr_nodes, deled);
> +       if (idx < 0) {
> +               dprintf("cannot find node\n");
> +               return;
> +       }
> +
> +       nr_nodes--;
> +       memmove(nodes + idx, nodes + idx + 1, sizeof(*nodes) * nr_nodes - idx);
> +}
> +
>  static int nodeid_to_addr(uint32_t nodeid, uint8_t *addr)
>  {
>        int ret, nr;
> @@ -103,24 +164,26 @@ static int nodeid_to_addr(uint32_t nodeid, uint8_t *addr)
>        return 0;
>  }
>
> -static void cpg_addr_to_sheepid(const struct cpg_address *cpgs,
> -                               struct sheepid *sheeps, size_t nr)
> -{
> -       int i;
> -
> -       for (i = 0; i < nr; i++) {
> -               nodeid_to_addr(cpgs[i].nodeid, sheeps[i].addr);
> -               sheeps[i].pid = cpgs[i].pid;
> -       }
> -}
> -
> -static int send_message(uint64_t type, void *msg, size_t msg_len)
> +static int send_message(enum corosync_message_type type,
> +                       enum cluster_join_result result,
> +                       struct cpg_node *sender, struct cpg_node *nodes,
> +                       size_t nr_nodes, void *msg, size_t msg_len)
>  {
>        struct iovec iov[2];
>        int ret, iov_cnt = 1;
> +       struct corosync_message cmsg = {
> +               .type = type,
> +               .msg_len = msg_len,
> +               .result = result,
> +               .sender = *sender,
> +               .nr_nodes = nr_nodes
> +       };
>
> -       iov[0].iov_base = &type;
> -       iov[0].iov_len = sizeof(type);
> +       if (nodes)
> +               memcpy(cmsg.nodes, nodes, sizeof(*nodes) * nr_nodes);
> +
> +       iov[0].iov_base = &cmsg;
> +       iov[0].iov_len = sizeof(cmsg);
>        if (msg) {
>                iov[1].iov_base = msg;
>                iov[1].iov_len = msg_len;
> @@ -153,13 +216,15 @@ static void corosync_block_done(struct work *work, int idx)
>  {
>        struct corosync_block_msg *bm = container_of(work, typeof(*bm), work);
>
> -       send_message(COROSYNC_MSG_TYPE_UNBLOCK, bm->msg, bm->msg_len);
> +       send_message(COROSYNC_MSG_TYPE_UNBLOCK, 0, &this_node, NULL, 0,
> +                    bm->msg, bm->msg_len);
>
>        free(bm->msg);
>        free(bm);
>  }
>
> -static struct corosync_event *find_block_event(struct sheepid *sender)
> +static struct corosync_event *find_block_event(enum corosync_event_type type,
> +                                              struct cpg_node *sender)
>  {
>        struct corosync_event *cevent;
>
> @@ -167,67 +232,197 @@ static struct corosync_event *find_block_event(struct sheepid *sender)
>                if (!cevent->blocked)
>                        continue;
>
> -               if (cevent->type == COROSYNC_EVENT_TYPE_NOTIFY &&
> -                   sheepid_cmp(&cevent->sender, sender) == 0)
> +               if (cevent->type == type &&
> +                   cpg_node_equal(&cevent->sender, sender))
>                        return cevent;
>        }
>
>        return NULL;
>  }
>
> +static int is_master(void)
> +{
> +       if (nr_cpg_nodes == 0)
> +               /* this node should be the first cpg node */
> +               return 1;
> +
> +       return cpg_node_equal(&cpg_nodes[0], &this_node);
> +}
> +
> +static void build_node_list(struct cpg_node *nodes, size_t nr_nodes,
> +                           struct sheepdog_node_list_entry *entries)
> +{
> +       int i;
> +
> +       for (i = 0; i < nr_nodes; i++)
> +               entries[i] = nodes[i].ent;
> +}
> +
> +/*
> + * Process one dispatch event
> + *
> + * Returns 1 if the event is processed
> + */
> +static int __corosync_dispatch_one(struct corosync_event *cevent)
> +{
> +       struct corosync_block_msg *bm;
> +       enum cluster_join_result res;
> +       struct sheepdog_node_list_entry entries[SD_MAX_NODES];
> +       int idx;
> +
> +       switch (cevent->type) {
> +       case COROSYNC_EVENT_TYPE_JOIN:
> +               if (cevent->blocked) {
> +                       if (!is_master())
> +                               return 0;
> +
> +                       if (!cevent->msg)
> +                               /* we haven't receive JOIN_REQUEST yet */
> +                               return 0;
> +
> +                       if (cevent->callbacked)
> +                               /* check_join() must be called only once */
> +                               return 0;
> +
> +                       build_node_list(cpg_nodes, nr_cpg_nodes, entries);
here entries[] array is unused
> +                       res = corosync_handlers.check_join_handler(&cevent->sender.ent,
> +                                                                  cevent->msg);
> +                       if (res == CJ_RES_MASTER_TRANSFER)
> +                               nr_cpg_nodes = 0;
> +
> +                       send_message(COROSYNC_MSG_TYPE_JOIN_RESPONSE, res,
> +                                    &cevent->sender, cpg_nodes, nr_cpg_nodes,
> +                                    cevent->msg, cevent->msg_len);
> +
> +                       if (res == CJ_RES_MASTER_TRANSFER) {
> +                               eprintf("Restart me later when master is up, please. Bye.\n");
> +                               exit(1);
> +                       }
> +
> +                       cevent->callbacked = 1;
> +                       return 0;
> +               }
> +
> +               switch (cevent->result) {
> +               case CJ_RES_SUCCESS:
> +               case CJ_RES_MASTER_TRANSFER:
> +                       add_cpg_node(cpg_nodes, nr_cpg_nodes, &cevent->sender);
> +                       nr_cpg_nodes++;
> +                       /* fall through */
> +               case CJ_RES_FAIL:
> +               case CJ_RES_JOIN_LATER:
> +                       build_node_list(cpg_nodes, nr_cpg_nodes, entries);
> +                       corosync_handlers.join_handler(&cevent->sender.ent, entries,
> +                                                      nr_cpg_nodes, cevent->result,
> +                                                      cevent->msg);
> +                       break;
> +               }
> +               break;
> +       case COROSYNC_EVENT_TYPE_LEAVE:
> +               idx = find_cpg_node(cpg_nodes, nr_cpg_nodes, &cevent->sender);
> +               if (idx < 0)
> +                       break;
> +               cevent->sender.ent = cpg_nodes[idx].ent;
> +
> +               del_cpg_node(cpg_nodes, nr_cpg_nodes, &cevent->sender);
> +               nr_cpg_nodes--;
> +               build_node_list(cpg_nodes, nr_cpg_nodes, entries);
> +               corosync_handlers.leave_handler(&cevent->sender.ent,
> +                                               entries, nr_cpg_nodes);
> +               break;
> +       case COROSYNC_EVENT_TYPE_NOTIFY:
> +               if (cevent->blocked) {
> +                       if (cpg_node_equal(&cevent->sender, &this_node) &&
> +                           !cevent->callbacked) {
> +                               /* call a block callback function from a worker thread */
> +                               if (list_empty(&corosync_block_list))
> +                                       panic("cannot call block callback\n");
> +
> +                               bm = list_first_entry(&corosync_block_list,
> +                                                     typeof(*bm), list);
> +                               list_del(&bm->list);
> +
> +                               bm->work.fn = corosync_block;
> +                               bm->work.done = corosync_block_done;
> +                               queue_work(corosync_block_wq, &bm->work);
> +
> +                               cevent->callbacked = 1;
> +                       }
> +
> +                       /* block the rest messages until unblock message comes */
> +                       return 0;
> +               }
> +
> +               corosync_handlers.notify_handler(&cevent->sender.ent, cevent->msg,
> +                                                cevent->msg_len);
> +               break;
> +       }
> +
> +       return 1;
> +}
> +
>  static void __corosync_dispatch(void)
>  {
>        struct corosync_event *cevent;
> -       struct corosync_block_msg *bm;
> +       static int join_finished;
> +       int done;
>
>        while (!list_empty(&corosync_event_list)) {
>                cevent = list_first_entry(&corosync_event_list, typeof(*cevent), list);
>
> -               switch (cevent->type) {
> -               case COROSYNC_EVENT_TYPE_JOIN:
> -                       corosync_handlers.join_handler(&cevent->sender,
> -                                                      cevent->members,
> -                                                      cevent->nr_members);
> -                       break;
> -               case COROSYNC_EVENT_TYPE_LEAVE:
> -                       corosync_handlers.leave_handler(&cevent->sender,
> -                                                       cevent->members,
> -                                                       cevent->nr_members);
> -                       break;
> -               case COROSYNC_EVENT_TYPE_NOTIFY:
> -                       if (cevent->blocked) {
> -                               if (sheepid_cmp(&cevent->sender, &this_sheepid) == 0 &&
> -                                   !cevent->callbacked) {
> -                                       /* call a block callback function from a worker thread */
> -                                       if (list_empty(&corosync_block_list))
> -                                               panic("cannot call block callback\n");
> -
> -                                       bm = list_first_entry(&corosync_block_list,
> -                                                             typeof(*bm), list);
> -                                       list_del(&bm->list);
> -
> -                                       bm->work.fn = corosync_block;
> -                                       bm->work.done = corosync_block_done;
> -                                       queue_work(corosync_block_wq, &bm->work);
> -
> -                                       cevent->callbacked = 1;
> +               /* update join status */
> +               if (!join_finished && cevent->type == COROSYNC_EVENT_TYPE_JOIN) {
> +                       if (cevent->first_node) {
> +                               join_finished = 1;
> +                               nr_cpg_nodes = 0;
> +                       }
> +                       if (!cevent->blocked && cpg_node_equal(&cevent->sender, &this_node)) {
> +                               if (cevent->result == CJ_RES_SUCCESS ||
> +                                   cevent->result == CJ_RES_MASTER_TRANSFER) {
> +                                       join_finished = 1;
> +                                       nr_cpg_nodes = cevent->nr_nodes;
> +                                       memcpy(cpg_nodes, cevent->nodes,
> +                                              sizeof(*cevent->nodes) * cevent->nr_nodes);
>                                }
> -
> -                               /* block the rest messages until unblock message comes */
> -                               goto out;
>                        }
> +               }
> +
> +               if (join_finished)
> +                       done = __corosync_dispatch_one(cevent);
> +               else
> +                       done = !cevent->blocked;
>
> -                       corosync_handlers.notify_handler(&cevent->sender,
> -                                                        cevent->msg,
> -                                                        cevent->msg_len);
> +               if (!done)
>                        break;
> -               }
>
>                list_del(&cevent->list);
>                free(cevent);
>        }
> -out:
> -       return;
> +}
> +
> +static struct corosync_event *update_block_event(enum corosync_event_type type,
> +                                                struct cpg_node *sender,
> +                                                void *msg, size_t msg_len)
> +{
> +       struct corosync_event *cevent;
> +
> +       cevent = find_block_event(type, sender);
> +       if (!cevent)
> +               /* block message was casted before this node joins */
> +               return NULL;
> +
> +       cevent->msg_len = msg_len;
> +       if (msg_len) {
> +               cevent->msg = realloc(cevent->msg, msg_len);
> +               if (!cevent->msg)
> +                       panic("oom\n");
> +               memcpy(cevent->msg, msg, msg_len);
> +       } else {
> +               free(cevent->msg);
> +               cevent->msg = NULL;
> +       }
> +
> +       return cevent;
>  }
>
>  static void cdrv_cpg_deliver(cpg_handle_t handle,
> @@ -236,57 +431,67 @@ static void cdrv_cpg_deliver(cpg_handle_t handle,
>                             void *msg, size_t msg_len)
>  {
>        struct corosync_event *cevent;
> -       uint64_t type;
> -       struct sheepid sender;
> -
> -       nodeid_to_addr(nodeid, sender.addr);
> -       sender.pid = pid;
> -
> -       memcpy(&type, msg, sizeof(type));
> -       msg = (uint8_t *)msg + sizeof(type);
> -       msg_len -= sizeof(type);
> +       struct corosync_message *cmsg = msg;
>
>        cevent = zalloc(sizeof(*cevent));
>        if (!cevent)
>                panic("oom\n");
>
> -       switch (type) {
> +       switch (cmsg->type) {
> +       case COROSYNC_MSG_TYPE_JOIN_REQUEST:
> +               free(cevent); /* we don't add a new cluster event in this case */
> +
> +               cevent = update_block_event(COROSYNC_EVENT_TYPE_JOIN, &cmsg->sender,
> +                                           cmsg->msg, cmsg->msg_len);
> +               if (!cevent)
> +                       break;
> +
> +               cevent->sender = cmsg->sender;
> +               cevent->msg_len = cmsg->msg_len;
> +               break;
>        case COROSYNC_MSG_TYPE_BLOCK:
>                cevent->blocked = 1;
>                /* fall through */
>        case COROSYNC_MSG_TYPE_NOTIFY:
>                cevent->type = COROSYNC_EVENT_TYPE_NOTIFY;
> -               cevent->sender = sender;
> -               cevent->msg_len = msg_len;
> -               if (msg_len) {
> -                       cevent->msg = zalloc(msg_len);
> +
> +               cevent->sender = cmsg->sender;
> +               cevent->msg_len = cmsg->msg_len;
> +               if (cmsg->msg_len) {
> +                       cevent->msg = zalloc(cmsg->msg_len);
>                        if (!cevent->msg)
>                                panic("oom\n");
> -                       memcpy(cevent->msg, msg, msg_len);
> +                       memcpy(cevent->msg, cmsg->msg, cmsg->msg_len);
>                } else
>                        cevent->msg = NULL;
>
>                list_add_tail(&cevent->list, &corosync_event_list);
>                break;
> +       case COROSYNC_MSG_TYPE_JOIN_RESPONSE:
> +               free(cevent); /* we don't add a new cluster event in this case */
> +
> +               cevent = update_block_event(COROSYNC_EVENT_TYPE_JOIN, &cmsg->sender,
> +                                           cmsg->msg, cmsg->msg_len);
> +               if (!cevent)
> +                       break;
> +
> +               cevent->blocked = 0;
> +
> +               cevent->result = cmsg->result;
> +               cevent->nr_nodes = cmsg->nr_nodes;
> +               memcpy(cevent->nodes, cmsg->nodes,
> +                      sizeof(*cmsg->nodes) * cmsg->nr_nodes);
> +
> +               break;
>        case COROSYNC_MSG_TYPE_UNBLOCK:
>                free(cevent); /* we don't add a new cluster event in this case */
>
> -               cevent = find_block_event(&sender);
> +               cevent = update_block_event(COROSYNC_EVENT_TYPE_NOTIFY,
> +                                           &cmsg->sender, cmsg->msg, cmsg->msg_len);
>                if (!cevent)
> -                       /* block message was casted before this node joins */
>                        break;
>
>                cevent->blocked = 0;
> -               cevent->msg_len = msg_len;
> -               if (msg_len) {
> -                       cevent->msg = realloc(cevent->msg, msg_len);
> -                       if (!cevent->msg)
> -                               panic("oom\n");
> -                       memcpy(cevent->msg, msg, msg_len);
> -               } else {
> -                       free(cevent->msg);
> -                       cevent->msg = NULL;
> -               }
>                break;
>        }
>
> @@ -304,27 +509,32 @@ static void cdrv_cpg_confchg(cpg_handle_t handle,
>  {
>        struct corosync_event *cevent;
>        int i;
> -       struct sheepid member_sheeps[SD_MAX_NODES];
> -       struct sheepid joined_sheeps[SD_MAX_NODES];
> -       struct sheepid left_sheeps[SD_MAX_NODES];
> -
> -       /* convert cpg_address to sheepid*/
> -       cpg_addr_to_sheepid(member_list, member_sheeps, member_list_entries);
> -       cpg_addr_to_sheepid(left_list, left_sheeps, left_list_entries);
> -       cpg_addr_to_sheepid(joined_list, joined_sheeps, joined_list_entries);
> -
> -       /* calculate a start member list */
> -       sheepid_del(member_sheeps, member_list_entries,
> -                   joined_sheeps, joined_list_entries);
> -       member_list_entries -= joined_list_entries;
> +       struct cpg_node joined_sheeps[SD_MAX_NODES];
> +       struct cpg_node left_sheeps[SD_MAX_NODES];
>
> -       sheepid_add(member_sheeps, member_list_entries,
> -                   left_sheeps, left_list_entries);
> -       member_list_entries += left_list_entries;
> +       /* convert cpg_address to cpg_node */
> +       for (i = 0; i < left_list_entries; i++) {
> +               left_sheeps[i].nodeid = left_list[i].nodeid;
> +               left_sheeps[i].pid = left_list[i].pid;
> +       }
> +       for (i = 0; i < joined_list_entries; i++) {
> +               joined_sheeps[i].nodeid = joined_list[i].nodeid;
> +               joined_sheeps[i].pid = joined_list[i].pid;
> +       }
>
>        /* dispatch leave_handler */
>        for (i = 0; i < left_list_entries; i++) {
> -               cevent = find_block_event(left_sheeps + i);
> +               cevent = find_block_event(COROSYNC_EVENT_TYPE_JOIN,
> +                                         left_sheeps + i);
> +               if (cevent) {
> +                       /* the node left before joining */
> +                       list_del(&cevent->list);
> +                       free(cevent);
> +                       continue;
> +               }
> +
> +               cevent = find_block_event(COROSYNC_EVENT_TYPE_NOTIFY,
> +                                         left_sheeps + i);
>                if (cevent) {
>                        /* the node left before sending UNBLOCK */
>                        list_del(&cevent->list);
> @@ -335,14 +545,8 @@ static void cdrv_cpg_confchg(cpg_handle_t handle,
>                if (!cevent)
>                        panic("oom\n");
>
> -               sheepid_del(member_sheeps, member_list_entries,
> -                           left_sheeps + i, 1);
> -               member_list_entries--;
> -
>                cevent->type = COROSYNC_EVENT_TYPE_LEAVE;
>                cevent->sender = left_sheeps[i];
> -               memcpy(cevent->members, member_sheeps, sizeof(member_sheeps));
> -               cevent->nr_members = member_list_entries;
>
>                list_add_tail(&cevent->list, &corosync_event_list);
>        }
> @@ -353,14 +557,12 @@ static void cdrv_cpg_confchg(cpg_handle_t handle,
>                if (!cevent)
>                        panic("oom\n");
>
> -               sheepid_add(member_sheeps, member_list_entries,
> -                           joined_sheeps, 1);
> -               member_list_entries++;
> -
>                cevent->type = COROSYNC_EVENT_TYPE_JOIN;
>                cevent->sender = joined_sheeps[i];
> -               memcpy(cevent->members, member_sheeps, sizeof(member_sheeps));
> -               cevent->nr_members = member_list_entries;
> +               cevent->blocked = 1; /* FIXME: add explanation */
> +               if (member_list_entries == joined_list_entries - left_list_entries &&
> +                   cpg_node_equal(&joined_sheeps[0], &this_node))
> +                       cevent->first_node = 1;
>
>                list_add_tail(&cevent->list, &corosync_event_list);
>        }
> @@ -368,7 +570,7 @@ static void cdrv_cpg_confchg(cpg_handle_t handle,
>        __corosync_dispatch();
>  }
>
> -static int corosync_init(struct cdrv_handlers *handlers, struct sheepid *myid)
> +static int corosync_init(struct cdrv_handlers *handlers, uint8_t *myaddr)
>  {
>        int ret, fd;
>        uint32_t nodeid;
> @@ -398,14 +600,14 @@ static int corosync_init(struct cdrv_handlers *handlers, struct sheepid *myid)
>                return -1;
>        }
>
> -       ret = nodeid_to_addr(nodeid, myid->addr);
> +       ret = nodeid_to_addr(nodeid, myaddr);
>        if (ret < 0) {
>                eprintf("failed to get local address\n");
>                return -1;
>        }
>
> -       myid->pid = getpid();
> -       this_sheepid = *myid;
> +       this_node.nodeid = nodeid;
> +       this_node.pid = getpid();
>
>        ret = cpg_fd_get(cpg_handle, &fd);
>        if (ret != CPG_OK) {
> @@ -418,7 +620,8 @@ static int corosync_init(struct cdrv_handlers *handlers, struct sheepid *myid)
>        return fd;
>  }
>
> -static int corosync_join(void)
> +static int corosync_join(struct sheepdog_node_list_entry *myself,
> +                        void *opaque, size_t opaque_len)
>  {
>        int ret;
>  retry:
> @@ -438,7 +641,12 @@ retry:
>                return -1;
>        }
>
> -       return 0;
> +       this_node.ent = *myself;
> +
> +       ret = send_message(COROSYNC_MSG_TYPE_JOIN_REQUEST, 0, &this_node,
> +                          NULL, 0, opaque, opaque_len);
> +
> +       return ret;
>  }
>
>  static int corosync_leave(void)
> @@ -472,9 +680,11 @@ static int corosync_notify(void *msg, size_t msg_len, void (*block_cb)(void *))
>                bm->cb = block_cb;
>                list_add_tail(&bm->list, &corosync_block_list);
>
> -               ret = send_message(COROSYNC_MSG_TYPE_BLOCK, NULL, 0);
> +               ret = send_message(COROSYNC_MSG_TYPE_BLOCK, 0, &this_node,
> +                                  NULL, 0, NULL, 0);
>        } else
> -               ret = send_message(COROSYNC_MSG_TYPE_NOTIFY, msg, msg_len);
> +               ret = send_message(COROSYNC_MSG_TYPE_NOTIFY, 0, &this_node,
> +                                  NULL, 0, msg, msg_len);
>
>        return ret;
>  }
> diff --git a/sheep/group.c b/sheep/group.c
> index 0b18fd2..42c8d71 100644
> --- a/sheep/group.c
> +++ b/sheep/group.c
> @@ -25,7 +25,6 @@
>  #include "cluster.h"
>
>  struct node {
> -       struct sheepid sheepid;
>        struct sheepdog_node_list_entry ent;
>        struct list_head list;
>  };
> @@ -42,7 +41,6 @@ struct message_header {
>        uint8_t op;
>        uint8_t state;
>        uint32_t msg_length;
> -       struct sheepid sheepid;
>        struct sheepdog_node_list_entry from;
>  };
>
> @@ -57,12 +55,10 @@ struct join_message {
>        uint8_t inc_epoch; /* set non-zero when we increment epoch of all nodes */
>        uint8_t pad[3];
>        struct {
> -               struct sheepid sheepid;
>                struct sheepdog_node_list_entry ent;
>        } nodes[SD_MAX_NODES];
>        uint32_t nr_leave_nodes;
>        struct {
> -               struct sheepid sheepid;
>                struct sheepdog_node_list_entry ent;
>        } leave_nodes[SD_MAX_NODES];
>  };
> @@ -93,17 +89,19 @@ struct work_notify {
>  struct work_join {
>        struct cpg_event cev;
>
> -       struct sheepid *member_list;
> +       struct sheepdog_node_list_entry *member_list;
>        size_t member_list_entries;
> -       struct sheepid joined;
> +       struct sheepdog_node_list_entry joined;
> +
> +       struct join_message jm;
>  };
>
>  struct work_leave {
>        struct cpg_event cev;
>
> -       struct sheepid *member_list;
> +       struct sheepdog_node_list_entry *member_list;
>        size_t member_list_entries;
> -       struct sheepid left;
> +       struct sheepdog_node_list_entry left;
>  };
>
>  #define print_node_list(node_list)                             \
> @@ -111,11 +109,11 @@ struct work_leave {
>        struct node *__node;                                    \
>        char __name[128];                                       \
>        list_for_each_entry(__node, node_list, list) {          \
> -               dprintf("%c pid: %ld, ip: %s\n",                \
> +               dprintf("%c ip: %s, port: %d\n",                \
>                        is_myself(__node->ent.addr, __node->ent.port) ? 'l' : ' ',      \
> -                       __node->sheepid.pid,                    \
>                        addr_to_str(__name, sizeof(__name),     \
> -                       __node->ent.addr, __node->ent.port));   \
> +                                   __node->ent.addr, __node->ent.port), \
> +                       __node->ent.port);                      \
>        }                                                       \
>  })
>
> @@ -389,12 +387,13 @@ out:
>        exit(1);
>  }
>
> -static struct node *find_node(struct list_head *node_list, struct sheepid *id)
> +static struct node *find_node(struct list_head *node_list,
> +                             struct sheepdog_node_list_entry *ent)
>  {
>        struct node *node;
>
>        list_for_each_entry(node, node_list, list) {
> -               if (sheepid_cmp(&node->sheepid, id) == 0)
> +               if (node_cmp(&node->ent, ent) == 0)
>                        return node;
>        }
>
> @@ -483,7 +482,6 @@ static int add_node_to_leave_list(struct message_header *msg)
>                        goto ret;
>                }
>
> -               n->sheepid = msg->sheepid;
>                n->ent = msg->from;
>
>                list_add_tail(&n->list, &sys->leave_list);
> @@ -504,7 +502,6 @@ static int add_node_to_leave_list(struct message_header *msg)
>                                continue;
>                        }
>
> -                       n->sheepid = jm->leave_nodes[i].sheepid;
>                        n->ent = jm->leave_nodes[i].ent;
>
>                        list_add_tail(&n->list, &tmp_list);
> @@ -647,7 +644,6 @@ static int get_cluster_status(struct sheepdog_node_list_entry *from,
>
>  static void join(struct join_message *msg)
>  {
> -       struct node *node;
>        struct sheepdog_node_list_entry entry[SD_MAX_NODES];
>        int i;
>
> @@ -666,12 +662,6 @@ static void join(struct join_message *msg)
>                                         &msg->inc_epoch);
>        msg->nr_sobjs = sys->nr_sobjs;
>        msg->ctime = get_cluster_ctime();
> -       msg->nr_nodes = 0;
> -       list_for_each_entry(node, &sys->sd_node_list, list) {
> -               msg->nodes[msg->nr_nodes].sheepid = node->sheepid;
> -               msg->nodes[msg->nr_nodes].ent = node->ent;
> -               msg->nr_nodes++;
> -       }
>  }
>
>  static int get_vdi_bitmap_from(struct sheepdog_node_list_entry *node)
> @@ -737,18 +727,16 @@ static void get_vdi_bitmap_from_sd_list(void)
>                get_vdi_bitmap_from(&nodes[i]);
>  }
>
> -static int move_node_to_sd_list(struct sheepid *id,
> -                               struct sheepdog_node_list_entry ent)
> +static int move_node_to_sd_list(struct sheepdog_node_list_entry ent)
>  {
>        struct node *node;
>
> -       node = find_node(&sys->cpg_node_list, id);
> +       node = zalloc(sizeof(*node));
>        if (!node)
> -               return 1;
> +               panic("failed to alloc memory for a new node\n");
>
>        node->ent = ent;
>
> -       list_del(&node->list);
>        list_add_tail(&node->list, &sys->sd_node_list);
>        sys->nr_vnodes = 0;
>
> @@ -771,22 +759,14 @@ static int update_epoch_log(int epoch)
>        return ret;
>  }
>
> -static void update_cluster_info(struct join_message *msg)
> +static void update_cluster_info(struct join_message *msg,
> +                               struct sheepdog_node_list_entry *nodes,
> +                               size_t nr_nodes)
>  {
>        int i;
> -       int ret, nr_nodes = msg->nr_nodes;
> +       int ret;
>
>        eprintf("status = %d, epoch = %d, %d, %d\n", msg->cluster_status, msg->epoch, msg->result, sys->join_finished);
> -       if (msg->result != SD_RES_SUCCESS) {
> -               if (is_myself(msg->header.from.addr, msg->header.from.port)) {
> -                       eprintf("failed to join sheepdog, %d\n", msg->result);
> -                       leave_cluster();
> -                       eprintf("Restart me later when master is up, please.Bye.\n");
> -                       exit(1);
> -                       /* sys->status = SD_STATUS_JOIN_FAILED; */
> -               }
> -               return;
> -       }
>
>        if (sys->status == SD_STATUS_JOIN_FAILED)
>                return;
> @@ -799,15 +779,16 @@ static void update_cluster_info(struct join_message *msg)
>
>        sys->epoch = msg->epoch;
>        for (i = 0; i < nr_nodes; i++) {
> -               ret = move_node_to_sd_list(&msg->nodes[i].sheepid,
> -                                          msg->nodes[i].ent);
> +               if (node_cmp(nodes + i, &msg->header.from) == 0)
> +                       continue;
> +               ret = move_node_to_sd_list(nodes[i]);
>                /*
>                 * the node belonged to sheepdog when the master build
>                 * the JOIN response however it has gone.
>                 */
>                if (ret)
>                        vprintf(SDOG_INFO "%s has gone\n",
> -                               sheepid_to_str(&msg->nodes[i].sheepid));
> +                               node_to_str(&nodes[i]));
>        }
>
>        if (msg->cluster_status != SD_STATUS_OK)
> @@ -819,14 +800,14 @@ static void update_cluster_info(struct join_message *msg)
>                update_epoch_log(sys->epoch);
>
>  join_finished:
> -       ret = move_node_to_sd_list(&msg->header.sheepid, msg->header.from);
> +       ret = move_node_to_sd_list(msg->header.from);
>        /*
>         * this should not happen since __sd_deliver() checks if the
>         * host from msg on cpg_node_list.
>         */
>        if (ret)
>                vprintf(SDOG_ERR "%s has gone\n",
> -                       sheepid_to_str(&msg->header.sheepid));
> +                       node_to_str(&msg->header.from));
>
>        if (msg->cluster_status == SD_STATUS_OK) {
>                if (msg->inc_epoch) {
> @@ -995,24 +976,24 @@ static void __sd_notify(struct cpg_event *cevent)
>        char name[128];
>        struct node *node;
>
> -       dprintf("op: %d, state: %u, size: %d, from: %s, pid: %ld\n",
> +       dprintf("op: %d, state: %u, size: %d, from: %s, port: %d\n",
>                m->op, m->state, m->msg_length,
>                addr_to_str(name, sizeof(name), m->from.addr, m->from.port),
> -               m->sheepid.pid);
> +               m->from.port);
>
>        /*
>         * we don't want to perform any deliver events except mastership_tx event
>         * until we join; we wait for our JOIN message.
>         */
>        if (!sys->join_finished && !master_tx_message(m)) {
> -               if (sheepid_cmp(&m->sheepid, &sys->this_sheepid) != 0) {
> +               if (node_cmp(&m->from, &sys->this_node) != 0) {
>                        cevent->skip = 1;
>                        return;
>                }
>        }
>
>        if (join_message(m)) {
> -               node = find_node(&sys->cpg_node_list, &m->sheepid);
> +               node = find_node(&sys->cpg_node_list, &m->from);
>                if (!node) {
>                        dprintf("the node was left before join operation is finished\n");
>                        return;
> @@ -1020,70 +1001,6 @@ static void __sd_notify(struct cpg_event *cevent)
>
>                node->ent = m->from;
>        }
> -
> -       if (m->state == DM_FIN) {
> -               switch (m->op) {
> -               case SD_MSG_JOIN:
> -                       if (((struct join_message *)m)->cluster_status == SD_STATUS_OK)
> -                               if (sys->status != SD_STATUS_OK) {
> -                                       struct join_message *msg = (struct join_message *)m;
> -                                       int i;
> -
> -                                       get_vdi_bitmap_from_sd_list();
> -                                       get_vdi_bitmap_from(&m->from);
> -                                       for (i = 0; i < msg->nr_nodes;i++)
> -                                               get_vdi_bitmap_from(&msg->nodes[i].ent);
> -                       }
> -                       break;
> -               }
> -       }
> -
> -}
> -
> -static int tx_mastership(void)
> -{
> -       struct mastership_tx_message msg;
> -       memset(&msg, 0, sizeof(msg));
> -       msg.header.proto_ver = SD_SHEEP_PROTO_VER;
> -       msg.header.op = SD_MSG_MASTER_TRANSFER;
> -       msg.header.state = DM_FIN;
> -       msg.header.msg_length = sizeof(msg);
> -       msg.header.from = sys->this_node;
> -       msg.header.sheepid = sys->this_sheepid;
> -
> -       return sys->cdrv->notify(&msg, msg.header.msg_length, NULL);
> -}
> -
> -static void send_join_response(struct work_notify *w)
> -{
> -       struct message_header *m;
> -       struct join_message *jm;
> -       struct node *node;
> -
> -       m = w->msg;
> -       jm = (struct join_message *)m;
> -       join(jm);
> -       m->state = DM_FIN;
> -
> -       dprintf("%d, %d\n", jm->result, jm->cluster_status);
> -       if (jm->result == SD_RES_SUCCESS && jm->cluster_status != SD_STATUS_OK) {
> -               jm->nr_leave_nodes = 0;
> -               list_for_each_entry(node, &sys->leave_list, list) {
> -                       jm->leave_nodes[jm->nr_leave_nodes].sheepid = node->sheepid;
> -                       jm->leave_nodes[jm->nr_leave_nodes].ent = node->ent;
> -                       jm->nr_leave_nodes++;
> -               }
> -               print_node_list(&sys->leave_list);
> -       } else if (jm->result != SD_RES_SUCCESS &&
> -                       jm->epoch > sys->epoch &&
> -                       jm->cluster_status == SD_STATUS_WAIT_FOR_JOIN) {
> -               eprintf("Transfer mastership.\n");
> -               tx_mastership();
> -               eprintf("Restart me later when master is up, please.Bye.\n");
> -               exit(1);
> -       }
> -       jm->epoch = sys->epoch;
> -       sys->cdrv->notify(m, m->msg_length, NULL);
>  }
>
>  static void __sd_notify_done(struct cpg_event *cevent)
> @@ -1100,10 +1017,9 @@ static void __sd_notify_done(struct cpg_event *cevent)
>        if (m->state == DM_FIN) {
>                switch (m->op) {
>                case SD_MSG_JOIN:
> -                       update_cluster_info((struct join_message *)m);
>                        break;
>                case SD_MSG_LEAVE:
> -                       node = find_node(&sys->sd_node_list, &m->sheepid);
> +                       node = find_node(&sys->sd_node_list, &m->from);
>                        if (node) {
>                                sys->nr_vnodes = 0;
>
> @@ -1125,7 +1041,7 @@ static void __sd_notify_done(struct cpg_event *cevent)
>                                 */
>                                if (!sys->join_finished) {
>                                        sys->join_finished = 1;
> -                                       move_node_to_sd_list(&sys->this_sheepid, sys->this_node);
> +                                       move_node_to_sd_list(sys->this_node);
>                                        sys->epoch = get_latest_epoch();
>                                }
>
> @@ -1158,7 +1074,6 @@ static void __sd_notify_done(struct cpg_event *cevent)
>        if (m->state == DM_INIT && is_master()) {
>                switch (m->op) {
>                case SD_MSG_JOIN:
> -                       send_join_response(w);
>                        break;
>                default:
>                        eprintf("unknown message %d\n", m->op);
> @@ -1174,17 +1089,18 @@ static void __sd_notify_done(struct cpg_event *cevent)
>        }
>  }
>
> -static void sd_notify_handler(struct sheepid *sender, void *msg, size_t msg_len)
> +static void sd_notify_handler(struct sheepdog_node_list_entry *sender,
> +                             void *msg, size_t msg_len)
>  {
>        struct cpg_event *cevent;
>        struct work_notify *w;
>        struct message_header *m = msg;
>        char name[128];
>
> -       dprintf("op: %d, state: %u, size: %d, from: %s, pid: %lu\n",
> +       dprintf("op: %d, state: %u, size: %d, from: %s, pid: %u\n",
>                m->op, m->state, m->msg_length,
>                addr_to_str(name, sizeof(name), m->from.addr, m->from.port),
> -               sender->pid);
> +               sender->port);
>
>        w = zalloc(sizeof(*w));
>        if (!w)
> @@ -1212,7 +1128,7 @@ static void sd_notify_handler(struct sheepid *sender, void *msg, size_t msg_len)
>        start_cpg_event_work();
>  }
>
> -static void add_node(struct sheepid *id)
> +static void add_node(struct sheepdog_node_list_entry *ent)
>  {
>        struct node *node;
>
> @@ -1220,16 +1136,16 @@ static void add_node(struct sheepid *id)
>        if (!node)
>                panic("failed to alloc memory for a new node\n");
>
> -       node->sheepid = *id;
> +       node->ent = *ent;
>
>        list_add_tail(&node->list, &sys->cpg_node_list);
>  }
>
> -static int del_node(struct sheepid *id)
> +static int del_node(struct sheepdog_node_list_entry *ent)
>  {
>        struct node *node;
>
> -       node = find_node(&sys->sd_node_list, id);
> +       node = find_node(&sys->sd_node_list, ent);
>        if (node) {
>                int nr;
>                struct sheepdog_node_list_entry e[SD_MAX_NODES];
> @@ -1252,7 +1168,7 @@ static int del_node(struct sheepid *id)
>                return 1;
>        }
>
> -       node = find_node(&sys->cpg_node_list, id);
> +       node = find_node(&sys->cpg_node_list, ent);
>        if (node) {
>                list_del(&node->list);
>                free(node);
> @@ -1264,7 +1180,7 @@ static int del_node(struct sheepid *id)
>  /*
>  * Check whether the majority of Sheepdog nodes are still alive or not
>  */
> -static int check_majority(struct sheepid *left)
> +static int check_majority(struct sheepdog_node_list_entry *left)
>  {
>        int nr_nodes = 0, nr_majority, nr_reachable = 0, fd;
>        struct node *node;
> @@ -1279,7 +1195,7 @@ static int check_majority(struct sheepid *left)
>                return 1;
>
>        list_for_each_entry(node, &sys->sd_node_list, list) {
> -               if (sheepid_cmp(&node->sheepid, left) == 0)
> +               if (node_cmp(&node->ent, left) == 0)
>                        continue;
>
>                addr_to_str(name, sizeof(name), node->ent.addr, 0);
> @@ -1299,6 +1215,23 @@ static int check_majority(struct sheepid *left)
>        return 0;
>  }
>
> +static void __sd_join(struct cpg_event *cevent)
> +{
> +       struct work_join *w = container_of(cevent, struct work_join, cev);
> +       struct join_message *msg = &w->jm;
> +       int i;
> +
> +       if (msg->cluster_status != SD_STATUS_OK)
> +               return;
> +
> +       if (sys->status == SD_STATUS_OK)
> +               return;
> +
> +       get_vdi_bitmap_from_sd_list();
> +       for (i = 0; i < w->member_list_entries; i++)
> +               get_vdi_bitmap_from(w->member_list + i);
> +}
> +
>  static void __sd_leave(struct cpg_event *cevent)
>  {
>        struct work_leave *w = container_of(cevent, struct work_leave, cev);
> @@ -1309,7 +1242,7 @@ static void __sd_leave(struct cpg_event *cevent)
>        }
>  }
>
> -static void send_join_request(struct sheepid *id)
> +static int send_join_request(struct sheepdog_node_list_entry *ent)
>  {
>        struct join_message msg;
>        struct sheepdog_node_list_entry entries[SD_MAX_NODES];
> @@ -1320,8 +1253,7 @@ static void send_join_request(struct sheepid *id)
>        msg.header.op = SD_MSG_JOIN;
>        msg.header.state = DM_INIT;
>        msg.header.msg_length = sizeof(msg);
> -       msg.header.from = sys->this_node;
> -       msg.header.sheepid = sys->this_sheepid;
> +       msg.header.from = *ent;
>
>        get_global_nr_copies(&msg.nr_sobjs);
>
> @@ -1333,22 +1265,24 @@ static void send_join_request(struct sheepid *id)
>                        msg.nodes[i].ent = entries[i];
>        }
>
> -       sys->cdrv->notify(&msg, msg.header.msg_length, NULL);
> +       ret = sys->cdrv->join(ent, &msg, msg.header.msg_length);
> +
> +       vprintf(SDOG_INFO "%s\n", node_to_str(&sys->this_node));
>
> -       vprintf(SDOG_INFO "%s\n", sheepid_to_str(&sys->this_sheepid));
> +       return ret;
>  }
>
>  static void __sd_join_done(struct cpg_event *cevent)
>  {
>        struct work_join *w = container_of(cevent, struct work_join, cev);
> -       int ret, i;
> -       int first_cpg_node = 0;
> +       struct join_message *jm = &w->jm;
> +       struct node *node, *t;
> +       int i;
>
>        if (w->member_list_entries == 1 &&
> -           sheepid_cmp(&w->joined, &sys->this_sheepid) == 0) {
> +           node_cmp(&w->joined, &sys->this_node) == 0) {
>                sys->join_finished = 1;
>                get_global_nr_copies(&sys->nr_sobjs);
> -               first_cpg_node = 1;
>        }
>
>        if (list_empty(&sys->cpg_node_list)) {
> @@ -1357,47 +1291,16 @@ static void __sd_join_done(struct cpg_event *cevent)
>        } else
>                add_node(&w->joined);
>
> -       if (first_cpg_node) {
> -               struct join_message msg;
> -               struct sheepdog_node_list_entry entries[SD_MAX_NODES];
> -               int nr_entries;
> -               uint64_t ctime;
> -               uint32_t epoch;
> -
> -               /*
> -                * If I'm the first sheep joins in colosync, I
> -                * becomes the master without sending JOIN.
> -                */
> -
> -               vprintf(SDOG_DEBUG "%s\n", sheepid_to_str(&sys->this_sheepid));
> -
> -               memset(&msg, 0, sizeof(msg));
> -
> -               msg.header.from = sys->this_node;
> -               msg.header.sheepid = sys->this_sheepid;
> -
> -               nr_entries = ARRAY_SIZE(entries);
> -               ret = read_epoch(&epoch, &ctime, entries, &nr_entries);
> -               if (ret == SD_RES_SUCCESS) {
> -                       sys->epoch = epoch;
> -                       msg.ctime = ctime;
> -                       get_cluster_status(&msg.header.from, entries, nr_entries,
> -                                          ctime, epoch, &msg.cluster_status, NULL);
> -               } else
> -                       msg.cluster_status = SD_STATUS_WAIT_FOR_FORMAT;
> -
> -               update_cluster_info(&msg);
> +       print_node_list(&sys->sd_node_list);
>
> -               if (sys->status == SD_STATUS_OK) /* sheepdog starts with one node */
> -                       start_recovery(sys->epoch);
> +       update_cluster_info(jm, w->member_list, w->member_list_entries);
>
> -               return;
> +       if (sys->status == SD_STATUS_OK) {
> +               list_for_each_entry_safe(node, t, &sys->leave_list, list) {
> +                       list_del(&node->list);
> +               }
> +               start_recovery(sys->epoch);
>        }
> -
> -       print_node_list(&sys->sd_node_list);
> -
> -       if (sheepid_cmp(&w->joined, &sys->this_sheepid) == 0)
> -               send_join_request(&w->joined);
>  }
>
>  static void __sd_leave_done(struct cpg_event *cevent)
> @@ -1455,6 +1358,7 @@ static void cpg_event_fn(struct work *work, int idx)
>
>        switch (cevent->ctype) {
>        case CPG_EVENT_JOIN:
> +               __sd_join(cevent);
>                break;
>        case CPG_EVENT_LEAVE:
>                __sd_leave(cevent);
> @@ -1746,62 +1650,214 @@ do_retry:
>        queue_work(sys->cpg_wqueue, &cpg_event_work);
>  }
>
> -static void sd_join_handler(struct sheepid *joined, struct sheepid *members,
> -                           size_t nr_members)
> +static enum cluster_join_result sd_check_join_handler(struct sheepdog_node_list_entry *joining,
> +                                                     void *opaque)
> +{
> +       struct message_header *m = opaque;
> +       struct join_message *jm;
> +       struct node *node;
> +
> +       jm = (struct join_message *)m;
> +
> +       if (node_cmp(joining, &sys->this_node) == 0) {
> +               struct sheepdog_node_list_entry entries[SD_MAX_NODES];
> +               int nr_entries;
> +               uint64_t ctime;
> +               uint32_t epoch;
> +               int ret;
> +
> +               /*
> +                * If I'm the first sheep joins in colosync, I
> +                * becomes the master without sending JOIN.
> +                */
> +
> +               vprintf(SDOG_DEBUG "%s\n", node_to_str(&sys->this_node));
> +
> +               jm->header.from = sys->this_node;
> +
> +               nr_entries = ARRAY_SIZE(entries);
> +               ret = read_epoch(&epoch, &ctime, entries, &nr_entries);
> +               if (ret == SD_RES_SUCCESS) {
> +                       sys->epoch = epoch;
> +                       jm->ctime = ctime;
> +                       get_cluster_status(&jm->header.from, entries, nr_entries,
> +                                          ctime, epoch, &jm->cluster_status, NULL);
> +               } else
> +                       jm->cluster_status = SD_STATUS_WAIT_FOR_FORMAT;
> +
> +               return CJ_RES_SUCCESS;
> +       }
> +
> +       join(jm);
> +       m->state = DM_FIN;
> +
> +       dprintf("%d, %d\n", jm->result, jm->cluster_status);
> +       if (jm->result == SD_RES_SUCCESS && jm->cluster_status != SD_STATUS_OK) {
> +               jm->nr_leave_nodes = 0;
> +               list_for_each_entry(node, &sys->leave_list, list) {
> +                       jm->leave_nodes[jm->nr_leave_nodes].ent = node->ent;
> +                       jm->nr_leave_nodes++;
> +               }
> +               print_node_list(&sys->leave_list);
> +       } else if (jm->result != SD_RES_SUCCESS &&
> +                       jm->epoch > sys->epoch &&
> +                       jm->cluster_status == SD_STATUS_WAIT_FOR_JOIN) {
> +               eprintf("Transfer mastership. %d, %d\n", jm->epoch, sys->epoch);
> +               return CJ_RES_MASTER_TRANSFER;
> +       }
> +       jm->epoch = sys->epoch;
> +
> +       if (jm->result == SD_RES_SUCCESS)
> +               return CJ_RES_SUCCESS;
> +       else if (jm->result == SD_RES_OLD_NODE_VER &&
> +                jm->result == SD_RES_NEW_NODE_VER)
> +               return CJ_RES_JOIN_LATER;
> +       else
> +               return CJ_RES_FAIL;
> +}
> +
> +static void sd_join_handler(struct sheepdog_node_list_entry *joined,
> +                           struct sheepdog_node_list_entry *members,
> +                           size_t nr_members, enum cluster_join_result result,
> +                           void *opaque)
>  {
>        struct cpg_event *cevent;
>        struct work_join *w = NULL;
>        int i, size;
> +       int nr, nr_local, nr_leave;
> +       struct node *n;
> +       struct join_message *jm;
> +       int le = get_latest_epoch();
>
> -       dprintf("join %s\n", sheepid_to_str(joined));
> -       for (i = 0; i < nr_members; i++)
> -               dprintf("[%x] %s\n", i, sheepid_to_str(members + i));
> +       if (node_cmp(joined, &sys->this_node) == 0) {
> +               if (result == CJ_RES_FAIL) {
> +                       eprintf("failed to join sheepdog\n");
> +                       sys->cdrv->leave();
> +                       exit(1);
> +               } else if (result == CJ_RES_JOIN_LATER) {
> +                       eprintf("Restart me later when master is up, please .Bye.\n");
> +                       sys->cdrv->leave();
> +                       exit(1);
> +               }
> +       }
>
> -       if (sys->status == SD_STATUS_SHUTDOWN)
> -               return;
> +       switch (result) {
> +       case CJ_RES_SUCCESS:
> +               dprintf("join %s\n", node_to_str(joined));
> +               for (i = 0; i < nr_members; i++)
> +                       dprintf("[%x] %s\n", i, node_to_str(members + i));
>
> -       w = zalloc(sizeof(*w));
> -       if (!w)
> -               goto oom;
> +               if (sys->status == SD_STATUS_SHUTDOWN)
> +                       break;
>
> -       cevent = &w->cev;
> -       cevent->ctype = CPG_EVENT_JOIN;
> +               w = zalloc(sizeof(*w));
> +               if (!w)
> +                       panic("oom");
>
> +               cevent = &w->cev;
> +               cevent->ctype = CPG_EVENT_JOIN;
>
> -       vprintf(SDOG_DEBUG "allow new confchg, %p\n", cevent);
>
> -       size = sizeof(struct sheepid) * nr_members;
> -       w->member_list = zalloc(size);
> -       if (!w->member_list)
> -               goto oom;
> -       memcpy(w->member_list, members, size);
> -       w->member_list_entries = nr_members;
> +               vprintf(SDOG_DEBUG "allow new confchg, %p\n", cevent);
>
> -       w->joined = *joined;
> +               size = sizeof(struct sheepdog_node_list_entry) * nr_members;
> +               w->member_list = zalloc(size);
> +               if (!w->member_list)
> +                       panic("oom");
>
> -       list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings);
> -       start_cpg_event_work();
> +               memcpy(w->member_list, members, size);
> +               w->member_list_entries = nr_members;
>
> -       return;
> -oom:
> -       if (w) {
> -               if (w->member_list)
> -                       free(w->member_list);
> -               free(w);
> +               w->joined = *joined;
> +
> +               memcpy(&w->jm, opaque, sizeof(w->jm));
> +
> +               list_add_tail(&cevent->cpg_event_list, &sys->cpg_event_siblings);
> +               start_cpg_event_work();
> +               break;
> +       case CJ_RES_FAIL:
> +       case CJ_RES_JOIN_LATER:
> +               if (sys->status != SD_STATUS_WAIT_FOR_JOIN)
> +                       break;
> +
> +               n = zalloc(sizeof(*n));
> +               if (!n)
> +                       panic("oom\n");
> +
> +               if (find_entry_list(joined, &sys->leave_list)
> +                   || !find_entry_epoch(joined, le)) {
> +                       free(n);
> +                       break;
> +               }
> +
> +               n->ent = *joined;
> +
> +               list_add_tail(&n->list, &sys->leave_list);
> +
> +               nr_local = get_nodes_nr_epoch(sys->epoch);
> +               nr = nr_members;
> +               nr_leave = get_nodes_nr_from(&sys->leave_list);
> +
> +               dprintf("%d == %d + %d \n", nr_local, nr, nr_leave);
> +               if (nr_local == nr + nr_leave) {
> +                       sys->status = SD_STATUS_OK;
> +                       update_epoch_log(sys->epoch);
> +                       update_epoch_store(sys->epoch);
> +               }
> +               break;
> +       case CJ_RES_MASTER_TRANSFER:
> +               jm = (struct join_message *)opaque;
> +               nr = jm->nr_leave_nodes;
> +               for (i = 0; i < nr; i++) {
> +                       n = zalloc(sizeof(*n));
> +                       if (!n)
> +                               panic("oom\n");
> +
> +                       if (find_entry_list(&jm->leave_nodes[i].ent, &sys->leave_list)
> +                           || !find_entry_epoch(&jm->leave_nodes[i].ent, le)) {
> +                               free(n);
> +                               continue;
> +                       }
> +
> +                       n->ent = jm->leave_nodes[i].ent;
> +
> +                       list_add_tail(&n->list, &sys->leave_list);
> +               }
> +
> +               /* Sheep needs this to identify itself as master.
> +                * Now mastership transfer is done.
> +                */
> +               if (!sys->join_finished) {
> +                       sys->join_finished = 1;
> +                       move_node_to_sd_list(sys->this_node);
> +                       sys->epoch = get_latest_epoch();
> +               }
> +
> +               nr_local = get_nodes_nr_epoch(sys->epoch);
> +               nr = nr_members;
> +               nr_leave = get_nodes_nr_from(&sys->leave_list);
> +
> +               dprintf("%d == %d + %d \n", nr_local, nr, nr_leave);
> +               if (nr_local == nr + nr_leave) {
> +                       sys->status = SD_STATUS_OK;
> +                       update_epoch_log(sys->epoch);
> +                       update_epoch_store(sys->epoch);
> +               }
> +               break;
>        }
> -       panic("failed to allocate memory for a confchg event\n");
>  }
>
> -static void sd_leave_handler(struct sheepid *left, struct sheepid *members,
> +static void sd_leave_handler(struct sheepdog_node_list_entry *left,
> +                            struct sheepdog_node_list_entry *members,
>                             size_t nr_members)
>  {
>        struct cpg_event *cevent;
>        struct work_leave *w = NULL;
>        int i, size;
>
> -       dprintf("leave %s\n", sheepid_to_str(left));
> +       dprintf("leave %s\n", node_to_str(left));
>        for (i = 0; i < nr_members; i++)
> -               dprintf("[%x] %s\n", i, sheepid_to_str(members + i));
> +               dprintf("[%x] %s\n", i, node_to_str(members + i));
>
>        if (sys->status == SD_STATUS_SHUTDOWN)
>                return;
> @@ -1816,7 +1872,7 @@ static void sd_leave_handler(struct sheepid *left, struct sheepid *members,
>
>        vprintf(SDOG_DEBUG "allow new confchg, %p\n", cevent);
>
> -       size = sizeof(struct sheepid) * nr_members;
> +       size = sizeof(struct sheepdog_node_list_entry) * nr_members;
>        w->member_list = zalloc(size);
>        if (!w->member_list)
>                goto oom;
> @@ -1843,6 +1899,7 @@ int create_cluster(int port, int64_t zone)
>        int fd, ret;
>        struct cluster_driver *cdrv;
>        struct cdrv_handlers handlers = {
> +               .check_join_handler = sd_check_join_handler,
>                .join_handler = sd_join_handler,
>                .leave_handler = sd_leave_handler,
>                .notify_handler = sd_notify_handler,
> @@ -1858,26 +1915,24 @@ int create_cluster(int port, int64_t zone)
>                }
>        }
>
> -       fd = sys->cdrv->init(&handlers, &sys->this_sheepid);
> +       fd = sys->cdrv->init(&handlers, sys->this_node.addr);
>        if (fd < 0)
>                return -1;
>
> -       ret = sys->cdrv->join();
> -       if (ret != 0)
> -               return -1;
> -
> -       memcpy(sys->this_node.addr, sys->this_sheepid.addr,
> -              sizeof(sys->this_node.addr));
>        sys->this_node.port = port;
>        sys->this_node.nr_vnodes = SD_DEFAULT_VNODES;
>        if (zone == -1) {
>                /* use last 4 bytes as zone id */
> -               uint8_t *b = sys->this_sheepid.addr + 12;
> +               uint8_t *b = sys->this_node.addr + 12;
>                sys->this_node.zone = b[0] | b[1] << 8 | b[2] << 16 | b[3] << 24;
>        } else
>                sys->this_node.zone = zone;
>        dprintf("zone id = %u\n", sys->this_node.zone);
>
> +       ret = send_join_request(&sys->this_node);
> +       if (ret != 0)
> +               return -1;
> +
>        if (get_latest_epoch() == 0)
>                sys->status = SD_STATUS_WAIT_FOR_FORMAT;
>        else
> @@ -1912,7 +1967,6 @@ int leave_cluster(void)
>        msg.header.state = DM_FIN;
>        msg.header.msg_length = sizeof(msg);
>        msg.header.from = sys->this_node;
> -       msg.header.sheepid = sys->this_sheepid;
>        msg.epoch = get_latest_epoch();
>
>        dprintf("%d\n", msg.epoch);
> diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
> index b292548..5613f12 100644
> --- a/sheep/sheep_priv.h
> +++ b/sheep/sheep_priv.h
> @@ -110,7 +110,6 @@ struct cluster_info {
>
>        /* set after finishing the JOIN procedure */
>        int join_finished;
> -       struct sheepid this_sheepid;
>        struct sheepdog_node_list_entry this_node;
>
>        uint32_t epoch;
> --
> 1.7.2.5
>
> --
> sheepdog mailing list
> sheepdog at lists.wpkg.org
> http://lists.wpkg.org/mailman/listinfo/sheepdog

________________________________

This email (including any attachments) is confidential and may be legally privileged. If you received this email in error, please delete it immediately and do not copy it or use it for any purpose or disclose its contents to any other person. Thank you.

本电邮(包括任何附件)可能含有机密资料并受法律保护。如您不是正确的收件人,请您立即删除本邮件。请不要将本电邮进行复制并用作任何其他用途、或透露本邮件之内容。谢谢。



More information about the sheepdog mailing list