Conditions sheepdog can start automatically and safely are: - all nodes have same epochs, which means `shepherd shutdown` was executed before shutting down sheepdog nodes - newly added machines have no local epochs and objects Otherwise collie returns SD_RES_INCONSISTENT_EPOCH error. Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp> --- collie/collie.h | 7 ++ collie/group.c | 209 +++++++++++++++++++++++++++++++++++++++------ collie/net.c | 33 +++++++ collie/store.c | 141 ++++++++++++++++++++++++++++++- include/sheepdog_proto.h | 39 +++++++++ 5 files changed, 397 insertions(+), 32 deletions(-) diff --git a/collie/collie.h b/collie/collie.h index 89eb8f8..5cd2383 100644 --- a/collie/collie.h +++ b/collie/collie.h @@ -62,6 +62,7 @@ struct cluster_info { struct sheepdog_node_list_entry this_node; uint32_t epoch; + uint32_t status; struct list_head cpg_node_list; struct list_head sd_node_list; @@ -96,6 +97,10 @@ void so_queue_request(struct work *work, int idx); void store_queue_request(struct work *work, int idx); +int read_epoch(uint32_t *epoch, uint64_t *ctime, + struct sheepdog_node_list_entry *entries, int *nr_entries); +void epoch_queue_request(struct work *work, int idx); + void cluster_queue_request(struct work *work, int idx); int update_epoch_store(uint32_t epoch); @@ -107,6 +112,8 @@ extern struct work_queue *dobj_queue; int epoch_log_write(uint32_t epoch, char *buf, int len); int epoch_log_read(uint32_t epoch, char *buf, int len); +int get_latest_epoch(void); +int remove_epoch(int epoch); int set_cluster_ctime(uint64_t ctime); uint64_t get_cluster_ctime(void); diff --git a/collie/group.c b/collie/group.c index d89f3ef..a49c1be 100644 --- a/collie/group.c +++ b/collie/group.c @@ -48,9 +48,9 @@ struct join_message { uint32_t nodeid; uint32_t pid; struct sheepdog_node_list_entry master_node; - uint32_t epoch; uint32_t nr_nodes; uint32_t nr_sobjs; + uint32_t cluster_status; uint32_t pad; struct { uint32_t nodeid; @@ -168,6 +168,7 @@ void cluster_queue_request(struct work *work, int idx) struct sd_req *hdr = (struct sd_req *)&req->rq; struct sd_rsp *rsp = (struct sd_rsp *)&req->rp; struct vdi_op_message *msg; + struct epoch_log *log; int ret = SD_RES_SUCCESS; eprintf("%p %x\n", req, hdr->opcode); @@ -180,6 +181,24 @@ void cluster_queue_request(struct work *work, int idx) case SD_OP_GET_VM_LIST: get_vm_list(rsp, req->data); break; + case SD_OP_STAT_CLUSTER: + log = (struct epoch_log *)req->data; + + ((struct sd_vdi_rsp *)rsp)->rsvd = sys->status; + log->ctime = get_cluster_ctime(); + log->epoch = get_latest_epoch(); + log->nr_nodes = epoch_log_read(log->epoch, (char *)log->nodes, + sizeof(log->nodes)); + if (log->nr_nodes == -1) { + rsp->data_length = 0; + log->nr_nodes = 0; + } else{ + rsp->data_length = sizeof(*log); + log->nr_nodes /= sizeof(log->nodes[0]); + } + + rsp->result = SD_RES_SUCCESS; + break; default: /* forward request to group */ goto forward; @@ -286,6 +305,96 @@ static int is_master(void) return 0; } +static int get_cluster_status(struct sheepdog_node_list_entry *node) +{ + struct sd_epoch_req hdr; + struct sd_epoch_rsp *rsp = (struct sd_epoch_rsp *)&hdr; + unsigned int rlen, wlen; + int ret, fd, i, j; + char name[128]; + uint64_t ctime; + int nr_entries, nr_local_entries; + struct sheepdog_node_list_entry entries[SD_MAX_NODES]; + struct sheepdog_node_list_entry local_entries[SD_MAX_NODES]; + uint32_t epoch; + + if (sys->status == SD_STATUS_INCONSISTENT_EPOCHS) + return SD_STATUS_INCONSISTENT_EPOCHS; + + if (node->id == sys->this_node.id) { + nr_entries = ARRAY_SIZE(entries); + ret = read_epoch(&epoch, &ctime, entries, &nr_entries); + } else { + addr_to_str(name, sizeof(name), node->addr, 0); + + fd = connect_to(name, node->port); + if (fd < 0) + return SD_STATUS_INCONSISTENT_EPOCHS; + + memset(&hdr, 0, sizeof(hdr)); + hdr.opcode = SD_OP_READ_EPOCH; + hdr.epoch = sys->epoch; + hdr.data_length = sizeof(entries); + rlen = hdr.data_length; + wlen = 0; + + ret = exec_req(fd, (struct sd_req *)&hdr, entries, + &wlen, &rlen); + + + nr_entries = rlen / sizeof(*entries); + if (ret != 0) { + eprintf("failed to send request, %x, %s\n", ret, name); + close(fd); + return SD_STATUS_INCONSISTENT_EPOCHS; + } + epoch = rsp->latest_epoch; + ctime = rsp->ctime; + ret = rsp->result; + + close(fd); + } + + if (ret != SD_RES_SUCCESS) { + eprintf("failed to read epoch, %x\n", ret); + return SD_STATUS_STARTUP; + } + + if (epoch != get_latest_epoch()) + return SD_STATUS_INCONSISTENT_EPOCHS; + + if (ctime != get_cluster_ctime()) + return SD_STATUS_INCONSISTENT_EPOCHS; + + nr_local_entries = epoch_log_read(epoch, (char *)local_entries, + sizeof(local_entries)); + nr_local_entries /= sizeof(local_entries[0]); + + if (nr_entries != nr_local_entries) + return SD_STATUS_INCONSISTENT_EPOCHS; + + if (memcmp(entries, local_entries, sizeof(entries[0]) * nr_entries) != 0) + return SD_STATUS_INCONSISTENT_EPOCHS; + + nr_entries = build_node_list(&sys->sd_node_list, entries); + if (nr_entries + 1 != nr_local_entries) + return SD_STATUS_STARTUP; + + for (i = 0; i < nr_local_entries; i++) { + if (local_entries[i].id == node->id) + goto next; + for (j = 0; j < nr_entries; j++) { + if (local_entries[i].id == entries[j].id) + goto next; + } + return SD_STATUS_STARTUP; + next: + ; + } + + return SD_STATUS_OK; +} + static void join(struct join_message *msg) { struct node *node; @@ -299,8 +408,8 @@ static void join(struct join_message *msg) if (msg->nr_sobjs) sys->nr_sobjs = msg->nr_sobjs; - msg->epoch = sys->epoch; msg->nr_sobjs = sys->nr_sobjs; + msg->nr_nodes = 0; list_for_each_entry(node, &sys->cpg_node_list, list) { if (node->nodeid == msg->nodeid && node->pid == msg->pid) continue; @@ -312,6 +421,11 @@ static void join(struct join_message *msg) msg->nodes[msg->nr_nodes].ent = node->ent; msg->nr_nodes++; } + + if (sys->status == SD_STATUS_STARTUP) + msg->cluster_status = get_cluster_status(&msg->header.from); + else + msg->cluster_status = sys->status; } static void update_cluster_info(struct join_message *msg) @@ -346,33 +460,45 @@ static void update_cluster_info(struct join_message *msg) &msg->nodes[i].ent); } - sys->epoch = msg->epoch; sys->synchronized = 1; - nr_nodes = build_node_list(&sys->sd_node_list, entry); + eprintf("system status = %d\n", msg->cluster_status); + if (sys->status == SD_STATUS_OK) { + nr_nodes = build_node_list(&sys->sd_node_list, entry); - ret = epoch_log_write(sys->epoch, (char *)entry, - nr_nodes * sizeof(struct sheepdog_node_list_entry)); - if (ret < 0) - eprintf("can't write epoch %u\n", sys->epoch); + dprintf("update epoch, %d, %d\n", sys->epoch, nr_nodes); + ret = epoch_log_write(sys->epoch, (char *)entry, + nr_nodes * sizeof(struct sheepdog_node_list_entry)); + if (ret < 0) + eprintf("can't write epoch %u\n", sys->epoch); + } - /* we are ready for object operations */ - update_epoch_store(sys->epoch); out: add_node(&sys->sd_node_list, msg->nodeid, msg->pid, &msg->header.from); - nr_nodes = build_node_list(&sys->sd_node_list, entry); + if (sys->status == SD_STATUS_OK) { + nr_nodes = build_node_list(&sys->sd_node_list, entry); - ret = epoch_log_write(sys->epoch + 1, (char *)entry, - nr_nodes * sizeof(struct sheepdog_node_list_entry)); - if (ret < 0) - eprintf("can't write epoch %u\n", sys->epoch + 1); + dprintf("update epoch, %d, %d\n", sys->epoch + 1, nr_nodes); + ret = epoch_log_write(sys->epoch + 1, (char *)entry, + nr_nodes * sizeof(struct sheepdog_node_list_entry)); + if (ret < 0) + eprintf("can't write epoch %u\n", sys->epoch + 1); - sys->epoch++; + sys->epoch++; - update_epoch_store(sys->epoch); + update_epoch_store(sys->epoch); + } print_node_list(&sys->sd_node_list); + + if (sys->status == SD_STATUS_STARTUP && msg->cluster_status == SD_STATUS_OK) + sys->epoch = get_latest_epoch(); + + if (sys->status != SD_STATUS_INCONSISTENT_EPOCHS) + sys->status = msg->cluster_status; + + return; } static void vdi_op(struct vdi_op_message *msg) @@ -421,6 +547,12 @@ static void vdi_op_done(struct vdi_op_message *msg) struct vm *vm; struct request *req; int ret = msg->rsp.result; + int i, latest_epoch, nr_nodes; + struct sheepdog_node_list_entry entry[SD_MAX_NODES]; + uint64_t ctime; + + if (ret != SD_RES_SUCCESS) + goto out; switch (hdr->opcode) { case SD_OP_NEW_VDI: @@ -456,11 +588,28 @@ static void vdi_op_done(struct vdi_op_message *msg) case SD_OP_GET_VDI_INFO: break; case SD_OP_MAKE_FS: - if (ret == SD_RES_SUCCESS) { - sys->nr_sobjs = ((struct sd_so_req *)hdr)->copies; - eprintf("%d\n", sys->nr_sobjs); - } + sys->nr_sobjs = ((struct sd_so_req *)hdr)->copies; + eprintf("%d\n", sys->nr_sobjs); + + ctime = ((struct sd_so_req *)hdr)->ctime; + set_cluster_ctime(ctime); + latest_epoch = get_latest_epoch(); + for (i = 1; i <= latest_epoch; i++) + remove_epoch(i); + + sys->epoch = 1; + nr_nodes = build_node_list(&sys->sd_node_list, entry); + + dprintf("write epoch log, %d, %d\n", sys->epoch, nr_nodes); + ret = epoch_log_write(sys->epoch, (char *)entry, + nr_nodes * sizeof(struct sheepdog_node_list_entry)); + if (ret < 0) + eprintf("can't write epoch %u\n", sys->epoch); + update_epoch_store(sys->epoch); + + sys->status = SD_STATUS_OK; + break; case SD_OP_SHUTDOWN: sys->status = SD_STATUS_SHUTDOWN; break; @@ -468,7 +617,7 @@ static void vdi_op_done(struct vdi_op_message *msg) eprintf("unknown operation %d\n", hdr->opcode); ret = SD_RES_UNKNOWN; } - +out: if (node_cmp(&sys->this_node, &msg->header.from) != 0) return; @@ -629,13 +778,16 @@ static void __sd_confch(struct work *work, int idx) list_del(&node->list); free(node); - nr = build_node_list(&sys->sd_node_list, e); - epoch_log_write(sys->epoch + 1, (char *)e, - nr * sizeof(struct sheepdog_node_list_entry)); + if (sys->status == SD_STATUS_OK) { + nr = build_node_list(&sys->sd_node_list, e); + dprintf("update epoch, %d, %d\n", sys->epoch + 1, nr); + epoch_log_write(sys->epoch + 1, (char *)e, + nr * sizeof(struct sheepdog_node_list_entry)); - sys->epoch++; + sys->epoch++; - update_epoch_store(sys->epoch); + update_epoch_store(sys->epoch); + } } } @@ -703,7 +855,7 @@ static void sd_confch(cpg_handle_t handle, const struct cpg_name *group_name, member_list[i].reason); } - if (sys->status & SD_STATUS_SHUTDOWN_MASK || sys->status & SD_STATUS_ERROR_MASK) + if (sys->status == SD_STATUS_SHUTDOWN || sys->status == SD_STATUS_INCONSISTENT_EPOCHS) return; w = zalloc(sizeof(*w)); @@ -866,6 +1018,7 @@ join_retry: sys->this_node.id = hval; sys->synchronized = 0; + sys->status = SD_STATUS_STARTUP; INIT_LIST_HEAD(&sys->sd_node_list); INIT_LIST_HEAD(&sys->cpg_node_list); INIT_LIST_HEAD(&sys->vm_list); diff --git a/collie/net.c b/collie/net.c index 84eecdc..04f9547 100644 --- a/collie/net.c +++ b/collie/net.c @@ -38,6 +38,33 @@ static void __done(struct work *work, int idx) static void queue_request(struct request *req) { struct sd_req *hdr = (struct sd_req *)&req->rq; + struct sd_rsp *rsp = (struct sd_rsp *)&req->rp;; + + if (sys->status == SD_STATUS_SHUTDOWN) { + rsp->result = SD_RES_SHUTDOWN; + req->done(req); + return; + } + + if (sys->status == SD_STATUS_STARTUP || + sys->status == SD_STATUS_INCONSISTENT_EPOCHS) { + /* TODO: cleanup */ + switch (hdr->opcode) { + case SD_OP_STAT_CLUSTER: + case SD_OP_MAKE_FS: + case SD_OP_GET_NODE_LIST: + case SD_OP_SO: + case SD_OP_READ_EPOCH: + break; + default: + if (sys->status == SD_STATUS_STARTUP) + rsp->result = SD_RES_STARTUP; + else + rsp->result = SD_RES_INCONSISTENT_EPOCHS; + req->done(req); + return; + } + } switch (hdr->opcode) { case SD_OP_CREATE_AND_WRITE_OBJ: @@ -49,6 +76,9 @@ static void queue_request(struct request *req) case SD_OP_GET_OBJ_LIST: req->work.fn = store_queue_request; break; + case SD_OP_READ_EPOCH: + req->work.fn = epoch_queue_request; + break; case SD_OP_GET_NODE_LIST: case SD_OP_GET_VM_LIST: case SD_OP_NEW_VDI: @@ -58,6 +88,7 @@ static void queue_request(struct request *req) case SD_OP_GET_VDI_INFO: case SD_OP_MAKE_FS: case SD_OP_SHUTDOWN: + case SD_OP_STAT_CLUSTER: req->work.fn = cluster_queue_request; break; case SD_OP_SO: @@ -69,6 +100,8 @@ static void queue_request(struct request *req) break; default: eprintf("unknown operation %d\n", hdr->opcode); + rsp->result = SD_RES_SYSTEM_ERROR; + req->done(req); return; } diff --git a/collie/store.c b/collie/store.c index e46ca46..a36c30f 100644 --- a/collie/store.c +++ b/collie/store.c @@ -24,6 +24,7 @@ #define ANAME_LAST_OID "user.sheepdog.last_oid" #define ANAME_CTIME "user.sheepdog.ctime" #define ANAME_COPIES "user.sheepdog.copies" +#define ANAME_OBJECT_UPDATED "user.sheepdog.object_updated" static char *vdi_path; static char *obj_path; @@ -504,8 +505,9 @@ static int store_queue_request_local(struct request *req, char *buf, uint32_t ep if (ret != hdr->data_length) { ret = SD_RES_EIO; goto out; - } else - ret = SD_RES_SUCCESS; + } + + ret = SD_RES_SUCCESS; break; case SD_OP_SYNC_OBJ: ret = fsync(fd); @@ -843,8 +845,11 @@ void so_queue_request(struct work *work, int idx) ret = fsetxattr(fd, ANAME_COPIES, &hdr->copies, sizeof(hdr->copies), 0); - if (ret) + if (ret) { result = SD_RES_EIO; + goto out; + } + break; case SD_OP_SO_NEW_VDI: fd = open(path, O_RDONLY); @@ -956,8 +961,8 @@ void so_queue_request(struct work *work, int idx) eprintf("%lx\n", last_oid); rsp->oid = last_oid; - break; + break; case SD_OP_SO_LOOKUP_VDI: result = so_lookup_vdi(req); break; @@ -1041,6 +1046,91 @@ int epoch_log_read(uint32_t epoch, char *buf, int len) return len; } +int get_latest_epoch(void) +{ + DIR *dir; + struct dirent *d; + uint32_t e, epoch = 0; + + dir = opendir(epoch_path); + if (!dir) + return -1; + + while ((d = readdir(dir))) { + e = atoi(d->d_name); + if (e > epoch) + epoch = e; + } + closedir(dir); + + return epoch; +} + +/* remove directory recursively */ +static int rmdir_r(char *dir_path) +{ + int ret; + struct stat s; + DIR *dir; + struct dirent *d; + char path[PATH_MAX]; + + dir = opendir(dir_path); + if (!dir) { + if (errno != ENOENT) + eprintf("failed, %s, %d\n", dir_path, errno); + return -errno; + } + + while ((d = readdir(dir))) { + if (!strcmp(d->d_name, ".") || !strcmp(d->d_name, "..")) + continue; + + snprintf(path, sizeof(path), "%s/%s", dir_path, d->d_name); + ret = stat(path, &s); + if (ret) { + eprintf("cannot remove directory %s\n", path); + goto out; + } + if (S_ISDIR(s.st_mode)) + ret = rmdir_r(path); + else + ret = unlink(path); + + if (ret != 0) { + eprintf("failed, %s, %d, %d\n", path, S_ISDIR(s.st_mode), errno); + goto out; + } + } + + ret = rmdir(dir_path); +out: + closedir(dir); + return ret; +} + +int remove_epoch(int epoch) +{ + int ret; + char path[PATH_MAX]; + + dprintf("remove epoch %d\n", epoch); + snprintf(path, sizeof(path), "%s%08u", epoch_path, epoch); + ret = unlink(path); + if (ret && ret != -ENOENT) { + eprintf("failed to remove %s, %s\n", path, strerror(-ret)); + return SD_RES_EIO; + } + + snprintf(path, sizeof(path), "%s%08u", obj_path, epoch); + ret = rmdir_r(path); + if (ret && ret != -ENOENT) { + eprintf("failed to remove %s, %s\n", path, strerror(-ret)); + return SD_RES_EIO; + } + return 0; +} + int set_cluster_ctime(uint64_t ctime) { int fd, ret; @@ -1574,3 +1664,46 @@ int init_store(char *d) return ret; } + +int read_epoch(uint32_t *epoch, uint64_t *ctime, + struct sheepdog_node_list_entry *entries, int *nr_entries) +{ + int ret; + + *epoch = get_latest_epoch(); + ret = epoch_log_read(*epoch, (char *)entries, + *nr_entries * sizeof(*entries)); + if (ret == -1) { + eprintf("failed to read epoch %d\n", *epoch); + *nr_entries = 0; + return SD_RES_EIO; + } + *nr_entries = ret / sizeof(*entries); + + *ctime = get_cluster_ctime(); + + return SD_RES_SUCCESS; +} + +void epoch_queue_request(struct work *work, int idx) +{ + struct request *req = container_of(work, struct request, work); + int ret = SD_RES_SUCCESS, n; + struct sd_epoch_req *hdr = (struct sd_epoch_req *)&req->rq; + struct sd_epoch_rsp *rsp = (struct sd_epoch_rsp *)&req->rp; + uint32_t opcode = hdr->opcode; + struct sheepdog_node_list_entry *entries; + + switch (opcode) { + case SD_OP_READ_EPOCH: + entries = req->data; + n = hdr->data_length / sizeof(*entries); + ret = read_epoch(&rsp->latest_epoch, &rsp->ctime, entries, &n); + rsp->data_length = n * sizeof(*entries); + break; + } + if (ret != SD_RES_SUCCESS) { + dprintf("failed, %d, %x, %x\n", idx, opcode, ret); + rsp->result = ret; + } +} diff --git a/include/sheepdog_proto.h b/include/sheepdog_proto.h index 4482382..9863aa3 100644 --- a/include/sheepdog_proto.h +++ b/include/sheepdog_proto.h @@ -35,6 +35,7 @@ #define SD_OP_UPDATE_EPOCH 0x22 #define SD_OP_GET_EPOCH 0x23 #define SD_OP_SHUTDOWN 0x24 +#define SD_OP_READ_EPOCH 0x25 #define SD_OP_DEBUG_INC_NVER 0xA0 #define SD_OP_DEBUG_SET_NODE 0xA1 @@ -55,6 +56,7 @@ #define SD_OP_SO_STAT 0x65 #define SD_OP_STAT_SHEEP 0xB0 +#define SD_OP_STAT_CLUSTER 0xB1 #define SD_FLAG_CMD_WRITE 0x01 #define SD_FLAG_CMD_COW 0x02 @@ -62,6 +64,11 @@ #define SD_FLAG_CMD_SNAPSHOT (1U << 8) +#define SD_STATUS_OK 0x00 +#define SD_STATUS_STARTUP 0x01 +#define SD_STATUS_SHUTDOWN 0x02 +#define SD_STATUS_INCONSISTENT_EPOCHS 0x03 + #define SD_RES_SUCCESS 0x00 /* Success */ #define SD_RES_UNKNOWN 0x01 /* Unknown error */ #define SD_RES_NO_OBJ 0x02 /* No object found */ @@ -87,6 +94,8 @@ #define SD_RES_NO_EPOCH 0x16 /* Requested epoch is not found */ #define SD_RES_VDI_NOT_LOCKED 0x17 /* Vdi is not locked */ #define SD_RES_SHUTDOWN 0x18 /* Sheepdog is shutting down */ +#define SD_RES_NO_MEM 0x19 /* Cannot allocate memory */ +#define SD_RES_INCONSISTENT_EPOCHS 0x1A /* There is inconsistency between epochs */ #define SD_VDI_RSP_FLAG_CURRENT 0x01 @@ -111,6 +120,29 @@ struct sd_rsp { uint32_t opcode_specific[7]; }; +struct sd_epoch_req { + uint8_t proto_ver; + uint8_t opcode; + uint16_t flags; + uint32_t epoch; + uint32_t id; + uint32_t data_length; + uint32_t pad[8]; +}; + +struct sd_epoch_rsp { + uint8_t proto_ver; + uint8_t opcode; + uint16_t flags; + uint32_t epoch; + uint32_t id; + uint32_t data_length; + uint32_t result; + uint32_t latest_epoch; + uint64_t ctime; + uint32_t pad[4]; +}; + struct sd_so_req { uint8_t proto_ver; uint8_t opcode; @@ -234,6 +266,13 @@ struct sheepdog_node_list_entry { uint16_t pad[3]; }; +struct epoch_log { + uint64_t ctime; + uint32_t epoch; + uint32_t nr_nodes; + struct sheepdog_node_list_entry nodes[SD_MAX_NODES]; +}; + /* * 64 bit FNV-1a non-zero initial basis */ -- 1.5.6.5 |