[Sheepdog] [PATCH] accord: Added acrd_tx_atomic_inc
OZAWA Tsuyoshi
ozawa.tsuyoshi at gmail.com
Wed Nov 9 10:06:32 CET 2011
The combination of cmp() and the other operations can simulate
atomic increment operations, however it can occur performance problem.
The new operation, atomic_inc(), can solve it.
Signed-off-by: OZAWA Tsuyoshi <ozawa.tsuyoshi at lab.ntt.co.jp>
---
conductor/acrdops.c | 48
++++++++++++++++++++++++++++++++++++++++++++++++
include/accord.h | 3 +++
include/accord_proto.h | 1 +
libacrd/libacrd.c | 7 +++++++
test/test-txn.c | 39 +++++++++++++++++++++++++++++++++++++++
5 files changed, 98 insertions(+), 0 deletions(-)
diff --git a/conductor/acrdops.c b/conductor/acrdops.c
index 8ec950d..ecfa04e 100644
--- a/conductor/acrdops.c
+++ b/conductor/acrdops.c
@@ -155,6 +155,50 @@ static int exec_del_req(const struct acrd_req *req,
struct acrd_rsp **rsp,
return ret;
}
+static int exec_atomic_inc_req(const struct acrd_req *req, struct
acrd_rsp **rsp,
+ struct acrd_txid *txid, struct client_info *from)
+{
+ int ret = 0;
+ void *data;
+ const void *adddata;
+ const char *path;
+ uint32_t size;
+ uint32_t d32, v;
+
+ path = get_arg(req, 0)->data;
+ adddata = get_arg(req, 1)->data;
+ size = get_arg(req, 1)->size;
+ dprintf("hogehoge %s %d\n", path, size);
+
+ if (size != sizeof(uint32_t))
+ goto err;
+
+ if (likely(path))
+ ret = store_read(path, &data, &size, req->offset, txid);
+ else
+ ret = ACRD_ERR_UNKNOWN;
+
+ if (ret != ACRD_SUCCESS)
+ goto err;
+
+ if (size != sizeof(uint32_t))
+ goto err;
+
+ v = *(uint32_t *) adddata;
+ d32 = *(uint32_t *)data;
+ d32 += v;
+ ret = store_write(path, &d32, size, req->offset, req->flags, txid);
+
+ if (rsp)
+ (*rsp)->result = ret;
+
+ return ret;
+err:
+ if (rsp)
+ (*rsp)->result = ret;
+ return ret;
+}
+
static int exec_cmp_req(const struct acrd_req *req, struct acrd_rsp **rsp,
struct acrd_txid *txid, struct client_info *from)
{
@@ -536,6 +580,10 @@ static struct acrd_op_tmpl acrd_ops[] = {
.exec_req = exec_copy_req,
.notify_event = notify_copy_event,
}, {
+ .opcode = ACRD_OP_ATOMIC_INC,
+ .need_mcast = 1,
+ .exec_req = exec_atomic_inc_req,
+ }, {
.opcode = ACRD_OP_TX,
.need_mcast = 1,
.exec_req = exec_tx_req,
diff --git a/include/accord.h b/include/accord.h
index 9a040f1..3fff476 100644
--- a/include/accord.h
+++ b/include/accord.h
@@ -375,6 +375,9 @@ int acrd_tx_scmp(struct acrd_tx *tx, const char
*path1, const char *path2,
int acrd_tx_copy(struct acrd_tx *tx, const char *src, const char *dst,
uint32_t flags);
+int acrd_tx_atomic_inc(struct acrd_tx *tx, const char *path, const void
*buf,
+ uint32_t count, uint32_t offset, uint32_t flags);
+
/**
* Commit a transaction
*
diff --git a/include/accord_proto.h b/include/accord_proto.h
index 455b2e8..861d623 100644
--- a/include/accord_proto.h
+++ b/include/accord_proto.h
@@ -18,6 +18,7 @@ enum OPERATION {
ACRD_OP_CMP,
ACRD_OP_SCMP,
ACRD_OP_COPY,
+ ACRD_OP_ATOMIC_INC,
ACRD_OP_LIST,
ACRD_OP_ADD_WATCH,
ACRD_OP_RM_WATCH,
diff --git a/libacrd/libacrd.c b/libacrd/libacrd.c
index a99c680..4249a86 100644
--- a/libacrd/libacrd.c
+++ b/libacrd/libacrd.c
@@ -610,6 +610,13 @@ int acrd_tx_copy(struct acrd_tx *tx, const char
*src, const char *dst,
strlen(dst) + 1, 0, 0, flags, NULL, NULL, NULL);
}
+int acrd_tx_atomic_inc(struct acrd_tx *tx, const char *path, const void
*buf,
+ uint32_t count, uint32_t offset, uint32_t flags)
+{
+ return acrd_op(tx->handle, tx, ACRD_OP_ATOMIC_INC, path, strlen(path)
+ 1, buf,
+ count, 0, offset, flags, NULL, NULL, NULL);
+}
+
int acrd_tx_commit(struct acrd_tx *tx, uint32_t flags)
{
struct acrd_aiocb *aiocb;
diff --git a/test/test-txn.c b/test/test-txn.c
index 562befd..57e3bdd 100644
--- a/test/test-txn.c
+++ b/test/test-txn.c
@@ -281,6 +281,43 @@ static void test_txn_increment(struct acrd_fixture
*fixture, gconstpointer p)
acrd_tx_close(tx);
}
+static void test_txn_ainc(struct acrd_fixture *fixture, gconstpointer p)
+{
+ struct acrd_handle *h = fixture->handle;
+ uint32_t data = 5555;
+ uint32_t newdata = 5556;
+ uint32_t *readdata;
+ uint32_t delta = 1;
+ char retdata[32];
+ uint32_t retdata_len = sizeof(data);
+ int ret;
+ struct acrd_tx *tx;
+
+ tx = acrd_tx_init(h);
+ g_assert(tx != NULL);
+ ret = acrd_tx_atomic_inc(tx, "/tmp/0", &delta, sizeof(uint32_t), 0, 0);
+ g_assert(ret == ACRD_SUCCESS);
+ ret = acrd_tx_commit(tx, 0);
+ g_assert(ret == ACRD_ERR_NOTFOUND);
+ acrd_tx_close(tx);
+
+ ret = acrd_write(h, "/tmp/0", &data, sizeof(uint32_t), 0,
ACRD_FLAG_CREATE);
+ g_assert(ret == ACRD_SUCCESS);
+
+ tx = acrd_tx_init(h);
+ g_assert(tx != NULL);
+ ret = acrd_tx_atomic_inc(tx, "/tmp/0", &delta, sizeof(uint32_t), 0, 0);
+ g_assert(ret == ACRD_SUCCESS);
+ ret = acrd_tx_commit(tx, 0);
+ g_assert(ret == ACRD_SUCCESS);
+ acrd_tx_close(tx);
+
+ ret = acrd_read(h, "/tmp/0", &retdata, &retdata_len, 0, ACRD_FLAG_CREATE);
+ g_assert(ret == ACRD_SUCCESS);
+ readdata = (uint32_t *)retdata;
+ g_assert(*readdata == newdata);
+}
+
static void test_txn_merge(struct acrd_fixture *fixture, gconstpointer p)
{
struct acrd_handle *h = fixture->handle;
@@ -406,6 +443,8 @@ int main(int argc, char **argv)
test_txn_setup, test_txn_merge, test_txn_teardown);
g_test_add("/txn/swap", struct acrd_fixture, NULL,
test_txn_setup, test_txn_swap, test_txn_teardown);
+ g_test_add("/txn/ainc", struct acrd_fixture, NULL,
+ test_txn_setup, test_txn_ainc, test_txn_teardown);
return g_test_run();
}
--
1.7.2.5
More information about the sheepdog
mailing list