[sheepdog] [PATCH 6/8] shared lib: add the low level request handling framework

Liu Yuan namei.unix at gmail.com
Fri Apr 3 05:20:52 CEST 2015


From: Liu Yuan <liuyuan at cmss.chinamobile.com>

The core idea is the same as sbd's framework.

         User request
             |
             | aio control block (struct aiocb)
             V
        +----+----+
        |    |    |
       r1   r2    r3
        |    |    |
        V    V    V
       obj  obj  obj

User request is spit into several sheep requests which are performed on the
individual sheepdog object. All the sheep requests are queued and sent in a
async mode concurently. When all the sheep requests are done, aiocb will try
to user of the completion of the user request.

For future possible other administritive opcodes, we provide a sync function,
which can run the opcode and get the response in a sync mode.

Signed-off-by: Liu Yuan <liuyuan at cmss.chinamobile.com>
---
 lib/shared/sheep.c    | 641 ++++++++++++++++++++++++++++++++++++++++++++++++++
 lib/shared/sheepdog.h |  68 ++++++
 2 files changed, 709 insertions(+)
 create mode 100644 lib/shared/sheep.c
 create mode 100644 lib/shared/sheepdog.h

diff --git a/lib/shared/sheep.c b/lib/shared/sheep.c
new file mode 100644
index 0000000..3afe6f5
--- /dev/null
+++ b/lib/shared/sheep.c
@@ -0,0 +1,641 @@
+/*
+ * Copyright (C) 2015 China Mobile Inc.
+ *
+ * Liu Yuan <liuyuan at cmss.chinamobile.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "sheepdog.h"
+
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/tcp.h>
+#include <pthread.h>
+
+struct sheep_aiocb {
+	struct sd_request *request;
+	off_t offset;
+	size_t length;
+	int ret;
+	uint32_t nr_requests;
+	char *buf;
+	int buf_iter;
+	void (*aio_done_func)(struct sheep_aiocb *);
+};
+
+enum sheep_request_type {
+	VDI_READ,
+	VDI_WRITE,
+	VDI_CREATE,
+};
+
+struct sheep_request {
+	struct list_node list;
+	struct sheep_aiocb *aiocb;
+	uint64_t oid;
+	uint64_t cow_oid;
+	uint32_t seq_num;
+	int type;
+	uint32_t offset;
+	uint32_t length;
+	char *buf;
+};
+
+static int sheep_submit_sdreq(struct sd_cluster *c, struct sd_req *hdr,
+			      void *data, uint32_t wlen)
+{
+	int ret;
+
+	sd_mutex_lock(&c->submit_mutex);
+	ret = xwrite(c->sockfd, hdr, sizeof(*hdr));
+	if (ret < 0)
+		goto out;
+
+	if (wlen)
+		ret = xwrite(c->sockfd, data, wlen);
+out:
+	sd_mutex_unlock(&c->submit_mutex);
+	return ret;
+}
+
+/* Run the request synchronously */
+int sd_run_sdreq(struct sd_cluster *c, struct sd_req *hdr, void *data)
+{
+	struct sd_rsp *rsp = (struct sd_rsp *)hdr;
+	unsigned int wlen, rlen;
+	int ret;
+
+	if (hdr->flags & SD_FLAG_CMD_WRITE) {
+		wlen = hdr->data_length;
+		rlen = 0;
+	} else {
+		wlen = 0;
+		rlen = hdr->data_length;
+	}
+
+	ret = sheep_submit_sdreq(c, hdr, data, wlen);
+	if (ret < 0)
+		return ret;
+
+	ret = xread(c->sockfd, rsp, sizeof(*rsp));
+	if (ret < 0)
+		return ret;
+
+	if (rlen > rsp->data_length)
+		rlen = rsp->data_length;
+
+	if (rlen) {
+		ret = xread(c->sockfd, data, rlen);
+		if (ret < 0)
+			return ret;
+	}
+
+	switch (rsp->result) {
+	case SD_RES_SUCCESS:
+		break;
+	case SD_RES_NO_OBJ:
+	case SD_RES_NO_VDI:
+		return -ENOENT;
+	default:
+		return -EIO;
+	}
+
+	return 0;
+}
+
+static void aio_end_request(struct sd_request *req, int ret)
+{
+
+}
+
+static void aio_rw_done(struct sheep_aiocb *aiocb)
+{
+	aio_end_request(aiocb->request, aiocb->ret);
+	free(aiocb);
+}
+
+static struct sheep_aiocb *sheep_aiocb_setup(struct sd_request *req)
+{
+	struct sheep_aiocb *aiocb = xmalloc(sizeof(*aiocb));
+
+	aiocb->offset = req->offset;
+	aiocb->length = req->length;
+	aiocb->ret = 0;
+	aiocb->buf_iter = 0;
+	aiocb->request = req;
+	aiocb->buf = req->data;
+	aiocb->aio_done_func = aio_rw_done;
+	uatomic_set(&aiocb->nr_requests, 0);
+
+	return aiocb;
+}
+
+static struct sheep_request *alloc_sheep_request(struct sheep_aiocb *aiocb,
+						 uint64_t oid, uint64_t cow_oid,
+						 int len, int offset)
+{
+	struct sheep_request *req = xzalloc(sizeof(*req));
+	struct sd_cluster *c = aiocb->request->vdi->cluster;
+
+	req->offset = offset;
+	req->length = len;
+	req->oid = oid;
+	req->cow_oid = cow_oid;
+	req->aiocb = aiocb;
+	req->buf = aiocb->buf + aiocb->buf_iter;
+	req->seq_num = uatomic_add_return(&c->seq_num, 1);
+	INIT_LIST_NODE(&req->list);
+	if (aiocb->request->write)
+		req->type = VDI_WRITE;
+	else
+		req->type = VDI_READ;
+
+	aiocb->buf_iter += len;
+	uatomic_inc(&aiocb->nr_requests);
+
+	return req;
+}
+
+static void end_sheep_request(struct sheep_request *req)
+{
+	struct sheep_aiocb *aiocb = req->aiocb;
+
+	if (uatomic_sub_return(&aiocb->nr_requests, 1) <= 0)
+		aiocb->aio_done_func(aiocb);
+	free(req);
+}
+
+/* FIXME: handle submit failure */
+static int submit_sheep_request(struct sheep_request *req)
+{
+	struct sd_req hdr = {};
+	struct sd_cluster *c = req->aiocb->request->vdi->cluster;
+	int ret = 0;
+
+	hdr.id = req->seq_num;
+	hdr.data_length = req->length;
+	hdr.obj.oid = req->oid;
+	hdr.obj.cow_oid = req->cow_oid;
+	hdr.obj.offset = req->offset;
+
+	sd_write_lock(&c->inflight_lock);
+	list_add_tail(&req->list, &c->inflight_list);
+	sd_rw_unlock(&c->inflight_lock);
+
+	switch (req->type) {
+	case VDI_CREATE:
+	case VDI_WRITE:
+		if (req->type == VDI_CREATE)
+			hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
+		else
+			hdr.opcode = SD_OP_WRITE_OBJ;
+		hdr.flags = SD_FLAG_CMD_WRITE | SD_FLAG_CMD_DIRECT;
+		if (req->cow_oid)
+			hdr.flags |= SD_FLAG_CMD_COW;
+		ret = sheep_submit_sdreq(c, &hdr, req->buf, req->length);
+		if (ret < 0)
+			goto err;
+		break;
+	case VDI_READ:
+		hdr.opcode = SD_OP_READ_OBJ;
+		ret = sheep_submit_sdreq(c, &hdr, NULL, 0);
+		if (ret < 0)
+			goto err;
+		break;
+	}
+err:
+	eventfd_xwrite(c->reply_fd, 1);
+	return ret;
+}
+
+static uint32_t sheep_inode_get_vid(struct sd_request *req, uint32_t idx)
+{
+	uint32_t vid;
+
+	sd_read_lock(&req->vdi->lock);
+	vid = req->vdi->inode->data_vdi_id[idx];
+	sd_rw_unlock(&req->vdi->lock);
+
+	return vid;
+}
+
+
+static struct sheep_request *find_inflight_request_oid(struct sd_cluster *c,
+						       uint64_t oid)
+{
+	struct sheep_request *req;
+
+	sd_read_lock(&c->inflight_lock);
+	list_for_each_entry(req, &c->inflight_list, list) {
+		if (req->oid == oid) {
+			sd_rw_unlock(&c->inflight_lock);
+			return req;
+		}
+	}
+	sd_rw_unlock(&c->inflight_lock);
+	return NULL;
+}
+
+static int sheep_aiocb_submit(struct sheep_aiocb *aiocb)
+{
+	struct sd_request *request = aiocb->request;
+	uint64_t offset = aiocb->offset;
+	uint64_t total = aiocb->length;
+	int start = offset % SD_DATA_OBJ_SIZE;
+	uint32_t idx = offset / SD_DATA_OBJ_SIZE;
+	int len = SD_DATA_OBJ_SIZE - start;
+	struct sd_cluster *c = request->vdi->cluster;
+
+	if (total < len)
+		len = total;
+
+	/*
+	 * Make sure we don't free the aiocb before we are done with all
+	 * requests.This additional reference is dropped at the end of this
+	 * function.
+	 */
+	uatomic_inc(&aiocb->nr_requests);
+
+	do {
+		struct sheep_request *req;
+		uint64_t oid = vid_to_data_oid(request->vdi->vid, idx),
+			 cow_oid = 0;
+		uint32_t vid = sheep_inode_get_vid(request, idx);
+
+		/*
+		 * For read, either read cow object or end the request.
+		 * For write, copy-on-write cow object
+		 */
+		if (vid && vid != request->vdi->vid) {
+			if (request->write)
+				cow_oid = vid_to_data_oid(vid, idx);
+			else
+				oid = vid_to_data_oid(vid, idx);
+		}
+
+		req = alloc_sheep_request(aiocb, oid, cow_oid, len, start);
+		if (IS_ERR(req))
+			return PTR_ERR(req);
+
+		if (vid && !cow_oid)
+			goto submit;
+
+		switch (req->type) {
+		case VDI_WRITE:
+			/*
+			 * Sheepdog can't handle concurrent creation on the same
+			 * object. We send one create req first and then send
+			 * write reqs in next.
+			 */
+			if (find_inflight_request_oid(c, oid)) {
+				uint32_t tmp_vid;
+
+				sd_write_lock(&c->blocking_lock);
+				/*
+				 * There are slim chance object was created
+				 * before we grab blocking_lock
+				 */
+				tmp_vid = sheep_inode_get_vid(request, idx);
+				if (tmp_vid && tmp_vid == request->vdi->vid) {
+					sd_rw_unlock(&c->blocking_lock);
+					goto submit;
+				}
+				list_add_tail(&req->list, &c->blocking_list);
+				sd_rw_unlock(&c->blocking_lock);
+				goto done;
+			}
+			req->type = VDI_CREATE;
+			break;
+		case VDI_READ:
+			end_sheep_request(req);
+			goto done;
+		}
+submit:
+		submit_sheep_request(req);
+done:
+		idx++;
+		total -= len;
+		start = (start + len) % SD_DATA_OBJ_SIZE;
+		len = total > SD_DATA_OBJ_SIZE ? SD_DATA_OBJ_SIZE : total;
+	} while (total > 0);
+
+	if (uatomic_sub_return(&aiocb->nr_requests, 1) <= 0)
+		aiocb->aio_done_func(aiocb);
+
+	return 0;
+}
+
+static int submit_request(struct sd_request *req)
+{
+	struct sheep_aiocb *aiocb = sheep_aiocb_setup(req);
+
+	if (IS_ERR(aiocb))
+		return PTR_ERR(aiocb);
+
+	return sheep_aiocb_submit(aiocb);
+}
+
+static void *request_handler(void *data)
+{
+	struct sd_request *req;
+	struct sd_cluster *c = data;
+
+	while (!uatomic_is_true(&c->stop_request_handler) ||
+	       !list_empty(&c->request_list)) {
+
+		eventfd_xread(c->request_fd);
+		sd_write_lock(&c->request_lock);
+		if (list_empty(&c->request_list)) {
+			sd_rw_unlock(&c->request_lock);
+			continue;
+		}
+		req = list_first_entry(&c->request_list, struct sd_request,
+				       list);
+		list_del(&req->list);
+		sd_rw_unlock(&c->request_lock);
+		submit_request(req);
+	}
+	pthread_detach(pthread_self());
+	pthread_exit(NULL);
+}
+
+static struct sheep_request *fetch_first_inflight_request(struct sd_cluster *c)
+{
+	struct sheep_request *req;
+
+	sd_write_lock(&c->inflight_lock);
+	if (!list_empty(&c->inflight_list)) {
+		req = list_first_entry(&c->inflight_list, struct sheep_request,
+				       list);
+		list_del(&req->list);
+	} else {
+		req = NULL;
+	}
+	sd_rw_unlock(&c->inflight_lock);
+	return req;
+}
+
+static struct sheep_request *fetch_inflight_request(struct sd_cluster *c,
+						    uint32_t seq_num)
+{
+	struct sheep_request *req;
+
+	sd_write_lock(&c->inflight_lock);
+	list_for_each_entry(req, &c->inflight_list, list) {
+		if (req->seq_num == seq_num) {
+			list_del(&req->list);
+			goto out;
+		}
+	}
+	req = NULL;
+out:
+	sd_rw_unlock(&c->inflight_lock);
+	return req;
+}
+
+static void submit_blocking_sheep_request(struct sd_cluster *c, uint64_t oid)
+{
+	struct sheep_request *req;
+
+	sd_write_lock(&c->blocking_lock);
+	list_for_each_entry(req, &c->blocking_list, list) {
+		if (req->oid != oid)
+			continue;
+		list_del(&req->list);
+		submit_sheep_request(req);
+	}
+	sd_rw_unlock(&c->blocking_lock);
+}
+
+/* FIXME: add auto-reconnect support */
+static int sheep_handle_reply(struct sd_cluster *c)
+{
+	struct sd_rsp rsp = {};
+	struct sheep_request *req, *new;
+	struct sd_vdi *vdi;
+	uint32_t vid, idx;
+	uint64_t oid;
+	int ret;
+
+	ret = xread(c->sockfd, (char *)&rsp, sizeof(rsp));
+	if (ret < 0) {
+		req = fetch_first_inflight_request(c);
+		if (req != NULL) {
+			req->aiocb->ret = EIO;
+			goto end_request;
+		}
+		goto err;
+	}
+
+	req = fetch_inflight_request(c, rsp.id);
+	if (!req)
+		return 0;
+	if (rsp.data_length > 0) {
+		ret = xread(c->sockfd, req->buf, req->length);
+		if (ret < 0) {
+			req->aiocb->ret = EIO;
+			goto end_request;
+		}
+	}
+
+	vdi = req->aiocb->request->vdi;
+	switch (req->type) {
+	case VDI_CREATE:
+		/* We need to update inode for create */
+		new = xmalloc(sizeof(*new));
+		vid = vdi->vid;
+		oid = vid_to_vdi_oid(vid);
+		idx = data_oid_to_idx(req->oid);
+		new->offset = SD_INODE_HEADER_SIZE + sizeof(vid) * idx;
+		new->length = sizeof(vid);
+		new->oid = oid;
+		new->cow_oid = 0;
+		new->aiocb = req->aiocb;
+		new->buf = (char *)&vid;
+		new->seq_num = uatomic_add_return(&c->seq_num, 1);
+		new->type = VDI_WRITE;
+		uatomic_inc(&req->aiocb->nr_requests);
+		INIT_LIST_NODE(&new->list);
+
+		/* Make sure no request is queued while we update inode */
+		sd_write_lock(&vdi->lock);
+		vdi->inode->data_vdi_id[idx] = vid;
+		sd_rw_unlock(&vdi->lock);
+
+		submit_sheep_request(new);
+		submit_blocking_sheep_request(c, req->oid);
+		/* fall thru */
+	case VDI_WRITE:
+	case VDI_READ:
+		break;
+	}
+end_request:
+	end_sheep_request(req);
+err:
+	return ret;
+}
+
+static void *reply_handler(void *data)
+{
+	struct sd_cluster *c = data;
+
+	while (!uatomic_is_true(&c->stop_request_handler) ||
+	       !list_empty(&c->inflight_list)) {
+		bool empty;
+
+		eventfd_xread(c->reply_fd);
+		sd_read_lock(&c->inflight_lock);
+		empty = list_empty(&c->inflight_list);
+		sd_rw_unlock(&c->inflight_lock);
+
+		if (empty)
+			continue;
+
+		sheep_handle_reply(c);
+
+	}
+	pthread_detach(pthread_self());
+	pthread_exit(NULL);
+}
+
+static int init_cluster_handlers(struct sd_cluster *c)
+{
+	pthread_t thread;
+	int ret;
+
+	c->request_fd = eventfd(0, 0);
+	if (c->request_fd < 0)
+		return -errno;
+
+	c->reply_fd = eventfd(0, 0);
+	if (c->reply_fd < 0) {
+		close(c->request_fd);
+		return -errno;
+	}
+
+	ret = pthread_create(&thread, NULL, request_handler, c);
+	if (ret < 0) {
+		close(c->request_fd);
+		close(c->reply_fd);
+		return ret;
+	}
+	c->request_thread = thread;
+	ret = pthread_create(&thread, NULL, reply_handler, c);
+	if (ret < 0) {
+		close(c->reply_fd);
+		uatomic_set_true(&c->stop_request_handler);
+		eventfd_xwrite(c->request_fd, 1);
+		pthread_join(c->request_thread, NULL);
+		return ret;
+	}
+	c->reply_thread = thread;
+
+	return 0;
+}
+
+struct sd_cluster *sd_connect(char *host)
+{
+	char *ip, *pt, *h = xstrdup(host);
+	unsigned port;
+	struct sockaddr_in addr;
+	struct linger linger_opt = {1, 0};
+	int fd, ret, value = 1;
+	struct sd_cluster *c;
+
+	ip = strtok(h, ":");
+	if (!ip) {
+		errno = EINVAL;
+		goto err;
+	}
+
+	pt = strtok(NULL, ":");
+	if (!pt) {
+		errno = EINVAL;
+		goto err;
+	}
+
+	if (sscanf(pt, "%u", &port) != 1) {
+		errno = EINVAL;
+		goto err;
+	}
+
+	fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+	if (fd < 0)
+		goto err;
+
+	ret = setsockopt(fd, SOL_SOCKET, SO_LINGER, &linger_opt,
+			 sizeof(linger_opt));
+	if (ret < 0)
+		goto err_close;
+
+	ret = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value));
+	if (ret < 0)
+		goto err_close;
+
+	addr.sin_family = AF_INET;
+	addr.sin_port = htons(port);
+	ret = inet_pton(AF_INET, ip, &addr.sin_addr);
+	switch (ret) {
+	case 1:
+		break;
+	default:
+		errno = EINVAL;
+		goto err_close;
+	}
+
+	ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
+	if (ret < 0)
+		goto err_close;
+
+	c = xzalloc(sizeof(*c));
+	c->sockfd = fd;
+	c->port = port;
+	memcpy(c->addr, &addr.sin_addr, INET_ADDRSTRLEN);
+	ret = init_cluster_handlers(c);
+	if (ret < 0) {
+		free(c);
+		errno = -ret;
+		goto err_close;
+	};
+	INIT_LIST_HEAD(&c->request_list);
+	INIT_LIST_HEAD(&c->inflight_list);
+	INIT_LIST_HEAD(&c->blocking_list);
+	sd_init_rw_lock(&c->request_lock);
+	sd_init_rw_lock(&c->inflight_lock);
+	sd_init_rw_lock(&c->blocking_lock);
+	sd_init_mutex(&c->submit_mutex);
+
+	free(h);
+	return c;
+err_close:
+	close(fd);
+err:
+	free(h);
+	return NULL;
+}
+
+int sd_disconnect(struct sd_cluster *c)
+{
+	uatomic_set_true(&c->stop_request_handler);
+	uatomic_set_true(&c->stop_reply_handler);
+	eventfd_xwrite(c->request_fd, 1);
+	eventfd_xwrite(c->reply_fd, 1);
+	pthread_join(c->request_thread, NULL);
+	pthread_join(c->reply_thread, NULL);
+	sd_destroy_rw_lock(&c->request_lock);
+	sd_destroy_rw_lock(&c->inflight_lock);
+	sd_destroy_rw_lock(&c->blocking_lock);
+	sd_destroy_mutex(&c->submit_mutex);
+	close(c->request_fd);
+	close(c->reply_fd);
+	close(c->sockfd);
+	free(c);
+	return 0;
+}
diff --git a/lib/shared/sheepdog.h b/lib/shared/sheepdog.h
new file mode 100644
index 0000000..c2c072c
--- /dev/null
+++ b/lib/shared/sheepdog.h
@@ -0,0 +1,68 @@
+/*
+ * Copyright (C) 2015 China Mobile Inc.
+ *
+ * Liu Yuan <liuyuan at cmss.chinamobile.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef SHEEPDOG_H_
+#define SHEEPDOG_H_
+
+#ifndef NO_SHEEPDOG_LOGGER
+# define NO_SHEEPDOG_LOGGER
+#endif
+
+#include "sheepdog_proto.h"
+#include "list.h"
+#include "util.h"
+
+#include <arpa/inet.h>
+
+struct sd_cluster {
+	int sockfd;
+	uint8_t addr[INET_ADDRSTRLEN];
+	unsigned int port;
+	uint32_t seq_num;
+	pthread_t request_thread;
+	pthread_t reply_thread;
+	int request_fd, reply_fd;
+	struct list_head request_list;
+	struct list_head inflight_list;
+	struct list_head blocking_list;
+	uatomic_bool stop_request_handler;
+	uatomic_bool stop_reply_handler;
+	struct sd_rw_lock request_lock;
+	struct sd_rw_lock inflight_lock;
+	struct sd_rw_lock blocking_lock;
+	struct sd_mutex submit_mutex;
+};
+
+struct sd_request {
+	struct list_node list;
+	struct sd_vdi *vdi;
+	void *data;
+	size_t length;
+	off_t offset;
+	bool write;
+};
+
+struct sd_vdi {
+	struct sd_cluster *cluster;
+	struct sd_inode *inode;
+	uint32_t vid;
+	struct sd_rw_lock lock;
+};
+
+int sd_init(void);
+void sd_free(void);
+struct sd_cluster *sd_connect(char *host);
+int sd_disconnect(struct sd_cluster *sd);
+int sd_run_sdreq(struct sd_cluster *c, struct sd_req *hdr, void *data);
+
+#endif
-- 
1.9.1




More information about the sheepdog mailing list