[Sheepdog] [RFC PATCH 2/2] cluster: add corosync driver

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Fri Sep 30 11:40:39 CEST 2011


Hi,

Thanks for the review!

At Fri, 30 Sep 2011 17:09:48 +0800,
Yibin Shen wrote:
> 
> On Fri, Sep 30, 2011 at 4:28 AM, MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp<mailto:morita.kazutaka at lab.ntt.co.jp>> wrote:
> Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp<mailto:morita.kazutaka at lab.ntt.co.jp>>
> ---
>  sheep/Makefile.am        |    2 +-
>  sheep/cluster/corosync.c |  260 ++++++++++++++++++++++++++++++++++++++++++++++
>  sheep/group.c            |   11 ++
>  3 files changed, 272 insertions(+), 1 deletions(-)
>  create mode 100644 sheep/cluster/corosync.c
> 
> diff --git a/sheep/Makefile.am b/sheep/Makefile.am
> index 2f51702..ad92ea0 100644
> --- a/sheep/Makefile.am
> +++ b/sheep/Makefile.am
> @@ -23,7 +23,7 @@ INCLUDES              = -I$(top_builddir)/include -I$(top_srcdir)/include $(libcpg_CFLAGS) $
> 
>  sbin_PROGRAMS          = sheep
> 
> -sheep_SOURCES          = sheep.c group.c sdnet.c store.c vdi.c work.c
> +sheep_SOURCES          = sheep.c group.c sdnet.c store.c vdi.c work.c cluster/corosync.c
>  sheep_LDADD            = $(libcpg_LIBS) $(libcfg_LIBS) ../lib/libsheepdog.a -lpthread
>  sheep_DEPENDENCIES     = ../lib/libsheepdog.a
> 
> diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
> new file mode 100644
> index 0000000..735bee7
> --- /dev/null
> +++ b/sheep/cluster/corosync.c
> @@ -0,0 +1,260 @@
> +/*
> + * Copyright (C) 2011 Nippon Telegraph and Telephone Corporation.
> + *
> + * This program is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU General Public License version
> + * 2 as published by the Free Software Foundation.
> + *
> + * You should have received a copy of the GNU General Public License
> + * along with this program. If not, see <http://www.gnu.org/licenses/>.
> + */
> +#include <stdio.h>
> +#include <unistd.h>
> +#include <corosync/cpg.h>
> +#include <corosync/cfg.h>
> +
> +#include "cluster.h"
> +
> +static cpg_handle_t cpg_handle;
> +static struct cpg_name cpg_group = { 9, "sheepdog" };
> +
> +static corosync_cfg_handle_t cfg_handle;
> +
> +static struct cdrv_handlers corosync_handlers;
> +
> +static int nodeid_to_addr(uint32_t nodeid, uint8_t *addr)
> +{
> +       int ret, nr;
> +       corosync_cfg_node_address_t caddr;
> +       struct sockaddr_storage *ss = (struct sockaddr_storage *)caddr.address;
> +       struct sockaddr_in *sin = (struct sockaddr_in *)caddr.address;
> +       struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)caddr.address;
> +       void *saddr;
> +
> +       ret = corosync_cfg_get_node_addrs(cfg_handle, nodeid, 1, &nr, &caddr);
> +       if (ret != CS_OK) {
> +               vprintf(SDOG_ERR "failed to get addr %d\n", ret);
> +               return -1;
> +       }
> +
> +       if (!nr) {
> +               vprintf(SDOG_ERR "we got no address\n");
> +               return -1;
> +       }
> +
> +       if (ss->ss_family == AF_INET6) {
> +               saddr = &sin6->sin6_addr;
> +               memcpy(addr, saddr, 16);
> +       } else if (ss->ss_family == AF_INET) {
> +               saddr = &sin->sin_addr;
> +               memset(addr, 0, 16);
> +               memcpy(addr + 12, saddr, 4);
> +       } else {
> +               vprintf(SDOG_ERR "unknown protocol %d\n", ss->ss_family);
> +               return -1;
> +       }
> +
> +       return 0;
> +}
> +
> +static void cpg_addr_to_sheepid(const struct cpg_address *cpgs,
> +                               struct sheepid *sheeps, size_t nr)
> +{
> +       int i;
> +
> +       for (i = 0; i < nr; i++) {
> +               nodeid_to_addr(cpgs[i].nodeid, sheeps[i].addr);
> +               sheeps[i].pid = cpgs[i].pid;
> +       }
> +}
> +
> +static void cdrv_cpg_deliver(cpg_handle_t handle,
> +                            const struct cpg_name *group_name,
> +                            uint32_t nodeid, uint32_t pid,
> +                            void *msg, size_t msg_len)
> +{
> +       struct sheepid sender;
> +
> +       nodeid_to_addr(nodeid, sender.addr);
> +       sender.pid = pid;
> +
> +       corosync_handlers.notify_handler(&sender, msg, msg_len);
> +}
> +
> +static void cdrv_cpg_confchg(cpg_handle_t handle,
> +                            const struct cpg_name *group_name,
> +                            const struct cpg_address *member_list,
> +                            size_t member_list_entries,
> +                            const struct cpg_address *left_list,
> +                            size_t left_list_entries,
> +                            const struct cpg_address *joined_list,
> +                            size_t joined_list_entries)
> +{
> +       int i;
> +       struct sheepid member_sheeps[SD_MAX_NODES];
> +       struct sheepid joined_sheeps[SD_MAX_NODES];
> +       struct sheepid left_sheeps[SD_MAX_NODES];
> +
> +       /* convert cpg_address to sheepid*/
> +       cpg_addr_to_sheepid(member_list, member_sheeps, member_list_entries);
> +       cpg_addr_to_sheepid(left_list, left_sheeps, left_list_entries);
> +       cpg_addr_to_sheepid(joined_list, joined_sheeps, joined_list_entries);
> +
> +       /* calculate a start member list */
> +       sheepid_del(member_sheeps, member_list_entries,
> +                   joined_sheeps, joined_list_entries);
> +       member_list_entries -= joined_list_entries;
> +
> +       sheepid_add(member_sheeps, member_list_entries,
> +                   left_sheeps, left_list_entries);
> +       member_list_entries += left_list_entries;
> +
> +       /* dispatch leave_handler */
> +       for (i = 0; i < left_list_entries; i++) {
> +               sheepid_del(member_sheeps, member_list_entries,
> +                           left_sheeps + i, 1);
> +               member_list_entries--;
> +
> +               corosync_handlers.leave_handler(left_sheeps + i, member_sheeps,
> +                                               member_list_entries);
> +       }
> +
> +       /* dispatch join_handler */
> +       for (i = 0; i < joined_list_entries; i++) {
> +               sheepid_add(member_sheeps, member_list_entries,
> +                           joined_sheeps, 1);
> +               member_list_entries++;
> +
> +               corosync_handlers.join_handler(joined_sheeps + i, member_sheeps,
> +                                              member_list_entries);
> +       }
> +}
> +
> +static int corosync_init(struct cdrv_handlers *handlers, struct sheepid *myid)
> +{
> +       int ret, fd;
> +       uint32_t nodeid;
> +       cpg_callbacks_t cb = {
> +               .cpg_deliver_fn = cdrv_cpg_deliver,
> +               .cpg_confchg_fn = cdrv_cpg_confchg
> +       };
> +
> +       corosync_handlers = *handlers;
> +
> +       ret = cpg_initialize(&cpg_handle, &cb);
> +       if (ret != CPG_OK) {
> +               eprintf("Failed to initialize cpg, %d\n", ret);
> +               eprintf("Is corosync running?\n");
> +               return -1;
> +       }
> +
> +       ret = corosync_cfg_initialize(&cfg_handle, NULL);
> +       if (ret != CS_OK) {
> +               vprintf(SDOG_ERR "failed to initiazize cfg %d\n", ret);
> +               return -1;
> +       }
> +
> +       ret = corosync_cfg_local_get(cfg_handle, &nodeid);
> +       if (ret != CS_OK) {
> +               vprintf(SDOG_ERR "failed to get nodeid %d\n", ret);
> +               return -1;
> +       }
> +
> +       ret = nodeid_to_addr(nodeid, myid->addr);
> +       if (ret < 0) {
> +               eprintf("failed to get local address\n");
> +               return -1;
> +       }
> +
> +       myid->pid = getpid();
> +
> +       ret = cpg_fd_get(cpg_handle, &fd);
> +       if (ret != CPG_OK) {
> +               eprintf("Failed to retrieve cpg file descriptor, %d\n", ret);
> +               return 1;
> here we can't return 1 anymore, the semanteme is changed

Oops, yes, we must return -1 here.

> +       }
> +
> +       return fd;
> +}
> +
> +static int corosync_join(void)
> +{
> +       int ret;
> +retry:
> +       ret = cpg_join(cpg_handle, &cpg_group);
> +       switch (ret) {
> +       case CPG_OK:
> +               break;
> +       case CPG_ERR_TRY_AGAIN:
> +               dprintf("Failed to join the sheepdog group, try again\n");
> +               sleep(1);
> +               goto retry;
> +       case CPG_ERR_SECURITY:
> +               eprintf("Permission error.\n");
> +               return -1;
> +       default:
> +               eprintf("Failed to join the sheepdog group, %d\n", ret);
> +               return -1;
> +       }
> +
> +       return 0;
> +}
> +
> +static int corosync_leave(void)
> +{
> +       int ret;
> +
> +       ret = cpg_leave(cpg_handle, &cpg_group);
> +       if (ret != CPG_OK) {
> +               eprintf("failed to leave the sheepdog group\n, %d", ret);
> +               return -1;
> +       }
> +
> +       return 0;
> +}
> +
> +static int corosync_notify(void *msg, size_t msg_len)
> +{
> +       struct iovec iov;
> +       int ret;
> +
> +       iov.iov_base = msg;
> +       iov.iov_len = msg_len;
> +retry:
> +       ret = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, &iov, 1);
> +       switch (ret) {
> +       case CPG_OK:
> +               break;
> +       case CPG_ERR_TRY_AGAIN:
> +               dprintf("failed to send message. try again\n");
> +               sleep(1);
> +               goto retry;
> +       default:
> +               eprintf("failed to send message, %d\n", ret);
> +               return -1;
> +       }
> +       return 0;
> +}
> +
> +static int corosync_dispatch(void)
> +{
> +       int ret;
> +
> +       ret = cpg_dispatch(cpg_handle, CPG_DISPATCH_ALL);
> +       if (ret != CPG_OK)
> +               return -1;
> +
> +       return 0;
> +}
> +
> +struct cluster_driver cdrv_corosync = {
> +       .name       = "corosync",
> +
> +       .init       = corosync_init,
> +       .join       = corosync_join,
> +       .leave      = corosync_leave,
> +       .notify     = corosync_notify,
> +       .dispatch   = corosync_dispatch,
> is 'dispatch'  operation really essential?

What we want to do are:

 - notify events to the sheep daemon when join/leave/notify happen
 - call registered handlers in the main thread according to the events
 - guarantee a total order of the handler call sequence in the cluster

Is there a better way to satisfy these?  If yes, I'd like to choose it.

> +};
> +
> +cdrv_register(cdrv_corosync);
> diff --git a/sheep/group.c b/sheep/group.c
> index 95fc799..f6743f5 100644
> --- a/sheep/group.c
> +++ b/sheep/group.c
> @@ -1890,12 +1890,23 @@ oom:
>  int create_cluster(int port, int64_t zone)
>  {
>        int fd, ret;
> +       struct cluster_driver *cdrv;
>        struct cdrv_handlers handlers = {
>                .join_handler = sd_join_handler,
>                .leave_handler = sd_leave_handler,
>                .notify_handler = sd_notify_handler,
>        };
> 
> +       if (!sys->cdrv) {
> +               FOR_EACH_CLUSTER_DRIVER(cdrv) {
> +                       if (strcmp(cdrv->name, "corosync") == 0) {
> +                               dprintf("use corosync driver as default\n");
> +                               sys->cdrv = cdrv;
> i think we'd better do a NULL pointer check for the driver's  member.

If the the driver's member is NULL, it is a bug.  How about checking
it in cdrv_register()?


Thanks,

Kazutaka



More information about the sheepdog mailing list