[sheepdog] [PATCH v6 4/4] shepherd: a new cluster manager specialized for sheepdog

Hitoshi Mitake mitake.hitoshi at lab.ntt.co.jp
Thu Jan 17 03:25:13 CET 2013


From: Hitoshi Mitake <h.mitake at gmail.com>

This patch adds shepherd, a new cluster manager for sheepdog. shepherd
is specialized for sheepdog, so it can be optimized only for sheepdog.

shepherd cluster manager is designed as master-slave. So it can be
SPOF. But I'm planning to let it be redundant by utilizing corosync.
If it can be improved, this can be an alternative of the group
management part of ZooKeeper, and it doesn't depend on JVM.

This patch adds the new directory shepherd/ for it, and the
executable binary will be produced as shepherd/shepherd.
The cluster driver for sheep is added as sheep/cluster/shepherd.c.

If you have some opinions or feature requests for a cluster manager
designed and implemented from scratch, I'd like to hear your comments.

Signed-off-by: Hitoshi Mitake <mitake.hitoshi at lab.ntt.co.jp>
---
v2
 * lots of cleaning and bug fix
 * sane command line option handling
 * a little bit improvement of leave handling
 * not call sd_join_handler() in sheepkeeper_join()
 * add new option --enable-sheepkeeper to configure script for
   enable/disable building sheepkeeper

v3
 * lots of cleaning and bug fix
 * suppress some compiler warnings
 * handle leave of sheeps in saner way. this also reduces the problem
   of odd handling of return values from xread/xwrite
 * default log of sheepkeeper is /var/log/sheepkeeper.c, and users can
   specify custom location with -l option

v4
 * tons of cleaning and bug fix
 * documentation: https://github.com/mitake/sheepdog/wiki/sheepkeeper-design-note
 * stabilizing. The unpassed tests are: 008 (long), 015, 043, and 044 (long).

v5
 * rename sheepkeeper -> shepherd
 * a little bit cleaning
 * new document: https://github.com/mitake/sheepdog/wiki/shepherd-design-note

v6
 * fix problems of coding style pointed by Liu Yuan
8<---
 .gitignore               |    1 +
 Makefile.am              |    2 +-
 configure.ac             |    3 +-
 include/Makefile.am      |    3 +-
 include/shepherd.h       |  110 ++++++
 sheep/Makefile.am        |    3 +-
 sheep/cluster/shepherd.c |  704 ++++++++++++++++++++++++++++++++++++
 shepherd/Makefile.am     |   44 +++
 shepherd/shepherd.c      |  883 ++++++++++++++++++++++++++++++++++++++++++++++
 9 files changed, 1749 insertions(+), 4 deletions(-)
 create mode 100644 include/shepherd.h
 create mode 100644 sheep/cluster/shepherd.c
 create mode 100644 shepherd/Makefile.am
 create mode 100644 shepherd/shepherd.c

diff --git a/.gitignore b/.gitignore
index dbdbd55..60448d4 100644
--- a/.gitignore
+++ b/.gitignore
@@ -31,6 +31,7 @@ GSYMS
 collie/collie
 sheep/sheep
 sheepfs/sheepfs
+shepherd/shepherd
 
 # directories
 .deps
diff --git a/Makefile.am b/Makefile.am
index 53d18b9..e3e4d9c 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -17,7 +17,7 @@ sheepdogsysconfdir	= ${SHEEPDOGCONFDIR}
 
 sheepdogsysconf_DATA	= 
 
-SUBDIRS			= lib collie sheep include script man
+SUBDIRS			= lib collie sheep include script man shepherd
 
 if BUILD_SHEEPFS
 SUBDIRS			+= sheepfs
diff --git a/configure.ac b/configure.ac
index 9e03e23..c891616 100644
--- a/configure.ac
+++ b/configure.ac
@@ -133,7 +133,8 @@ AC_CONFIG_FILES([Makefile
 		include/Makefile
 		script/Makefile
 		lib/Makefile
-		man/Makefile])
+		man/Makefile
+		shepherd/Makefile])
 
 ### Local business
 
diff --git a/include/Makefile.am b/include/Makefile.am
index 6413047..20e614a 100644
--- a/include/Makefile.am
+++ b/include/Makefile.am
@@ -2,4 +2,5 @@ MAINTAINERCLEANFILES    = Makefile.in config.h.in
 
 noinst_HEADERS          = bitops.h event.h logger.h sheepdog_proto.h util.h \
 			  list.h net.h sheep.h exits.h strbuf.h rbtree.h \
-			  sha1.h option.h internal_proto.h
+			  sha1.h option.h internal_proto.h shepherd.h
+
diff --git a/include/shepherd.h b/include/shepherd.h
new file mode 100644
index 0000000..e3b7e65
--- /dev/null
+++ b/include/shepherd.h
@@ -0,0 +1,110 @@
+#ifndef SHEPHERD_H
+#define SHEPHERD_H
+
+enum sph_msg_type {
+	_SPH_MSG_PADDING = 0,	/* for making first member 1 */
+	SPH_MSG_JOIN,
+	SPH_MSG_JOIN_REPLY,
+	SPH_MSG_JOIN_RETRY,
+
+	SPH_MSG_NEW_NODE,
+	SPH_MSG_NEW_NODE_REPLY,
+	SPH_MSG_NEW_NODE_FINISH,
+
+	SPH_MSG_NOTIFY,
+	SPH_MSG_NOTIFY_FORWARD,
+
+	SPH_MSG_BLOCK,
+	SPH_MSG_BLOCK_FORWARD,
+
+	SPH_MSG_LEAVE,
+	SPH_MSG_LEAVE_FORWARD,
+
+	SPH_MSG_MASTER_ELECTION,
+
+	NR_SPH_MSG,		/* this must be last one */
+};
+
+struct sph_msg {
+	uint32_t type; /* original type: enum sph_msg_type */
+	uint32_t body_len;
+};
+
+#include "internal_proto.h"
+
+struct sph_msg_join {
+	uint32_t res;		/* original type: enum cluster_join_result */
+	struct sd_node node;
+	uint8_t master_elected;
+	uint8_t opaque[0];
+};
+
+struct sph_msg_join_reply {
+	uint32_t res;		/* original type: enum cluster_join_result */
+	struct sd_node nodes[SD_MAX_NODES];
+	uint32_t nr_nodes;
+	uint8_t opaque[0];
+};
+
+struct sph_msg_join_node_finish {
+	uint32_t res;		/* original type: enum cluster_join_result */
+	struct sd_node new_node;
+
+	struct sd_node nodes[SD_MAX_NODES];
+	uint32_t nr_nodes;
+	uint8_t opaque[0];
+};
+
+struct sph_msg_notify {
+	uint8_t unblock;
+	uint8_t notify_msg[0];
+};
+
+struct sph_msg_notify_forward {
+	struct sd_node from_node;
+	uint8_t unblock;
+	uint8_t notify_msg[0];
+};
+
+#define SHEPHERD_PORT 2501
+
+static inline const char *sph_msg_to_str(enum sph_msg_type msg)
+/* CAUTION: non reentrant */
+{
+	int i;
+	static char unknown[64];
+
+	static const struct {
+		int msg;
+		const char *desc;
+	} msgs[] = {
+		{ SPH_MSG_JOIN, "SPH_MSG_JOIN" },
+		{ SPH_MSG_JOIN_REPLY, "SPH_MSG_JOIN_REPLY" },
+		{ SPH_MSG_JOIN_RETRY, "SPH_MSG_JOIN_RETRY" },
+		{ SPH_MSG_NEW_NODE, "SPH_MSG_NEW_NODE" },
+		{ SPH_MSG_NEW_NODE_REPLY, "SPH_MSG_NEW_NODE_REPLY" },
+		{ SPH_MSG_NEW_NODE_FINISH, "SPH_MSG_NEW_NODE_FINISH" },
+
+		{ SPH_MSG_NOTIFY, "SPH_MSG_NOTIFY" },
+		{ SPH_MSG_NOTIFY_FORWARD, "SPH_MSG_NOTIFY_FORWARD" },
+
+		{ SPH_MSG_BLOCK, "SPH_MSG_BLOCK" },
+		{ SPH_MSG_BLOCK_FORWARD, "SPH_MSG_BLOCK_FORWARD" },
+
+		{ SPH_MSG_LEAVE, "SPH_MSG_LEAVE" },
+		{ SPH_MSG_LEAVE_FORWARD, "SPH_MSG_LEAVE_FORWARD" },
+
+		{ SPH_MSG_MASTER_ELECTION, "SPH_MSG_MASTER_ELECTION" },
+	};
+
+	for (i = 0; i < ARRAY_SIZE(msgs); i++) {
+		if (msgs[i].msg == msg)
+			return msgs[i].desc;
+	}
+
+	memset(unknown, 0, 64);
+	snprintf(unknown, 64, "<unknown shepherd message: %d>", msg);
+	return unknown;
+}
+
+#endif	/* SHEPHERD_H */
diff --git a/sheep/Makefile.am b/sheep/Makefile.am
index 0ae19de..b6131b0 100644
--- a/sheep/Makefile.am
+++ b/sheep/Makefile.am
@@ -27,7 +27,8 @@ sbin_PROGRAMS		= sheep
 sheep_SOURCES		= sheep.c group.c request.c gateway.c store.c vdi.c work.c \
 			  journal.c ops.c recovery.c cluster/local.c \
 			  object_cache.c object_list_cache.c sockfd_cache.c \
-			  plain_store.c config.c migrate.c journal_file.c
+			  plain_store.c config.c migrate.c journal_file.c \
+			  cluster/shepherd.c
 
 if BUILD_COROSYNC
 sheep_SOURCES		+= cluster/corosync.c
diff --git a/sheep/cluster/shepherd.c b/sheep/cluster/shepherd.c
new file mode 100644
index 0000000..21dc381
--- /dev/null
+++ b/sheep/cluster/shepherd.c
@@ -0,0 +1,704 @@
+/*
+ * Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <stdio.h>
+#include <stdint.h>
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+
+#include "cluster.h"
+#include "event.h"
+#include "shepherd.h"
+#include "internal_proto.h"
+
+static int sph_comm_fd;
+
+static struct sd_node this_node;
+static bool is_master;
+
+static int nr_nodes;
+static struct sd_node nodes[SD_MAX_NODES];
+
+enum sph_driver_state {
+	STATE_PRE_JOIN,
+	STATE_JOINED,
+};
+
+static enum sph_driver_state state = STATE_PRE_JOIN;
+
+static char *kept_opaque;
+static size_t kept_opaque_len;
+
+static int do_shepherd_join(void)
+{
+	int ret, msg_join_len;
+	struct iovec iov[2];
+	struct sph_msg msg;
+	struct sph_msg_join *msg_join;
+
+	msg_join_len = sizeof(struct sph_msg_join) + kept_opaque_len;
+
+	memset(&msg, 0, sizeof(msg));
+	msg.type = SPH_MSG_JOIN;
+	msg.body_len = msg_join_len;
+
+	iov[0].iov_base = &msg;
+	iov[0].iov_len = sizeof(msg);
+
+	msg_join = xzalloc(msg_join_len);
+	msg_join->node = this_node;
+	memcpy(msg_join->opaque, kept_opaque, kept_opaque_len);
+
+	iov[1].iov_base = msg_join;
+	iov[1].iov_len = msg_join_len;
+
+	ret = writev(sph_comm_fd, iov, 2);
+	if (ret != (iov[0].iov_len + iov[1].iov_len)) {
+		vprintf(SDOG_ERR, "do_shepherd_join() failed, %s\n",
+			strerror(errno));
+		free(msg_join);
+
+		return -1;
+	}
+
+	free(msg_join);
+	return 0;
+}
+
+static void read_msg_pre_join(void)
+{
+	int ret;
+	struct sph_msg snd, rcv;
+	struct sph_msg_join_reply *join_reply;
+	struct iovec iov[2];
+	enum cluster_join_result res;
+
+	static int first = 1;
+
+retry:
+	if (!first)
+		/* FIXME */
+		do_shepherd_join();
+
+	ret = xread(sph_comm_fd, &rcv, sizeof(rcv));
+	if (ret != sizeof(rcv)) {
+		vprintf(SDOG_INFO, "reading message from shepherd failed:" \
+			" %m\n");
+		exit(1);
+	}
+
+	if (rcv.type == SPH_MSG_JOIN_RETRY) {
+		vprintf(SDOG_INFO, "join request is rejected, retrying\n");
+		first = 0;
+
+		goto retry;
+	} else if (rcv.type == SPH_MSG_NEW_NODE) {
+		struct sph_msg_join *join;
+		int join_len;
+
+		join_len = rcv.body_len;
+		join = xzalloc(join_len);
+		ret = xread(sph_comm_fd, join, join_len);
+		if (ret != join_len) {
+			vprintf(SDOG_ERR, "xread() failed: %m\n");
+			exit(1);
+		}
+
+		res = sd_check_join_cb(&join->node, join->opaque);
+		if (res == CJ_RES_FAIL) {
+			vprintf(SDOG_ERR, "sd_check_join_cb() failed\n");
+			exit(1);
+		}
+		assert(res == CJ_RES_SUCCESS);
+
+		/* FIXME: join->master_elected is needed? */
+		assert(join->master_elected);
+		is_master = true;
+
+		snd.type = SPH_MSG_NEW_NODE_REPLY;
+		snd.body_len = join_len;
+
+		iov[0].iov_base = &snd;
+		iov[0].iov_len = sizeof(snd);
+
+		iov[1].iov_base = join;
+		iov[1].iov_len = join_len;
+
+		ret = writev(sph_comm_fd, iov, 2);
+		if (ret != (iov[0].iov_len + iov[1].iov_len)) {
+			vprintf(SDOG_ERR, "writev() failed: %m\n");
+			exit(1);
+		}
+
+		free(join);
+
+		ret = xread(sph_comm_fd, &rcv, sizeof(rcv));
+		if (ret != sizeof(rcv)) {
+			vprintf(SDOG_ERR, "invalid ret: %d\n", ret);
+			exit(1);
+		}
+	}
+
+	if (rcv.type != SPH_MSG_JOIN_REPLY) {
+		vprintf(SDOG_ERR, "unexpected message from shepherd, " \
+			"received message: %s\n", sph_msg_to_str(rcv.type));
+
+		/*
+		 * In this case, the state of this sheep in shepherd must be
+		 * SHEEP_STATE_CONNECTED. Messages other than SPH_MSG_JOIN_REPLY
+		 * mean bugs of shepherd.
+		 */
+		exit(1);
+	}
+
+	join_reply = xzalloc(rcv.body_len);
+	ret = xread(sph_comm_fd, join_reply, rcv.body_len);
+	if (ret != rcv.body_len) {
+		vprintf(SDOG_ERR, "xread() failed: %m\n");
+		exit(1);
+	}
+
+	vprintf(SDOG_INFO, "join reply arrived, nr_nodes: %d\n",
+		join_reply->nr_nodes);
+
+	if (join_reply->res == CJ_RES_MASTER_TRANSFER) {
+		is_master = true;
+
+		nr_nodes = 1;
+		nodes[0] = this_node;
+	} else {
+		memcpy(nodes, join_reply->nodes,
+			join_reply->nr_nodes * sizeof(struct sd_node));
+
+		nr_nodes = join_reply->nr_nodes;
+	}
+
+	sd_join_handler(&this_node, nodes, nr_nodes,
+			join_reply->res, join_reply->opaque);
+
+	free(join_reply);
+
+	vprintf(SDOG_INFO, "shepherd_join() succeed\n");
+	state = STATE_JOINED;
+}
+
+struct sph_event {
+	struct sd_node sender;
+
+	void *msg;
+	int msg_len;
+
+	bool callbacked, removed;
+
+	struct list_head event_list;
+};
+
+static LIST_HEAD(nonblocked_event_list);
+static LIST_HEAD(blocked_event_list);
+
+static int sph_event_fd;
+
+static bool sph_process_event(void)
+{
+	struct sph_event *ev;
+	bool nonblock;
+
+	if (!list_empty(&nonblocked_event_list)) {
+		ev = list_first_entry(&nonblocked_event_list,
+				struct sph_event, event_list);
+		nonblock = true;
+	} else if (!list_empty(&blocked_event_list)) {
+		ev = list_first_entry(&blocked_event_list,
+				struct sph_event, event_list);
+		nonblock = false;
+	} else
+		return false;
+
+	if (ev->removed)
+		goto remove;
+
+	if (ev->callbacked)
+		return false;
+
+	if (nonblock) {
+		vprintf(SDOG_DEBUG, "processing nonblock event\n");
+
+		sd_notify_handler(&ev->sender, ev->msg, ev->msg_len);
+	} else {
+		vprintf(SDOG_DEBUG, "processing block event\n");
+
+		ev->callbacked = sd_block_handler(&ev->sender);
+		return false;
+	}
+
+remove:
+	list_del(&ev->event_list);
+	free(ev->msg);
+	free(ev);
+
+	return true;
+}
+
+static void push_sph_event(bool nonblock, struct sd_node *sender,
+			void *msg, int msg_len)
+{
+	int ret;
+	struct sph_event *ev;
+
+	vprintf(SDOG_DEBUG, "push_sph_event() called, pushing %sblocking"\
+		" event\n", nonblock ? "non" : "");
+
+	ev = xzalloc(sizeof(*ev));
+
+	ev->sender = *sender;
+	if (msg_len) {
+		ev->msg = xzalloc(msg_len);
+		memcpy(ev->msg, msg, msg_len);
+		ev->msg_len = msg_len;
+	}
+
+	ev->removed = false;
+	ev->callbacked = false;
+
+	INIT_LIST_HEAD(&ev->event_list);
+
+	if (nonblock)
+		list_add(&ev->event_list, &nonblocked_event_list);
+	else
+		list_add(&ev->event_list, &blocked_event_list);
+
+	ret = eventfd_write(sph_event_fd, 1);
+	if (ret) {
+		vprintf(SDOG_ERR, "eventfd_write() failed: %m\n");
+		exit(1);
+	}
+}
+
+static void remove_one_block_event(void)
+{
+	struct sph_event *ev;
+
+	if (list_empty(&blocked_event_list))
+		/* FIXME: should I treat this case as an error? */
+		return;
+
+	ev = list_first_entry(&blocked_event_list,
+			struct sph_event, event_list);
+	ev->removed = true;
+	eventfd_write(sph_event_fd, 1);
+
+	vprintf(SDOG_DEBUG, "unblock a blocking event\n");
+}
+
+static void sph_event_handler(int fd, int events, void *data)
+{
+	int ret;
+	eventfd_t val;
+
+	ret = eventfd_read(fd, &val);
+	if (ret < 0)
+		panic("eventfd_read() failed: %m\n");
+
+	while (sph_process_event())
+		;
+}
+
+static void elected_as_master(void)
+{
+	vprintf(SDOG_DEBUG, "elected_as_master() called\n");
+
+	/* assert(!is_master); */
+	is_master = true;
+	vprintf(SDOG_INFO, "became new master\n");
+}
+
+static void read_msg(void)
+{
+	int i, j, ret;
+	struct sph_msg snd, rcv;
+	struct sd_node sender;
+	struct iovec iov[2];
+	struct sph_msg_join *join;
+	struct sph_msg_notify_forward *notify_forward;
+	struct sph_msg_join_node_finish *join_node_finish;
+	struct join_message *jm;
+
+	enum cluster_join_result res;
+
+	vprintf(SDOG_INFO, "read_msg() called\n");
+
+	ret = xread(sph_comm_fd, &rcv, sizeof(rcv));
+	if (ret != sizeof(rcv)) {
+		vprintf(SDOG_ERR, "xread() failed: %m\n");
+		exit(1);
+	}
+
+	switch (rcv.type) {
+	case SPH_MSG_NEW_NODE:
+		if (!is_master) {
+			vprintf(SDOG_EMERG, "I am not a master but received" \
+				" SPH_MSG_NEW_NODE, shepherd is buggy\n");
+			exit(1);
+		}
+
+		join = xzalloc(rcv.body_len);
+		ret = xread(sph_comm_fd, join, rcv.body_len);
+		if (ret != rcv.body_len) {
+			vprintf(SDOG_ERR, "xread() failed: %m\n");
+			exit(1);
+		}
+
+		res = sd_check_join_cb(&join->node, join->opaque);
+
+		join->res = res;
+
+		memset(&snd, 0, sizeof(snd));
+		snd.type = SPH_MSG_NEW_NODE_REPLY;
+		snd.body_len = rcv.body_len;
+
+		iov[0].iov_base = &snd;
+		iov[0].iov_len = sizeof(snd);
+
+		iov[1].iov_base = join;
+		iov[1].iov_len = rcv.body_len;
+
+		ret = writev(sph_comm_fd, iov, 2);
+		if (ret != (iov[0].iov_len + iov[1].iov_len)) {
+			vprintf(SDOG_ERR, "writev() failed: %m\n");
+			exit(1);
+		}
+		free(join);
+
+		if (res == CJ_RES_MASTER_TRANSFER) {
+			vprintf(SDOG_ERR, "failed to join sheepdog cluster: "
+				"please retry when master is up\n");
+			exit(1);
+		}
+
+		break;
+	case SPH_MSG_NEW_NODE_FINISH:
+		join_node_finish = xzalloc(rcv.body_len);
+		ret = xread(sph_comm_fd, join_node_finish, rcv.body_len);
+		if (ret != rcv.body_len) {
+			vprintf(SDOG_ERR, "xread() failed: %m\n");
+			exit(1);
+		}
+
+		jm = (struct join_message *)join_node_finish->opaque;
+		memcpy(nodes, join_node_finish->nodes,
+			join_node_finish->nr_nodes * sizeof(struct sd_node));
+		nr_nodes = join_node_finish->nr_nodes;
+
+		vprintf(SDOG_INFO, "new node: %s\n",
+			node_to_str(&join_node_finish->new_node));
+		sd_join_handler(&join_node_finish->new_node, nodes, nr_nodes,
+				join_node_finish->res, jm);
+
+		free(join_node_finish);
+
+		break;
+	case SPH_MSG_NOTIFY_FORWARD:
+		notify_forward = xzalloc(rcv.body_len);
+		ret = xread(sph_comm_fd, notify_forward, rcv.body_len);
+		if (ret != rcv.body_len) {
+			vprintf(SDOG_ERR, "xread() failed: %m\n");
+			exit(1);
+		}
+
+		if (notify_forward->unblock)
+			remove_one_block_event();
+
+		push_sph_event(true, &notify_forward->from_node,
+			notify_forward->notify_msg,
+			rcv.body_len - sizeof(*notify_forward));
+
+		free(notify_forward);
+		break;
+	case SPH_MSG_BLOCK_FORWARD:
+		ret = xread(sph_comm_fd, &sender, sizeof(sender));
+		if (ret != sizeof(sender)) {
+			vprintf(SDOG_ERR, "xread() failed: %m\n");
+			exit(1);
+		}
+
+		push_sph_event(false, &sender, NULL, 0);
+
+		break;
+	case SPH_MSG_LEAVE_FORWARD:
+		ret = xread(sph_comm_fd, &sender, sizeof(sender));
+		if (ret != sizeof(sender)) {
+			vprintf(SDOG_ERR, "xread() failed: %m\n");
+			exit(1);
+		}
+
+		vprintf(SDOG_INFO, "leaving node: %s\n", node_to_str(&sender));
+
+		for (i = 0; i < nr_nodes; i++) {
+			if (node_eq(&sender, &nodes[i])) {
+				for (j = i; j < nr_nodes; j++)
+					nodes[j] = nodes[j + 1];
+
+				nr_nodes--;
+
+				goto removed;
+			}
+		}
+
+		vprintf(SDOG_INFO, "leave message from unknown node: %s\n",
+			node_to_str(&sender));
+		break;
+removed:
+		vprintf(SDOG_DEBUG, "calling sd_leave_handler(), sender: %s\n",
+			node_to_str(&sender));
+		sd_leave_handler(&sender, nodes, nr_nodes);
+		break;
+	case SPH_MSG_MASTER_ELECTION:
+		elected_as_master();
+		break;
+	default:
+		vprintf(SDOG_ERR, "invalid message from shepherd: %s, "
+			"length: %d\n", sph_msg_to_str(rcv.type), rcv.body_len);
+		exit(1);
+		break;
+	}
+}
+
+static void read_msg_from_shepherd(void)
+{
+	switch (state) {
+	case STATE_PRE_JOIN:
+		read_msg_pre_join();
+		break;
+	case STATE_JOINED:
+		read_msg();
+		break;
+	default:
+		panic("invalid state of shepherd cluster driver: %d\n",
+			state);
+		break;
+	};
+}
+
+static void shepherd_comm_handler(int fd, int events, void *data)
+{
+	assert(fd == sph_comm_fd);
+	assert(data == NULL);
+
+	if (events & EPOLLIN)
+		read_msg_from_shepherd();
+	else if (events & EPOLLHUP || events & EPOLLERR) {
+		vprintf(SDOG_ERR, "connection to shepherd"
+			" caused an error: %m");
+		exit(1);
+	}
+}
+
+static void init_addr_port(const char *s, struct sockaddr_in *addr)
+{
+	/* format: <address>[:<port>] */
+	char *sep, *copied, *p;
+	uint8_t tmp_addr[16];
+	bool addr_only = false;
+
+	copied = strdup(s);
+	if (!copied)
+		panic("strdup() failed: %m");
+
+	sep = strchr(copied, ':');
+	if (!sep) {
+		addr_only = true;
+		goto addr_trans;
+	}
+
+	if (sep == copied)
+		goto invalid_format;
+
+	if (*(sep + 1) == '\0')
+		goto invalid_format;
+
+	*sep = '\0';
+
+addr_trans:
+	memset(addr, 0, sizeof(*addr));
+	addr->sin_family = AF_INET;
+
+	if (!str_to_addr(copied, tmp_addr))
+		goto invalid_format;
+
+	memcpy(&addr->sin_addr, &tmp_addr[12], 4);
+	if (addr_only) {
+		addr->sin_port = SHEPHERD_PORT;
+		goto end;
+	}
+
+	addr->sin_port = strtol(sep + 1, &p, 10);
+	if (sep + 1 == p)
+		goto invalid_format;
+	if (*p != '\0')
+		goto invalid_format;
+
+end:
+	free(copied);
+	return;
+
+invalid_format:
+	vprintf(SDOG_ERR, "invalid option for host and port: %s\n", s);
+	exit(1);
+}
+
+static int shepherd_init(const char *option)
+{
+	int ret;
+	struct sockaddr_in addr;
+
+	sph_comm_fd = socket(AF_INET, SOCK_STREAM, 0);
+	if (sph_comm_fd < 0) {
+		vprintf(SDOG_INFO, "error at socket()\n");
+		return -1;
+	}
+
+	if (option)
+		init_addr_port(option, &addr);
+
+	if (connect(sph_comm_fd, &addr, sizeof(struct sockaddr_in))) {
+		vprintf(SDOG_ERR, "cannot connect to shepherd,"
+			" is shepherd running?\n");
+		vprintf(SDOG_ERR, "errno: %s\n", strerror(errno));
+		return -1;
+	}
+
+	sph_event_fd = eventfd(0, EFD_NONBLOCK);
+	ret = register_event(sph_event_fd, sph_event_handler, NULL);
+	if (ret) {
+		vprintf(SDOG_ERR, "register_event() failed: %m\n");
+		exit(1);
+	}
+
+	return 0;
+}
+
+static int shepherd_join(const struct sd_node *myself,
+		      void *opaque, size_t opaque_len)
+{
+	int ret;
+	/* keep opaque for retrying */
+	kept_opaque = xzalloc(opaque_len);
+	memcpy(kept_opaque, opaque, opaque_len);
+	kept_opaque_len = opaque_len;
+	this_node = *myself;
+
+	vprintf(SDOG_INFO, "shepherd_join() called, myself is %s\n",
+		node_to_str(myself));
+
+	ret = do_shepherd_join();
+	register_event(sph_comm_fd, shepherd_comm_handler, NULL);
+
+	return ret;
+}
+
+static int shepherd_leave(void)
+{
+	int ret;
+	struct sph_msg msg;
+
+	vprintf(SDOG_DEBUG, "shepherd_leave() called\n");
+	msg.type = SPH_MSG_LEAVE;
+	msg.body_len = 0;
+
+	ret = xwrite(sph_comm_fd, &msg, sizeof(msg));
+	if (ret != sizeof(msg)) {
+		vprintf(SDOG_INFO, "xwrite() failed: %m\n");
+		exit(1);
+	}
+
+	state = STATE_PRE_JOIN;
+	is_master = false;
+
+	return 0;
+}
+
+static int do_shepherd_notify(bool unblock, void *msg, size_t msg_len)
+{
+	int ret;
+	struct sph_msg snd;
+	struct iovec iov[2];
+
+	struct sph_msg_notify *notify;
+
+	vprintf(SDOG_INFO, "do_shepherd_notify() called\n");
+
+	snd.type = SPH_MSG_NOTIFY;
+	snd.body_len = msg_len + sizeof(*notify);
+
+	iov[0].iov_base = &snd;
+	iov[0].iov_len = sizeof(snd);
+
+	notify = xzalloc(snd.body_len);
+	notify->unblock = unblock;
+	memcpy(notify->notify_msg, msg, msg_len);
+
+	iov[1].iov_base = notify;
+	iov[1].iov_len = snd.body_len;
+
+	ret = writev(sph_comm_fd, iov, 2);
+	if (ret != (iov[0].iov_len + iov[1].iov_len)) {
+		vprintf(SDOG_ERR, "writev() failed: %m\n");
+		exit(1);
+	}
+	free(notify);
+
+	return 0;
+}
+
+static int shepherd_notify(void *msg, size_t msg_len)
+{
+	return do_shepherd_notify(false, msg, msg_len);
+}
+
+static void shepherd_block(void)
+{
+	int ret;
+	struct sph_msg msg;
+
+	msg.type = SPH_MSG_BLOCK;
+	msg.body_len = 0;
+
+	ret = xwrite(sph_comm_fd, &msg, sizeof(msg));
+	if (ret != sizeof(msg)) {
+		vprintf(SDOG_ERR, "xwrite() failed: %m\n");
+		exit(1);
+	}
+}
+
+static void shepherd_unblock(void *msg, size_t msg_len)
+{
+	do_shepherd_notify(true, msg, msg_len);
+}
+
+static struct cluster_driver cdrv_shepherd = {
+	.name		= "shepherd",
+
+	.init		= shepherd_init,
+	.join		= shepherd_join,
+	.leave		= shepherd_leave,
+	.notify		= shepherd_notify,
+	.block		= shepherd_block,
+	.unblock	= shepherd_unblock,
+};
+
+cdrv_register(cdrv_shepherd);
diff --git a/shepherd/Makefile.am b/shepherd/Makefile.am
new file mode 100644
index 0000000..f7fd998
--- /dev/null
+++ b/shepherd/Makefile.am
@@ -0,0 +1,44 @@
+#
+# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; see the file COPYING.  If not, write to
+# the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+#
+
+MAINTAINERCLEANFILES	= Makefile.in
+
+AM_CFLAGS		=
+
+INCLUDES		= -I$(top_builddir)/include -I$(top_srcdir)/include
+
+sbin_PROGRAMS		= shepherd
+
+shepherd_SOURCES		= shepherd.c
+
+shepherd_LDADD	  	= ../lib/libsheepdog.a
+shepherd_DEPENDENCIES	= ../lib/libsheepdog.a
+
+EXTRA_DIST		=
+
+lint:
+	-splint $(INCLUDES) $(LINT_FLAGS) $(CFLAGS) *.c
+
+all-local:
+	@echo Built shepherd
+
+clean-local:
+	rm -f shepherd *.o gmon.out *.da *.bb *.bbg
+
+# support for GNU Flymake
+check-syntax:
+	$(COMPILE) -fsyntax-only $(CHK_SOURCES)
diff --git a/shepherd/shepherd.c b/shepherd/shepherd.c
new file mode 100644
index 0000000..794c761
--- /dev/null
+++ b/shepherd/shepherd.c
@@ -0,0 +1,883 @@
+/*
+ * Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <errno.h>
+#include <assert.h>
+#include <getopt.h>
+
+#include <unistd.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+
+#include <sys/un.h>
+#include <netinet/in.h>
+
+#include "net.h"
+#include "event.h"
+#include "list.h"
+#include "internal_proto.h"
+#include "sheep.h"
+#include "util.h"
+#include "shepherd.h"
+
+#define EPOLL_SIZE 4096
+
+enum shepherd_state {
+	SPH_STATE_DEFAULT,
+	SPH_STATE_JOINING,
+};
+
+static enum shepherd_state state = SPH_STATE_DEFAULT;
+
+enum sheep_state {
+	SHEEP_STATE_CONNECTED,	/* accept()ed */
+	SHEEP_STATE_JOINED,
+	SHEEP_STATE_LEAVING,
+};
+
+struct sheep {
+	int fd;
+	struct sd_node node;
+	struct sockaddr_in addr;
+
+	enum sheep_state state;
+
+	struct list_head sheep_list;
+	struct list_head join_wait_list;
+};
+
+static LIST_HEAD(sheep_list_head);
+/*
+ * nr_joined_sheep is a number of sheeps which is in state of
+ * SHEEP_STATE_JOINED, not the length of sheep_list_head
+ */
+static int nr_joined_sheep;
+
+/*
+ * important invariant of shepherd: nr_joined_sheep ? !!master_sheep : true
+ *
+ * if there is at least one sheep which is in state of SHEEP_STATE_JOINED,
+ * master sheep must be elected
+ */
+static struct sheep *master_sheep;
+
+static bool running;
+
+static int port = SHEPHERD_PORT;
+static uint8_t addr[16];
+static int sheep_listen_fd;
+
+static const char *progname;
+
+static bool is_sd_node_zero(struct sd_node *node)
+{
+	static struct sd_node zero_node;
+	return !memcmp(node, &zero_node, sizeof(*node));
+}
+
+static int build_node_array(struct sd_node *nodes)
+{
+	int i;
+	struct sheep *s;
+
+	i = 0;
+	list_for_each_entry(s, &sheep_list_head, sheep_list) {
+		nodes[i++] = s->node;
+	}
+
+	return i;
+}
+
+static struct sheep *find_sheep_by_nid(struct node_id *id)
+{
+	struct sheep *s;
+
+	list_for_each_entry(s, &sheep_list_head, sheep_list) {
+		if (!node_id_cmp(&s->node.nid, id))
+			return s;
+	}
+
+	return NULL;
+}
+
+static int remove_efd;
+
+static inline void remove_sheep(struct sheep *sheep)
+{
+	int ret;
+
+	vprintf(SDOG_DEBUG, "remove_sheep() called, removing %s\n",
+		node_to_str(&sheep->node));
+
+	if (sheep->state == SHEEP_STATE_JOINED)
+		nr_joined_sheep--;
+
+	sheep->state = SHEEP_STATE_LEAVING;
+	ret = eventfd_write(remove_efd, 1);
+	if (ret < 0)
+		panic("eventfd_write() failed: %m\n");
+
+	unregister_event(sheep->fd);
+	close(sheep->fd);
+}
+
+static int master_election(void)
+{
+	int ret, nr_failed = 0;
+	struct sheep *s;
+	struct sph_msg msg;
+
+	assert(!master_sheep);
+
+	if (!nr_joined_sheep)
+		return 0;
+
+	list_for_each_entry(s, &sheep_list_head, sheep_list) {
+		if (s->state != SHEEP_STATE_JOINED)
+			continue;
+
+		msg.type = SPH_MSG_MASTER_ELECTION;
+		msg.body_len = 0;
+
+		ret = xwrite(s->fd, &msg, sizeof(msg));
+		if (sizeof(msg) != ret) {
+			vprintf(SDOG_ERR, "xwrite() for failed: %m\n");
+			goto election_failed;
+		}
+
+		master_sheep = s;
+		break;
+election_failed:
+		remove_sheep(s);
+		nr_failed++;
+	}
+
+	if (master_sheep) {
+		vprintf(SDOG_INFO, "new master elected: %s\n",
+			node_to_str(&master_sheep->node));
+	}
+
+	return nr_failed;
+}
+
+static int do_remove_sheep(struct sheep *leaving)
+{
+	int ret, failed = 0;
+	struct sheep *s;
+	struct sph_msg snd;
+	struct iovec iov[2];
+
+	snd.type = SPH_MSG_LEAVE_FORWARD;
+	snd.body_len = sizeof(struct sd_node);
+
+	iov[0].iov_base = &snd;
+	iov[0].iov_len = sizeof(snd);
+
+	iov[1].iov_base = &leaving->node;
+	iov[1].iov_len = sizeof(struct sd_node);
+
+	list_for_each_entry(s, &sheep_list_head, sheep_list) {
+		if (s->state != SHEEP_STATE_JOINED)
+			continue;
+
+		ret = writev(s->fd, iov, 2);
+
+		if (ret != (iov[0].iov_len + iov[1].iov_len)) {
+			vprintf(SDOG_ERR, "writev() failed: %m\n");
+
+			remove_sheep(s);
+			failed++;
+		}
+	}
+
+	return failed;
+}
+
+static void remove_handler(int fd, int events, void *data)
+{
+	struct sheep *s;
+	int ret, failed = 0;
+	eventfd_t val;
+	bool election = false;
+
+	vprintf(SDOG_DEBUG, "remove_handler() called\n");
+
+	ret = eventfd_read(remove_efd, &val);
+	if (ret < 0)
+		panic("eventfd_read() failed: %m\n");
+
+	vprintf(SDOG_DEBUG, "removed sheeps: %lu\n", val);
+	assert(0 < val);
+
+
+remove:
+	/* FIXME */
+	list_for_each_entry(s, &sheep_list_head, sheep_list) {
+		if (s->state != SHEEP_STATE_LEAVING)
+			continue;
+
+		vprintf(SDOG_DEBUG, "removing the node: %s\n",
+			node_to_str(&s->node));
+
+		if (s == master_sheep) {
+			vprintf(SDOG_DEBUG, "removing the master\n");
+
+			master_sheep = NULL;
+			election = true;
+		}
+
+		if (!is_sd_node_zero(&s->node))
+			/*
+			 * This condition can be false when the sheep had
+			 * transited from CONNECTED to LEAVING directly.
+			 * (sd_node of sheep in CONNECTED state doesn't have
+			 * any information, because the member is initialized
+			 * when SPH_MSG_NEW_NODE from master sheep is accepted.)
+			 *
+			 * sheep in CONNECTED state doesn't have to be removed
+			 * with do_remove_sheep(), because other sheeps don't
+			 * know its existence.
+			 */
+			do_remove_sheep(s);
+
+		goto del;
+	}
+
+	goto end;
+
+del:
+	vprintf(SDOG_DEBUG, "removed node: %s\n", node_to_str(&s->node));
+	list_del(&s->sheep_list);
+	list_del(&s->join_wait_list);
+	free(s);
+
+	if (--val)
+		goto remove;
+
+end:
+	if (election) {
+		vprintf(SDOG_DEBUG, "master is removed, electing new master\n");
+		failed = master_election();
+
+		vprintf(SDOG_DEBUG, "election was done");
+		assert(nr_joined_sheep ? !!master_sheep : true);
+	}
+
+	vprintf(SDOG_DEBUG,
+		"nodes failed during remove_handler(): %d\n", failed);
+}
+
+static LIST_HEAD(join_wait_queue);
+
+static int release_joining_sheep(void)
+{
+	ssize_t wbytes;
+	struct sheep *waiting;
+	struct sph_msg snd;
+	int nr_failed = 0;
+
+retry:
+	if (list_empty(&join_wait_queue))
+		return nr_failed;
+
+	waiting = list_first_entry(&join_wait_queue,
+				struct sheep, join_wait_list);
+	list_del(&waiting->join_wait_list);
+	INIT_LIST_HEAD(&waiting->join_wait_list);
+
+	memset(&snd, 0, sizeof(snd));
+	snd.type = SPH_MSG_JOIN_RETRY;
+
+	wbytes = xwrite(waiting->fd, &snd, sizeof(snd));
+	if (sizeof(snd) != wbytes) {
+		vprintf(SDOG_ERR, "xwrite() failed: %m\n");
+		remove_sheep(waiting);
+
+		vprintf(SDOG_DEBUG, "node %s is failed to join\n",
+			node_to_str(&waiting->node));
+		nr_failed++;
+
+		goto retry;
+	}
+
+	return nr_failed;
+}
+
+static void msg_join(struct sph_msg *msg, struct sheep *sheep)
+{
+	int fd = sheep->fd;
+	ssize_t rbytes, wbytes;
+	struct iovec iov[2];
+
+	struct sph_msg snd;
+	struct sph_msg_join *join;
+
+	vprintf(SDOG_DEBUG, "msg_join() called\n");
+
+	if (state == SPH_STATE_JOINING) {
+		/* we have to trash opaque from the sheep */
+		char *buf;
+		buf = xzalloc(msg->body_len);
+		rbytes = xread(fd, buf, msg->body_len);
+		if (rbytes != msg->body_len) {
+			vprintf(SDOG_ERR, "xread() failed: %m\n");
+			goto purge_current_sheep;
+		}
+		free(buf);
+
+		list_add(&sheep->join_wait_list, &join_wait_queue);
+
+		vprintf(SDOG_DEBUG, "there is already a joining sheep\n");
+		return;
+	}
+
+	join = xzalloc(msg->body_len);
+	rbytes = xread(fd, join, msg->body_len);
+	if (msg->body_len != rbytes) {
+		vprintf(SDOG_ERR, "xread() for reading the body of" \
+			" SPH_MSG_JOIN failed: %m\n");
+
+		free(join);
+		goto purge_current_sheep;
+	}
+
+	sheep->node = join->node;
+
+	snd.type = SPH_MSG_NEW_NODE;
+	snd.body_len = msg->body_len;
+
+	iov[0].iov_base = &snd;
+	iov[0].iov_len = sizeof(snd);
+
+	iov[1].iov_base = join;
+	iov[1].iov_len = msg->body_len;
+
+	if (!nr_joined_sheep) {
+		/* this sheep is a new master */
+		/* FIXME: is this master_elected need? */
+		join->master_elected = true;
+	}
+
+	assert(nr_joined_sheep ? !!master_sheep : true);
+
+	wbytes = writev(!nr_joined_sheep ? fd : master_sheep->fd, iov, 2);
+	free(join);
+
+	if (wbytes != (iov[0].iov_len + iov[1].iov_len)) {
+		vprintf(SDOG_ERR, "writev() for sending "	\
+			"SPH_MSG_NEW_NODE failed: %m\n");
+
+		if (nr_joined_sheep)
+			remove_sheep(master_sheep);
+
+		goto purge_current_sheep;
+	}
+
+	state = SPH_STATE_JOINING;
+	return;
+
+purge_current_sheep:
+	remove_sheep(sheep);
+}
+
+static void msg_new_node_reply(struct sph_msg *msg, struct sheep *sheep)
+{
+	int fd = sheep->fd, removed = 0;
+	ssize_t rbytes, wbytes;
+	struct iovec iov[2];
+
+	char *opaque;
+	int opaque_len;
+
+	struct sph_msg_join *join;
+	struct sheep *s, *joining_sheep;
+	struct sph_msg snd;
+	struct sph_msg_join_reply *join_reply_body;
+	struct sph_msg_join_node_finish *join_node_finish;
+
+	enum cluster_join_result join_result;
+
+	if (nr_joined_sheep && sheep != master_sheep) {
+		vprintf(SDOG_ERR, "sheep which is not a master replied " \
+			"SPH_MSG_NEW_NODE_REPLY\n");
+		goto purge_current_sheep;
+	}
+
+	vprintf(SDOG_DEBUG, "new node reply from %s\n",
+		node_to_str(&sheep->node));
+
+	join = xzalloc(msg->body_len);
+	rbytes = xread(fd, join, msg->body_len);
+	if (msg->body_len != rbytes) {
+		vprintf(SDOG_ERR, "xread() failed: %m\n");
+		free(join);
+
+		goto purge_current_sheep;
+	}
+
+	join_result = join->res;
+
+	vprintf(SDOG_DEBUG, "joining node is %s\n", node_to_str(&join->node));
+
+	joining_sheep = find_sheep_by_nid(&join->node.nid);
+	if (!joining_sheep) {
+		/* master is broken */
+		vprintf(SDOG_ERR, "invalid nid is required, %s\n",
+			node_to_str(&join->node));
+		vprintf(SDOG_ERR, "purging master sheep: %s and joining one\n",
+			node_to_str(&master_sheep->node));
+
+		remove_sheep(master_sheep);
+		goto purge_current_sheep;
+	}
+
+	opaque_len = msg->body_len - sizeof(struct sph_msg_join);
+	opaque = xzalloc(opaque_len);
+	memcpy(opaque, join->opaque, opaque_len);
+
+	vprintf(SDOG_DEBUG, "length of opaque: %d\n", opaque_len);
+	memset(&snd, 0, sizeof(snd));
+	snd.type = SPH_MSG_JOIN_REPLY;
+	snd.body_len = sizeof(struct sph_msg_join_reply) + opaque_len;
+	iov[0].iov_base = &snd;
+	iov[0].iov_len = sizeof(snd);
+
+	join_reply_body = xzalloc(snd.body_len);
+
+	join_reply_body->nr_nodes = build_node_array(join_reply_body->nodes);
+	memcpy(join_reply_body->opaque, opaque, opaque_len);
+	join_reply_body->res = join_result;
+
+	iov[1].iov_base = join_reply_body;
+	iov[1].iov_len = snd.body_len;
+
+	wbytes = writev(joining_sheep->fd, iov, 2);
+	free(join_reply_body);
+	free(join);
+
+	if (wbytes != (iov[0].iov_len + iov[1].iov_len)) {
+		vprintf(SDOG_ERR, "writev() to master failed: %m\n");
+
+		remove_sheep(master_sheep);
+		goto purge_current_sheep;
+	}
+
+	snd.type = SPH_MSG_NEW_NODE_FINISH;
+	snd.body_len = sizeof(*join_node_finish) + opaque_len;
+
+	join_node_finish = xzalloc(snd.body_len);
+	join_node_finish->new_node = joining_sheep->node;
+	memcpy(join_node_finish->opaque, opaque, opaque_len);
+	join_node_finish->nr_nodes = build_node_array(join_node_finish->nodes);
+	join_node_finish->res = join_result;
+
+	iov[0].iov_base = &snd;
+	iov[0].iov_len = sizeof(snd);
+
+	iov[1].iov_base = join_node_finish;
+	iov[1].iov_len = snd.body_len;
+
+	list_for_each_entry(s, &sheep_list_head, sheep_list) {
+		if (s->state != SHEEP_STATE_JOINED)
+			continue;
+
+		if (s == joining_sheep)
+			continue;
+
+		wbytes = writev(s->fd, iov, 2);
+
+		if (wbytes != (iov[0].iov_len + iov[1].iov_len)) {
+			vprintf(SDOG_ERR, "writev() failed: %m\n");
+			remove_sheep(s);
+			removed++;
+		}
+	}
+
+	free(join_node_finish);
+	free(opaque);
+
+	joining_sheep->state = SHEEP_STATE_JOINED;
+	nr_joined_sheep++;
+
+	if (nr_joined_sheep == 1) {
+		assert(!master_sheep);
+		assert(joining_sheep == sheep);
+
+		master_sheep = sheep;
+
+		vprintf(SDOG_INFO, "new master elected: %s\n",
+			node_to_str(&sheep->node));
+	}
+	state = SPH_STATE_DEFAULT;
+
+	removed += release_joining_sheep();
+	return;
+
+purge_current_sheep:
+	state = SPH_STATE_DEFAULT;
+
+	remove_sheep(sheep);
+}
+
+static void msg_notify(struct sph_msg *msg, struct sheep *sheep)
+{
+	ssize_t rbytes, wbytes;
+	int fd = sheep->fd, removed = 0;
+	struct iovec iov[2];
+
+	struct sph_msg snd;
+	struct sph_msg_notify *notify;
+	int notify_msg_len;
+	struct sph_msg_notify_forward *notify_forward;
+	struct sheep *s;
+
+	notify = xzalloc(msg->body_len);
+	rbytes = xread(fd, notify, msg->body_len);
+	if (rbytes != msg->body_len) {
+		vprintf(SDOG_ERR, "xread() failed: %m\n");
+		goto purge_current_sheep;
+	}
+
+	notify_forward = xzalloc(msg->body_len + sizeof(*notify_forward));
+	notify_msg_len = msg->body_len - sizeof(*notify);
+
+	memcpy(notify_forward->notify_msg, notify->notify_msg, notify_msg_len);
+	notify_forward->unblock = notify->unblock;
+	free(notify);
+
+	memset(&snd, 0, sizeof(snd));
+	snd.type = SPH_MSG_NOTIFY_FORWARD;
+	snd.body_len = notify_msg_len + sizeof(*notify_forward);
+
+	iov[0].iov_base = &snd;
+	iov[0].iov_len = sizeof(snd);
+
+	notify_forward->from_node = sheep->node;
+
+	iov[1].iov_base = notify_forward;
+	iov[1].iov_len = snd.body_len;
+
+	list_for_each_entry(s, &sheep_list_head, sheep_list) {
+		if (s->state != SHEEP_STATE_JOINED)
+			continue;
+
+		wbytes = writev(s->fd, iov, 2);
+		if ((iov[0].iov_len + iov[1].iov_len) != wbytes) {
+			vprintf(SDOG_ERR, "writev() failed: %m\n");
+			goto notify_failed;
+		}
+
+		continue;
+
+notify_failed:
+		remove_sheep(s);
+		removed++;
+	}
+
+	free(notify_forward);
+	return;
+
+purge_current_sheep:
+	remove_sheep(sheep);
+}
+
+static void msg_block(struct sph_msg *msg, struct sheep *sheep)
+{
+	int removed = 0;
+	ssize_t wbytes;
+	struct iovec iov[2];
+
+	struct sheep *s;
+	struct sph_msg snd;
+
+	memset(&snd, 0, sizeof(snd));
+	snd.type = SPH_MSG_BLOCK_FORWARD;
+	snd.body_len = sizeof(struct sd_node);
+
+	iov[0].iov_base = &snd;
+	iov[0].iov_len = sizeof(snd);
+
+	iov[1].iov_base = &sheep->node;
+	iov[1].iov_len = sizeof(struct sd_node);
+
+	list_for_each_entry(s, &sheep_list_head, sheep_list) {
+		if (s->state != SHEEP_STATE_JOINED)
+			continue;
+
+		wbytes = writev(s->fd, iov, 2);
+		if ((iov[0].iov_len + iov[1].iov_len) != wbytes) {
+			vprintf(SDOG_ERR, "writev() failed: %m\n");
+			goto block_failed;
+		}
+
+		continue;
+
+block_failed:	/* FIXME: is this correct behaviour? */
+		remove_sheep(s);
+		removed++;
+	}
+
+	return;
+}
+
+static void msg_leave(struct sph_msg *msg, struct sheep *sheep)
+{
+	vprintf(SDOG_INFO, "%s is leaving\n", node_to_str(&sheep->node));
+	remove_sheep(sheep);
+}
+
+static void msg_invalid(struct sph_msg *msg, struct sheep *sheep)
+{
+	vprintf(SDOG_ERR, "msg_invalid() is called\n");
+	vprintf(SDOG_ERR, "received invalid message with type:"		\
+		" %s from sheep: %s (sockaddr: %s)\n",
+		sph_msg_to_str(msg->type), node_to_str(&sheep->node),
+		sockaddr_in_to_str(&sheep->addr));
+
+	remove_sheep(sheep);
+}
+
+static void (*msg_handlers[])(struct sph_msg*, struct sheep *) = {
+	[SPH_MSG_JOIN] = msg_join,
+	[SPH_MSG_NEW_NODE_REPLY] = msg_new_node_reply,
+	[SPH_MSG_NOTIFY] = msg_notify,
+	[SPH_MSG_BLOCK] = msg_block,
+	[SPH_MSG_LEAVE] = msg_leave,
+};
+
+static void init_msg_handlers(void)
+{
+	int i;
+
+	for (i = 0; i < ARRAY_SIZE(msg_handlers); i++) {
+		if (msg_handlers[i])
+			continue;
+
+		msg_handlers[i] = msg_invalid;
+	}
+}
+
+static void read_msg_from_sheep(struct sheep *sheep)
+{
+	int ret;
+	struct sph_msg rcv;
+
+	memset(&rcv, 0, sizeof(rcv));
+	ret = xread(sheep->fd, &rcv, sizeof(rcv));
+
+	if (ret != sizeof(rcv)) {
+		vprintf(SDOG_ERR, "xread() failed: %m, ");
+		goto remove;
+	}
+
+	if (!(0 <= rcv.type && rcv.type < ARRAY_SIZE(msg_handlers))) {
+		vprintf(SDOG_ERR, "invalid message type: %d, ", rcv.type);
+		vprintf(SDOG_ERR, "from node: %s\n",
+			node_to_str(&sheep->node));
+		vprintf(SDOG_ERR, "from node (sockaddr): %s\n",
+			sockaddr_in_to_str(&sheep->addr));
+		vprintf(SDOG_ERR, "read bytes: %d, body length: %d\n",
+			ret, rcv.body_len);
+		goto remove;
+	}
+
+	vprintf(SDOG_INFO, "received op: %s\n", sph_msg_to_str(rcv.type));
+
+	return msg_handlers[rcv.type](&rcv, sheep);
+
+remove:
+	vprintf(SDOG_ERR, "removing node: %s\n", node_to_str(&sheep->node));
+	remove_sheep(sheep);
+}
+
+static void sheep_comm_handler(int fd, int events, void *data)
+{
+	vprintf(SDOG_DEBUG, "sheep_comm_handler() called\n");
+
+	if (events & EPOLLIN)
+		read_msg_from_sheep(data);
+	else if (events & EPOLLHUP || events & EPOLLERR) {
+		vprintf(SDOG_INFO, "epoll() error: %s\n",
+			node_to_str(&((struct sheep *)data)->node));
+		remove_sheep(data);
+	}
+}
+
+static void sheep_accept_handler(int fd, int events, void *data)
+{
+	int ret;
+	struct sheep *new_sheep;
+	socklen_t len;
+
+	new_sheep = xzalloc(sizeof(struct sheep));
+	INIT_LIST_HEAD(&new_sheep->sheep_list);
+
+	len = sizeof(struct sockaddr_in);
+	new_sheep->fd = accept(fd, (struct sockaddr *)&new_sheep->addr, &len);
+	if (new_sheep->fd < 0) {
+		vprintf(SDOG_ERR, "accept() failed: %m\n");
+		goto clean;
+	}
+
+	if (-1 == set_keepalive(new_sheep->fd)) {
+		vprintf(SDOG_ERR, "set_keepalive() failed: %m\n");
+		goto clean;
+	}
+
+	ret = register_event(new_sheep->fd, sheep_comm_handler, new_sheep);
+	if (ret < 0) {
+		vprintf(SDOG_ERR, "register_event() failed: %m\n");
+		goto clean;
+	}
+
+	list_add_tail(&new_sheep->sheep_list, &sheep_list_head);
+	new_sheep->state = SHEEP_STATE_CONNECTED;
+
+	INIT_LIST_HEAD(&new_sheep->join_wait_list);
+
+	vprintf(SDOG_INFO, "accepted new sheep connection\n");
+	return;
+
+clean:
+	free(new_sheep);
+}
+
+static struct option const long_options[] = {
+	{ "port", required_argument, NULL, 'p' },
+	{ "address", required_argument, NULL, 'a' },
+	{ "foreground", no_argument, NULL, 'f' },
+	{ "debug", no_argument, NULL, 'd' },
+	{ "log-file", no_argument, NULL, 'l' },
+
+	{ NULL, 0, NULL, 0 },
+};
+
+static const char *short_options = "p:a:fdl:";
+
+static void exit_handler(void)
+{
+	vprintf(SDOG_INFO, "exiting...\n");
+}
+
+int main(int argc, char **argv)
+{
+	int ch, ret, longindex, opt;
+	char *p;
+	bool daemonize = true;
+	struct sockaddr_in listen_addr;
+	int log_level = SDOG_INFO;
+	const char *log_file = "/var/log/shepherd.log";
+
+	progname = argv[0];
+
+	while ((ch = getopt_long(argc, argv, short_options, long_options,
+				 &longindex)) >= 0) {
+		switch (ch) {
+		case 'p':
+			port = strtol(optarg, &p, 10);
+			if (p == optarg) {
+				fprintf(stderr, "invalid port: %s\n", optarg);
+				exit(1);
+			}
+			break;
+		case 'a':
+			if (!str_to_addr((const char *)optarg, addr)) {
+				fprintf(stderr, "invalid address: %s\n",
+					optarg);
+				exit(1);
+			}
+			break;
+		case 'f':
+			daemonize = false;
+			break;
+		case 'd':
+			log_level = SDOG_DEBUG;
+			break;
+		case 'l':
+			log_file = optarg;
+			break;
+		default:
+			fprintf(stderr, "unknown option\n");
+			exit(1);
+			break;
+		}
+	}
+
+	if (daemonize) {
+		ret = daemon(0, 0);
+
+		if (-1 == ret) {
+			fprintf(stderr, "daemon() failed: %s\n",
+				strerror(errno));
+			exit(1);
+		}
+	}
+
+	ret = log_init(progname, LOG_SPACE_SIZE, !daemonize, log_level,
+		(char *)log_file);
+	if (ret)
+		panic("initialize logger failed: %m\n");
+
+	atexit(exit_handler);
+	init_event(EPOLL_SIZE);
+
+	remove_efd = eventfd(0, EFD_NONBLOCK);
+	if (remove_efd < 0)
+		panic("eventfd() failed: %m\n");
+
+	ret = register_event(remove_efd, remove_handler, NULL);
+	if (ret)
+		panic("register_event() failed: %m\n");
+
+	/* setup inet socket for communication with sheeps */
+	sheep_listen_fd = socket(AF_INET, SOCK_STREAM, 0);
+	if (sheep_listen_fd < 0)
+		panic("socket() failed: %m\n");
+
+	init_msg_handlers();
+
+	opt = 1;
+	ret = setsockopt(sheep_listen_fd, SOL_SOCKET, SO_REUSEADDR,
+			&opt, sizeof(opt));
+	if (ret == -1)
+		panic("setsockopt() for SO_REUSEADDR failed: %m\n");
+
+	memset(&listen_addr, 0, sizeof(listen_addr));
+	listen_addr.sin_family = AF_INET;
+	listen_addr.sin_port = port;
+	memcpy(&listen_addr.sin_addr, addr + 12, sizeof(listen_addr.sin_addr));
+
+	ret = bind(sheep_listen_fd, &listen_addr, sizeof(struct sockaddr_in));
+	if (ret == -1)
+		panic("bind() failed: %m\n");
+
+	ret = listen(sheep_listen_fd, 1);
+	if (ret == -1)
+		panic("listen() failed: %m\n");
+
+	ret = register_event(sheep_listen_fd, sheep_accept_handler, NULL);
+	if (ret)
+		panic("register_event() failed: %m\n");
+
+	running = true;
+
+	while (running)
+		event_loop(-1);
+
+	return 0;
+}
-- 
1.7.2.5




More information about the sheepdog mailing list