[Sheepdog] [PATCH 1/4] use __thread to simplify code

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Thu Dec 15 23:01:17 CET 2011


This also fixes a problem that get_sheep_fd() is not thread-safe.

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 lib/coroutine.c           |   43 ++++++-------------------------------------
 sheep/cluster/accord.c    |    8 ++++----
 sheep/cluster/corosync.c  |    4 ++--
 sheep/cluster/local.c     |    4 ++--
 sheep/cluster/zookeeper.c |    4 ++--
 sheep/group.c             |    8 ++++----
 sheep/sdnet.c             |   40 +++++++++++++++++-----------------------
 sheep/sheep_priv.h        |    7 +++----
 sheep/store.c             |   36 +++++++++++++++++-------------------
 sheep/vdi.c               |    4 ++--
 sheep/work.c              |   14 +++-----------
 sheep/work.h              |    2 +-
 12 files changed, 63 insertions(+), 111 deletions(-)

diff --git a/lib/coroutine.c b/lib/coroutine.c
index 7ec9bba..5b2ed79 100644
--- a/lib/coroutine.c
+++ b/lib/coroutine.c
@@ -29,7 +29,6 @@
 #include <stdlib.h>
 #include <setjmp.h>
 #include <stdint.h>
-#include <pthread.h>
 #include <ucontext.h>
 #include <errno.h>
 #include <assert.h>
@@ -72,7 +71,7 @@ struct co_ucontext {
 /**
  * Per-thread coroutine bookkeeping
  */
-struct co_thread_state{
+__thread struct co_thread_state{
 	/** Currently executing coroutine */
 	struct coroutine *current;
 
@@ -82,9 +81,7 @@ struct co_thread_state{
 
 	/** The default coroutine */
 	struct co_ucontext leader;
-};
-
-static pthread_key_t thread_state_key;
+} co_ts;
 
 static enum co_action coroutine_switch(struct coroutine *from,
 				       struct coroutine *to,
@@ -102,43 +99,15 @@ union cc_arg {
 
 static struct co_thread_state *coroutine_get_thread_state(void)
 {
-	struct co_thread_state *s = pthread_getspecific(thread_state_key);
+	struct co_thread_state *s = &co_ts;
 
-	if (!s) {
-		s = zalloc(sizeof(*s));
-		if (!s)
-			abort();
+	if (!s->current) {
 		s->current = &s->leader.base;
 		INIT_LIST_HEAD(&s->pool);
-		pthread_setspecific(thread_state_key, s);
 	}
 	return s;
 }
 
-static void coroutine_thread_cleanup(void *opaque)
-{
-	struct co_thread_state *s = opaque;
-	struct coroutine *co;
-	struct coroutine *tmp;
-
-	list_for_each_entry_safe(co, tmp, &s->pool, pool_next) {
-		free(container_of(co, struct co_ucontext, base)->stack);
-		free(co);
-	}
-	free(s);
-}
-
-static void __attribute__((constructor)) coroutine_init(void)
-{
-	int ret;
-
-	ret = pthread_key_create(&thread_state_key, coroutine_thread_cleanup);
-	if (ret != 0) {
-		fprintf(stderr, "unable to create leader key: %m\n");
-		abort();
-	}
-}
-
 static void coroutine_trampoline(int i0, int i1)
 {
 	union cc_arg arg;
@@ -302,9 +271,9 @@ struct coroutine *coroutine_self(void)
 
 int in_coroutine(void)
 {
-	struct co_thread_state *s = pthread_getspecific(thread_state_key);
+	struct co_thread_state *s = &co_ts;
 
-	return s && s->current->caller;
+	return s->current && s->current->caller;
 }
 
 
diff --git a/sheep/cluster/accord.c b/sheep/cluster/accord.c
index a685f9e..cc762e3 100644
--- a/sheep/cluster/accord.c
+++ b/sheep/cluster/accord.c
@@ -391,7 +391,7 @@ struct acrd_leave_info {
 	struct work work;
 };
 
-static void __acrd_leave(struct work *work, int idx)
+static void __acrd_leave(struct work *work)
 {
 	struct acrd_leave_info *info = container_of(work, typeof(*info), work);
 	struct acrd_handle *ah = info->ah;
@@ -425,7 +425,7 @@ static void __acrd_leave(struct work *work, int idx)
 	pthread_mutex_unlock(&queue_lock);
 }
 
-static void __acrd_leave_done(struct work *work, int idx)
+static void __acrd_leave_done(struct work *work)
 {
 	struct acrd_leave_info *info = container_of(work, typeof(*info), work);
 
@@ -536,7 +536,7 @@ static int accord_notify(void *msg, size_t msg_len, void (*block_cb)(void *arg))
 	return add_event(ahandle, EVENT_NOTIFY, &this_node, msg, msg_len, block_cb);
 }
 
-static void acrd_block(struct work *work, int idx)
+static void acrd_block(struct work *work)
 {
 	struct acrd_event ev;
 
@@ -552,7 +552,7 @@ static void acrd_block(struct work *work, int idx)
 	pthread_mutex_unlock(&queue_lock);
 }
 
-static void acrd_block_done(struct work *work, int idx)
+static void acrd_block_done(struct work *work)
 {
 }
 
diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
index 4fe7704..a1d528a 100644
--- a/sheep/cluster/corosync.c
+++ b/sheep/cluster/corosync.c
@@ -209,14 +209,14 @@ retry:
 	return 0;
 }
 
-static void corosync_block(struct work *work, int idx)
+static void corosync_block(struct work *work)
 {
 	struct corosync_block_msg *bm = container_of(work, typeof(*bm), work);
 
 	bm->cb(bm->msg);
 }
 
-static void corosync_block_done(struct work *work, int idx)
+static void corosync_block_done(struct work *work)
 {
 	struct corosync_block_msg *bm = container_of(work, typeof(*bm), work);
 
diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c
index 84d4bb0..729741e 100644
--- a/sheep/cluster/local.c
+++ b/sheep/cluster/local.c
@@ -364,7 +364,7 @@ static int local_notify(void *msg, size_t msg_len, void (*block_cb)(void *arg))
 	return 0;
 }
 
-static void local_block(struct work *work, int idx)
+static void local_block(struct work *work)
 {
 	struct local_event *ev;
 
@@ -381,7 +381,7 @@ static void local_block(struct work *work, int idx)
 	shm_queue_unlock();
 }
 
-static void local_block_done(struct work *work, int idx)
+static void local_block_done(struct work *work)
 {
 }
 
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 7717b6b..32c9fd2 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -441,7 +441,7 @@ static int zk_notify(void *msg, size_t msg_len, void (*block_cb)(void *arg))
 	return add_event(zhandle, EVENT_NOTIFY, &this_node, msg, msg_len, block_cb);
 }
 
-static void zk_block(struct work *work, int idx)
+static void zk_block(struct work *work)
 {
 	struct zk_event ev;
 
@@ -453,7 +453,7 @@ static void zk_block(struct work *work, int idx)
 	zk_queue_push_back(zhandle, &ev);
 }
 
-static void zk_block_done(struct work *work, int idx)
+static void zk_block_done(struct work *work)
 {
 }
 
diff --git a/sheep/group.c b/sheep/group.c
index 2c075c5..ae9958e 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -215,7 +215,7 @@ static void do_cluster_op(void *arg)
 	msg->rsp.result = ret;
 }
 
-void do_cluster_request(struct work *work, int idx)
+void do_cluster_request(struct work *work)
 {
 	struct request *req = container_of(work, struct request, work);
 	struct sd_req *hdr = (struct sd_req *)&req->rq;
@@ -886,7 +886,7 @@ static void cpg_event_free(struct cpg_event *cevent)
 
 static struct work cpg_event_work;
 
-static void cpg_event_fn(struct work *work, int idx)
+static void cpg_event_fn(struct work *work)
 {
 	struct cpg_event *cevent = sys->cur_cevent;
 
@@ -913,7 +913,7 @@ static void cpg_event_fn(struct work *work, int idx)
 	}
 }
 
-static void cpg_event_done(struct work *work, int idx)
+static void cpg_event_done(struct work *work)
 {
 	struct cpg_event *cevent;
 
@@ -1132,7 +1132,7 @@ do_retry:
 	while (!list_empty(&failed_req_list)) {
 		struct request *req = list_first_entry(&failed_req_list,
 						       struct request, r_wlist);
-		req->work.done(&req->work, 0);
+		req->work.done(&req->work);
 
 		retry = 1;
 	}
diff --git a/sheep/sdnet.c b/sheep/sdnet.c
index 510fd4e..5c97374 100644
--- a/sheep/sdnet.c
+++ b/sheep/sdnet.c
@@ -76,7 +76,7 @@ static void setup_access_to_local_objects(struct request *req)
 		req->local_oid = hdr->oid;
 }
 
-static void io_op_done(struct work *work, int idx)
+static void io_op_done(struct work *work)
 {
 	struct request *req = container_of(work, struct request, work);
 	struct cpg_event *cevent = &req->cev;
@@ -167,7 +167,7 @@ done:
 		req->done(req);
 }
 
-static void local_op_done(struct work *work, int idx)
+static void local_op_done(struct work *work)
 {
 	struct request *req = container_of(work, struct request, work);
 
@@ -179,19 +179,17 @@ static void local_op_done(struct work *work, int idx)
 	req->done(req);
 }
 
-static void cluster_op_done(struct work *work, int idx)
+static void cluster_op_done(struct work *work)
 {
 	/* request is forwarded to cpg group */
 }
 
-static void do_local_request(struct work *work, int idx)
+static void do_local_request(struct work *work)
 {
 	struct request *req = container_of(work, struct request, work);
 	struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&req->rp;
 	int ret = SD_RES_SUCCESS;
 
-	dprintf("%d\n", idx);
-
 	if (has_process_work(req->op))
 		ret = do_process_work(req->op, &req->rq, &req->rp, req->data);
 
@@ -793,20 +791,18 @@ int remove_object(struct sheepdog_vnode_list_entry *e,
 	return 0;
 }
 
-int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx,
-		 uint32_t epoch, int worker_idx)
+int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx, uint32_t epoch)
 {
-	static int cached_fds[NR_GW_WORKER_THREAD][SD_MAX_NODES];
-	static uint32_t cached_epoch = 0;
-	int i, j, fd, ret;
+	static __thread int cached_fds[SD_MAX_NODES];
+	static __thread uint32_t cached_epoch = 0;
+	int i, fd, ret;
 	char name[INET6_ADDRSTRLEN];
 
 	if (cached_epoch == 0) {
 		/* initialize */
-		for (i = 0; i < NR_GW_WORKER_THREAD; i++) {
-			for (j = 0; j < SD_MAX_NODES; j++)
-				cached_fds[i][j] = -1;
-		}
+		for (i = 0; i < SD_MAX_NODES; i++)
+			cached_fds[i] = -1;
+
 		cached_epoch = epoch;
 	}
 
@@ -816,18 +812,16 @@ int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx,
 		return -1;
 	}
 	if (after(epoch, cached_epoch)) {
-		for (i = 0; i < NR_GW_WORKER_THREAD; i++) {
-			for (j = 0; j < SD_MAX_NODES; j++) {
-				if (cached_fds[i][j] >= 0)
-					close(cached_fds[i][j]);
+		for (i = 0; i < SD_MAX_NODES; i++) {
+			if (cached_fds[i] >= 0)
+				close(cached_fds[i]);
 
-				cached_fds[i][j] = -1;
-			}
+			cached_fds[i] = -1;
 		}
 		cached_epoch = epoch;
 	}
 
-	fd = cached_fds[worker_idx][node_idx];
+	fd = cached_fds[node_idx];
 	dprintf("%d, %d\n", epoch, fd);
 
 	if (cached_epoch == epoch && fd >= 0) {
@@ -855,7 +849,7 @@ int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx,
 		return -1;
 	}
 
-	cached_fds[worker_idx][node_idx] = fd;
+	cached_fds[node_idx] = fd;
 
 	return fd;
 }
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 7c61032..556d719 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -215,7 +215,7 @@ int create_cluster(int port, int64_t zone);
 int leave_cluster(void);
 
 void start_cpg_event_work(void);
-void do_io_request(struct work *work, int idx);
+void do_io_request(struct work *work);
 int write_object_local(uint64_t oid, char *data, unsigned int datalen,
 		       uint64_t offset, uint16_t flags, int copies,
 		       uint32_t epoch, int create);
@@ -224,7 +224,7 @@ int read_object_local(uint64_t oid, char *data, unsigned int datalen,
 
 int read_epoch(uint32_t *epoch, uint64_t *ctime,
 	       struct sheepdog_node_list_entry *entries, int *nr_entries);
-void do_cluster_request(struct work *work, int idx);
+void do_cluster_request(struct work *work);
 
 int update_epoch_store(uint32_t epoch);
 int update_epoch_log(int epoch);
@@ -268,8 +268,7 @@ int remove_object(struct sheepdog_vnode_list_entry *e,
 		  int vnodes, int zones, uint32_t node_version,
 		  uint64_t oid, int nr);
 
-int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx,
-		 uint32_t epoch, int worker_idx);
+int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx, uint32_t epoch);
 
 /* Operations */
 
diff --git a/sheep/store.c b/sheep/store.c
index 78277a7..c0d7b62 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -214,7 +214,7 @@ out:
 
 static int do_local_io(struct request *req, uint32_t epoch);
 
-static int forward_read_obj_req(struct request *req, int idx)
+static int forward_read_obj_req(struct request *req)
 {
 	int i, n, nr, fd, ret;
 	unsigned wlen, rlen;
@@ -249,7 +249,7 @@ static int forward_read_obj_req(struct request *req, int idx)
 
 	n = obj_to_sheep(e, nr, oid, 0);
 
-	fd = get_sheep_fd(e[n].addr, e[n].port, e[n].node_idx, hdr.epoch, idx);
+	fd = get_sheep_fd(e[n].addr, e[n].port, e[n].node_idx, hdr.epoch);
 	if (fd < 0) {
 		ret = SD_RES_NETWORK_ERROR;
 		goto out;
@@ -270,7 +270,7 @@ out:
 	return ret;
 }
 
-static int forward_write_obj_req(struct request *req, int idx)
+static int forward_write_obj_req(struct request *req)
 {
 	int i, n, nr, fd, ret;
 	unsigned wlen;
@@ -314,7 +314,7 @@ static int forward_write_obj_req(struct request *req, int idx)
 			continue;
 		}
 
-		fd = get_sheep_fd(e[n].addr, e[n].port, e[n].node_idx, hdr.epoch, idx);
+		fd = get_sheep_fd(e[n].addr, e[n].port, e[n].node_idx, hdr.epoch);
 		if (fd < 0) {
 			eprintf("failed to connect to %s:%"PRIu32"\n", name, e[n].port);
 			ret = SD_RES_NETWORK_ERROR;
@@ -666,7 +666,7 @@ static int do_local_io(struct request *req, uint32_t epoch)
 	return ret;
 }
 
-static int fix_object_consistency(struct request *req, int idx)
+static int fix_object_consistency(struct request *req)
 {
 	int ret = SD_RES_NO_MEM;
 	unsigned int data_length;
@@ -697,7 +697,7 @@ static int fix_object_consistency(struct request *req, int idx)
 	hdr->opcode = SD_OP_READ_OBJ;
 	hdr->flags = 0;
 	req->op = get_sd_op(SD_OP_READ_OBJ);
-	ret = forward_read_obj_req(req, idx);
+	ret = forward_read_obj_req(req);
 	if (ret != SD_RES_SUCCESS) {
 		eprintf("failed to read object %d\n", ret);
 		goto out;
@@ -707,7 +707,7 @@ static int fix_object_consistency(struct request *req, int idx)
 	hdr->flags = SD_FLAG_CMD_WRITE;
 	hdr->oid = oid;
 	req->op = get_sd_op(SD_OP_WRITE_OBJ);
-	ret = forward_write_obj_req(req, idx);
+	ret = forward_write_obj_req(req);
 	if (ret != SD_RES_SUCCESS) {
 		eprintf("failed to write object %d\n", ret);
 		goto out;
@@ -722,7 +722,7 @@ out:
 	return ret;
 }
 
-void do_io_request(struct work *work, int idx)
+void do_io_request(struct work *work)
 {
 	struct request *req = container_of(work, struct request, work);
 	int ret = SD_RES_SUCCESS;
@@ -732,7 +732,7 @@ void do_io_request(struct work *work, int idx)
 	uint32_t opcode = hdr->opcode;
 	uint32_t epoch = hdr->epoch;
 
-	dprintf("%"PRIu32", %x, %" PRIx64" , %u\n", idx, opcode, oid, epoch);
+	dprintf("%x, %" PRIx64" , %u\n", opcode, oid, epoch);
 
 	if (hdr->flags & SD_FLAG_CMD_RECOVERY)
 		epoch = hdr->tgt_epoch;
@@ -742,20 +742,20 @@ void do_io_request(struct work *work, int idx)
 	} else {
 		/* fix object consistency when we read the object for the first time */
 		if (req->check_consistency) {
-			ret = fix_object_consistency(req, idx);
+			ret = fix_object_consistency(req);
 			if (ret != SD_RES_SUCCESS)
 				goto out;
 		}
 
 		if (hdr->flags & SD_FLAG_CMD_WRITE)
-			ret = forward_write_obj_req(req, idx);
+			ret = forward_write_obj_req(req);
 		else
-			ret = forward_read_obj_req(req, idx);
+			ret = forward_read_obj_req(req);
 	}
 out:
 	if (ret != SD_RES_SUCCESS)
-		dprintf("failed: %"PRIu32", %x, %" PRIx64" , %u, %"PRIu32"\n",
-			idx, opcode, oid, epoch, ret);
+		dprintf("failed: %x, %" PRIx64" , %u, %"PRIu32"\n",
+			opcode, oid, epoch, ret);
 	rsp->result = ret;
 }
 
@@ -1324,7 +1324,7 @@ err:
 	return -1;
 }
 
-static void recover_one(struct work *work, int idx)
+static void recover_one(struct work *work)
 {
 	struct recovery_work *rw = container_of(work, struct recovery_work, work);
 	char *buf = NULL;
@@ -1403,8 +1403,6 @@ out:
 
 static struct recovery_work *suspended_recovery_work;
 
-static void __start_recovery(struct work *work, int idx);
-
 static void recover_timer(void *data)
 {
 	struct recovery_work *rw = (struct recovery_work *)data;
@@ -1496,7 +1494,7 @@ int is_recoverying_oid(uint64_t oid)
 	return 0;
 }
 
-static void recover_done(struct work *work, int idx)
+static void recover_done(struct work *work)
 {
 	struct recovery_work *rw = container_of(work, struct recovery_work, work);
 	uint64_t oid;
@@ -1698,7 +1696,7 @@ fail:
 	return -1;
 }
 
-static void __start_recovery(struct work *work, int idx)
+static void __start_recovery(struct work *work)
 {
 	struct recovery_work *rw = container_of(work, struct recovery_work, work);
 	uint32_t epoch = rw->epoch;
diff --git a/sheep/vdi.c b/sheep/vdi.c
index f00ce04..a3b49fc 100644
--- a/sheep/vdi.c
+++ b/sheep/vdi.c
@@ -430,7 +430,7 @@ struct deletion_work {
 
 static LIST_HEAD(deletion_work_list);
 
-static void delete_one(struct work *work, int idx)
+static void delete_one(struct work *work)
 {
 	struct deletion_work *dw = container_of(work, struct deletion_work, work);
 	uint32_t vdi_id = *(((uint32_t *)dw->buf) + dw->count - dw->done - 1);
@@ -478,7 +478,7 @@ out:
 	free(inode);
 }
 
-static void delete_one_done(struct work *work, int idx)
+static void delete_one_done(struct work *work)
 {
 	struct deletion_work *dw = container_of(work, struct deletion_work, work);
 
diff --git a/sheep/work.c b/sheep/work.c
index f33b914..789272e 100644
--- a/sheep/work.c
+++ b/sheep/work.c
@@ -182,7 +182,7 @@ static void bs_thread_request_done(int fd, int events, void *data)
 			 * save its attr for qork_post_done().
 			 */
 			attr = work->attr;
-			work->done(work, 0);
+			work->done(work);
 			work_post_done(&wi->q, attr);
 		}
 	}
@@ -192,18 +192,10 @@ static void *worker_routine(void *arg)
 {
 	struct worker_info *wi = arg;
 	struct work *work;
-	int i, idx = 0;
 	eventfd_t value = 1;
 
-	for (i = 0; i < wi->nr_threads; i++) {
-		if (wi->worker_thread[i] == pthread_self()) {
-			idx = i;
-			break;
-		}
-	}
-
 	pthread_mutex_lock(&wi->startup_lock);
-	dprintf("started this thread %d\n", idx);
+	/* started this thread */
 	pthread_mutex_unlock(&wi->startup_lock);
 
 	while (!(wi->q.wq_state & WQ_DEAD)) {
@@ -225,7 +217,7 @@ retest:
 		list_del(&work->w_list);
 		pthread_mutex_unlock(&wi->pending_lock);
 
-		work->fn(work, idx);
+		work->fn(work);
 
 		pthread_mutex_lock(&wi->finished_lock);
 		list_add_tail(&work->w_list, &wi->finished_list);
diff --git a/sheep/work.h b/sheep/work.h
index a980600..9ef9936 100644
--- a/sheep/work.h
+++ b/sheep/work.h
@@ -4,7 +4,7 @@
 struct work;
 struct work_queue;
 
-typedef void (*work_func_t)(struct work *, int idx);
+typedef void (*work_func_t)(struct work *);
 
 enum work_attr {
 	WORK_SIMPLE,
-- 
1.7.2.5




More information about the sheepdog mailing list