[Sheepdog] [PATCH] accord: Added acrd_tx_atomic_inc

OZAWA Tsuyoshi ozawa.tsuyoshi at gmail.com
Wed Nov 9 10:06:32 CET 2011


The combination of cmp() and the other operations can simulate
atomic increment operations, however it can occur performance problem.
The new operation, atomic_inc(), can solve it.

Signed-off-by: OZAWA Tsuyoshi <ozawa.tsuyoshi at lab.ntt.co.jp>
---
 conductor/acrdops.c    |   48
++++++++++++++++++++++++++++++++++++++++++++++++
 include/accord.h       |    3 +++
 include/accord_proto.h |    1 +
 libacrd/libacrd.c      |    7 +++++++
 test/test-txn.c        |   39 +++++++++++++++++++++++++++++++++++++++
 5 files changed, 98 insertions(+), 0 deletions(-)

diff --git a/conductor/acrdops.c b/conductor/acrdops.c
index 8ec950d..ecfa04e 100644
--- a/conductor/acrdops.c
+++ b/conductor/acrdops.c
@@ -155,6 +155,50 @@ static int exec_del_req(const struct acrd_req *req,
struct acrd_rsp **rsp,
 	return ret;
 }

+static int exec_atomic_inc_req(const struct acrd_req *req, struct
acrd_rsp **rsp,
+			struct acrd_txid *txid, struct client_info *from)
+{
+	int ret = 0;
+	void *data;
+	const void *adddata;
+	const char *path;
+	uint32_t size;
+	uint32_t d32, v;
+
+	path = get_arg(req, 0)->data;
+	adddata = get_arg(req, 1)->data;
+	size = get_arg(req, 1)->size;
+	dprintf("hogehoge %s %d\n", path, size);
+
+	if (size != sizeof(uint32_t))
+		goto err;
+
+	if (likely(path))
+		ret = store_read(path, &data, &size, req->offset, txid);
+	else
+		ret = ACRD_ERR_UNKNOWN;
+
+	if (ret != ACRD_SUCCESS)
+		goto err;
+
+	if (size != sizeof(uint32_t))
+		goto err;
+
+	v = *(uint32_t *) adddata;
+	d32 = *(uint32_t *)data;
+	d32 += v;
+	ret = store_write(path, &d32, size, req->offset, req->flags, txid);
+
+	if (rsp)
+		(*rsp)->result = ret;
+
+	return ret;
+err:
+	if (rsp)
+		(*rsp)->result = ret;
+	return ret;
+}
+
 static int exec_cmp_req(const struct acrd_req *req, struct acrd_rsp **rsp,
 			struct acrd_txid *txid, struct client_info *from)
 {
@@ -536,6 +580,10 @@ static struct acrd_op_tmpl acrd_ops[] = {
 		.exec_req = exec_copy_req,
 		.notify_event = notify_copy_event,
 	}, {
+		.opcode = ACRD_OP_ATOMIC_INC,
+		.need_mcast = 1,
+		.exec_req = exec_atomic_inc_req,
+	}, {
 		.opcode = ACRD_OP_TX,
 		.need_mcast = 1,
 		.exec_req = exec_tx_req,
diff --git a/include/accord.h b/include/accord.h
index 9a040f1..3fff476 100644
--- a/include/accord.h
+++ b/include/accord.h
@@ -375,6 +375,9 @@ int acrd_tx_scmp(struct acrd_tx *tx, const char
*path1, const char *path2,
 int acrd_tx_copy(struct acrd_tx *tx, const char *src, const char *dst,
 		uint32_t flags);

+int acrd_tx_atomic_inc(struct acrd_tx *tx, const char *path, const void
*buf,
+	       uint32_t count, uint32_t offset, uint32_t flags);
+
 /**
  * Commit a transaction
  *
diff --git a/include/accord_proto.h b/include/accord_proto.h
index 455b2e8..861d623 100644
--- a/include/accord_proto.h
+++ b/include/accord_proto.h
@@ -18,6 +18,7 @@ enum OPERATION {
 	ACRD_OP_CMP,
 	ACRD_OP_SCMP,
 	ACRD_OP_COPY,
+	ACRD_OP_ATOMIC_INC,
 	ACRD_OP_LIST,
 	ACRD_OP_ADD_WATCH,
 	ACRD_OP_RM_WATCH,
diff --git a/libacrd/libacrd.c b/libacrd/libacrd.c
index a99c680..4249a86 100644
--- a/libacrd/libacrd.c
+++ b/libacrd/libacrd.c
@@ -610,6 +610,13 @@ int acrd_tx_copy(struct acrd_tx *tx, const char
*src, const char *dst,
 		      strlen(dst) + 1, 0, 0, flags, NULL, NULL, NULL);
 }

+int acrd_tx_atomic_inc(struct acrd_tx *tx, const char *path, const void
*buf,
+	       uint32_t count, uint32_t offset, uint32_t flags)
+{
+	return acrd_op(tx->handle, tx, ACRD_OP_ATOMIC_INC, path, strlen(path)
+ 1, buf,
+		      count, 0, offset, flags, NULL, NULL, NULL);
+}
+
 int acrd_tx_commit(struct acrd_tx *tx, uint32_t flags)
 {
 	struct acrd_aiocb *aiocb;
diff --git a/test/test-txn.c b/test/test-txn.c
index 562befd..57e3bdd 100644
--- a/test/test-txn.c
+++ b/test/test-txn.c
@@ -281,6 +281,43 @@ static void test_txn_increment(struct acrd_fixture
*fixture, gconstpointer p)
 	acrd_tx_close(tx);
 }

+static void test_txn_ainc(struct acrd_fixture *fixture, gconstpointer p)
+{
+	struct acrd_handle *h = fixture->handle;
+	uint32_t data = 5555;
+	uint32_t newdata = 5556;
+	uint32_t *readdata;
+	uint32_t delta = 1;
+	char retdata[32];
+	uint32_t retdata_len = sizeof(data);
+	int ret;
+	struct acrd_tx *tx;
+
+	tx = acrd_tx_init(h);
+	g_assert(tx != NULL);
+	ret = acrd_tx_atomic_inc(tx, "/tmp/0", &delta, sizeof(uint32_t), 0, 0);
+	g_assert(ret == ACRD_SUCCESS);
+	ret = acrd_tx_commit(tx, 0);
+	g_assert(ret == ACRD_ERR_NOTFOUND);
+	acrd_tx_close(tx);
+
+	ret = acrd_write(h, "/tmp/0", &data, sizeof(uint32_t), 0,
ACRD_FLAG_CREATE);
+	g_assert(ret == ACRD_SUCCESS);
+
+	tx = acrd_tx_init(h);
+	g_assert(tx != NULL);
+	ret = acrd_tx_atomic_inc(tx, "/tmp/0", &delta, sizeof(uint32_t), 0, 0);
+	g_assert(ret == ACRD_SUCCESS);
+	ret = acrd_tx_commit(tx, 0);
+	g_assert(ret == ACRD_SUCCESS);
+	acrd_tx_close(tx);
+
+	ret = acrd_read(h, "/tmp/0", &retdata, &retdata_len, 0, ACRD_FLAG_CREATE);
+	g_assert(ret == ACRD_SUCCESS);
+	readdata = (uint32_t *)retdata;
+	g_assert(*readdata == newdata);
+}
+
 static void test_txn_merge(struct acrd_fixture *fixture, gconstpointer p)
 {
 	struct acrd_handle *h = fixture->handle;
@@ -406,6 +443,8 @@ int main(int argc, char **argv)
 		   test_txn_setup, test_txn_merge, test_txn_teardown);
 	g_test_add("/txn/swap", struct acrd_fixture, NULL,
 		   test_txn_setup, test_txn_swap, test_txn_teardown);
+	g_test_add("/txn/ainc", struct acrd_fixture, NULL,
+		   test_txn_setup, test_txn_ainc, test_txn_teardown);

 	return g_test_run();
 }
-- 
1.7.2.5




More information about the sheepdog mailing list