On Fri, Sep 30, 2011 at 4:28 AM, MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp<mailto:morita.kazutaka at lab.ntt.co.jp>> wrote: Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp<mailto:morita.kazutaka at lab.ntt.co.jp>> --- sheep/Makefile.am | 2 +- sheep/cluster/corosync.c | 260 ++++++++++++++++++++++++++++++++++++++++++++++ sheep/group.c | 11 ++ 3 files changed, 272 insertions(+), 1 deletions(-) create mode 100644 sheep/cluster/corosync.c diff --git a/sheep/Makefile.am b/sheep/Makefile.am index 2f51702..ad92ea0 100644 --- a/sheep/Makefile.am +++ b/sheep/Makefile.am @@ -23,7 +23,7 @@ INCLUDES = -I$(top_builddir)/include -I$(top_srcdir)/include $(libcpg_CFLAGS) $ sbin_PROGRAMS = sheep -sheep_SOURCES = sheep.c group.c sdnet.c store.c vdi.c work.c +sheep_SOURCES = sheep.c group.c sdnet.c store.c vdi.c work.c cluster/corosync.c sheep_LDADD = $(libcpg_LIBS) $(libcfg_LIBS) ../lib/libsheepdog.a -lpthread sheep_DEPENDENCIES = ../lib/libsheepdog.a diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c new file mode 100644 index 0000000..735bee7 --- /dev/null +++ b/sheep/cluster/corosync.c @@ -0,0 +1,260 @@ +/* + * Copyright (C) 2011 Nippon Telegraph and Telephone Corporation. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License version + * 2 as published by the Free Software Foundation. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +#include <stdio.h> +#include <unistd.h> +#include <corosync/cpg.h> +#include <corosync/cfg.h> + +#include "cluster.h" + +static cpg_handle_t cpg_handle; +static struct cpg_name cpg_group = { 9, "sheepdog" }; + +static corosync_cfg_handle_t cfg_handle; + +static struct cdrv_handlers corosync_handlers; + +static int nodeid_to_addr(uint32_t nodeid, uint8_t *addr) +{ + int ret, nr; + corosync_cfg_node_address_t caddr; + struct sockaddr_storage *ss = (struct sockaddr_storage *)caddr.address; + struct sockaddr_in *sin = (struct sockaddr_in *)caddr.address; + struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)caddr.address; + void *saddr; + + ret = corosync_cfg_get_node_addrs(cfg_handle, nodeid, 1, &nr, &caddr); + if (ret != CS_OK) { + vprintf(SDOG_ERR "failed to get addr %d\n", ret); + return -1; + } + + if (!nr) { + vprintf(SDOG_ERR "we got no address\n"); + return -1; + } + + if (ss->ss_family == AF_INET6) { + saddr = &sin6->sin6_addr; + memcpy(addr, saddr, 16); + } else if (ss->ss_family == AF_INET) { + saddr = &sin->sin_addr; + memset(addr, 0, 16); + memcpy(addr + 12, saddr, 4); + } else { + vprintf(SDOG_ERR "unknown protocol %d\n", ss->ss_family); + return -1; + } + + return 0; +} + +static void cpg_addr_to_sheepid(const struct cpg_address *cpgs, + struct sheepid *sheeps, size_t nr) +{ + int i; + + for (i = 0; i < nr; i++) { + nodeid_to_addr(cpgs[i].nodeid, sheeps[i].addr); + sheeps[i].pid = cpgs[i].pid; + } +} + +static void cdrv_cpg_deliver(cpg_handle_t handle, + const struct cpg_name *group_name, + uint32_t nodeid, uint32_t pid, + void *msg, size_t msg_len) +{ + struct sheepid sender; + + nodeid_to_addr(nodeid, sender.addr); + sender.pid = pid; + + corosync_handlers.notify_handler(&sender, msg, msg_len); +} + +static void cdrv_cpg_confchg(cpg_handle_t handle, + const struct cpg_name *group_name, + const struct cpg_address *member_list, + size_t member_list_entries, + const struct cpg_address *left_list, + size_t left_list_entries, + const struct cpg_address *joined_list, + size_t joined_list_entries) +{ + int i; + struct sheepid member_sheeps[SD_MAX_NODES]; + struct sheepid joined_sheeps[SD_MAX_NODES]; + struct sheepid left_sheeps[SD_MAX_NODES]; + + /* convert cpg_address to sheepid*/ + cpg_addr_to_sheepid(member_list, member_sheeps, member_list_entries); + cpg_addr_to_sheepid(left_list, left_sheeps, left_list_entries); + cpg_addr_to_sheepid(joined_list, joined_sheeps, joined_list_entries); + + /* calculate a start member list */ + sheepid_del(member_sheeps, member_list_entries, + joined_sheeps, joined_list_entries); + member_list_entries -= joined_list_entries; + + sheepid_add(member_sheeps, member_list_entries, + left_sheeps, left_list_entries); + member_list_entries += left_list_entries; + + /* dispatch leave_handler */ + for (i = 0; i < left_list_entries; i++) { + sheepid_del(member_sheeps, member_list_entries, + left_sheeps + i, 1); + member_list_entries--; + + corosync_handlers.leave_handler(left_sheeps + i, member_sheeps, + member_list_entries); + } + + /* dispatch join_handler */ + for (i = 0; i < joined_list_entries; i++) { + sheepid_add(member_sheeps, member_list_entries, + joined_sheeps, 1); + member_list_entries++; + + corosync_handlers.join_handler(joined_sheeps + i, member_sheeps, + member_list_entries); + } +} + +static int corosync_init(struct cdrv_handlers *handlers, struct sheepid *myid) +{ + int ret, fd; + uint32_t nodeid; + cpg_callbacks_t cb = { + .cpg_deliver_fn = cdrv_cpg_deliver, + .cpg_confchg_fn = cdrv_cpg_confchg + }; + + corosync_handlers = *handlers; + + ret = cpg_initialize(&cpg_handle, &cb); + if (ret != CPG_OK) { + eprintf("Failed to initialize cpg, %d\n", ret); + eprintf("Is corosync running?\n"); + return -1; + } + + ret = corosync_cfg_initialize(&cfg_handle, NULL); + if (ret != CS_OK) { + vprintf(SDOG_ERR "failed to initiazize cfg %d\n", ret); + return -1; + } + + ret = corosync_cfg_local_get(cfg_handle, &nodeid); + if (ret != CS_OK) { + vprintf(SDOG_ERR "failed to get nodeid %d\n", ret); + return -1; + } + + ret = nodeid_to_addr(nodeid, myid->addr); + if (ret < 0) { + eprintf("failed to get local address\n"); + return -1; + } + + myid->pid = getpid(); + + ret = cpg_fd_get(cpg_handle, &fd); + if (ret != CPG_OK) { + eprintf("Failed to retrieve cpg file descriptor, %d\n", ret); + return 1; here we can't return 1 anymore, the semanteme is changed + } + + return fd; +} + +static int corosync_join(void) +{ + int ret; +retry: + ret = cpg_join(cpg_handle, &cpg_group); + switch (ret) { + case CPG_OK: + break; + case CPG_ERR_TRY_AGAIN: + dprintf("Failed to join the sheepdog group, try again\n"); + sleep(1); + goto retry; + case CPG_ERR_SECURITY: + eprintf("Permission error.\n"); + return -1; + default: + eprintf("Failed to join the sheepdog group, %d\n", ret); + return -1; + } + + return 0; +} + +static int corosync_leave(void) +{ + int ret; + + ret = cpg_leave(cpg_handle, &cpg_group); + if (ret != CPG_OK) { + eprintf("failed to leave the sheepdog group\n, %d", ret); + return -1; + } + + return 0; +} + +static int corosync_notify(void *msg, size_t msg_len) +{ + struct iovec iov; + int ret; + + iov.iov_base = msg; + iov.iov_len = msg_len; +retry: + ret = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, &iov, 1); + switch (ret) { + case CPG_OK: + break; + case CPG_ERR_TRY_AGAIN: + dprintf("failed to send message. try again\n"); + sleep(1); + goto retry; + default: + eprintf("failed to send message, %d\n", ret); + return -1; + } + return 0; +} + +static int corosync_dispatch(void) +{ + int ret; + + ret = cpg_dispatch(cpg_handle, CPG_DISPATCH_ALL); + if (ret != CPG_OK) + return -1; + + return 0; +} + +struct cluster_driver cdrv_corosync = { + .name = "corosync", + + .init = corosync_init, + .join = corosync_join, + .leave = corosync_leave, + .notify = corosync_notify, + .dispatch = corosync_dispatch, is 'dispatch' operation really essential? +}; + +cdrv_register(cdrv_corosync); diff --git a/sheep/group.c b/sheep/group.c index 95fc799..f6743f5 100644 --- a/sheep/group.c +++ b/sheep/group.c @@ -1890,12 +1890,23 @@ oom: int create_cluster(int port, int64_t zone) { int fd, ret; + struct cluster_driver *cdrv; struct cdrv_handlers handlers = { .join_handler = sd_join_handler, .leave_handler = sd_leave_handler, .notify_handler = sd_notify_handler, }; + if (!sys->cdrv) { + FOR_EACH_CLUSTER_DRIVER(cdrv) { + if (strcmp(cdrv->name, "corosync") == 0) { + dprintf("use corosync driver as default\n"); + sys->cdrv = cdrv; i think we'd better do a NULL pointer check for the driver's member. + break; + } + } + } + fd = sys->cdrv->init(&handlers, &sys->this_sheepid); if (fd < 0) return -1; -- 1.7.2.5 -- sheepdog mailing list sheepdog at lists.wpkg.org<mailto:sheepdog at lists.wpkg.org> http://lists.wpkg.org/mailman/listinfo/sheepdog ________________________________ This email (including any attachments) is confidential and may be legally privileged. If you received this email in error, please delete it immediately and do not copy it or use it for any purpose or disclose its contents to any other person. Thank you. 本电邮(包括任何附件)可能含有机密资料并受法律保护。如您不是正确的收件人,请您立即删除本邮件。请不要将本电邮进行复制并用作任何其他用途、或透露本邮件之内容。谢谢。 -------------- next part -------------- An HTML attachment was scrubbed... URL: <http://lists.wpkg.org/pipermail/sheepdog/attachments/20110930/90a9f59c/attachment.html> |