[Sheepdog] [PATCH] accord: added apps directory and queue code

OZAWA Tsuyoshi ozawa.tsuyoshi at gmail.com
Thu Nov 10 18:11:05 CET 2011


Accord focuses on write-intensive workloads.
One of the applications is the distributed and fully-replicated
queue.

This patch provide the simple queue implementation with accord.

Signed-off-by: OZAWA Tsuyoshi <ozawa.tsuyoshi at lab.ntt.co.jp>
---
 .gitignore           |    1 +
 Makefile             |    6 +
 apps/Makefile        |   12 ++
 apps/queue/Makefile  |   29 ++++
 apps/queue/README.md |   36 +++++
 apps/queue/adler32.c |   47 +++++++
 apps/queue/queue.c   |  373 ++++++++++++++++++++++++++++++++++++++++++++++++++
 7 files changed, 504 insertions(+), 0 deletions(-)
 create mode 100644 apps/Makefile
 create mode 100644 apps/queue/Makefile
 create mode 100644 apps/queue/README.md
 create mode 100644 apps/queue/adler32.c
 create mode 100644 apps/queue/queue.c

diff --git a/.gitignore b/.gitignore
index 9be066b..ce92825 100644
--- a/.gitignore
+++ b/.gitignore
@@ -23,3 +23,4 @@ test/watch_files
 valgrind.log
 test/test-report.*ml
 libacrd/libacrd.pc
+apps/queue/qbench
diff --git a/Makefile b/Makefile
index a027d6e..79c6796 100644
--- a/Makefile
+++ b/Makefile
@@ -9,12 +9,14 @@ all:
 	$(MAKE) -C libacrd
 	$(MAKE) -C conductor
 	$(MAKE) -C test
+	$(MAKE) -C apps
 
 .PHONY:clean
 clean:
 	$(MAKE) -C libacrd clean
 	$(MAKE) -C conductor clean
 	$(MAKE) -C test clean
+	$(MAKE) -C apps clean
 	$(MAKE) -C lib clean
 
 .PHONY:install
@@ -23,6 +25,10 @@ install:
 	$(MAKE) -C conductor install
 	$(MAKE) -C include install
 
+.PHONY:apps
+apps:
+	$(MAKE) -C apps
+
 .PHONY:test
 test:
 	$(MAKE) -C libacrd
diff --git a/apps/Makefile b/apps/Makefile
new file mode 100644
index 0000000..c7f3f91
--- /dev/null
+++ b/apps/Makefile
@@ -0,0 +1,12 @@
+.PHONY:all
+all:
+	$(MAKE) -C queue
+
+.PHONY:clean
+clean:
+	$(MAKE) -C queue clean
+
+.PHONY:queue
+queue:
+	$(MAKE) -C queue
+
diff --git a/apps/queue/Makefile b/apps/queue/Makefile
new file mode 100644
index 0000000..6f750a5
--- /dev/null
+++ b/apps/queue/Makefile
@@ -0,0 +1,29 @@
+sbindir ?= $(PREFIX)/sbin
+
+CFLAGS += -g -O3 -Wall -Wstrict-prototypes -I../../include
+CFLAGS += -D_GNU_SOURCE -DNDEBUG
+LIBS += -lpthread -lacrd
+
+PROGRAMS = qbench
+QUEUE_OBJS = queue.o
+QUEUE_DEP = $(ACCORD_OBJS:.o=.d)
+
+.PHONY:all
+all: $(PROGRAMS)
+
+qbench: $(QUEUE_OBJS)
+	$(CC) $^ -o $@ $(LIBS)
+
+-include $(ACCORD_DEP)
+
+%.o: %.c
+	$(CC) -c $(CFLAGS) $*.c -o $*.o
+	@$(CC) -MM $(CFLAGS) -MF $*.d -MT $*.o $*.c
+
+.PHONY:clean
+clean:
+	rm -f *.[od] $(PROGRAMS)
+
+# support for GNU Flymake
+check-syntax:
+	$(CC) $(CFLAGS) -fsyntax-only $(CHK_SOURCES)
diff --git a/apps/queue/README.md b/apps/queue/README.md
new file mode 100644
index 0000000..74e195d
--- /dev/null
+++ b/apps/queue/README.md
@@ -0,0 +1,36 @@
+# How to running queue
+
+1. Please install accord.
+2. run as follows
+
+```
+	$ make apps
+	$ ./apps/qbench localhost 9090 200 10000
+	2000000 requests in 298.100236 sec. (6709.15 throughput)
+```
+
+## Bencmark result
+
+* The benchmark environment
+ * Xeon 2.1 GHz 4 Core
+ * 7200 rpm HDD
+ * Run Accord as disk persistency mode with single node
+
+the result of the benchmarks(50% push and 50% pop) is as follows:
+
+```
+	2000000 requests in 298.100236 sec. (6709.15 throughput)
+```
+
+
+Additionally, the other benchmarks(100% push) achieve better result as follows:
+
+```
+	2000000 requests in 167.953943 sec. (11908.03 throughput)
+```
+
+To benchmark only push(), please comment out queue_pop() function in the run() function.
+
+## TODO
+
+* run benchmark with 1KB message
diff --git a/apps/queue/adler32.c b/apps/queue/adler32.c
new file mode 100644
index 0000000..4340e7d
--- /dev/null
+++ b/apps/queue/adler32.c
@@ -0,0 +1,47 @@
+/*
+ * This is a modified version based on adler32.c from gst-ffmpeg based on
+ * adler32.c from the zlib library.
+ *
+ * Copyright (C) 1995 Mark Adler
+ *
+ * This software is provided 'as-is', without any express or implied
+ * warranty.  In no event will the authors be held liable for any damages
+ * arising from the use of this software.
+ *
+ * Permission is granted to anyone to use this software for any purpose,
+ * including commercial applications, and to alter it and redistribute it
+ * freely, subject to the following restrictions:
+ *
+ * 1. The origin of this software must not be misrepresented; you must not
+ *    claim that you wrote the original software. If you use this software
+ *    in a product, an acknowledgment in the product documentation would be
+ *    appreciated but is not required.
+ * 2. Altered source versions must be plainly marked as such, and must not be
+ *    misrepresented as being the original software.
+ * 3. This notice may not be removed or altered from any source distribution.
+ *
+ */
+#include <stdint.h>
+
+#define ADLER32_BASE 65521L /* largest prime smaller than 65536 */
+
+#define ADLER32_DO1(buf)  {s1 += *buf++; s2 += s1;}
+#define ADLER32_DO4(buf)  ADLER32_DO1(buf); ADLER32_DO1(buf); ADLER32_DO1(buf); ADLER32_DO1(buf);
+#define ADLER32_DO16(buf) ADLER32_DO4(buf); ADLER32_DO4(buf); ADLER32_DO4(buf); ADLER32_DO4(buf);
+
+static uint32_t adler32(uint32_t adler, const void* ptr, uint32_t len)
+{
+  const uint8_t* buf = (const uint8_t*)ptr;
+  uint32_t s1 = adler & 0xffff;
+  uint32_t s2 = (adler >> 16) & 0xffff;
+
+  while (len > 0) {
+    while(len > 16 && s2 < (1U<<31)) {
+      ADLER32_DO16(buf); len-=16;
+    }
+    ADLER32_DO1(buf); len--;
+    s1 %= ADLER32_BASE;
+    s2 %= ADLER32_BASE;
+  }
+  return (s2 << 16) | s1;
+}
diff --git a/apps/queue/queue.c b/apps/queue/queue.c
new file mode 100644
index 0000000..7e92b25
--- /dev/null
+++ b/apps/queue/queue.c
@@ -0,0 +1,373 @@
+#include <string.h>
+#include <assert.h>
+#include <pthread.h>
+#include <sys/time.h>
+
+#include <accord.h>
+#include "util.h"
+#include "coroutine.h"
+#include "adler32.c"
+
+#define NR_THREADS 100
+
+static char *hostname;
+static int port;
+
+struct acrd_path_list_entry {
+	char *path;
+
+	struct list_head list;
+};
+
+struct queue_handle {
+	struct acrd_handle *ah;
+	char name[128];
+};
+
+struct queue_msg {
+	uint32_t len;
+	uint32_t checksum;
+	char data[0];
+};
+
+struct ack_info {
+	char *invisible_path;
+};
+
+static void test_concurrent_list_cb(struct acrd_handle *h, const char *path, void *arg)
+{
+	struct acrd_path_list_entry *entry = malloc(sizeof(*entry));
+	struct list_head *head = arg;
+
+	entry->path = strdup(path);
+	list_add_tail(&entry->list, head);
+}
+
+static int create_first_nodes(struct acrd_handle *h)
+{
+	int max = 1;
+	int ret;
+	struct acrd_tx *tx;
+
+	tx = acrd_tx_init(h);
+
+retry:
+	acrd_tx_write(tx, "/tmp/queue/min", &max, sizeof(max), 0,
+			ACRD_FLAG_CREATE | ACRD_FLAG_EXCL);
+	acrd_tx_write(tx, "/tmp/queue/max", &max, sizeof(max), 0,
+			ACRD_FLAG_CREATE | ACRD_FLAG_EXCL);
+	ret = acrd_tx_commit(tx, 0);
+
+	if (ret != ACRD_SUCCESS && ret != ACRD_ERR_EXIST) {
+		if (ret == ACRD_ERR_AGAIN) {
+			goto retry;
+		} else
+			printf("%d:unknown err %d.\n", __LINE__, ret);
+	}
+	acrd_tx_close(tx);
+	return 0;
+}
+
+struct queue_handle *queue_init(const char *hostname, int port, const char *name)
+{
+	struct queue_handle *qh;
+	struct acrd_handle *ah;
+
+	ah = acrd_init(hostname, port, NULL, NULL, NULL);
+	qh = zalloc(sizeof(struct queue_handle));
+
+	if (!qh || !ah)
+		return NULL;
+
+	qh->ah = ah;
+	create_first_nodes(ah);
+	return qh;
+}
+
+void queue_close(struct queue_handle *qh)
+{
+	acrd_close(qh->ah);
+	free(qh);
+
+	return;
+}
+
+inline uint32_t calc_checksum(void *data, uint32_t len)
+{
+	const uint32_t adler = 1;
+	return adler32(adler, data, len);;
+}
+
+inline uint32_t get_msghdr_size(void)
+{
+	return sizeof(struct queue_msg);
+}
+
+int queue_push(struct queue_handle *qh, void *data, uint32_t len)
+{
+	struct acrd_tx *tx;
+	char retdata[32];
+	uint32_t size, max;
+	int ret;
+	char path[256];
+	struct acrd_handle *h = qh->ah;
+	uint32_t delta = 1;
+	struct queue_msg *qe;
+
+	qe = zalloc(sizeof(struct queue_msg) + len);
+	memcpy(qe->data, data, len);
+	qe->len = len;
+	qe->checksum = calc_checksum(qe->data, qe->len);
+	//printf("len %d checksum %d data %s \n", qe->len, qe->checksum, qe->data);
+
+	size = sizeof(retdata);
+retry1:
+	assert(max > 0);
+
+	tx = acrd_tx_init(h);
+	acrd_tx_atomic_inc(tx, "/tmp/queue/max", &delta,
+			sizeof(uint32_t), 0, 0);
+	acrd_tx_read(tx, "/tmp/queue/max", &retdata, &size, 0, 0);
+	ret = acrd_tx_commit(tx, 0);
+	if (ret != ACRD_SUCCESS) {
+		if (ret == ACRD_ERR_AGAIN)
+			goto retry1;
+		else
+			printf("%d:unknown err %d.\n", __LINE__, ret);
+	}
+	acrd_tx_close(tx);
+	memcpy(&max, retdata, sizeof(max));
+	sprintf(path, "/tmp/queue/%d", max - 1);
+	//printf("max %d\n", max);
+	//printf("wrote data path %s len %d checksum %d data %s \n", path, qe->len, qe->checksum, qe->data);
+
+retry2:
+	ret = acrd_write(h, path, qe, get_msghdr_size() + len, 0,
+			     ACRD_FLAG_CREATE | ACRD_FLAG_EXCL);
+	if (ret != ACRD_SUCCESS) {
+		if (ret == ACRD_ERR_AGAIN)
+			goto retry2;
+		else {
+			printf("write err?\n");
+			free(qe);
+			return -1;
+		}
+	}
+	return 0;
+}
+
+int queue_pop(struct queue_handle *qh, struct queue_msg **retqe)
+{
+	struct acrd_handle *h = qh->ah;
+	struct acrd_tx *tx;
+	int delta = 1, min = 0, ret;
+	char path[256];
+	char min_buf[32];
+	uint32_t size = sizeof(uint32_t);
+	uint32_t min_size = sizeof(uint32_t);
+	uint32_t checksum;
+	uint32_t qe_size;
+	uint64_t offset;
+	struct queue_msg *qe;
+
+	*retqe = NULL;
+	qe = zalloc(sizeof(struct queue_msg));
+	qe_size = sizeof(*qe);
+
+	if (qe == NULL) {
+		printf("oom\n");
+		return -1;
+	}
+retry1:
+	tx = acrd_tx_init(h);
+	acrd_tx_read(tx, "/tmp/queue/min", min_buf, &min_size, 0, 0);
+	acrd_tx_atomic_inc(tx, "/tmp/queue/min", &delta,
+		sizeof(uint32_t), 0, 0);
+	ret = acrd_tx_commit(tx, 0);
+	acrd_tx_close(tx);
+	switch (ret) {
+	case ACRD_SUCCESS:
+		break;
+	case ACRD_ERR_AGAIN:
+		goto retry1;
+	case ACRD_ERR_NOTFOUND:
+	default:
+		printf("the node min is not found\n");
+		free(qe);
+		return -1;
+	}
+
+	if (size != sizeof(min)) {
+		printf("the read min size error\n");
+		return -1;
+	}
+
+	memcpy(&min, min_buf, sizeof(min));
+	//printf("min %d max %d\n", min, max);
+	if (min < 0) {
+		printf("min value error\n");
+		return -1;
+	}
+
+	sprintf(path, "/tmp/queue/%d", min);
+	//printf("path %s min %d max %d\n", path, min, max);
+
+retry2:
+	ret = acrd_read(h, path, qe, &qe_size, 0, 0);
+	if (ret != ACRD_SUCCESS) {
+		if (ret == ACRD_ERR_AGAIN)
+			goto retry2;
+		else {
+			printf("%d:unknown err %d.\n", __LINE__, ret);
+			return -1;
+		}
+	}
+
+	/* read body */
+	qe = realloc(qe, get_msghdr_size() + qe->len);
+	qe_size = qe->len;
+	offset = get_msghdr_size();
+retry3:
+	ret = acrd_read(h, path, qe->data, &qe_size, offset, 0);
+	if (ret != ACRD_SUCCESS) {
+		if (ret == ACRD_ERR_AGAIN)
+			goto retry3;
+		else {
+			printf("%d:unknown err %d.\n", __LINE__, ret);
+			return -1;
+		}
+	}
+
+	checksum = calc_checksum(qe->data, qe->len);
+	//printf("len %d checksum %d data %s\n", qe->len, qe->checksum, qe->data);
+
+	if (qe->checksum != checksum) {
+		printf("Read corrupt data\n");
+		printf("path %s data %s len %d checksum %d, calc value : %d\n",
+		path, qe->data, qe->len,
+		qe->checksum, checksum);
+		return -1;
+	}
+	*retqe = qe;
+
+	/* FIXME: it is better to call queue_ack() */
+	ret = acrd_del(h, path, 0);
+
+	return 0;
+}
+
+int queue_ack(struct queue_handle *qh, struct ack_info *info)
+{
+	int ret;
+	struct acrd_handle *h = qh->ah;
+	char *inv_path = info->invisible_path;
+
+	ret = acrd_del(h, inv_path, 0);
+	if (ret != ACRD_SUCCESS)
+		return -1;
+
+	free(info->invisible_path);
+	free(info);
+	return 0;
+}
+
+void queue_msg_close(struct queue_msg *msg)
+{
+	free(msg);
+}
+
+static void *run(void *arg)
+{
+	struct queue_handle *h;
+	const char *qname = "hoge";
+	char data[128] = "the contents of data";
+	uint32_t size = strlen(data) + 1;
+	struct queue_msg *msg;
+	int reqs, i;
+
+	reqs = *(int *)arg;
+	h  = queue_init(hostname, port, qname);
+	if (h == NULL) {
+		printf("failed to exit...");
+		goto exit;
+	}
+
+	/* very simple test case */
+	for (i = 0; i < reqs; i++) {
+		queue_push(h, data, size);
+		if (queue_pop(h, &msg) == 0) {
+			if (msg)
+				queue_msg_close(msg);
+		}
+	}
+	printf("exit.\n");
+
+	/* cleanup */
+	queue_close(h);
+
+exit:
+	pthread_exit(NULL);
+}
+
+int main(int argc, char *argv[]) {
+	int ret, i, nr_threads, nr_requests;
+	struct acrd_handle *h;
+	LIST_HEAD(path_list);
+	struct acrd_listcb listcb = {
+		.cb = test_concurrent_list_cb,
+		.arg = &path_list,
+	};
+	struct acrd_path_list_entry *entry, *n;
+	struct timeval start, end, total;
+	double throughput;
+
+	pthread_t *th;
+
+	if (argc < 5) {
+		printf("usage: ./qbench [hostname] [port] [nr_threads] [nr_requests]\n");
+		exit(1);
+	}
+
+	hostname = argv[1];
+	port = atoi(argv[2]);
+	nr_threads = atoi(argv[3]);
+	nr_requests = atoi(argv[4]);
+	th = malloc(sizeof(pthread_t)*nr_threads);
+	if (!th) {
+		printf("oom\n");
+		exit(1);
+	}
+
+	gettimeofday(&start, NULL);
+	for (i = 0; i < nr_threads; i++) {
+		ret = pthread_create(&th[i], NULL, run, &nr_requests);
+		if (ret < 0) {
+			printf("failed to init threads.\n");
+			exit(1);
+		}
+	}
+
+	for (i = 0; i < nr_threads; i++)
+		pthread_join(th[i], NULL);
+
+	gettimeofday(&end, NULL);
+	timersub(&end, &start, &total);
+	throughput = (nr_requests * nr_threads) /
+		(total.tv_sec + ((double)total.tv_usec)/1000000.0);
+
+	printf("\n%d requests in %d.%06d sec. (%.2f throughput)\n",
+		nr_requests * nr_threads, (int)total.tv_sec, (int)total.tv_usec,
+		throughput);
+
+	/* cleanup data */
+	h = acrd_init(hostname, port, NULL, NULL, NULL);
+	acrd_list(h, "/tmp/", 0, &listcb);
+	list_for_each_entry_safe(entry, n, &path_list, list) {
+		acrd_del(h, entry->path, 0);
+		free(entry->path);
+		list_del(&entry->list);
+		free(entry);
+	}
+	acrd_close(h);
+	return 0;
+}
-- 
1.7.2.5




More information about the sheepdog mailing list