[sheepdog] [PATCH v3 2/2] sheepkeeper: a new cluster manager specialized for sheepdog
Hitoshi Mitake
h.mitake at gmail.com
Sat Nov 17 13:56:48 CET 2012
At Fri, 16 Nov 2012 17:25:59 +0900,
MORITA Kazutaka wrote:
>
>
> > AM_CONDITIONAL(BUILD_ACCORD, test x$enable_accord = xyes)
> >
> > +AC_ARG_ENABLE([sheepkeeper],
> > + [ --enable-sheepkeeper : build sheepkeeper and its cluster driver ],,
> > + [ enable_sheepkeeper="no" ],)
> > +AM_CONDITIONAL(BUILD_SHEEPKEEPER, test x$enable_sheepkeeper = xyes)
> > +
>
> Sheepkeeper requires no external library, doesn't it? If yes, I think
> we don't need this configure option.
>
Ah, the configuration option has no meaning. I'll remove it in the next version.
>
>
> > +
> > +struct sk_msg {
> > + enum sk_msg_type type;
> > + int body_len;
> > +};
> > +
> > +#include "internal_proto.h"
> > +
> > +struct sk_msg_join {
> > + struct sd_node node;
> > + bool leader_elected;
> > + char opaque[0];
> > +};
> > +
> > +struct sk_msg_join_reply {
> > + struct sd_node nodes[SD_MAX_NODES];
> > + int nr_nodes;
> > + char opaque[0];
> > +};
> > +
> > +struct sk_msg_join_node_finish {
> > + struct sd_node new_node;
> > +
> > + struct sd_node nodes[SD_MAX_NODES];
> > + int nr_nodes;
> > + char opaque[0];
> > +};
> > +
> > +struct sk_msg_notify_forward {
> > + struct sd_node from_node;
> > + char notify_msg[0];
> > +};
>
> These structure can be shared between different architecture, no? If
> yes, please use types defined in stdint.h and aligned them to 8 bytes
> offset.
>
OK, I'll use types in stdint.h in the next version.
>
> > +
> > +static void read_msg(void)
> > +{
> > + int i, j, ret;
> > + struct sk_msg rcv, snd;
> > + struct sd_node sender;
> > + struct iovec iov[2];
> > + struct sk_msg_join *join;
> > + struct sk_msg_notify_forward *notify_forward;
> > + struct sk_msg_join_node_finish *join_node_finish;
> > + struct join_message *jm;
> > +
> > +
> > + ret = xread(sk_comm_fd, &rcv, sizeof(rcv));
> > + if (ret != sizeof(rcv)) {
> > + vprintf(SDOG_ERR, "xread() failed\n");
> > + exit(1);
> > + }
> > +
> > + switch (rcv.type) {
> > + case SK_MSG_NEW_NODE:
> > + join = xmalloc(rcv.body_len);
> > + ret = xread(sk_comm_fd, join, rcv.body_len);
> > + if (ret != rcv.body_len) {
> > + vprintf(SDOG_ERR, "xread() failed\n");
> > + exit(1);
> > + }
> > +
> > + if (is_leader)
> > + sd_check_join_cb(&join->node, join->opaque);
> > +
> > + memset(&snd, 0, sizeof(snd));
> > + snd.type = SK_MSG_NEW_NODE_REPLY;
> > + snd.body_len = rcv.body_len;
> > +
> > + iov[0].iov_base = &snd;
> > + iov[0].iov_len = sizeof(snd);
> > +
> > + iov[1].iov_base = join;
> > + iov[1].iov_len = rcv.body_len;
> > +
> > + ret = writev(sk_comm_fd, iov, 2);
> > + if (ret != (iov[0].iov_len + iov[1].iov_len)) {
> > + vprintf(SDOG_ERR, "writev() failed\n");
> > + exit(1);
> > + }
> > +
> > + break;
> > +
> > + case SK_MSG_NEW_NODE_FINISH:
> > + join_node_finish = xmalloc(rcv.body_len);
> > + ret = xread(sk_comm_fd, join_node_finish, rcv.body_len);
> > + if (ret != rcv.body_len) {
> > + vprintf(SDOG_ERR, "xread() failed\n");
> > + exit(1);
> > + }
> > +
> > + jm = (struct join_message *)join_node_finish->opaque;
> > + memcpy(nodes, join_node_finish->nodes,
> > + join_node_finish->nr_nodes * sizeof(struct sd_node));
> > + nr_nodes = join_node_finish->nr_nodes;
> > +
> > + sd_join_handler(&join_node_finish->new_node, nodes, nr_nodes,
> > + CJ_RES_SUCCESS, jm);
> > +
> > + break;
> > +
> > + case SK_MSG_NOTIFY_FORWARD:
> > + notify_forward = xmalloc(rcv.body_len);
> > + ret = xread(sk_comm_fd, notify_forward, rcv.body_len);
> > + if (ret != rcv.body_len) {
> > + vprintf(SDOG_ERR, "xread() failed\n");
> > + exit(1);
> > + }
> > +
> > + sd_notify_handler(¬ify_forward->from_node,
> > + notify_forward->notify_msg,
> > + rcv.body_len - sizeof(*notify_forward));
> > +
> > + snd.type = SK_MSG_NOTIFY_FORWARD_REPLY;
>
> snd.body_len = 0 is missing?
SK_MSG_NOTIFY_FORWARD_REPLY doesn't have its body, so we don't have to
set its length.
>
> > + ret = xwrite(sk_comm_fd, &snd, sizeof(snd));
> > + if (ret != sizeof(snd)) {
> > + vprintf(SDOG_ERR, "xwrite() failed\n");
> > + exit(1);
> > + }
> > +
> > + break;
> > +
> > + case SK_MSG_BLOCK_FORWARD:
> > + ret = xread(sk_comm_fd, &sender, sizeof(struct sd_node));
> > + if (ret != sizeof(sender)) {
> > + vprintf(SDOG_ERR, "xread() failed\n");
> > + exit(1);
> > + }
> > +
> > + snd.type = SK_MSG_BLOCK_FORWARD_REPLY;
> > + snd.body_len = 0;
> > + ret = xwrite(sk_comm_fd, &snd, sizeof(struct sk_msg));
> > + if (ret != sizeof(snd)) {
> > + vprintf(SDOG_ERR, "xwrite() failed\n");
> > + exit(1);
> > + }
> > +
> > + break;
> > +
> > + case SK_MSG_LEAVE_FORWARD:
> > + ret = xread(sk_comm_fd, &sender, sizeof(struct sd_node));
> > + if (ret != sizeof(sender)) {
> > + vprintf(SDOG_ERR, "xread() failed\n");
> > + exit(1);
> > + }
> > +
> > + for (i = 0; i < nr_nodes; i++) {
> > + if (!memcmp(&sender, &nodes[i], sizeof(sender))) {
> > + nr_nodes--;
> > +
> > + for (j = i; j < nr_nodes; j++)
> > + nodes[j] = nodes[j + 1];
> > +
> > + goto removed;
> > + }
> > + }
> > +
> > + vprintf(SDOG_INFO, "leave message from unknown node\n");
> > + exit(1);
> > +
> > +removed:
> > + sd_leave_handler(&sender, nodes, nr_nodes);
> > + break;
> > +
> > + case SK_MSG_LEADER_ELECTION:
> > + is_leader = true;
> > + vprintf(SDOG_INFO, "became new leader\n");
> > +
> > + snd.type = SK_MSG_LEADER_ELECTION_REPLY;
> > + snd.body_len = 0;
> > +
> > + ret = xwrite(sk_comm_fd, &snd, sizeof(snd));
> > + if (-1 == ret) {
> > + vprintf(SDOG_ERR, "xwrite() failed: %m\n");
> > + exit(1);
> > + }
> > +
> > + break;
> > +
> > + default:
> > + vprintf(SDOG_ERR, "invalid message from sheepkeeper: %d, "
> > + "length: %d\n", rcv.type, rcv.body_len);
> > + exit(1);
> > + break;
> > + }
> > +}
> > +
> > +static void read_msg_from_sheepkeeper(void)
> > +{
> > + switch (state) {
> > + case state_pre_join:
> > + read_msg_pre_join();
> > + break;
> > +
> > + case state_joined:
> > + read_msg();
> > + break;
> > +
> > + default:
> > + panic("invalid state of sheepkeeper cluster driver: %d\n",
> > + state);
> > + break;
> > + };
> > +}
> > +
> > +static void sheepkeeper_comm_handler(int fd, int events, void *data)
> > +{
> > + assert(fd == sk_comm_fd);
> > + assert(data == NULL);
> > +
> > + if (events & EPOLLIN)
> > + read_msg_from_sheepkeeper();
> > +}
> > +
> > +static void init_addr_port(const char *s, struct sockaddr_in *addr)
> > +{
> > + /* format: <address>[:<port>] */
> > + char *sep, *copied, *p;
> > + uint8_t tmp_addr[16];
> > + bool addr_only = false;
> > +
> > + copied = xcalloc(strlen(s) + 1, sizeof(char));
> > + memcpy(copied, s, strlen(s));
>
> copied = strdup(s) looks simpler.
Thanks for your advice, strdup() is more suitable for this
situation. I'll fix this in the next version.
>
>
> > +
> > +static void read_msg_from_sheep(struct sheep *sheep)
> > +{
> > + int ret, fd = sheep->fd;
> > + struct sk_msg rcv, snd;
> > + struct sk_msg_join *join;
> > + struct iovec iov[2];
> > +
> > + struct sheep *s, *s2;
> > +
> > + struct sk_msg_notify_forward *notify_forward;
> > +
> > + struct sk_msg join_reply;
> > + struct sk_msg_join_reply *reply_body;
> > +
> > + struct sk_msg_join_node_finish *finish;
> > +
> > + memset(&rcv, 0, sizeof(rcv));
> > +
> > + ret = xread(fd, &rcv, sizeof(rcv));
> > + if (-1 == ret) {
> > + vprintf(SDOG_ERR, "xread() failed: %m\n");
> > + goto purge_current_sheep;
> > + }
> > +
> > + switch (rcv.type) {
> > + case SK_MSG_JOIN:
> > + if (state == SK_STATE_JOINING) {
> > + memset(&snd, 0, sizeof(snd));
> > + snd.type = SK_MSG_JOIN_RETRY;
> > +
> > + ret = xwrite(fd, &snd, sizeof(snd));
> > + if (-1 == ret) {
> > + vprintf(SDOG_ERR, "xwrite() failed: %m\n");
> > + goto purge_current_sheep;
> > + }
> > +
> > + break;
> > + }
> > +
> > + join = xmalloc(rcv.body_len);
> > + ret = xread(fd, join, rcv.body_len);
> > + if (-1 == ret) {
> > + vprintf(SDOG_ERR, "xread() for reading the body of" \
> > + " SK_MSG_JOIN failed: %m\n");
> > +
> > + free(join);
> > + goto purge_current_sheep;
> > + }
> > +
> > + sheep->node = join->node;
> > +
> > + snd.type = SK_MSG_NEW_NODE;
> > + snd.body_len = rcv.body_len;
> > +
> > + iov[0].iov_base = &snd;
> > + iov[0].iov_len = sizeof(struct sk_msg);
> > +
> > + iov[1].iov_base = join;
> > + iov[1].iov_len = rcv.body_len;
> > +
> > + if (!nr_joined_sheep) {
> > + /* this sheep is a new leader */
> > + join->leader_elected = true;
> > + }
> > + ret = writev(!nr_joined_sheep ?
> > + fd : leader_sheep->fd, iov, 2);
> > + free(join);
> > + if (ret != (iov[0].iov_len + iov[1].iov_len)) {
> > + vprintf(SDOG_ERR, "writev() for sending" \
> > + " SK_MSG_NEW_NODE failed: %m\n");
> > + goto purge_current_sheep;
> > + }
> > +
> > + state = SK_STATE_JOINING;
> > + break;
> > +
> > + case SK_MSG_NEW_NODE_REPLY:
> > + if (nr_joined_sheep && sheep != leader_sheep) {
> > + vprintf(SDOG_ERR, "not leader sheep replied " \
> > + "SK_MSG_NEW_NODE_REPLY\n");
> > + goto purge_current_sheep;
> > + }
> > +
> > + vprintf(SDOG_INFO, "new node reply from %s\n",
> > + node_to_str(&sheep->node));
> > +
> > + join = xmalloc(rcv.body_len);
> > + ret = xread(fd, join, rcv.body_len);
> > + if (-1 == ret) {
> > + vprintf(SDOG_ERR, "xread() failed: %m\n");
> > + free(join);
> > +
> > + goto purge_current_sheep;
> > + }
> > +
> > + s = find_sheep_by_nid(&join->node.nid);
> > + if (!s) {
> > + /* leader is broken */
> > + panic("invalid nid is required, %s\n",
> > + node_to_str(&join->node));
> > + }
> > +
> > + s->kept_join_msg_len = rcv.body_len -
> > + sizeof(struct sk_msg_join);
> > + s->kept_join_msg = xmalloc(s->kept_join_msg_len);
> > + memcpy(s->kept_join_msg, join->opaque, s->kept_join_msg_len);
> > +
> > + memset(&join_reply, 0, sizeof(struct sk_msg));
> > + join_reply.type = SK_MSG_JOIN_REPLY;
> > + join_reply.body_len = sizeof(struct sk_msg_join_reply)
> > + + s->kept_join_msg_len;
> > + iov[0].iov_base = &join_reply;
> > + iov[0].iov_len = sizeof(struct sk_msg);
> > +
> > + reply_body = xmalloc(join_reply.body_len);
>
> Initialize with zero or use xzalloc. Otherwise, uninitialized data
> will be sent to another machine.
>
> > +
> > + reply_body->nr_nodes = build_node_array(reply_body->nodes);
> > + memcpy(reply_body->opaque, s->kept_join_msg,
> > + s->kept_join_msg_len);
> > +
> > + iov[1].iov_base = reply_body;
> > + iov[1].iov_len = join_reply.body_len;
> > +
> > + ret = writev(s->fd, iov, 2);
> > + free(reply_body);
> > + if (ret != (iov[0].iov_len + iov[1].iov_len)) {
> > + vprintf(SDOG_ERR, "writev() failed: %m\n");
> > + goto purge_current_sheep;
> > + }
> > +
> > + snd.type = SK_MSG_NEW_NODE_FINISH;
> > + snd.body_len = sizeof(*finish) + s->kept_join_msg_len;
> > +
> > + finish = xmalloc(snd.body_len);
>
> Same here.
OK, I'll xzalloc() for them in the next version.
>
> Anyway, the current sheepkeeper is too unstable. Can you send this
> series after you pass the following test?
>
> * setup sheepkeeper
> $ valgrind ./sheepkeeper/sheepkeeper -f
>
> * run tests
> $ cd tests
> $ DRIVER=sheepkeeper:127.0.0.1 ./check -e -valgrind
>
Sorry for my buggy implementation. I'll check with the above tests and
documentation before next post.
Thanks for your review,
Hitoshi
More information about the sheepdog
mailing list