[sheepdog] [PATCH] sheepkeeper: a new cluster manager specialized for sheepdog
MORITA Kazutaka
morita.kazutaka at gmail.com
Sun Nov 11 20:57:17 CET 2012
> + switch (rcv.type) {
> + case SK_MSG_NEW_NODE:
> + join = xmalloc(rcv.body_len);
> + ret = xread(sk_comm_fd, join, rcv.body_len);
> + if (ret != rcv.body_len) {
> + vprintf(SDOG_ERR, "xread() failed\n");
> + exit(1);
> + }
> +
> + if (is_master)
> + sd_check_join_cb(&join->node, join->opaque);
> +
> + bzero(&snd, sizeof(snd));
bzero is deprecated. Please use memset.
> +static int sheepkeeper_init(const char *option)
> +{
> + struct sockaddr_in addr;
> +
> + sk_comm_fd = socket(AF_INET, SOCK_STREAM, 0);
> + if (sk_comm_fd < 0) {
> + vprintf(SDOG_INFO, "error at socket()\n");
> + return -1;
> + }
> +
> + bzero(&addr, sizeof(struct sockaddr_in));
Use memset.
> + addr.sin_family = AF_INET;
> + addr.sin_port = SHEEPKEEPER_PORT;
The address and port of sheepkeeper need to be configurable.
> +
> + if (connect(sk_comm_fd, &addr, sizeof(struct sockaddr_in))) {
> + vprintf(SDOG_ERR, "cannot connect to sheepkeeper,"
> + " is sheepkeeper running?\n");
> + vprintf(SDOG_ERR, "errno: %s\n", strerror(errno));
> + return -1;
> + }
> +
> + return 0;
> +}
> +
> +static int sheepkeeper_join(struct sd_node *myself,
Should be "const struct sd_node *myself".
> + void *opaque, size_t opaque_len)
> +{
> + int ret, msg_join_len;
> + struct iovec iov[2];
> + struct sk_msg msg, rcv;
> + struct sk_msg_join *msg_join;
> + struct sk_msg_join_reply *join_reply;
> +
> + msg_join_len = sizeof(struct sk_msg_join) + opaque_len;
> +
> + bzero(&msg, sizeof(struct sk_msg));
Use memset.
> + msg.type = SK_MSG_JOIN;
> + msg.body_len = msg_join_len;
> +
> + iov[0].iov_base = &msg;
> + iov[0].iov_len = sizeof(struct sk_msg);
> +
> + msg_join = calloc(opaque_len, sizeof(char));
The correct size looks to be msg_join_len.
> + this_node = *myself;
> + msg_join->node = this_node;
> + memcpy(msg_join->opaque, opaque, opaque_len);
> +
> + iov[1].iov_base = msg_join;
> + iov[1].iov_len = msg_join_len;
> +
> +retry:
> + ret = writev(sk_comm_fd, iov, 2);
> + if (ret != (iov[0].iov_len + iov[1].iov_len)) {
> + vprintf(SDOG_ERR, "sheepkeeper_join() failed, %s\n",
> + strerror(errno));
> + return -1;
> + }
> +
> + ret = xread(sk_comm_fd, &rcv, sizeof(struct sk_msg));
> + if (rcv.type == SK_MSG_JOIN_RETRY) {
> + vprintf(SDOG_INFO, "join request is rejected, retrying\n");
> + goto retry;
> + } else if (rcv.type == SK_MSG_NEW_NODE) {
> + struct sk_msg_join *join;
> + int join_len;
> +
> + join_len = msg.body_len;
> + join = xcalloc(join_len, sizeof(char));
> + ret = xread(sk_comm_fd, join, join_len);
> +
> + ret = sd_check_join_cb(&join->node, join->opaque);
> +
> + if (join->master_elected) {
> + vprintf(SDOG_INFO, "elected as master\n");
> + is_master = true;
> + }
> +
> + msg.type = SK_MSG_NEW_NODE_REPLY;
> + msg.body_len = join_len;
> +
> + iov[0].iov_base = &msg;
> + iov[0].iov_len = sizeof(struct sk_msg);
> +
> + iov[1].iov_base = join;
> + iov[1].iov_len = join_len;
> +
> + ret = writev(sk_comm_fd, iov, 2);
> + if (ret != (iov[0].iov_len + iov[1].iov_len)) {
> + vprintf(SDOG_ERR, "writev() failed\n");
> + exit(1);
> + }
> +
> + ret = xread(sk_comm_fd, &rcv, sizeof(struct sk_msg));
> + if (ret != sizeof(rcv)) {
> + vprintf(SDOG_ERR, "invalid ret: %d\n", ret);
> + exit(1);
> + }
> + }
> +
> + if (rcv.type != SK_MSG_JOIN_REPLY) {
> + vprintf(SDOG_INFO, "invalid message from sheepkeeper, "
> + "received message: %d\n", rcv.type);
> + exit(1);
> + }
> +
> + join_reply = xmalloc(rcv.body_len);
> + ret = xread(sk_comm_fd, join_reply, rcv.body_len);
> + if (ret != rcv.body_len) {
> + vprintf(SDOG_ERR, "xread() failed\n");
> + exit(1);
> + }
> +
> + vprintf(SDOG_INFO, "join reply arrived, nr_nodes: %d\n",
> + join_reply->nr_nodes);
> + sd_join_handler(&this_node, join_reply->nodes, join_reply->nr_nodes,
> + CJ_RES_SUCCESS, join_reply->opaque);
The current code doesn't expect that a cluster driver call
sd_join_handler in the join handler. To fix the problem, you need to
initialize work queues before calling create_cluster.
However, can we move all xread calls against sk_comm_fd into
read_msg_from_sheepkeeper? Then, we can remove sd_join_handler from
here, I think.
> diff --git a/sheepkeeper/sheepkeeper.c b/sheepkeeper/sheepkeeper.c
> new file mode 100644
> index 0000000..3249958
> --- /dev/null
> +++ b/sheepkeeper/sheepkeeper.c
> @@ -0,0 +1,530 @@
> +
> +#include <stdio.h>
> +#include <stdlib.h>
> +#include <string.h>
> +#include <stdbool.h>
> +#include <stdint.h>
> +#include <errno.h>
> +#include <assert.h>
> +#include <getopt.h>
> +
> +#include <unistd.h>
> +#include <sys/socket.h>
> +#include <sys/types.h>
> +#include <sys/epoll.h>
> +
> +#include <sys/un.h>
> +#include <netinet/in.h>
> +
> +#include "net.h"
> +#include "event.h"
> +#include "list.h"
> +#include "internal_proto.h"
> +#include "sheepkeeper.h"
> +
> +static bool daemonized;
> +
> +static inline char *node_to_str(struct sd_node *id)
> +{
> + static char str[256];
> + char name[256];
> + int af = AF_INET6;
> + uint8_t *addr = id->nid.addr;
> +
> + /* Find address family type */
> + if (addr[12]) {
> + int oct_no = 0;
> + while (!addr[oct_no] && oct_no++ < 12)
> + ;
> + if (oct_no == 12)
> + af = AF_INET;
> + }
> +
> + snprintf(str, sizeof(str), "%s ip:%s port:%d",
> + (af == AF_INET) ? "IPv4" : "IPv6",
> + addr_to_str(name, sizeof(name), id->nid.addr, 0), id->nid.port);
> +
> + return str;
> +}
How about moving node_to_str from sheep/cluster.h to
include/internal_proto.h rather than redefining it?
> +
> +#define die(fmt, ...) do { \
> + if (!daemonized) { \
> + fprintf(stderr, "%s, %d (errno: %s): " fmt, \
> + __FILE__, __LINE__, strerror(errno), \
> + ## __VA_ARGS__); \
> + } else { \
> + syslog(LOG_ERR, "%s, %d (errno: %s): " fmt, \
> + __FILE__, __LINE__, strerror(errno), \
> + ## __VA_ARGS__); \
> + } \
> + \
> + exit(1); \
> + } while (0)
Can we use panic()?
> +
> +#define log(fmt, ...) do { \
> + if (!daemonized) { \
> + fprintf(stdout, "%s, %d: " fmt, \
> + __FILE__, __LINE__, \
> + ## __VA_ARGS__); \
> + } else { \
> + syslog(LOG_INFO, "%s, %d: " fmt, \
> + __FILE__, __LINE__, \
> + ## __VA_ARGS__); \
> + } \
> + } while (0)
Can we use eprintf?
> +
> +#define debug_printf(fmt, ...) do { \
> + if (!daemonized) { \
> + fprintf(stdout, "%s, %d (errno: %s): " fmt, \
> + __FILE__, __LINE__, strerror(errno), \
> + ## __VA_ARGS__); \
> + } else { \
> + syslog(LOG_DEBUG, "%s, %d (errno: %s): " fmt, \
> + __FILE__, __LINE__, strerror(errno), \
> + ## __VA_ARGS__); \
> + } \
> + } while (0)
Can we use dprintf?
> +
> +static void *_xcalloc(size_t nmemb, size_t size)
> +{
> + void *ret;
> +
> + ret = calloc(nmemb, size);
> + if (!ret)
> + die("error at calloc()\n");
> +
> + return ret;
> +}
Can we use xcalloc in util.c?
> + case SK_MSG_NEW_NODE_REPLY:
> + log("new node reply from %s\n", node_to_str(&sheep->node));
> + join = _xcalloc(rcv.body_len, sizeof(char));
> + ret = xread(fd, join, rcv.body_len);
> + if (ret != rcv.body_len)
> + die("xread() failed, ret: %d\n", ret);
> +
> + s = find_sheep_by_nid(&join->node.nid);
> + if (!s) {
> + die("invalid nid is required, %s\n",
> + node_to_str(&join->node));
> + }
> +
> + if (!sheep->master)
> + die("no master replied new node reply\n");
> +
> + s->kept_join_msg_len = rcv.body_len -
> + sizeof(struct sk_msg_join);
> + s->kept_join_msg = _xcalloc(s->kept_join_msg_len, sizeof(char));
> + memcpy(s->kept_join_msg, join->opaque, s->kept_join_msg_len);
> +
> + log("reserved opaque, length: %d\n", s->kept_join_msg_len);
> +
> + bzero(&join_reply, sizeof(struct sk_msg));
Use memset.
> +
> +static void sheep_comm_handler(int fd, int events, void *data)
> +{
> + if (events & EPOLLIN)
> + read_msg_from_sheep(data);
> +}
Should we handle EPOLLERR and EPOLLHUP, too?
> +static struct option const long_options[] = {
> + { "port", required_argument, NULL, 'p' },
> + { "address", required_argument, NULL, 'a' },
> + { "not-daemonize", no_argument, NULL, 'n' },
Use "foreground" and 'f' like sheep.
The last element of long_options has to be filled with zeros.
> +
> + /* setup inet socket for communication with sheeps */
> + sheep_listen_fd = socket(AF_INET, SOCK_STREAM, 0);
> + if (sheep_listen_fd < 0)
> + die("socket() failed\n");
Turn on SO_REUSEADDR to allow us to reuse the port immediately.
Thanks,
Kazutaka
More information about the sheepdog
mailing list