This is the skeleton of a writeback cache implementation. The biggest missing item is that cache flush requestsn't aren't forwarded to other nodes yet. Also currently it is enabled unconditional instead of through a flag. Signed-off-by: Christoph Hellwig <hch at lst.de> Index: sheepdog/include/sheepdog_proto.h =================================================================== --- sheepdog.orig/include/sheepdog_proto.h 2011-11-15 22:55:42.339689112 +0100 +++ sheepdog/include/sheepdog_proto.h 2011-11-15 22:56:08.209688958 +0100 @@ -28,6 +28,7 @@ #define SD_OP_RELEASE_VDI 0x13 #define SD_OP_GET_VDI_INFO 0x14 #define SD_OP_READ_VDIS 0x15 +#define SD_OP_FLUSH_VDI 0x16 #define SD_FLAG_CMD_WRITE 0x01 #define SD_FLAG_CMD_COW 0x02 Index: sheepdog/sheep/ops.c =================================================================== --- sheepdog.orig/sheep/ops.c 2011-11-15 22:55:42.353022445 +0100 +++ sheepdog/sheep/ops.c 2011-11-15 22:56:08.213022292 +0100 @@ -489,6 +489,11 @@ static struct sd_op_template sd_ops[] = [SD_OP_REMOVE_OBJ] = { .type = SD_OP_TYPE_IO, }, + + [SD_OP_FLUSH_VDI] = { + .type = SD_OP_TYPE_IO, + }, + }; struct sd_op_template *get_sd_op(uint8_t opcode) Index: sheepdog/sheep/store.c =================================================================== --- sheepdog.orig/sheep/store.c 2011-11-15 22:55:42.366355778 +0100 +++ sheepdog/sheep/store.c 2011-11-15 23:02:30.179686678 +0100 @@ -22,6 +22,7 @@ #include <sys/stat.h> #include <fcntl.h> #include <time.h> +#include <asm/unistd.h> #include "sheep_priv.h" @@ -429,12 +430,150 @@ out: return ret; } +static int forward_flush_obj_req(struct request *req, int idx) +{ + struct sd_obj_req hdr = *(struct sd_obj_req *)&req->rq; + struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&req->rp; + int ret; + + /* + * XXX: need to write code to find all nodes that have objects for + * this VDI. + */ +#if 0 + int i, n, nr, fd; + unsigned wlen, rlen; + char name[128]; + struct sheepdog_vnode_list_entry *e; + uint64_t oid = hdr.oid; + int copies; + struct pollfd pfds[SD_MAX_REDUNDANCY]; + int nr_fds, local = 0; + + dprintf("forward_flush_obj_req %"PRIx64"\n", oid); + e = req->entry; + nr = req->nr_vnodes; + + copies = hdr.copies; + + /* temporary hack */ + if (!copies) + copies = sys->nr_sobjs; + if (copies > req->nr_zones) + copies = req->nr_zones; + + nr_fds = 0; + memset(pfds, 0, sizeof(pfds)); + for (i = 0; i < ARRAY_SIZE(pfds); i++) + pfds[i].fd = -1; + + hdr.flags |= SD_FLAG_CMD_IO_LOCAL; + + wlen = hdr.data_length; + rlen = 0; + + for (i = 0; i < copies; i++) { + n = obj_to_sheep(e, nr, oid, i); + + addr_to_str(name, sizeof(name), e[n].addr, 0); + + if (is_myself(e[n].addr, e[n].port)) { + local = 1; + continue; + } + + fd = get_sheep_fd(e[n].addr, e[n].port, e[n].node_idx, hdr.epoch, idx); + if (fd < 0) { + eprintf("failed to connect to %s:%"PRIu32"\n", name, e[n].port); + ret = SD_RES_NETWORK_ERROR; + goto out; + } + + ret = send_req(fd, (struct sd_req *)&hdr, req->data, &wlen); + if (ret) { /* network errors */ + ret = SD_RES_NETWORK_ERROR; + dprintf("fail %"PRIu32"\n", ret); + goto out; + } + + pfds[nr_fds].fd = fd; + pfds[nr_fds].events = POLLIN; + nr_fds++; + } + + if (local) { +#endif + ret = store_queue_request_local(req, hdr.epoch); + rsp->result = ret; + +#if 0 + if (nr_fds == 0) { + eprintf("exit %"PRIu32"\n", ret); + goto out; + } + + if (rsp->result != SD_RES_SUCCESS) { + eprintf("fail (local) %"PRIu32"\n", ret); + goto out; + } + } + + ret = SD_RES_SUCCESS; +again: + if (poll(pfds, nr_fds, -1) < 0) { + if (errno == EINTR) + goto again; + + ret = SD_RES_EIO; + } + + for (i = 0; i < nr_fds; i++) { + if (pfds[i].fd < 0) + break; + + if (pfds[i].revents & POLLERR || pfds[i].revents & POLLHUP || pfds[i].revents & POLLNVAL) { + ret = SD_RES_NETWORK_ERROR; + break; + } + + if (!(pfds[i].revents & POLLIN)) + continue; + + if (do_read(pfds[i].fd, rsp, sizeof(*rsp))) { + eprintf("failed to read a response: %m\n"); + ret = SD_RES_NETWORK_ERROR; + break; + } + + if (rsp->result != SD_RES_SUCCESS) { + eprintf("fail %"PRIu32"\n", rsp->result); + ret = rsp->result; + } + + break; + } + if (i < nr_fds) { + nr_fds--; + memmove(pfds + i, pfds + i + 1, sizeof(*pfds) * (nr_fds - i)); + } + + dprintf("%"PRIx64" %"PRIu32"\n", oid, nr_fds); + + if (nr_fds > 0) { + goto again; + } +out: +#endif + return ret; +} + + static int ob_open(uint32_t epoch, uint64_t oid, int flags, int *ret) { char path[1024]; int fd; - flags |= O_DSYNC | O_RDWR; + flags |= O_RDWR; if (sys->use_directio && is_data_obj(oid)) flags |= O_DIRECT; @@ -782,6 +921,33 @@ out: return ret; } +static int store_flush_cache(struct request *req, uint32_t epoch) +{ + int ret = SD_RES_SUCCESS; + int fd; + + /* + * XXX: either check during startup that there is only a single + * filesystem used by the sheep daemon, or find a way to iterate + * over all of them. + */ + fd = open(obj_path, O_DIRECTORY | O_RDONLY); + if (fd < 0) { + fprintf(stderr, "open failed\n"); + return SD_RES_EIO; + } + + if (syscall(__NR_syncfs, fd) < 0) { + fprintf(stderr, "syncfs returned %d\n", errno); + ret = SD_RES_EIO; + } else { + fprintf(stderr, "syncfs done\n"); + } + + close(fd); + return ret; +} + static int store_queue_request_local(struct request *req, uint32_t epoch) { struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq; @@ -790,6 +956,9 @@ static int store_queue_request_local(str dprintf("%x, %" PRIx64" , %u\n", hdr->opcode, hdr->oid, epoch); switch (hdr->opcode) { + case SD_OP_FLUSH_VDI: + ret = store_flush_cache(req, epoch); + break; case SD_OP_WRITE_OBJ: ret = store_write_obj(req, epoch); break; @@ -900,7 +1069,9 @@ void store_queue_request(struct work *wo goto out; } - if (hdr->flags & SD_FLAG_CMD_WRITE) + if (hdr->opcode == SD_OP_FLUSH_VDI) + ret = forward_flush_obj_req(req, idx); + else if (hdr->flags & SD_FLAG_CMD_WRITE) ret = forward_write_obj_req(req, idx); else ret = forward_read_obj_req(req, idx); |