[sheepdog] [PATCH v3] http: add http transport

Liu Yuan namei.unix at gmail.com
Wed Jul 3 11:34:39 CEST 2013


This implement basic http transport via fastcgi. All the http operation are
left empty and just return 'unplemented' notice back.

Later if we write specific http handling as for swift or S3 service, we need
just implement GET/PUT/POST/HEAD/DELETE handlers respectively.

Signed-off-by: Liu Yuan <namei.unix at gmail.com>
---
 v3
 - remove unnecessary blank line

 configure.ac       |   13 +++
 sheep/Makefile.am  |    3 +
 sheep/http.c       |  318 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 sheep/sheep.c      |   10 +-
 sheep/sheep_priv.h |   16 +++
 5 files changed, 359 insertions(+), 1 deletion(-)
 create mode 100644 sheep/http.c

diff --git a/configure.ac b/configure.ac
index 9af3af2..b609fac 100644
--- a/configure.ac
+++ b/configure.ac
@@ -218,6 +218,11 @@ AC_ARG_ENABLE([sheepfs],
 	[ enable_sheepfs=$HAVE_FUSE ],)
 AM_CONDITIONAL(BUILD_SHEEPFS, test x$enable_sheepfs = xyes)
 
+AC_ARG_ENABLE([http],
+	[ --enable-http : enable http request service (default no) ],,
+	[ enable_http="no" ],)
+AM_CONDITIONAL(BUILD_HTTP, test x$enable_http = xyes)
+
 CP=cp
 OS_LDL="-ldl"
 case "$host_os" in
@@ -297,6 +302,14 @@ if test "x${enable_sheepfs}" = xyes; then
 	PACKAGE_FEATURES="$PACKAGE_FEATURES sheepfs"
 fi
 
+if test "x${enable_http}" = xyes; then
+	AC_CHECK_HEADERS([fcgiapp.h],,
+		AC_MSG_ERROR(fcgiapp.h header not found))
+	AC_CHECK_LIB([fcgi], [FCGX_Accept],,
+		AC_MSG_ERROR(libfcgi not found))
+	AC_DEFINE_UNQUOTED(HAVE_HTTP, 1, [have http])
+	PACKAGE_FEATURES="$PACKAGE_FEATURES http"
+fi
 
 # extra warnings
 EXTRA_WARNINGS=""
diff --git a/sheep/Makefile.am b/sheep/Makefile.am
index 4916409..ebb8587 100644
--- a/sheep/Makefile.am
+++ b/sheep/Makefile.am
@@ -30,6 +30,9 @@ sheep_SOURCES		= sheep.c group.c request.c gateway.c store.c vdi.c \
 			  plain_store.c config.c migrate.c md.c \
 			  cluster/shepherd.c
 
+if BUILD_HTTP
+sheep_SOURCES		+= http.c
+endif
 if BUILD_COROSYNC
 sheep_SOURCES		+= cluster/corosync.c
 endif
diff --git a/sheep/http.c b/sheep/http.c
new file mode 100644
index 0000000..9f17214
--- /dev/null
+++ b/sheep/http.c
@@ -0,0 +1,318 @@
+/*
+ * Copyright (C) 2013 Taobao Inc.
+ *
+ * Liu Yuan <namei.unix at gmail.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+/* This files implement RESTful interface to sheepdog storage via fastcig */
+
+#include <string.h>
+#include <fcgiapp.h>
+#include <errno.h>
+#include <pthread.h>
+
+#include "util.h"
+#include "sheep_priv.h"
+
+struct http_request {
+	FCGX_Request fcgx;
+	int opcode;
+	char *data;
+	size_t data_length;
+};
+
+enum http_opcode {
+	HTTP_GET = 1,
+	HTTP_PUT,
+	HTTP_POST,
+	HTTP_DELETE,
+	HTTP_HEAD,
+};
+
+enum http_status {
+	OK = 1,                         /* 200 */
+	CREATED,                        /* 201 */
+	PARTIAL_CONTENT,                /* 206 */
+	BAD_REQUEST,                    /* 400 */
+	NOT_FOUND,                      /* 404 */
+	REQUEST_RANGE_NOT_SATISFIABLE,  /* 416 */
+	INTERNAL_SERVER_ERROR,          /* 500 */
+};
+
+static inline const char *strstatus(int status)
+{
+	static const char *const descs[] = {
+		[OK] = "200 OK",
+		[CREATED] = "201 CREATED",
+		[PARTIAL_CONTENT] = "206 Partial Content",
+		[BAD_REQUEST] = "400 Bad Request",
+		[NOT_FOUND] = "404 Not Found",
+		[REQUEST_RANGE_NOT_SATISFIABLE] =
+			"416 Requested Range Not Satisfiable",
+		[INTERNAL_SERVER_ERROR] = "500 Internal Server Error",
+	};
+
+	if (descs[status] == NULL) {
+		static __thread char msg[32];
+		snprintf(msg, sizeof(msg), "Invalid Status %d", status);
+		return msg;
+	}
+
+	return descs[status];
+}
+
+struct http_work {
+	struct work work;
+	struct http_request *request;
+};
+
+static inline int http_request_error(struct http_request *req)
+{
+	int ret = FCGX_GetError(req->fcgx.out);
+
+	if (ret == 0) {
+		return OK;
+	} else if (ret < 0) {
+		sd_eprintf("failed, FCGI error %d", ret);
+		return INTERNAL_SERVER_ERROR;
+	} else {
+		sd_eprintf("failed, %s", strerror(ret));
+		return INTERNAL_SERVER_ERROR;
+	}
+}
+
+static inline int http_request_write(struct http_request *req,
+				     const char *buf, int len)
+{
+	int ret = FCGX_PutStr(buf, len, req->fcgx.out);
+	if (ret < 0)
+		return http_request_error(req);
+	return OK;
+}
+
+static inline int http_request_read(struct http_request *req,
+				    char *buf, int len)
+{
+	int ret = FCGX_GetStr(buf, len, req->fcgx.in);
+	if (ret < 0)
+		return http_request_error(req);
+	return OK;
+}
+
+static inline int http_request_writes(struct http_request *req, const char *str)
+{
+	int ret = FCGX_PutS(str, req->fcgx.out);
+	if (ret < 0)
+		return http_request_error(req);
+	return OK;
+}
+
+__printf(2, 3)
+static int http_request_writef(struct http_request *req, const char *fmt, ...)
+{
+	va_list ap;
+	int ret;
+
+	va_start(ap, fmt);
+	ret = FCGX_VFPrintF(req->fcgx.out, fmt, ap);
+	va_end(ap);
+	if (ret < 0)
+		return http_request_error(req);
+	return OK;
+}
+
+static int request_init_operation(struct http_request *req)
+{
+	char **env = req->fcgx.envp;
+	char *p;
+
+	p = FCGX_GetParam("REQUEST_METHOD", env);
+	if (!strcmp(p, "PUT")) {
+		req->opcode = HTTP_PUT;
+		p = FCGX_GetParam("CONTENT_LENGTH", env);
+		req->data_length = strtoll(p, NULL, 10);
+		req->data = xmalloc(req->data_length);
+		http_request_read(req, req->data, req->data_length);
+	} else if (!strcmp(p, "GET")) {
+		req->opcode = HTTP_GET;
+	} else if (!strcmp(p, "POST")) {
+		req->opcode = HTTP_POST;
+	} else if (!strcmp(p, "DELETE")) {
+		req->opcode = HTTP_DELETE;
+	} else if (!strcmp(p, "HEAD")) {
+		req->opcode = HTTP_HEAD;
+	} else {
+		return BAD_REQUEST;
+	}
+	return OK;
+}
+
+static int http_init_request(struct http_request *req)
+{
+	char *p;
+	int ret;
+
+	for (int i = 0; (p = req->fcgx.envp[i]); ++i)
+		sd_dprintf("%s", p);
+
+	ret = request_init_operation(req);
+	if (ret != OK)
+		return ret;
+	return OK;
+}
+
+static void http_response_header(struct http_request *req, int status)
+{
+	http_request_writef(req, "Status: %s\n", strstatus(status));
+	http_request_writes(req, "Content-type: text/plain;\r\n\r\n");
+}
+
+static void http_handle_get(struct http_request *req)
+{
+	http_response_header(req, OK);
+	http_request_writes(req, "not implemented\n");
+}
+
+static void http_handle_put(struct http_request *req)
+{
+	http_response_header(req, OK);
+	http_request_writes(req, "not implemented\n");
+}
+
+static void http_handle_post(struct http_request *req)
+{
+	http_response_header(req, OK);
+	http_request_writes(req, "not implemented\n");
+}
+
+static void http_handle_delete(struct http_request *req)
+{
+	http_response_header(req, OK);
+	http_request_writes(req, "not implemented\n");
+}
+
+static void http_handle_head(struct http_request *req)
+{
+	http_response_header(req, OK);
+	http_request_writes(req, "not implemented\n");
+}
+
+static void (*const http_request_handlers[])(struct http_request *req) = {
+	[HTTP_GET] = http_handle_get,
+	[HTTP_PUT] = http_handle_put,
+	[HTTP_POST] = http_handle_post,
+	[HTTP_DELETE] = http_handle_delete,
+	[HTTP_HEAD] = http_handle_head,
+};
+
+static const int http_max_request_handlers = ARRAY_SIZE(http_request_handlers);
+
+static void http_end_request(struct http_request *req)
+{
+	FCGX_Finish_r(&req->fcgx);
+	free(req->data);
+	free(req);
+}
+
+static void http_run_request(struct work *work)
+{
+	struct http_work *hw = container_of(work, struct http_work, work);
+	struct http_request *req = hw->request;
+	int op = req->opcode;
+
+	if (op < http_max_request_handlers && http_request_handlers[op])
+		http_request_handlers[op](req);
+	else
+		panic("unhandled opcode %d", op);
+	http_end_request(req);
+}
+
+static void http_request_done(struct work *work)
+{
+	struct http_work *hw = container_of(work, struct http_work, work);
+	free(hw);
+}
+
+static void http_queue_request(struct http_request *req)
+{
+	struct http_work *hw = xmalloc(sizeof(*hw));
+
+	hw->work.fn = http_run_request;
+	hw->work.done = http_request_done;
+	hw->request = req;
+	queue_work(sys->http_wqueue, &hw->work);
+}
+
+static inline struct http_request *http_new_request(int sockfd)
+{
+	struct http_request *req = xzalloc(sizeof(*req));
+
+	FCGX_InitRequest(&req->fcgx, sockfd, 0);
+	return req;
+}
+
+static int http_sockfd;
+
+static void *http_main_loop(void *ignored)
+{
+	int err;
+
+	for (;;) {
+		struct http_request *req = http_new_request(http_sockfd);
+		int ret;
+
+		ret = FCGX_Accept_r(&req->fcgx);
+		if (ret < 0) {
+			sd_eprintf("accept failed, %d, %d", http_sockfd, ret);
+			goto out;
+		}
+		ret = http_init_request(req);
+		if (ret != OK) {
+			http_response_header(req, ret);
+			http_end_request(req);
+			continue;
+		}
+		http_queue_request(req);
+	}
+out:
+	err = pthread_detach(pthread_self());
+	if (err)
+		sd_eprintf("%s", strerror(err));
+	pthread_exit(NULL);
+}
+
+int http_init(const char *address)
+{
+	pthread_t t;
+	int err;
+
+	if (!address)
+		return 0;
+
+	sys->http_wqueue = create_work_queue("http", WQ_DYNAMIC);
+	if (!sys->http_wqueue)
+		return -1;
+
+	FCGX_Init();
+
+#define LISTEN_QUEUE_DEPTH 1024 /* No rationale */
+	http_sockfd = FCGX_OpenSocket(address, LISTEN_QUEUE_DEPTH);
+	if (http_sockfd < 0) {
+		sd_eprintf("open socket failed, address %s", address);
+		return -1;
+	}
+	sd_iprintf("http service listen at %s", address);
+	err = pthread_create(&t, NULL, http_main_loop, NULL);
+	if (err) {
+		sd_eprintf("%s", strerror(err));
+		return -1;
+	}
+	return 0;
+}
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 84bd269..8b2d522 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -57,6 +57,7 @@ static struct sd_option sheep_options[] = {
 	{'o', "stdout", false, "log to stdout instead of shared logger"},
 	{'p', "port", true, "specify the TCP port on which to listen"},
 	{'P', "pidfile", true, "create a pid file"},
+	{'r', "http", true, "enable http service"},
 	{'u', "upgrade", false, "upgrade to the latest data layout"},
 	{'v', "version", false, "show the version"},
 	{'w', "enable-cache", true, "enable object cache"},
@@ -517,7 +518,7 @@ int main(int argc, char **argv)
 	int64_t zone = -1;
 	struct cluster_driver *cdrv;
 	struct option *long_options;
-	const char *log_format = "default";
+	const char *log_format = "default", *http_address = NULL;
 	static struct logger_user_info sheep_info;
 
 	install_crash_handler(crash_handler);
@@ -540,6 +541,9 @@ int main(int argc, char **argv)
 		case 'P':
 			pid_file = optarg;
 			break;
+		case 'r':
+			http_address = optarg;
+			break;
 		case 'f':
 			is_daemon = false;
 			break;
@@ -774,6 +778,10 @@ int main(int argc, char **argv)
 	if (ret)
 		exit(1);
 
+	ret = http_init(http_address);
+	if (ret)
+		exit(1);
+
 	if (pid_file && (create_pidfile(pid_file) != 0)) {
 		fprintf(stderr, "failed to pid file '%s' - %m\n", pid_file);
 		exit(1);
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 20b2554..8f05c5d 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -25,6 +25,7 @@
 #include "cluster.h"
 #include "rbtree.h"
 #include "strbuf.h"
+#include "config.h"
 
 struct client_info {
 	struct connection conn;
@@ -100,6 +101,9 @@ struct cluster_info {
 	struct work_queue *oc_reclaim_wqueue;
 	struct work_queue *oc_push_wqueue;
 	struct work_queue *md_wqueue;
+#ifdef HAVE_HTTP
+	struct work_queue *http_wqueue;
+#endif
 
 	bool enable_object_cache;
 
@@ -431,4 +435,16 @@ uint64_t md_get_size(uint64_t *used);
 void kick_node_recover(void);
 void update_node_size(struct sd_node *node);
 
+/* http.c */
+#ifdef HAVE_HTTP
+int http_init(const char *address);
+#else
+static inline int http_init(const char *address)
+{
+	if (address)
+		sd_iprintf("http service is not complied");
+	return 0;
+}
+#endif /* END BUILD_HTTP */
+
 #endif
-- 
1.7.9.5




More information about the sheepdog mailing list