[Sheepdog] [PATCH] accord: added apps directory and queue code
OZAWA Tsuyoshi
ozawa.tsuyoshi at gmail.com
Thu Nov 10 18:11:05 CET 2011
Accord focuses on write-intensive workloads.
One of the applications is the distributed and fully-replicated
queue.
This patch provide the simple queue implementation with accord.
Signed-off-by: OZAWA Tsuyoshi <ozawa.tsuyoshi at lab.ntt.co.jp>
---
.gitignore | 1 +
Makefile | 6 +
apps/Makefile | 12 ++
apps/queue/Makefile | 29 ++++
apps/queue/README.md | 36 +++++
apps/queue/adler32.c | 47 +++++++
apps/queue/queue.c | 373 ++++++++++++++++++++++++++++++++++++++++++++++++++
7 files changed, 504 insertions(+), 0 deletions(-)
create mode 100644 apps/Makefile
create mode 100644 apps/queue/Makefile
create mode 100644 apps/queue/README.md
create mode 100644 apps/queue/adler32.c
create mode 100644 apps/queue/queue.c
diff --git a/.gitignore b/.gitignore
index 9be066b..ce92825 100644
--- a/.gitignore
+++ b/.gitignore
@@ -23,3 +23,4 @@ test/watch_files
valgrind.log
test/test-report.*ml
libacrd/libacrd.pc
+apps/queue/qbench
diff --git a/Makefile b/Makefile
index a027d6e..79c6796 100644
--- a/Makefile
+++ b/Makefile
@@ -9,12 +9,14 @@ all:
$(MAKE) -C libacrd
$(MAKE) -C conductor
$(MAKE) -C test
+ $(MAKE) -C apps
.PHONY:clean
clean:
$(MAKE) -C libacrd clean
$(MAKE) -C conductor clean
$(MAKE) -C test clean
+ $(MAKE) -C apps clean
$(MAKE) -C lib clean
.PHONY:install
@@ -23,6 +25,10 @@ install:
$(MAKE) -C conductor install
$(MAKE) -C include install
+.PHONY:apps
+apps:
+ $(MAKE) -C apps
+
.PHONY:test
test:
$(MAKE) -C libacrd
diff --git a/apps/Makefile b/apps/Makefile
new file mode 100644
index 0000000..c7f3f91
--- /dev/null
+++ b/apps/Makefile
@@ -0,0 +1,12 @@
+.PHONY:all
+all:
+ $(MAKE) -C queue
+
+.PHONY:clean
+clean:
+ $(MAKE) -C queue clean
+
+.PHONY:queue
+queue:
+ $(MAKE) -C queue
+
diff --git a/apps/queue/Makefile b/apps/queue/Makefile
new file mode 100644
index 0000000..6f750a5
--- /dev/null
+++ b/apps/queue/Makefile
@@ -0,0 +1,29 @@
+sbindir ?= $(PREFIX)/sbin
+
+CFLAGS += -g -O3 -Wall -Wstrict-prototypes -I../../include
+CFLAGS += -D_GNU_SOURCE -DNDEBUG
+LIBS += -lpthread -lacrd
+
+PROGRAMS = qbench
+QUEUE_OBJS = queue.o
+QUEUE_DEP = $(ACCORD_OBJS:.o=.d)
+
+.PHONY:all
+all: $(PROGRAMS)
+
+qbench: $(QUEUE_OBJS)
+ $(CC) $^ -o $@ $(LIBS)
+
+-include $(ACCORD_DEP)
+
+%.o: %.c
+ $(CC) -c $(CFLAGS) $*.c -o $*.o
+ @$(CC) -MM $(CFLAGS) -MF $*.d -MT $*.o $*.c
+
+.PHONY:clean
+clean:
+ rm -f *.[od] $(PROGRAMS)
+
+# support for GNU Flymake
+check-syntax:
+ $(CC) $(CFLAGS) -fsyntax-only $(CHK_SOURCES)
diff --git a/apps/queue/README.md b/apps/queue/README.md
new file mode 100644
index 0000000..74e195d
--- /dev/null
+++ b/apps/queue/README.md
@@ -0,0 +1,36 @@
+# How to running queue
+
+1. Please install accord.
+2. run as follows
+
+```
+ $ make apps
+ $ ./apps/qbench localhost 9090 200 10000
+ 2000000 requests in 298.100236 sec. (6709.15 throughput)
+```
+
+## Bencmark result
+
+* The benchmark environment
+ * Xeon 2.1 GHz 4 Core
+ * 7200 rpm HDD
+ * Run Accord as disk persistency mode with single node
+
+the result of the benchmarks(50% push and 50% pop) is as follows:
+
+```
+ 2000000 requests in 298.100236 sec. (6709.15 throughput)
+```
+
+
+Additionally, the other benchmarks(100% push) achieve better result as follows:
+
+```
+ 2000000 requests in 167.953943 sec. (11908.03 throughput)
+```
+
+To benchmark only push(), please comment out queue_pop() function in the run() function.
+
+## TODO
+
+* run benchmark with 1KB message
diff --git a/apps/queue/adler32.c b/apps/queue/adler32.c
new file mode 100644
index 0000000..4340e7d
--- /dev/null
+++ b/apps/queue/adler32.c
@@ -0,0 +1,47 @@
+/*
+ * This is a modified version based on adler32.c from gst-ffmpeg based on
+ * adler32.c from the zlib library.
+ *
+ * Copyright (C) 1995 Mark Adler
+ *
+ * This software is provided 'as-is', without any express or implied
+ * warranty. In no event will the authors be held liable for any damages
+ * arising from the use of this software.
+ *
+ * Permission is granted to anyone to use this software for any purpose,
+ * including commercial applications, and to alter it and redistribute it
+ * freely, subject to the following restrictions:
+ *
+ * 1. The origin of this software must not be misrepresented; you must not
+ * claim that you wrote the original software. If you use this software
+ * in a product, an acknowledgment in the product documentation would be
+ * appreciated but is not required.
+ * 2. Altered source versions must be plainly marked as such, and must not be
+ * misrepresented as being the original software.
+ * 3. This notice may not be removed or altered from any source distribution.
+ *
+ */
+#include <stdint.h>
+
+#define ADLER32_BASE 65521L /* largest prime smaller than 65536 */
+
+#define ADLER32_DO1(buf) {s1 += *buf++; s2 += s1;}
+#define ADLER32_DO4(buf) ADLER32_DO1(buf); ADLER32_DO1(buf); ADLER32_DO1(buf); ADLER32_DO1(buf);
+#define ADLER32_DO16(buf) ADLER32_DO4(buf); ADLER32_DO4(buf); ADLER32_DO4(buf); ADLER32_DO4(buf);
+
+static uint32_t adler32(uint32_t adler, const void* ptr, uint32_t len)
+{
+ const uint8_t* buf = (const uint8_t*)ptr;
+ uint32_t s1 = adler & 0xffff;
+ uint32_t s2 = (adler >> 16) & 0xffff;
+
+ while (len > 0) {
+ while(len > 16 && s2 < (1U<<31)) {
+ ADLER32_DO16(buf); len-=16;
+ }
+ ADLER32_DO1(buf); len--;
+ s1 %= ADLER32_BASE;
+ s2 %= ADLER32_BASE;
+ }
+ return (s2 << 16) | s1;
+}
diff --git a/apps/queue/queue.c b/apps/queue/queue.c
new file mode 100644
index 0000000..7e92b25
--- /dev/null
+++ b/apps/queue/queue.c
@@ -0,0 +1,373 @@
+#include <string.h>
+#include <assert.h>
+#include <pthread.h>
+#include <sys/time.h>
+
+#include <accord.h>
+#include "util.h"
+#include "coroutine.h"
+#include "adler32.c"
+
+#define NR_THREADS 100
+
+static char *hostname;
+static int port;
+
+struct acrd_path_list_entry {
+ char *path;
+
+ struct list_head list;
+};
+
+struct queue_handle {
+ struct acrd_handle *ah;
+ char name[128];
+};
+
+struct queue_msg {
+ uint32_t len;
+ uint32_t checksum;
+ char data[0];
+};
+
+struct ack_info {
+ char *invisible_path;
+};
+
+static void test_concurrent_list_cb(struct acrd_handle *h, const char *path, void *arg)
+{
+ struct acrd_path_list_entry *entry = malloc(sizeof(*entry));
+ struct list_head *head = arg;
+
+ entry->path = strdup(path);
+ list_add_tail(&entry->list, head);
+}
+
+static int create_first_nodes(struct acrd_handle *h)
+{
+ int max = 1;
+ int ret;
+ struct acrd_tx *tx;
+
+ tx = acrd_tx_init(h);
+
+retry:
+ acrd_tx_write(tx, "/tmp/queue/min", &max, sizeof(max), 0,
+ ACRD_FLAG_CREATE | ACRD_FLAG_EXCL);
+ acrd_tx_write(tx, "/tmp/queue/max", &max, sizeof(max), 0,
+ ACRD_FLAG_CREATE | ACRD_FLAG_EXCL);
+ ret = acrd_tx_commit(tx, 0);
+
+ if (ret != ACRD_SUCCESS && ret != ACRD_ERR_EXIST) {
+ if (ret == ACRD_ERR_AGAIN) {
+ goto retry;
+ } else
+ printf("%d:unknown err %d.\n", __LINE__, ret);
+ }
+ acrd_tx_close(tx);
+ return 0;
+}
+
+struct queue_handle *queue_init(const char *hostname, int port, const char *name)
+{
+ struct queue_handle *qh;
+ struct acrd_handle *ah;
+
+ ah = acrd_init(hostname, port, NULL, NULL, NULL);
+ qh = zalloc(sizeof(struct queue_handle));
+
+ if (!qh || !ah)
+ return NULL;
+
+ qh->ah = ah;
+ create_first_nodes(ah);
+ return qh;
+}
+
+void queue_close(struct queue_handle *qh)
+{
+ acrd_close(qh->ah);
+ free(qh);
+
+ return;
+}
+
+inline uint32_t calc_checksum(void *data, uint32_t len)
+{
+ const uint32_t adler = 1;
+ return adler32(adler, data, len);;
+}
+
+inline uint32_t get_msghdr_size(void)
+{
+ return sizeof(struct queue_msg);
+}
+
+int queue_push(struct queue_handle *qh, void *data, uint32_t len)
+{
+ struct acrd_tx *tx;
+ char retdata[32];
+ uint32_t size, max;
+ int ret;
+ char path[256];
+ struct acrd_handle *h = qh->ah;
+ uint32_t delta = 1;
+ struct queue_msg *qe;
+
+ qe = zalloc(sizeof(struct queue_msg) + len);
+ memcpy(qe->data, data, len);
+ qe->len = len;
+ qe->checksum = calc_checksum(qe->data, qe->len);
+ //printf("len %d checksum %d data %s \n", qe->len, qe->checksum, qe->data);
+
+ size = sizeof(retdata);
+retry1:
+ assert(max > 0);
+
+ tx = acrd_tx_init(h);
+ acrd_tx_atomic_inc(tx, "/tmp/queue/max", &delta,
+ sizeof(uint32_t), 0, 0);
+ acrd_tx_read(tx, "/tmp/queue/max", &retdata, &size, 0, 0);
+ ret = acrd_tx_commit(tx, 0);
+ if (ret != ACRD_SUCCESS) {
+ if (ret == ACRD_ERR_AGAIN)
+ goto retry1;
+ else
+ printf("%d:unknown err %d.\n", __LINE__, ret);
+ }
+ acrd_tx_close(tx);
+ memcpy(&max, retdata, sizeof(max));
+ sprintf(path, "/tmp/queue/%d", max - 1);
+ //printf("max %d\n", max);
+ //printf("wrote data path %s len %d checksum %d data %s \n", path, qe->len, qe->checksum, qe->data);
+
+retry2:
+ ret = acrd_write(h, path, qe, get_msghdr_size() + len, 0,
+ ACRD_FLAG_CREATE | ACRD_FLAG_EXCL);
+ if (ret != ACRD_SUCCESS) {
+ if (ret == ACRD_ERR_AGAIN)
+ goto retry2;
+ else {
+ printf("write err?\n");
+ free(qe);
+ return -1;
+ }
+ }
+ return 0;
+}
+
+int queue_pop(struct queue_handle *qh, struct queue_msg **retqe)
+{
+ struct acrd_handle *h = qh->ah;
+ struct acrd_tx *tx;
+ int delta = 1, min = 0, ret;
+ char path[256];
+ char min_buf[32];
+ uint32_t size = sizeof(uint32_t);
+ uint32_t min_size = sizeof(uint32_t);
+ uint32_t checksum;
+ uint32_t qe_size;
+ uint64_t offset;
+ struct queue_msg *qe;
+
+ *retqe = NULL;
+ qe = zalloc(sizeof(struct queue_msg));
+ qe_size = sizeof(*qe);
+
+ if (qe == NULL) {
+ printf("oom\n");
+ return -1;
+ }
+retry1:
+ tx = acrd_tx_init(h);
+ acrd_tx_read(tx, "/tmp/queue/min", min_buf, &min_size, 0, 0);
+ acrd_tx_atomic_inc(tx, "/tmp/queue/min", &delta,
+ sizeof(uint32_t), 0, 0);
+ ret = acrd_tx_commit(tx, 0);
+ acrd_tx_close(tx);
+ switch (ret) {
+ case ACRD_SUCCESS:
+ break;
+ case ACRD_ERR_AGAIN:
+ goto retry1;
+ case ACRD_ERR_NOTFOUND:
+ default:
+ printf("the node min is not found\n");
+ free(qe);
+ return -1;
+ }
+
+ if (size != sizeof(min)) {
+ printf("the read min size error\n");
+ return -1;
+ }
+
+ memcpy(&min, min_buf, sizeof(min));
+ //printf("min %d max %d\n", min, max);
+ if (min < 0) {
+ printf("min value error\n");
+ return -1;
+ }
+
+ sprintf(path, "/tmp/queue/%d", min);
+ //printf("path %s min %d max %d\n", path, min, max);
+
+retry2:
+ ret = acrd_read(h, path, qe, &qe_size, 0, 0);
+ if (ret != ACRD_SUCCESS) {
+ if (ret == ACRD_ERR_AGAIN)
+ goto retry2;
+ else {
+ printf("%d:unknown err %d.\n", __LINE__, ret);
+ return -1;
+ }
+ }
+
+ /* read body */
+ qe = realloc(qe, get_msghdr_size() + qe->len);
+ qe_size = qe->len;
+ offset = get_msghdr_size();
+retry3:
+ ret = acrd_read(h, path, qe->data, &qe_size, offset, 0);
+ if (ret != ACRD_SUCCESS) {
+ if (ret == ACRD_ERR_AGAIN)
+ goto retry3;
+ else {
+ printf("%d:unknown err %d.\n", __LINE__, ret);
+ return -1;
+ }
+ }
+
+ checksum = calc_checksum(qe->data, qe->len);
+ //printf("len %d checksum %d data %s\n", qe->len, qe->checksum, qe->data);
+
+ if (qe->checksum != checksum) {
+ printf("Read corrupt data\n");
+ printf("path %s data %s len %d checksum %d, calc value : %d\n",
+ path, qe->data, qe->len,
+ qe->checksum, checksum);
+ return -1;
+ }
+ *retqe = qe;
+
+ /* FIXME: it is better to call queue_ack() */
+ ret = acrd_del(h, path, 0);
+
+ return 0;
+}
+
+int queue_ack(struct queue_handle *qh, struct ack_info *info)
+{
+ int ret;
+ struct acrd_handle *h = qh->ah;
+ char *inv_path = info->invisible_path;
+
+ ret = acrd_del(h, inv_path, 0);
+ if (ret != ACRD_SUCCESS)
+ return -1;
+
+ free(info->invisible_path);
+ free(info);
+ return 0;
+}
+
+void queue_msg_close(struct queue_msg *msg)
+{
+ free(msg);
+}
+
+static void *run(void *arg)
+{
+ struct queue_handle *h;
+ const char *qname = "hoge";
+ char data[128] = "the contents of data";
+ uint32_t size = strlen(data) + 1;
+ struct queue_msg *msg;
+ int reqs, i;
+
+ reqs = *(int *)arg;
+ h = queue_init(hostname, port, qname);
+ if (h == NULL) {
+ printf("failed to exit...");
+ goto exit;
+ }
+
+ /* very simple test case */
+ for (i = 0; i < reqs; i++) {
+ queue_push(h, data, size);
+ if (queue_pop(h, &msg) == 0) {
+ if (msg)
+ queue_msg_close(msg);
+ }
+ }
+ printf("exit.\n");
+
+ /* cleanup */
+ queue_close(h);
+
+exit:
+ pthread_exit(NULL);
+}
+
+int main(int argc, char *argv[]) {
+ int ret, i, nr_threads, nr_requests;
+ struct acrd_handle *h;
+ LIST_HEAD(path_list);
+ struct acrd_listcb listcb = {
+ .cb = test_concurrent_list_cb,
+ .arg = &path_list,
+ };
+ struct acrd_path_list_entry *entry, *n;
+ struct timeval start, end, total;
+ double throughput;
+
+ pthread_t *th;
+
+ if (argc < 5) {
+ printf("usage: ./qbench [hostname] [port] [nr_threads] [nr_requests]\n");
+ exit(1);
+ }
+
+ hostname = argv[1];
+ port = atoi(argv[2]);
+ nr_threads = atoi(argv[3]);
+ nr_requests = atoi(argv[4]);
+ th = malloc(sizeof(pthread_t)*nr_threads);
+ if (!th) {
+ printf("oom\n");
+ exit(1);
+ }
+
+ gettimeofday(&start, NULL);
+ for (i = 0; i < nr_threads; i++) {
+ ret = pthread_create(&th[i], NULL, run, &nr_requests);
+ if (ret < 0) {
+ printf("failed to init threads.\n");
+ exit(1);
+ }
+ }
+
+ for (i = 0; i < nr_threads; i++)
+ pthread_join(th[i], NULL);
+
+ gettimeofday(&end, NULL);
+ timersub(&end, &start, &total);
+ throughput = (nr_requests * nr_threads) /
+ (total.tv_sec + ((double)total.tv_usec)/1000000.0);
+
+ printf("\n%d requests in %d.%06d sec. (%.2f throughput)\n",
+ nr_requests * nr_threads, (int)total.tv_sec, (int)total.tv_usec,
+ throughput);
+
+ /* cleanup data */
+ h = acrd_init(hostname, port, NULL, NULL, NULL);
+ acrd_list(h, "/tmp/", 0, &listcb);
+ list_for_each_entry_safe(entry, n, &path_list, list) {
+ acrd_del(h, entry->path, 0);
+ free(entry->path);
+ list_del(&entry->list);
+ free(entry);
+ }
+ acrd_close(h);
+ return 0;
+}
--
1.7.2.5
More information about the sheepdog
mailing list