Accord focuses on write-intensive workloads. One of the applications is the distributed and fully-replicated queue. This patch provide the simple queue implementation with accord. Signed-off-by: OZAWA Tsuyoshi <ozawa.tsuyoshi at lab.ntt.co.jp> --- .gitignore | 1 + Makefile | 6 + apps/Makefile | 12 ++ apps/queue/Makefile | 29 ++++ apps/queue/README.md | 36 +++++ apps/queue/adler32.c | 47 +++++++ apps/queue/queue.c | 373 ++++++++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 504 insertions(+), 0 deletions(-) create mode 100644 apps/Makefile create mode 100644 apps/queue/Makefile create mode 100644 apps/queue/README.md create mode 100644 apps/queue/adler32.c create mode 100644 apps/queue/queue.c diff --git a/.gitignore b/.gitignore index 9be066b..ce92825 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,4 @@ test/watch_files valgrind.log test/test-report.*ml libacrd/libacrd.pc +apps/queue/qbench diff --git a/Makefile b/Makefile index a027d6e..79c6796 100644 --- a/Makefile +++ b/Makefile @@ -9,12 +9,14 @@ all: $(MAKE) -C libacrd $(MAKE) -C conductor $(MAKE) -C test + $(MAKE) -C apps .PHONY:clean clean: $(MAKE) -C libacrd clean $(MAKE) -C conductor clean $(MAKE) -C test clean + $(MAKE) -C apps clean $(MAKE) -C lib clean .PHONY:install @@ -23,6 +25,10 @@ install: $(MAKE) -C conductor install $(MAKE) -C include install +.PHONY:apps +apps: + $(MAKE) -C apps + .PHONY:test test: $(MAKE) -C libacrd diff --git a/apps/Makefile b/apps/Makefile new file mode 100644 index 0000000..c7f3f91 --- /dev/null +++ b/apps/Makefile @@ -0,0 +1,12 @@ +.PHONY:all +all: + $(MAKE) -C queue + +.PHONY:clean +clean: + $(MAKE) -C queue clean + +.PHONY:queue +queue: + $(MAKE) -C queue + diff --git a/apps/queue/Makefile b/apps/queue/Makefile new file mode 100644 index 0000000..6f750a5 --- /dev/null +++ b/apps/queue/Makefile @@ -0,0 +1,29 @@ +sbindir ?= $(PREFIX)/sbin + +CFLAGS += -g -O3 -Wall -Wstrict-prototypes -I../../include +CFLAGS += -D_GNU_SOURCE -DNDEBUG +LIBS += -lpthread -lacrd + +PROGRAMS = qbench +QUEUE_OBJS = queue.o +QUEUE_DEP = $(ACCORD_OBJS:.o=.d) + +.PHONY:all +all: $(PROGRAMS) + +qbench: $(QUEUE_OBJS) + $(CC) $^ -o $@ $(LIBS) + +-include $(ACCORD_DEP) + +%.o: %.c + $(CC) -c $(CFLAGS) $*.c -o $*.o + @$(CC) -MM $(CFLAGS) -MF $*.d -MT $*.o $*.c + +.PHONY:clean +clean: + rm -f *.[od] $(PROGRAMS) + +# support for GNU Flymake +check-syntax: + $(CC) $(CFLAGS) -fsyntax-only $(CHK_SOURCES) diff --git a/apps/queue/README.md b/apps/queue/README.md new file mode 100644 index 0000000..74e195d --- /dev/null +++ b/apps/queue/README.md @@ -0,0 +1,36 @@ +# How to running queue + +1. Please install accord. +2. run as follows + +``` + $ make apps + $ ./apps/qbench localhost 9090 200 10000 + 2000000 requests in 298.100236 sec. (6709.15 throughput) +``` + +## Bencmark result + +* The benchmark environment + * Xeon 2.1 GHz 4 Core + * 7200 rpm HDD + * Run Accord as disk persistency mode with single node + +the result of the benchmarks(50% push and 50% pop) is as follows: + +``` + 2000000 requests in 298.100236 sec. (6709.15 throughput) +``` + + +Additionally, the other benchmarks(100% push) achieve better result as follows: + +``` + 2000000 requests in 167.953943 sec. (11908.03 throughput) +``` + +To benchmark only push(), please comment out queue_pop() function in the run() function. + +## TODO + +* run benchmark with 1KB message diff --git a/apps/queue/adler32.c b/apps/queue/adler32.c new file mode 100644 index 0000000..4340e7d --- /dev/null +++ b/apps/queue/adler32.c @@ -0,0 +1,47 @@ +/* + * This is a modified version based on adler32.c from gst-ffmpeg based on + * adler32.c from the zlib library. + * + * Copyright (C) 1995 Mark Adler + * + * This software is provided 'as-is', without any express or implied + * warranty. In no event will the authors be held liable for any damages + * arising from the use of this software. + * + * Permission is granted to anyone to use this software for any purpose, + * including commercial applications, and to alter it and redistribute it + * freely, subject to the following restrictions: + * + * 1. The origin of this software must not be misrepresented; you must not + * claim that you wrote the original software. If you use this software + * in a product, an acknowledgment in the product documentation would be + * appreciated but is not required. + * 2. Altered source versions must be plainly marked as such, and must not be + * misrepresented as being the original software. + * 3. This notice may not be removed or altered from any source distribution. + * + */ +#include <stdint.h> + +#define ADLER32_BASE 65521L /* largest prime smaller than 65536 */ + +#define ADLER32_DO1(buf) {s1 += *buf++; s2 += s1;} +#define ADLER32_DO4(buf) ADLER32_DO1(buf); ADLER32_DO1(buf); ADLER32_DO1(buf); ADLER32_DO1(buf); +#define ADLER32_DO16(buf) ADLER32_DO4(buf); ADLER32_DO4(buf); ADLER32_DO4(buf); ADLER32_DO4(buf); + +static uint32_t adler32(uint32_t adler, const void* ptr, uint32_t len) +{ + const uint8_t* buf = (const uint8_t*)ptr; + uint32_t s1 = adler & 0xffff; + uint32_t s2 = (adler >> 16) & 0xffff; + + while (len > 0) { + while(len > 16 && s2 < (1U<<31)) { + ADLER32_DO16(buf); len-=16; + } + ADLER32_DO1(buf); len--; + s1 %= ADLER32_BASE; + s2 %= ADLER32_BASE; + } + return (s2 << 16) | s1; +} diff --git a/apps/queue/queue.c b/apps/queue/queue.c new file mode 100644 index 0000000..7e92b25 --- /dev/null +++ b/apps/queue/queue.c @@ -0,0 +1,373 @@ +#include <string.h> +#include <assert.h> +#include <pthread.h> +#include <sys/time.h> + +#include <accord.h> +#include "util.h" +#include "coroutine.h" +#include "adler32.c" + +#define NR_THREADS 100 + +static char *hostname; +static int port; + +struct acrd_path_list_entry { + char *path; + + struct list_head list; +}; + +struct queue_handle { + struct acrd_handle *ah; + char name[128]; +}; + +struct queue_msg { + uint32_t len; + uint32_t checksum; + char data[0]; +}; + +struct ack_info { + char *invisible_path; +}; + +static void test_concurrent_list_cb(struct acrd_handle *h, const char *path, void *arg) +{ + struct acrd_path_list_entry *entry = malloc(sizeof(*entry)); + struct list_head *head = arg; + + entry->path = strdup(path); + list_add_tail(&entry->list, head); +} + +static int create_first_nodes(struct acrd_handle *h) +{ + int max = 1; + int ret; + struct acrd_tx *tx; + + tx = acrd_tx_init(h); + +retry: + acrd_tx_write(tx, "/tmp/queue/min", &max, sizeof(max), 0, + ACRD_FLAG_CREATE | ACRD_FLAG_EXCL); + acrd_tx_write(tx, "/tmp/queue/max", &max, sizeof(max), 0, + ACRD_FLAG_CREATE | ACRD_FLAG_EXCL); + ret = acrd_tx_commit(tx, 0); + + if (ret != ACRD_SUCCESS && ret != ACRD_ERR_EXIST) { + if (ret == ACRD_ERR_AGAIN) { + goto retry; + } else + printf("%d:unknown err %d.\n", __LINE__, ret); + } + acrd_tx_close(tx); + return 0; +} + +struct queue_handle *queue_init(const char *hostname, int port, const char *name) +{ + struct queue_handle *qh; + struct acrd_handle *ah; + + ah = acrd_init(hostname, port, NULL, NULL, NULL); + qh = zalloc(sizeof(struct queue_handle)); + + if (!qh || !ah) + return NULL; + + qh->ah = ah; + create_first_nodes(ah); + return qh; +} + +void queue_close(struct queue_handle *qh) +{ + acrd_close(qh->ah); + free(qh); + + return; +} + +inline uint32_t calc_checksum(void *data, uint32_t len) +{ + const uint32_t adler = 1; + return adler32(adler, data, len);; +} + +inline uint32_t get_msghdr_size(void) +{ + return sizeof(struct queue_msg); +} + +int queue_push(struct queue_handle *qh, void *data, uint32_t len) +{ + struct acrd_tx *tx; + char retdata[32]; + uint32_t size, max; + int ret; + char path[256]; + struct acrd_handle *h = qh->ah; + uint32_t delta = 1; + struct queue_msg *qe; + + qe = zalloc(sizeof(struct queue_msg) + len); + memcpy(qe->data, data, len); + qe->len = len; + qe->checksum = calc_checksum(qe->data, qe->len); + //printf("len %d checksum %d data %s \n", qe->len, qe->checksum, qe->data); + + size = sizeof(retdata); +retry1: + assert(max > 0); + + tx = acrd_tx_init(h); + acrd_tx_atomic_inc(tx, "/tmp/queue/max", &delta, + sizeof(uint32_t), 0, 0); + acrd_tx_read(tx, "/tmp/queue/max", &retdata, &size, 0, 0); + ret = acrd_tx_commit(tx, 0); + if (ret != ACRD_SUCCESS) { + if (ret == ACRD_ERR_AGAIN) + goto retry1; + else + printf("%d:unknown err %d.\n", __LINE__, ret); + } + acrd_tx_close(tx); + memcpy(&max, retdata, sizeof(max)); + sprintf(path, "/tmp/queue/%d", max - 1); + //printf("max %d\n", max); + //printf("wrote data path %s len %d checksum %d data %s \n", path, qe->len, qe->checksum, qe->data); + +retry2: + ret = acrd_write(h, path, qe, get_msghdr_size() + len, 0, + ACRD_FLAG_CREATE | ACRD_FLAG_EXCL); + if (ret != ACRD_SUCCESS) { + if (ret == ACRD_ERR_AGAIN) + goto retry2; + else { + printf("write err?\n"); + free(qe); + return -1; + } + } + return 0; +} + +int queue_pop(struct queue_handle *qh, struct queue_msg **retqe) +{ + struct acrd_handle *h = qh->ah; + struct acrd_tx *tx; + int delta = 1, min = 0, ret; + char path[256]; + char min_buf[32]; + uint32_t size = sizeof(uint32_t); + uint32_t min_size = sizeof(uint32_t); + uint32_t checksum; + uint32_t qe_size; + uint64_t offset; + struct queue_msg *qe; + + *retqe = NULL; + qe = zalloc(sizeof(struct queue_msg)); + qe_size = sizeof(*qe); + + if (qe == NULL) { + printf("oom\n"); + return -1; + } +retry1: + tx = acrd_tx_init(h); + acrd_tx_read(tx, "/tmp/queue/min", min_buf, &min_size, 0, 0); + acrd_tx_atomic_inc(tx, "/tmp/queue/min", &delta, + sizeof(uint32_t), 0, 0); + ret = acrd_tx_commit(tx, 0); + acrd_tx_close(tx); + switch (ret) { + case ACRD_SUCCESS: + break; + case ACRD_ERR_AGAIN: + goto retry1; + case ACRD_ERR_NOTFOUND: + default: + printf("the node min is not found\n"); + free(qe); + return -1; + } + + if (size != sizeof(min)) { + printf("the read min size error\n"); + return -1; + } + + memcpy(&min, min_buf, sizeof(min)); + //printf("min %d max %d\n", min, max); + if (min < 0) { + printf("min value error\n"); + return -1; + } + + sprintf(path, "/tmp/queue/%d", min); + //printf("path %s min %d max %d\n", path, min, max); + +retry2: + ret = acrd_read(h, path, qe, &qe_size, 0, 0); + if (ret != ACRD_SUCCESS) { + if (ret == ACRD_ERR_AGAIN) + goto retry2; + else { + printf("%d:unknown err %d.\n", __LINE__, ret); + return -1; + } + } + + /* read body */ + qe = realloc(qe, get_msghdr_size() + qe->len); + qe_size = qe->len; + offset = get_msghdr_size(); +retry3: + ret = acrd_read(h, path, qe->data, &qe_size, offset, 0); + if (ret != ACRD_SUCCESS) { + if (ret == ACRD_ERR_AGAIN) + goto retry3; + else { + printf("%d:unknown err %d.\n", __LINE__, ret); + return -1; + } + } + + checksum = calc_checksum(qe->data, qe->len); + //printf("len %d checksum %d data %s\n", qe->len, qe->checksum, qe->data); + + if (qe->checksum != checksum) { + printf("Read corrupt data\n"); + printf("path %s data %s len %d checksum %d, calc value : %d\n", + path, qe->data, qe->len, + qe->checksum, checksum); + return -1; + } + *retqe = qe; + + /* FIXME: it is better to call queue_ack() */ + ret = acrd_del(h, path, 0); + + return 0; +} + +int queue_ack(struct queue_handle *qh, struct ack_info *info) +{ + int ret; + struct acrd_handle *h = qh->ah; + char *inv_path = info->invisible_path; + + ret = acrd_del(h, inv_path, 0); + if (ret != ACRD_SUCCESS) + return -1; + + free(info->invisible_path); + free(info); + return 0; +} + +void queue_msg_close(struct queue_msg *msg) +{ + free(msg); +} + +static void *run(void *arg) +{ + struct queue_handle *h; + const char *qname = "hoge"; + char data[128] = "the contents of data"; + uint32_t size = strlen(data) + 1; + struct queue_msg *msg; + int reqs, i; + + reqs = *(int *)arg; + h = queue_init(hostname, port, qname); + if (h == NULL) { + printf("failed to exit..."); + goto exit; + } + + /* very simple test case */ + for (i = 0; i < reqs; i++) { + queue_push(h, data, size); + if (queue_pop(h, &msg) == 0) { + if (msg) + queue_msg_close(msg); + } + } + printf("exit.\n"); + + /* cleanup */ + queue_close(h); + +exit: + pthread_exit(NULL); +} + +int main(int argc, char *argv[]) { + int ret, i, nr_threads, nr_requests; + struct acrd_handle *h; + LIST_HEAD(path_list); + struct acrd_listcb listcb = { + .cb = test_concurrent_list_cb, + .arg = &path_list, + }; + struct acrd_path_list_entry *entry, *n; + struct timeval start, end, total; + double throughput; + + pthread_t *th; + + if (argc < 5) { + printf("usage: ./qbench [hostname] [port] [nr_threads] [nr_requests]\n"); + exit(1); + } + + hostname = argv[1]; + port = atoi(argv[2]); + nr_threads = atoi(argv[3]); + nr_requests = atoi(argv[4]); + th = malloc(sizeof(pthread_t)*nr_threads); + if (!th) { + printf("oom\n"); + exit(1); + } + + gettimeofday(&start, NULL); + for (i = 0; i < nr_threads; i++) { + ret = pthread_create(&th[i], NULL, run, &nr_requests); + if (ret < 0) { + printf("failed to init threads.\n"); + exit(1); + } + } + + for (i = 0; i < nr_threads; i++) + pthread_join(th[i], NULL); + + gettimeofday(&end, NULL); + timersub(&end, &start, &total); + throughput = (nr_requests * nr_threads) / + (total.tv_sec + ((double)total.tv_usec)/1000000.0); + + printf("\n%d requests in %d.%06d sec. (%.2f throughput)\n", + nr_requests * nr_threads, (int)total.tv_sec, (int)total.tv_usec, + throughput); + + /* cleanup data */ + h = acrd_init(hostname, port, NULL, NULL, NULL); + acrd_list(h, "/tmp/", 0, &listcb); + list_for_each_entry_safe(entry, n, &path_list, list) { + acrd_del(h, entry->path, 0); + free(entry->path); + list_del(&entry->list); + free(entry); + } + acrd_close(h); + return 0; +} -- 1.7.2.5 |