From: Liu Yuan <tailai.ly at taobao.com> - thus we can make read/write_object_local() as local static functions. Signed-off-by: Liu Yuan <tailai.ly at taobao.com> --- sheep/sdnet.c | 173 ----------------------------- sheep/sheep_priv.h | 5 - sheep/store.c | 309 ++++++++++++++++++++++++++++++++++++++++------------ 3 files changed, 239 insertions(+), 248 deletions(-) diff --git a/sheep/sdnet.c b/sheep/sdnet.c index bdfe003..5ed39eb 100644 --- a/sheep/sdnet.c +++ b/sheep/sdnet.c @@ -669,179 +669,6 @@ int create_listen_port(int port, void *data) return create_listen_ports(port, create_listen_port_fn, data); } -int write_object(struct vnode_info *vnodes, uint32_t node_version, - uint64_t oid, char *data, unsigned int datalen, - uint64_t offset, uint16_t flags, int nr_copies, int create) -{ - struct sd_obj_req hdr; - struct sd_vnode *v; - int i, fd, ret; - char name[128]; - - for (i = 0; i < nr_copies; i++) { - unsigned rlen = 0, wlen = datalen; - - v = oid_to_vnode(vnodes, oid, i); - if (vnode_is_local(v)) { - ret = write_object_local(oid, data, datalen, offset, - flags, nr_copies, node_version, - create); - - if (ret != 0) { - eprintf("fail %"PRIx64" %"PRIx32"\n", oid, ret); - return -1; - } - - continue; - } - - addr_to_str(name, sizeof(name), v->addr, 0); - - fd = connect_to(name, v->port); - if (fd < 0) { - eprintf("failed to connect to host %s\n", name); - return -1; - } - - memset(&hdr, 0, sizeof(hdr)); - hdr.epoch = node_version; - if (create) - hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ; - else - hdr.opcode = SD_OP_WRITE_OBJ; - - hdr.oid = oid; - hdr.copies = nr_copies; - - hdr.flags = flags; - hdr.flags |= SD_FLAG_CMD_WRITE | SD_FLAG_CMD_IO_LOCAL; - hdr.data_length = wlen; - hdr.offset = offset; - - ret = exec_req(fd, (struct sd_req *)&hdr, data, &wlen, &rlen); - close(fd); - if (ret) { - eprintf("failed to update host %s\n", name); - return -1; - } - } - - return 0; -} - -int read_object(struct vnode_info *vnodes, uint32_t node_version, - uint64_t oid, char *data, unsigned int datalen, - uint64_t offset, int nr_copies) -{ - struct sd_obj_req hdr; - struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr; - struct sd_vnode *v; - char name[128]; - int i = 0, fd, ret, last_error = SD_RES_SUCCESS; - - /* search a local object first */ - for (i = 0; i < nr_copies; i++) { - v = oid_to_vnode(vnodes, oid, i); - if (vnode_is_local(v)) { - ret = read_object_local(oid, data, datalen, offset, - nr_copies, node_version); - - if (ret != SD_RES_SUCCESS) { - eprintf("fail %"PRIx64" %"PRId32"\n", oid, ret); - return ret; - } - - return SD_RES_SUCCESS; - } - - } - - for (i = 0; i < nr_copies; i++) { - unsigned wlen = 0, rlen = datalen; - - v = oid_to_vnode(vnodes, oid, i); - - addr_to_str(name, sizeof(name), v->addr, 0); - - fd = connect_to(name, v->port); - if (fd < 0) { - printf("%s(%d): %s, %m\n", __func__, __LINE__, - name); - return SD_RES_EIO; - } - - memset(&hdr, 0, sizeof(hdr)); - hdr.epoch = node_version; - hdr.opcode = SD_OP_READ_OBJ; - hdr.oid = oid; - - hdr.flags = SD_FLAG_CMD_IO_LOCAL; - hdr.data_length = rlen; - hdr.offset = offset; - - ret = exec_req(fd, (struct sd_req *)&hdr, data, &wlen, &rlen); - close(fd); - - if (ret) { - last_error = SD_RES_EIO; - continue; - } - - if (rsp->result == SD_RES_SUCCESS) - return SD_RES_SUCCESS; - - last_error = rsp->result; - } - - return last_error; -} - -int remove_object(struct vnode_info *vnodes, uint32_t node_version, - uint64_t oid, int nr) -{ - char name[128]; - struct sd_obj_req hdr; - struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr; - struct sd_vnode *v; - int i = 0, fd, ret, err = 0; - - for (i = 0; i < nr; i++) { - unsigned wlen = 0, rlen = 0; - - v = oid_to_vnode(vnodes, oid, i); - - addr_to_str(name, sizeof(name), v->addr, 0); - - fd = connect_to(name, v->port); - if (fd < 0) { - rsp->result = SD_RES_EIO; - return -1; - } - - memset(&hdr, 0, sizeof(hdr)); - hdr.epoch = node_version; - hdr.opcode = SD_OP_REMOVE_OBJ; - hdr.oid = oid; - - hdr.flags = 0; - hdr.data_length = rlen; - - ret = exec_req(fd, (struct sd_req *)&hdr, NULL, &wlen, &rlen); - close(fd); - - if (ret) - return -1; - - if (rsp->result != SD_RES_SUCCESS) - err = 1; - } - - if (err) - return -1; - - return 0; -} - static __thread int cached_fds[SD_MAX_NODES]; static __thread uint32_t cached_epoch = 0; diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h index afd5c1b..d5ddca4 100644 --- a/sheep/sheep_priv.h +++ b/sheep/sheep_priv.h @@ -250,11 +250,6 @@ int leave_cluster(void); void process_request_event_queues(void); void do_io_request(struct work *work); -int write_object_local(uint64_t oid, char *data, unsigned int datalen, - uint64_t offset, uint16_t flags, int copies, - uint32_t epoch, int create); -int read_object_local(uint64_t oid, char *data, unsigned int datalen, - uint64_t offset, int copies, uint32_t epoch); int forward_write_obj_req(struct request *req); int read_epoch(uint32_t *epoch, uint64_t *ctime, diff --git a/sheep/store.c b/sheep/store.c index 2ce8d50..321a3a6 100644 --- a/sheep/store.c +++ b/sheep/store.c @@ -278,7 +278,15 @@ out: return ret; } -static int do_local_io(struct request *req, uint32_t epoch); +static int do_local_io(struct request *req, uint32_t epoch) +{ + struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq; + + hdr->epoch = epoch; + dprintf("%x, %" PRIx64" , %u\n", hdr->opcode, hdr->oid, epoch); + + return do_process_work(req->op, &req->rq, &req->rp, req); +} static int forward_read_obj_req(struct request *req) { @@ -513,65 +521,6 @@ err_open: return -1; } -int write_object_local(uint64_t oid, char *data, unsigned int datalen, - uint64_t offset, uint16_t flags, int copies, - uint32_t epoch, int create) -{ - int ret; - struct request *req; - struct sd_obj_req *hdr; - - req = zalloc(sizeof(*req)); - if (!req) - return SD_RES_NO_MEM; - hdr = (struct sd_obj_req *)&req->rq; - - hdr->oid = oid; - if (create) - hdr->opcode = SD_OP_CREATE_AND_WRITE_OBJ; - else - hdr->opcode = SD_OP_WRITE_OBJ; - hdr->copies = copies; - hdr->flags = flags | SD_FLAG_CMD_WRITE; - hdr->offset = offset; - hdr->data_length = datalen; - req->data = data; - req->op = get_sd_op(hdr->opcode); - - ret = do_local_io(req, epoch); - - free(req); - - return ret; -} - -int read_object_local(uint64_t oid, char *data, unsigned int datalen, - uint64_t offset, int copies, uint32_t epoch) -{ - int ret; - struct request *req; - struct sd_obj_req *hdr; - - req = zalloc(sizeof(*req)); - if (!req) - return SD_RES_NO_MEM; - hdr = (struct sd_obj_req *)&req->rq; - - hdr->oid = oid; - hdr->opcode = SD_OP_READ_OBJ; - hdr->copies = copies; - hdr->flags = 0; - hdr->offset = offset; - hdr->data_length = datalen; - req->data = data; - req->op = get_sd_op(hdr->opcode); - - ret = do_local_io(req, epoch); - - free(req); - return ret; -} - int store_remove_obj(const struct sd_req *req, struct sd_rsp *rsp, void *data) { struct sd_obj_req *hdr = (struct sd_obj_req *)req; @@ -738,16 +687,6 @@ out: return ret; } -static int do_local_io(struct request *req, uint32_t epoch) -{ - struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq; - - hdr->epoch = epoch; - dprintf("%x, %" PRIx64" , %u\n", hdr->opcode, hdr->oid, epoch); - - return do_process_work(req->op, &req->rq, &req->rp, req); -} - static int fix_object_consistency(struct request *req) { int ret = SD_RES_NO_MEM; @@ -1379,6 +1318,236 @@ int read_epoch(uint32_t *epoch, uint64_t *ct, return SD_RES_SUCCESS; } +static int write_object_local(uint64_t oid, char *data, unsigned int datalen, + uint64_t offset, uint16_t flags, int copies, + uint32_t epoch, int create) +{ + int ret; + struct request *req; + struct sd_obj_req *hdr; + + req = zalloc(sizeof(*req)); + if (!req) + return SD_RES_NO_MEM; + hdr = (struct sd_obj_req *)&req->rq; + + hdr->oid = oid; + if (create) + hdr->opcode = SD_OP_CREATE_AND_WRITE_OBJ; + else + hdr->opcode = SD_OP_WRITE_OBJ; + hdr->copies = copies; + hdr->flags = flags | SD_FLAG_CMD_WRITE; + hdr->offset = offset; + hdr->data_length = datalen; + req->data = data; + req->op = get_sd_op(hdr->opcode); + + ret = do_local_io(req, epoch); + + free(req); + + return ret; +} +int write_object(struct vnode_info *vnodes, uint32_t node_version, + uint64_t oid, char *data, unsigned int datalen, + uint64_t offset, uint16_t flags, int nr_copies, int create) +{ + struct sd_obj_req hdr; + struct sd_vnode *v; + int i, fd, ret; + char name[128]; + + for (i = 0; i < nr_copies; i++) { + unsigned rlen = 0, wlen = datalen; + + v = oid_to_vnode(vnodes, oid, i); + if (vnode_is_local(v)) { + ret = write_object_local(oid, data, datalen, offset, + flags, nr_copies, node_version, + create); + + if (ret != 0) { + eprintf("fail %"PRIx64" %"PRIx32"\n", oid, ret); + return -1; + } + + continue; + } + + addr_to_str(name, sizeof(name), v->addr, 0); + + fd = connect_to(name, v->port); + if (fd < 0) { + eprintf("failed to connect to host %s\n", name); + return -1; + } + + memset(&hdr, 0, sizeof(hdr)); + hdr.epoch = node_version; + if (create) + hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ; + else + hdr.opcode = SD_OP_WRITE_OBJ; + + hdr.oid = oid; + hdr.copies = nr_copies; + + hdr.flags = flags; + hdr.flags |= SD_FLAG_CMD_WRITE | SD_FLAG_CMD_IO_LOCAL; + hdr.data_length = wlen; + hdr.offset = offset; + + ret = exec_req(fd, (struct sd_req *)&hdr, data, &wlen, &rlen); + close(fd); + if (ret) { + eprintf("failed to update host %s\n", name); + return -1; + } + } + + return 0; +} + +static int read_object_local(uint64_t oid, char *data, unsigned int datalen, + uint64_t offset, int copies, uint32_t epoch) +{ + int ret; + struct request *req; + struct sd_obj_req *hdr; + + req = zalloc(sizeof(*req)); + if (!req) + return SD_RES_NO_MEM; + hdr = (struct sd_obj_req *)&req->rq; + + hdr->oid = oid; + hdr->opcode = SD_OP_READ_OBJ; + hdr->copies = copies; + hdr->flags = 0; + hdr->offset = offset; + hdr->data_length = datalen; + req->data = data; + req->op = get_sd_op(hdr->opcode); + + ret = do_local_io(req, epoch); + + free(req); + return ret; +} +int read_object(struct vnode_info *vnodes, uint32_t node_version, + uint64_t oid, char *data, unsigned int datalen, + uint64_t offset, int nr_copies) +{ + struct sd_obj_req hdr; + struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr; + struct sd_vnode *v; + char name[128]; + int i = 0, fd, ret, last_error = SD_RES_SUCCESS; + + /* search a local object first */ + for (i = 0; i < nr_copies; i++) { + v = oid_to_vnode(vnodes, oid, i); + if (vnode_is_local(v)) { + ret = read_object_local(oid, data, datalen, offset, + nr_copies, node_version); + + if (ret != SD_RES_SUCCESS) { + eprintf("fail %"PRIx64" %"PRId32"\n", oid, ret); + return ret; + } + + return SD_RES_SUCCESS; + } + + } + + for (i = 0; i < nr_copies; i++) { + unsigned wlen = 0, rlen = datalen; + + v = oid_to_vnode(vnodes, oid, i); + + addr_to_str(name, sizeof(name), v->addr, 0); + + fd = connect_to(name, v->port); + if (fd < 0) { + printf("%s(%d): %s, %m\n", __func__, __LINE__, + name); + return SD_RES_EIO; + } + + memset(&hdr, 0, sizeof(hdr)); + hdr.epoch = node_version; + hdr.opcode = SD_OP_READ_OBJ; + hdr.oid = oid; + + hdr.flags = SD_FLAG_CMD_IO_LOCAL; + hdr.data_length = rlen; + hdr.offset = offset; + + ret = exec_req(fd, (struct sd_req *)&hdr, data, &wlen, &rlen); + close(fd); + + if (ret) { + last_error = SD_RES_EIO; + continue; + } + + if (rsp->result == SD_RES_SUCCESS) + return SD_RES_SUCCESS; + + last_error = rsp->result; + } + + return last_error; +} + +int remove_object(struct vnode_info *vnodes, uint32_t node_version, + uint64_t oid, int nr) +{ + char name[128]; + struct sd_obj_req hdr; + struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr; + struct sd_vnode *v; + int i = 0, fd, ret, err = 0; + + for (i = 0; i < nr; i++) { + unsigned wlen = 0, rlen = 0; + + v = oid_to_vnode(vnodes, oid, i); + + addr_to_str(name, sizeof(name), v->addr, 0); + + fd = connect_to(name, v->port); + if (fd < 0) { + rsp->result = SD_RES_EIO; + return -1; + } + + memset(&hdr, 0, sizeof(hdr)); + hdr.epoch = node_version; + hdr.opcode = SD_OP_REMOVE_OBJ; + hdr.oid = oid; + + hdr.flags = 0; + hdr.data_length = rlen; + + ret = exec_req(fd, (struct sd_req *)&hdr, NULL, &wlen, &rlen); + close(fd); + + if (ret) + return -1; + + if (rsp->result != SD_RES_SUCCESS) + err = 1; + } + + if (err) + return -1; + + return 0; +} + int set_cluster_copies(uint8_t copies) { int fd, ret; -- 1.7.8.2 |