[sheepdog] [PATCH v3 2/2] sheepkeeper: a new cluster manager specialized for sheepdog
MORITA Kazutaka
morita.kazutaka at lab.ntt.co.jp
Fri Nov 16 09:25:59 CET 2012
At Wed, 14 Nov 2012 15:44:08 +0900,
Hitoshi Mitake wrote:
>
> From: Hitoshi Mitake <h.mitake at gmail.com>
>
> This patch adds sheepkeeper, a new cluster manager for sheepdog.
>
> sheepkeeper cluster manager is designed as master-slave. So it can be
> SPOF. But I'm planning to let it be redundant by utilizing corosync.
> If it can be improved, this can be an alternative of the group
> management part of ZooKeeper, and it doesn't depend on JVM.
>
> This patch adds the new directory sheepkeeper/ for it, and the
> executable binary will be produced as sheepkeeper/sheepkeeper.
> The cluster driver for sheep is added as sheep/cluster/sheepkeeper.c.
>
> If you have some opinions or feature requests for a cluster manager
> designed and implemented from scratch, I'd like to hear your comments.
>
> Signed-off-by: Hitoshi Mitake <mitake.hitoshi at lab.ntt.co.jp>
> ---
>
> v2
> * lots of cleaning and bug fix
> * sane command line option handling
> * a little bit improvement of leave handling
> * not call sd_join_handler() in sheepkeeper_join()
> * add new option --enable-sheepkeeper to configure script for
> enable/disable building sheepkeeper
>
> v3
> * lots of cleaning and bug fix
> * suppress some compiler warnings
> * handle leave of sheeps in saner way. this also reduces the problem
> of odd handling of return values from xread/xwrite
> * default log of sheepkeeper is /var/log/sheepkeeper.c, and users can
> specify custom location with -l option
I didn't notice that you wrote change logs here, sorry. But it's
better to write them in the cover letter.
> AM_CONDITIONAL(BUILD_ACCORD, test x$enable_accord = xyes)
>
> +AC_ARG_ENABLE([sheepkeeper],
> + [ --enable-sheepkeeper : build sheepkeeper and its cluster driver ],,
> + [ enable_sheepkeeper="no" ],)
> +AM_CONDITIONAL(BUILD_SHEEPKEEPER, test x$enable_sheepkeeper = xyes)
> +
Sheepkeeper requires no external library, doesn't it? If yes, I think
we don't need this configure option.
> +
> +struct sk_msg {
> + enum sk_msg_type type;
> + int body_len;
> +};
> +
> +#include "internal_proto.h"
> +
> +struct sk_msg_join {
> + struct sd_node node;
> + bool leader_elected;
> + char opaque[0];
> +};
> +
> +struct sk_msg_join_reply {
> + struct sd_node nodes[SD_MAX_NODES];
> + int nr_nodes;
> + char opaque[0];
> +};
> +
> +struct sk_msg_join_node_finish {
> + struct sd_node new_node;
> +
> + struct sd_node nodes[SD_MAX_NODES];
> + int nr_nodes;
> + char opaque[0];
> +};
> +
> +struct sk_msg_notify_forward {
> + struct sd_node from_node;
> + char notify_msg[0];
> +};
These structure can be shared between different architecture, no? If
yes, please use types defined in stdint.h and aligned them to 8 bytes
offset.
> +
> +static void read_msg(void)
> +{
> + int i, j, ret;
> + struct sk_msg rcv, snd;
> + struct sd_node sender;
> + struct iovec iov[2];
> + struct sk_msg_join *join;
> + struct sk_msg_notify_forward *notify_forward;
> + struct sk_msg_join_node_finish *join_node_finish;
> + struct join_message *jm;
> +
> +
> + ret = xread(sk_comm_fd, &rcv, sizeof(rcv));
> + if (ret != sizeof(rcv)) {
> + vprintf(SDOG_ERR, "xread() failed\n");
> + exit(1);
> + }
> +
> + switch (rcv.type) {
> + case SK_MSG_NEW_NODE:
> + join = xmalloc(rcv.body_len);
> + ret = xread(sk_comm_fd, join, rcv.body_len);
> + if (ret != rcv.body_len) {
> + vprintf(SDOG_ERR, "xread() failed\n");
> + exit(1);
> + }
> +
> + if (is_leader)
> + sd_check_join_cb(&join->node, join->opaque);
> +
> + memset(&snd, 0, sizeof(snd));
> + snd.type = SK_MSG_NEW_NODE_REPLY;
> + snd.body_len = rcv.body_len;
> +
> + iov[0].iov_base = &snd;
> + iov[0].iov_len = sizeof(snd);
> +
> + iov[1].iov_base = join;
> + iov[1].iov_len = rcv.body_len;
> +
> + ret = writev(sk_comm_fd, iov, 2);
> + if (ret != (iov[0].iov_len + iov[1].iov_len)) {
> + vprintf(SDOG_ERR, "writev() failed\n");
> + exit(1);
> + }
> +
> + break;
> +
> + case SK_MSG_NEW_NODE_FINISH:
> + join_node_finish = xmalloc(rcv.body_len);
> + ret = xread(sk_comm_fd, join_node_finish, rcv.body_len);
> + if (ret != rcv.body_len) {
> + vprintf(SDOG_ERR, "xread() failed\n");
> + exit(1);
> + }
> +
> + jm = (struct join_message *)join_node_finish->opaque;
> + memcpy(nodes, join_node_finish->nodes,
> + join_node_finish->nr_nodes * sizeof(struct sd_node));
> + nr_nodes = join_node_finish->nr_nodes;
> +
> + sd_join_handler(&join_node_finish->new_node, nodes, nr_nodes,
> + CJ_RES_SUCCESS, jm);
> +
> + break;
> +
> + case SK_MSG_NOTIFY_FORWARD:
> + notify_forward = xmalloc(rcv.body_len);
> + ret = xread(sk_comm_fd, notify_forward, rcv.body_len);
> + if (ret != rcv.body_len) {
> + vprintf(SDOG_ERR, "xread() failed\n");
> + exit(1);
> + }
> +
> + sd_notify_handler(¬ify_forward->from_node,
> + notify_forward->notify_msg,
> + rcv.body_len - sizeof(*notify_forward));
> +
> + snd.type = SK_MSG_NOTIFY_FORWARD_REPLY;
snd.body_len = 0 is missing?
> + ret = xwrite(sk_comm_fd, &snd, sizeof(snd));
> + if (ret != sizeof(snd)) {
> + vprintf(SDOG_ERR, "xwrite() failed\n");
> + exit(1);
> + }
> +
> + break;
> +
> + case SK_MSG_BLOCK_FORWARD:
> + ret = xread(sk_comm_fd, &sender, sizeof(struct sd_node));
> + if (ret != sizeof(sender)) {
> + vprintf(SDOG_ERR, "xread() failed\n");
> + exit(1);
> + }
> +
> + snd.type = SK_MSG_BLOCK_FORWARD_REPLY;
> + snd.body_len = 0;
> + ret = xwrite(sk_comm_fd, &snd, sizeof(struct sk_msg));
> + if (ret != sizeof(snd)) {
> + vprintf(SDOG_ERR, "xwrite() failed\n");
> + exit(1);
> + }
> +
> + break;
> +
> + case SK_MSG_LEAVE_FORWARD:
> + ret = xread(sk_comm_fd, &sender, sizeof(struct sd_node));
> + if (ret != sizeof(sender)) {
> + vprintf(SDOG_ERR, "xread() failed\n");
> + exit(1);
> + }
> +
> + for (i = 0; i < nr_nodes; i++) {
> + if (!memcmp(&sender, &nodes[i], sizeof(sender))) {
> + nr_nodes--;
> +
> + for (j = i; j < nr_nodes; j++)
> + nodes[j] = nodes[j + 1];
> +
> + goto removed;
> + }
> + }
> +
> + vprintf(SDOG_INFO, "leave message from unknown node\n");
> + exit(1);
> +
> +removed:
> + sd_leave_handler(&sender, nodes, nr_nodes);
> + break;
> +
> + case SK_MSG_LEADER_ELECTION:
> + is_leader = true;
> + vprintf(SDOG_INFO, "became new leader\n");
> +
> + snd.type = SK_MSG_LEADER_ELECTION_REPLY;
> + snd.body_len = 0;
> +
> + ret = xwrite(sk_comm_fd, &snd, sizeof(snd));
> + if (-1 == ret) {
> + vprintf(SDOG_ERR, "xwrite() failed: %m\n");
> + exit(1);
> + }
> +
> + break;
> +
> + default:
> + vprintf(SDOG_ERR, "invalid message from sheepkeeper: %d, "
> + "length: %d\n", rcv.type, rcv.body_len);
> + exit(1);
> + break;
> + }
> +}
> +
> +static void read_msg_from_sheepkeeper(void)
> +{
> + switch (state) {
> + case state_pre_join:
> + read_msg_pre_join();
> + break;
> +
> + case state_joined:
> + read_msg();
> + break;
> +
> + default:
> + panic("invalid state of sheepkeeper cluster driver: %d\n",
> + state);
> + break;
> + };
> +}
> +
> +static void sheepkeeper_comm_handler(int fd, int events, void *data)
> +{
> + assert(fd == sk_comm_fd);
> + assert(data == NULL);
> +
> + if (events & EPOLLIN)
> + read_msg_from_sheepkeeper();
> +}
> +
> +static void init_addr_port(const char *s, struct sockaddr_in *addr)
> +{
> + /* format: <address>[:<port>] */
> + char *sep, *copied, *p;
> + uint8_t tmp_addr[16];
> + bool addr_only = false;
> +
> + copied = xcalloc(strlen(s) + 1, sizeof(char));
> + memcpy(copied, s, strlen(s));
copied = strdup(s) looks simpler.
> +
> +static void read_msg_from_sheep(struct sheep *sheep)
> +{
> + int ret, fd = sheep->fd;
> + struct sk_msg rcv, snd;
> + struct sk_msg_join *join;
> + struct iovec iov[2];
> +
> + struct sheep *s, *s2;
> +
> + struct sk_msg_notify_forward *notify_forward;
> +
> + struct sk_msg join_reply;
> + struct sk_msg_join_reply *reply_body;
> +
> + struct sk_msg_join_node_finish *finish;
> +
> + memset(&rcv, 0, sizeof(rcv));
> +
> + ret = xread(fd, &rcv, sizeof(rcv));
> + if (-1 == ret) {
> + vprintf(SDOG_ERR, "xread() failed: %m\n");
> + goto purge_current_sheep;
> + }
> +
> + switch (rcv.type) {
> + case SK_MSG_JOIN:
> + if (state == SK_STATE_JOINING) {
> + memset(&snd, 0, sizeof(snd));
> + snd.type = SK_MSG_JOIN_RETRY;
> +
> + ret = xwrite(fd, &snd, sizeof(snd));
> + if (-1 == ret) {
> + vprintf(SDOG_ERR, "xwrite() failed: %m\n");
> + goto purge_current_sheep;
> + }
> +
> + break;
> + }
> +
> + join = xmalloc(rcv.body_len);
> + ret = xread(fd, join, rcv.body_len);
> + if (-1 == ret) {
> + vprintf(SDOG_ERR, "xread() for reading the body of" \
> + " SK_MSG_JOIN failed: %m\n");
> +
> + free(join);
> + goto purge_current_sheep;
> + }
> +
> + sheep->node = join->node;
> +
> + snd.type = SK_MSG_NEW_NODE;
> + snd.body_len = rcv.body_len;
> +
> + iov[0].iov_base = &snd;
> + iov[0].iov_len = sizeof(struct sk_msg);
> +
> + iov[1].iov_base = join;
> + iov[1].iov_len = rcv.body_len;
> +
> + if (!nr_joined_sheep) {
> + /* this sheep is a new leader */
> + join->leader_elected = true;
> + }
> + ret = writev(!nr_joined_sheep ?
> + fd : leader_sheep->fd, iov, 2);
> + free(join);
> + if (ret != (iov[0].iov_len + iov[1].iov_len)) {
> + vprintf(SDOG_ERR, "writev() for sending" \
> + " SK_MSG_NEW_NODE failed: %m\n");
> + goto purge_current_sheep;
> + }
> +
> + state = SK_STATE_JOINING;
> + break;
> +
> + case SK_MSG_NEW_NODE_REPLY:
> + if (nr_joined_sheep && sheep != leader_sheep) {
> + vprintf(SDOG_ERR, "not leader sheep replied " \
> + "SK_MSG_NEW_NODE_REPLY\n");
> + goto purge_current_sheep;
> + }
> +
> + vprintf(SDOG_INFO, "new node reply from %s\n",
> + node_to_str(&sheep->node));
> +
> + join = xmalloc(rcv.body_len);
> + ret = xread(fd, join, rcv.body_len);
> + if (-1 == ret) {
> + vprintf(SDOG_ERR, "xread() failed: %m\n");
> + free(join);
> +
> + goto purge_current_sheep;
> + }
> +
> + s = find_sheep_by_nid(&join->node.nid);
> + if (!s) {
> + /* leader is broken */
> + panic("invalid nid is required, %s\n",
> + node_to_str(&join->node));
> + }
> +
> + s->kept_join_msg_len = rcv.body_len -
> + sizeof(struct sk_msg_join);
> + s->kept_join_msg = xmalloc(s->kept_join_msg_len);
> + memcpy(s->kept_join_msg, join->opaque, s->kept_join_msg_len);
> +
> + memset(&join_reply, 0, sizeof(struct sk_msg));
> + join_reply.type = SK_MSG_JOIN_REPLY;
> + join_reply.body_len = sizeof(struct sk_msg_join_reply)
> + + s->kept_join_msg_len;
> + iov[0].iov_base = &join_reply;
> + iov[0].iov_len = sizeof(struct sk_msg);
> +
> + reply_body = xmalloc(join_reply.body_len);
Initialize with zero or use xzalloc. Otherwise, uninitialized data
will be sent to another machine.
> +
> + reply_body->nr_nodes = build_node_array(reply_body->nodes);
> + memcpy(reply_body->opaque, s->kept_join_msg,
> + s->kept_join_msg_len);
> +
> + iov[1].iov_base = reply_body;
> + iov[1].iov_len = join_reply.body_len;
> +
> + ret = writev(s->fd, iov, 2);
> + free(reply_body);
> + if (ret != (iov[0].iov_len + iov[1].iov_len)) {
> + vprintf(SDOG_ERR, "writev() failed: %m\n");
> + goto purge_current_sheep;
> + }
> +
> + snd.type = SK_MSG_NEW_NODE_FINISH;
> + snd.body_len = sizeof(*finish) + s->kept_join_msg_len;
> +
> + finish = xmalloc(snd.body_len);
Same here.
Anyway, the current sheepkeeper is too unstable. Can you send this
series after you pass the following test?
* setup sheepkeeper
$ valgrind ./sheepkeeper/sheepkeeper -f
* run tests
$ cd tests
$ DRIVER=sheepkeeper:127.0.0.1 ./check -e -valgrind
Thanks,
Kazutaka
More information about the sheepdog
mailing list