[Sheepdog] [PATCH 1/4] use __thread to simplify code
MORITA Kazutaka
morita.kazutaka at lab.ntt.co.jp
Thu Dec 15 23:01:17 CET 2011
This also fixes a problem that get_sheep_fd() is not thread-safe.
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
lib/coroutine.c | 43 ++++++-------------------------------------
sheep/cluster/accord.c | 8 ++++----
sheep/cluster/corosync.c | 4 ++--
sheep/cluster/local.c | 4 ++--
sheep/cluster/zookeeper.c | 4 ++--
sheep/group.c | 8 ++++----
sheep/sdnet.c | 40 +++++++++++++++++-----------------------
sheep/sheep_priv.h | 7 +++----
sheep/store.c | 36 +++++++++++++++++-------------------
sheep/vdi.c | 4 ++--
sheep/work.c | 14 +++-----------
sheep/work.h | 2 +-
12 files changed, 63 insertions(+), 111 deletions(-)
diff --git a/lib/coroutine.c b/lib/coroutine.c
index 7ec9bba..5b2ed79 100644
--- a/lib/coroutine.c
+++ b/lib/coroutine.c
@@ -29,7 +29,6 @@
#include <stdlib.h>
#include <setjmp.h>
#include <stdint.h>
-#include <pthread.h>
#include <ucontext.h>
#include <errno.h>
#include <assert.h>
@@ -72,7 +71,7 @@ struct co_ucontext {
/**
* Per-thread coroutine bookkeeping
*/
-struct co_thread_state{
+__thread struct co_thread_state{
/** Currently executing coroutine */
struct coroutine *current;
@@ -82,9 +81,7 @@ struct co_thread_state{
/** The default coroutine */
struct co_ucontext leader;
-};
-
-static pthread_key_t thread_state_key;
+} co_ts;
static enum co_action coroutine_switch(struct coroutine *from,
struct coroutine *to,
@@ -102,43 +99,15 @@ union cc_arg {
static struct co_thread_state *coroutine_get_thread_state(void)
{
- struct co_thread_state *s = pthread_getspecific(thread_state_key);
+ struct co_thread_state *s = &co_ts;
- if (!s) {
- s = zalloc(sizeof(*s));
- if (!s)
- abort();
+ if (!s->current) {
s->current = &s->leader.base;
INIT_LIST_HEAD(&s->pool);
- pthread_setspecific(thread_state_key, s);
}
return s;
}
-static void coroutine_thread_cleanup(void *opaque)
-{
- struct co_thread_state *s = opaque;
- struct coroutine *co;
- struct coroutine *tmp;
-
- list_for_each_entry_safe(co, tmp, &s->pool, pool_next) {
- free(container_of(co, struct co_ucontext, base)->stack);
- free(co);
- }
- free(s);
-}
-
-static void __attribute__((constructor)) coroutine_init(void)
-{
- int ret;
-
- ret = pthread_key_create(&thread_state_key, coroutine_thread_cleanup);
- if (ret != 0) {
- fprintf(stderr, "unable to create leader key: %m\n");
- abort();
- }
-}
-
static void coroutine_trampoline(int i0, int i1)
{
union cc_arg arg;
@@ -302,9 +271,9 @@ struct coroutine *coroutine_self(void)
int in_coroutine(void)
{
- struct co_thread_state *s = pthread_getspecific(thread_state_key);
+ struct co_thread_state *s = &co_ts;
- return s && s->current->caller;
+ return s->current && s->current->caller;
}
diff --git a/sheep/cluster/accord.c b/sheep/cluster/accord.c
index a685f9e..cc762e3 100644
--- a/sheep/cluster/accord.c
+++ b/sheep/cluster/accord.c
@@ -391,7 +391,7 @@ struct acrd_leave_info {
struct work work;
};
-static void __acrd_leave(struct work *work, int idx)
+static void __acrd_leave(struct work *work)
{
struct acrd_leave_info *info = container_of(work, typeof(*info), work);
struct acrd_handle *ah = info->ah;
@@ -425,7 +425,7 @@ static void __acrd_leave(struct work *work, int idx)
pthread_mutex_unlock(&queue_lock);
}
-static void __acrd_leave_done(struct work *work, int idx)
+static void __acrd_leave_done(struct work *work)
{
struct acrd_leave_info *info = container_of(work, typeof(*info), work);
@@ -536,7 +536,7 @@ static int accord_notify(void *msg, size_t msg_len, void (*block_cb)(void *arg))
return add_event(ahandle, EVENT_NOTIFY, &this_node, msg, msg_len, block_cb);
}
-static void acrd_block(struct work *work, int idx)
+static void acrd_block(struct work *work)
{
struct acrd_event ev;
@@ -552,7 +552,7 @@ static void acrd_block(struct work *work, int idx)
pthread_mutex_unlock(&queue_lock);
}
-static void acrd_block_done(struct work *work, int idx)
+static void acrd_block_done(struct work *work)
{
}
diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
index 4fe7704..a1d528a 100644
--- a/sheep/cluster/corosync.c
+++ b/sheep/cluster/corosync.c
@@ -209,14 +209,14 @@ retry:
return 0;
}
-static void corosync_block(struct work *work, int idx)
+static void corosync_block(struct work *work)
{
struct corosync_block_msg *bm = container_of(work, typeof(*bm), work);
bm->cb(bm->msg);
}
-static void corosync_block_done(struct work *work, int idx)
+static void corosync_block_done(struct work *work)
{
struct corosync_block_msg *bm = container_of(work, typeof(*bm), work);
diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c
index 84d4bb0..729741e 100644
--- a/sheep/cluster/local.c
+++ b/sheep/cluster/local.c
@@ -364,7 +364,7 @@ static int local_notify(void *msg, size_t msg_len, void (*block_cb)(void *arg))
return 0;
}
-static void local_block(struct work *work, int idx)
+static void local_block(struct work *work)
{
struct local_event *ev;
@@ -381,7 +381,7 @@ static void local_block(struct work *work, int idx)
shm_queue_unlock();
}
-static void local_block_done(struct work *work, int idx)
+static void local_block_done(struct work *work)
{
}
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 7717b6b..32c9fd2 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -441,7 +441,7 @@ static int zk_notify(void *msg, size_t msg_len, void (*block_cb)(void *arg))
return add_event(zhandle, EVENT_NOTIFY, &this_node, msg, msg_len, block_cb);
}
-static void zk_block(struct work *work, int idx)
+static void zk_block(struct work *work)
{
struct zk_event ev;
@@ -453,7 +453,7 @@ static void zk_block(struct work *work, int idx)
zk_queue_push_back(zhandle, &ev);
}
-static void zk_block_done(struct work *work, int idx)
+static void zk_block_done(struct work *work)
{
}
diff --git a/sheep/group.c b/sheep/group.c
index 2c075c5..ae9958e 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -215,7 +215,7 @@ static void do_cluster_op(void *arg)
msg->rsp.result = ret;
}
-void do_cluster_request(struct work *work, int idx)
+void do_cluster_request(struct work *work)
{
struct request *req = container_of(work, struct request, work);
struct sd_req *hdr = (struct sd_req *)&req->rq;
@@ -886,7 +886,7 @@ static void cpg_event_free(struct cpg_event *cevent)
static struct work cpg_event_work;
-static void cpg_event_fn(struct work *work, int idx)
+static void cpg_event_fn(struct work *work)
{
struct cpg_event *cevent = sys->cur_cevent;
@@ -913,7 +913,7 @@ static void cpg_event_fn(struct work *work, int idx)
}
}
-static void cpg_event_done(struct work *work, int idx)
+static void cpg_event_done(struct work *work)
{
struct cpg_event *cevent;
@@ -1132,7 +1132,7 @@ do_retry:
while (!list_empty(&failed_req_list)) {
struct request *req = list_first_entry(&failed_req_list,
struct request, r_wlist);
- req->work.done(&req->work, 0);
+ req->work.done(&req->work);
retry = 1;
}
diff --git a/sheep/sdnet.c b/sheep/sdnet.c
index 510fd4e..5c97374 100644
--- a/sheep/sdnet.c
+++ b/sheep/sdnet.c
@@ -76,7 +76,7 @@ static void setup_access_to_local_objects(struct request *req)
req->local_oid = hdr->oid;
}
-static void io_op_done(struct work *work, int idx)
+static void io_op_done(struct work *work)
{
struct request *req = container_of(work, struct request, work);
struct cpg_event *cevent = &req->cev;
@@ -167,7 +167,7 @@ done:
req->done(req);
}
-static void local_op_done(struct work *work, int idx)
+static void local_op_done(struct work *work)
{
struct request *req = container_of(work, struct request, work);
@@ -179,19 +179,17 @@ static void local_op_done(struct work *work, int idx)
req->done(req);
}
-static void cluster_op_done(struct work *work, int idx)
+static void cluster_op_done(struct work *work)
{
/* request is forwarded to cpg group */
}
-static void do_local_request(struct work *work, int idx)
+static void do_local_request(struct work *work)
{
struct request *req = container_of(work, struct request, work);
struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&req->rp;
int ret = SD_RES_SUCCESS;
- dprintf("%d\n", idx);
-
if (has_process_work(req->op))
ret = do_process_work(req->op, &req->rq, &req->rp, req->data);
@@ -793,20 +791,18 @@ int remove_object(struct sheepdog_vnode_list_entry *e,
return 0;
}
-int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx,
- uint32_t epoch, int worker_idx)
+int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx, uint32_t epoch)
{
- static int cached_fds[NR_GW_WORKER_THREAD][SD_MAX_NODES];
- static uint32_t cached_epoch = 0;
- int i, j, fd, ret;
+ static __thread int cached_fds[SD_MAX_NODES];
+ static __thread uint32_t cached_epoch = 0;
+ int i, fd, ret;
char name[INET6_ADDRSTRLEN];
if (cached_epoch == 0) {
/* initialize */
- for (i = 0; i < NR_GW_WORKER_THREAD; i++) {
- for (j = 0; j < SD_MAX_NODES; j++)
- cached_fds[i][j] = -1;
- }
+ for (i = 0; i < SD_MAX_NODES; i++)
+ cached_fds[i] = -1;
+
cached_epoch = epoch;
}
@@ -816,18 +812,16 @@ int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx,
return -1;
}
if (after(epoch, cached_epoch)) {
- for (i = 0; i < NR_GW_WORKER_THREAD; i++) {
- for (j = 0; j < SD_MAX_NODES; j++) {
- if (cached_fds[i][j] >= 0)
- close(cached_fds[i][j]);
+ for (i = 0; i < SD_MAX_NODES; i++) {
+ if (cached_fds[i] >= 0)
+ close(cached_fds[i]);
- cached_fds[i][j] = -1;
- }
+ cached_fds[i] = -1;
}
cached_epoch = epoch;
}
- fd = cached_fds[worker_idx][node_idx];
+ fd = cached_fds[node_idx];
dprintf("%d, %d\n", epoch, fd);
if (cached_epoch == epoch && fd >= 0) {
@@ -855,7 +849,7 @@ int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx,
return -1;
}
- cached_fds[worker_idx][node_idx] = fd;
+ cached_fds[node_idx] = fd;
return fd;
}
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 7c61032..556d719 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -215,7 +215,7 @@ int create_cluster(int port, int64_t zone);
int leave_cluster(void);
void start_cpg_event_work(void);
-void do_io_request(struct work *work, int idx);
+void do_io_request(struct work *work);
int write_object_local(uint64_t oid, char *data, unsigned int datalen,
uint64_t offset, uint16_t flags, int copies,
uint32_t epoch, int create);
@@ -224,7 +224,7 @@ int read_object_local(uint64_t oid, char *data, unsigned int datalen,
int read_epoch(uint32_t *epoch, uint64_t *ctime,
struct sheepdog_node_list_entry *entries, int *nr_entries);
-void do_cluster_request(struct work *work, int idx);
+void do_cluster_request(struct work *work);
int update_epoch_store(uint32_t epoch);
int update_epoch_log(int epoch);
@@ -268,8 +268,7 @@ int remove_object(struct sheepdog_vnode_list_entry *e,
int vnodes, int zones, uint32_t node_version,
uint64_t oid, int nr);
-int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx,
- uint32_t epoch, int worker_idx);
+int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx, uint32_t epoch);
/* Operations */
diff --git a/sheep/store.c b/sheep/store.c
index 78277a7..c0d7b62 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -214,7 +214,7 @@ out:
static int do_local_io(struct request *req, uint32_t epoch);
-static int forward_read_obj_req(struct request *req, int idx)
+static int forward_read_obj_req(struct request *req)
{
int i, n, nr, fd, ret;
unsigned wlen, rlen;
@@ -249,7 +249,7 @@ static int forward_read_obj_req(struct request *req, int idx)
n = obj_to_sheep(e, nr, oid, 0);
- fd = get_sheep_fd(e[n].addr, e[n].port, e[n].node_idx, hdr.epoch, idx);
+ fd = get_sheep_fd(e[n].addr, e[n].port, e[n].node_idx, hdr.epoch);
if (fd < 0) {
ret = SD_RES_NETWORK_ERROR;
goto out;
@@ -270,7 +270,7 @@ out:
return ret;
}
-static int forward_write_obj_req(struct request *req, int idx)
+static int forward_write_obj_req(struct request *req)
{
int i, n, nr, fd, ret;
unsigned wlen;
@@ -314,7 +314,7 @@ static int forward_write_obj_req(struct request *req, int idx)
continue;
}
- fd = get_sheep_fd(e[n].addr, e[n].port, e[n].node_idx, hdr.epoch, idx);
+ fd = get_sheep_fd(e[n].addr, e[n].port, e[n].node_idx, hdr.epoch);
if (fd < 0) {
eprintf("failed to connect to %s:%"PRIu32"\n", name, e[n].port);
ret = SD_RES_NETWORK_ERROR;
@@ -666,7 +666,7 @@ static int do_local_io(struct request *req, uint32_t epoch)
return ret;
}
-static int fix_object_consistency(struct request *req, int idx)
+static int fix_object_consistency(struct request *req)
{
int ret = SD_RES_NO_MEM;
unsigned int data_length;
@@ -697,7 +697,7 @@ static int fix_object_consistency(struct request *req, int idx)
hdr->opcode = SD_OP_READ_OBJ;
hdr->flags = 0;
req->op = get_sd_op(SD_OP_READ_OBJ);
- ret = forward_read_obj_req(req, idx);
+ ret = forward_read_obj_req(req);
if (ret != SD_RES_SUCCESS) {
eprintf("failed to read object %d\n", ret);
goto out;
@@ -707,7 +707,7 @@ static int fix_object_consistency(struct request *req, int idx)
hdr->flags = SD_FLAG_CMD_WRITE;
hdr->oid = oid;
req->op = get_sd_op(SD_OP_WRITE_OBJ);
- ret = forward_write_obj_req(req, idx);
+ ret = forward_write_obj_req(req);
if (ret != SD_RES_SUCCESS) {
eprintf("failed to write object %d\n", ret);
goto out;
@@ -722,7 +722,7 @@ out:
return ret;
}
-void do_io_request(struct work *work, int idx)
+void do_io_request(struct work *work)
{
struct request *req = container_of(work, struct request, work);
int ret = SD_RES_SUCCESS;
@@ -732,7 +732,7 @@ void do_io_request(struct work *work, int idx)
uint32_t opcode = hdr->opcode;
uint32_t epoch = hdr->epoch;
- dprintf("%"PRIu32", %x, %" PRIx64" , %u\n", idx, opcode, oid, epoch);
+ dprintf("%x, %" PRIx64" , %u\n", opcode, oid, epoch);
if (hdr->flags & SD_FLAG_CMD_RECOVERY)
epoch = hdr->tgt_epoch;
@@ -742,20 +742,20 @@ void do_io_request(struct work *work, int idx)
} else {
/* fix object consistency when we read the object for the first time */
if (req->check_consistency) {
- ret = fix_object_consistency(req, idx);
+ ret = fix_object_consistency(req);
if (ret != SD_RES_SUCCESS)
goto out;
}
if (hdr->flags & SD_FLAG_CMD_WRITE)
- ret = forward_write_obj_req(req, idx);
+ ret = forward_write_obj_req(req);
else
- ret = forward_read_obj_req(req, idx);
+ ret = forward_read_obj_req(req);
}
out:
if (ret != SD_RES_SUCCESS)
- dprintf("failed: %"PRIu32", %x, %" PRIx64" , %u, %"PRIu32"\n",
- idx, opcode, oid, epoch, ret);
+ dprintf("failed: %x, %" PRIx64" , %u, %"PRIu32"\n",
+ opcode, oid, epoch, ret);
rsp->result = ret;
}
@@ -1324,7 +1324,7 @@ err:
return -1;
}
-static void recover_one(struct work *work, int idx)
+static void recover_one(struct work *work)
{
struct recovery_work *rw = container_of(work, struct recovery_work, work);
char *buf = NULL;
@@ -1403,8 +1403,6 @@ out:
static struct recovery_work *suspended_recovery_work;
-static void __start_recovery(struct work *work, int idx);
-
static void recover_timer(void *data)
{
struct recovery_work *rw = (struct recovery_work *)data;
@@ -1496,7 +1494,7 @@ int is_recoverying_oid(uint64_t oid)
return 0;
}
-static void recover_done(struct work *work, int idx)
+static void recover_done(struct work *work)
{
struct recovery_work *rw = container_of(work, struct recovery_work, work);
uint64_t oid;
@@ -1698,7 +1696,7 @@ fail:
return -1;
}
-static void __start_recovery(struct work *work, int idx)
+static void __start_recovery(struct work *work)
{
struct recovery_work *rw = container_of(work, struct recovery_work, work);
uint32_t epoch = rw->epoch;
diff --git a/sheep/vdi.c b/sheep/vdi.c
index f00ce04..a3b49fc 100644
--- a/sheep/vdi.c
+++ b/sheep/vdi.c
@@ -430,7 +430,7 @@ struct deletion_work {
static LIST_HEAD(deletion_work_list);
-static void delete_one(struct work *work, int idx)
+static void delete_one(struct work *work)
{
struct deletion_work *dw = container_of(work, struct deletion_work, work);
uint32_t vdi_id = *(((uint32_t *)dw->buf) + dw->count - dw->done - 1);
@@ -478,7 +478,7 @@ out:
free(inode);
}
-static void delete_one_done(struct work *work, int idx)
+static void delete_one_done(struct work *work)
{
struct deletion_work *dw = container_of(work, struct deletion_work, work);
diff --git a/sheep/work.c b/sheep/work.c
index f33b914..789272e 100644
--- a/sheep/work.c
+++ b/sheep/work.c
@@ -182,7 +182,7 @@ static void bs_thread_request_done(int fd, int events, void *data)
* save its attr for qork_post_done().
*/
attr = work->attr;
- work->done(work, 0);
+ work->done(work);
work_post_done(&wi->q, attr);
}
}
@@ -192,18 +192,10 @@ static void *worker_routine(void *arg)
{
struct worker_info *wi = arg;
struct work *work;
- int i, idx = 0;
eventfd_t value = 1;
- for (i = 0; i < wi->nr_threads; i++) {
- if (wi->worker_thread[i] == pthread_self()) {
- idx = i;
- break;
- }
- }
-
pthread_mutex_lock(&wi->startup_lock);
- dprintf("started this thread %d\n", idx);
+ /* started this thread */
pthread_mutex_unlock(&wi->startup_lock);
while (!(wi->q.wq_state & WQ_DEAD)) {
@@ -225,7 +217,7 @@ retest:
list_del(&work->w_list);
pthread_mutex_unlock(&wi->pending_lock);
- work->fn(work, idx);
+ work->fn(work);
pthread_mutex_lock(&wi->finished_lock);
list_add_tail(&work->w_list, &wi->finished_list);
diff --git a/sheep/work.h b/sheep/work.h
index a980600..9ef9936 100644
--- a/sheep/work.h
+++ b/sheep/work.h
@@ -4,7 +4,7 @@
struct work;
struct work_queue;
-typedef void (*work_func_t)(struct work *, int idx);
+typedef void (*work_func_t)(struct work *);
enum work_attr {
WORK_SIMPLE,
--
1.7.2.5
More information about the sheepdog
mailing list