The combination of cmp() and the other operations can simulate atomic increment operations, however it can occur performance problem. The new operation, atomic_inc(), can solve it. Signed-off-by: OZAWA Tsuyoshi <ozawa.tsuyoshi at lab.ntt.co.jp> --- conductor/acrdops.c | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ include/accord.h | 3 +++ include/accord_proto.h | 1 + libacrd/libacrd.c | 7 +++++++ test/test-txn.c | 39 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 98 insertions(+), 0 deletions(-) diff --git a/conductor/acrdops.c b/conductor/acrdops.c index 8ec950d..ecfa04e 100644 --- a/conductor/acrdops.c +++ b/conductor/acrdops.c @@ -155,6 +155,50 @@ static int exec_del_req(const struct acrd_req *req, struct acrd_rsp **rsp, return ret; } +static int exec_atomic_inc_req(const struct acrd_req *req, struct acrd_rsp **rsp, + struct acrd_txid *txid, struct client_info *from) +{ + int ret = 0; + void *data; + const void *adddata; + const char *path; + uint32_t size; + uint32_t d32, v; + + path = get_arg(req, 0)->data; + adddata = get_arg(req, 1)->data; + size = get_arg(req, 1)->size; + dprintf("hogehoge %s %d\n", path, size); + + if (size != sizeof(uint32_t)) + goto err; + + if (likely(path)) + ret = store_read(path, &data, &size, req->offset, txid); + else + ret = ACRD_ERR_UNKNOWN; + + if (ret != ACRD_SUCCESS) + goto err; + + if (size != sizeof(uint32_t)) + goto err; + + v = *(uint32_t *) adddata; + d32 = *(uint32_t *)data; + d32 += v; + ret = store_write(path, &d32, size, req->offset, req->flags, txid); + + if (rsp) + (*rsp)->result = ret; + + return ret; +err: + if (rsp) + (*rsp)->result = ret; + return ret; +} + static int exec_cmp_req(const struct acrd_req *req, struct acrd_rsp **rsp, struct acrd_txid *txid, struct client_info *from) { @@ -536,6 +580,10 @@ static struct acrd_op_tmpl acrd_ops[] = { .exec_req = exec_copy_req, .notify_event = notify_copy_event, }, { + .opcode = ACRD_OP_ATOMIC_INC, + .need_mcast = 1, + .exec_req = exec_atomic_inc_req, + }, { .opcode = ACRD_OP_TX, .need_mcast = 1, .exec_req = exec_tx_req, diff --git a/include/accord.h b/include/accord.h index 9a040f1..3fff476 100644 --- a/include/accord.h +++ b/include/accord.h @@ -375,6 +375,9 @@ int acrd_tx_scmp(struct acrd_tx *tx, const char *path1, const char *path2, int acrd_tx_copy(struct acrd_tx *tx, const char *src, const char *dst, uint32_t flags); +int acrd_tx_atomic_inc(struct acrd_tx *tx, const char *path, const void *buf, + uint32_t count, uint32_t offset, uint32_t flags); + /** * Commit a transaction * diff --git a/include/accord_proto.h b/include/accord_proto.h index 455b2e8..861d623 100644 --- a/include/accord_proto.h +++ b/include/accord_proto.h @@ -18,6 +18,7 @@ enum OPERATION { ACRD_OP_CMP, ACRD_OP_SCMP, ACRD_OP_COPY, + ACRD_OP_ATOMIC_INC, ACRD_OP_LIST, ACRD_OP_ADD_WATCH, ACRD_OP_RM_WATCH, diff --git a/libacrd/libacrd.c b/libacrd/libacrd.c index a99c680..4249a86 100644 --- a/libacrd/libacrd.c +++ b/libacrd/libacrd.c @@ -610,6 +610,13 @@ int acrd_tx_copy(struct acrd_tx *tx, const char *src, const char *dst, strlen(dst) + 1, 0, 0, flags, NULL, NULL, NULL); } +int acrd_tx_atomic_inc(struct acrd_tx *tx, const char *path, const void *buf, + uint32_t count, uint32_t offset, uint32_t flags) +{ + return acrd_op(tx->handle, tx, ACRD_OP_ATOMIC_INC, path, strlen(path) + 1, buf, + count, 0, offset, flags, NULL, NULL, NULL); +} + int acrd_tx_commit(struct acrd_tx *tx, uint32_t flags) { struct acrd_aiocb *aiocb; diff --git a/test/test-txn.c b/test/test-txn.c index 562befd..57e3bdd 100644 --- a/test/test-txn.c +++ b/test/test-txn.c @@ -281,6 +281,43 @@ static void test_txn_increment(struct acrd_fixture *fixture, gconstpointer p) acrd_tx_close(tx); } +static void test_txn_ainc(struct acrd_fixture *fixture, gconstpointer p) +{ + struct acrd_handle *h = fixture->handle; + uint32_t data = 5555; + uint32_t newdata = 5556; + uint32_t *readdata; + uint32_t delta = 1; + char retdata[32]; + uint32_t retdata_len = sizeof(data); + int ret; + struct acrd_tx *tx; + + tx = acrd_tx_init(h); + g_assert(tx != NULL); + ret = acrd_tx_atomic_inc(tx, "/tmp/0", &delta, sizeof(uint32_t), 0, 0); + g_assert(ret == ACRD_SUCCESS); + ret = acrd_tx_commit(tx, 0); + g_assert(ret == ACRD_ERR_NOTFOUND); + acrd_tx_close(tx); + + ret = acrd_write(h, "/tmp/0", &data, sizeof(uint32_t), 0, ACRD_FLAG_CREATE); + g_assert(ret == ACRD_SUCCESS); + + tx = acrd_tx_init(h); + g_assert(tx != NULL); + ret = acrd_tx_atomic_inc(tx, "/tmp/0", &delta, sizeof(uint32_t), 0, 0); + g_assert(ret == ACRD_SUCCESS); + ret = acrd_tx_commit(tx, 0); + g_assert(ret == ACRD_SUCCESS); + acrd_tx_close(tx); + + ret = acrd_read(h, "/tmp/0", &retdata, &retdata_len, 0, ACRD_FLAG_CREATE); + g_assert(ret == ACRD_SUCCESS); + readdata = (uint32_t *)retdata; + g_assert(*readdata == newdata); +} + static void test_txn_merge(struct acrd_fixture *fixture, gconstpointer p) { struct acrd_handle *h = fixture->handle; @@ -406,6 +443,8 @@ int main(int argc, char **argv) test_txn_setup, test_txn_merge, test_txn_teardown); g_test_add("/txn/swap", struct acrd_fixture, NULL, test_txn_setup, test_txn_swap, test_txn_teardown); + g_test_add("/txn/ainc", struct acrd_fixture, NULL, + test_txn_setup, test_txn_ainc, test_txn_teardown); return g_test_run(); } -- 1.7.2.5 |