[sheepdog] [PATCH v2 05/11] sheep: define some thread unsafe variables as thread_unsafe

MORITA Kazutaka morita.kazutaka at lab.ntt.co.jp
Fri Apr 19 09:14:54 CEST 2013


This declares current_vnode_info, sys->pending_block_list,
sys->pending_notify_list, and recovering_work as thread_unsafe
variables.

Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 sheep/group.c      | 92 +++++++++++++++++++++++++++++++++---------------------
 sheep/recovery.c   | 27 +++++++++-------
 sheep/sheep_priv.h |  3 --
 3 files changed, 72 insertions(+), 50 deletions(-)

diff --git a/sheep/group.c b/sheep/group.c
index f802756..342579f 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -45,7 +45,9 @@ static pthread_mutex_t wait_vdis_lock = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t wait_vdis_cond = PTHREAD_COND_INITIALIZER;
 static bool is_vdi_list_ready = true;
 
-static struct vnode_info *current_vnode_info;
+static thread_unsafe(struct vnode_info *) current_vnode_info;
+static thread_unsafe(struct list_head *) pending_block_list;
+static thread_unsafe(struct list_head *) pending_notify_list;
 
 static size_t get_join_message_size(struct join_message *jm)
 {
@@ -87,26 +89,27 @@ static int get_zones_nr_from(const struct sd_node *nodes, int nr_nodes)
 bool have_enough_zones(void)
 {
 	int max_copies;
+	struct vnode_info *cur_vinfo = thread_unsafe_get(current_vnode_info);
 
 	if (sys->flags & SD_FLAG_NOHALT)
 		return true;
 
-	if (!current_vnode_info)
+	if (!cur_vinfo)
 		return false;
 
 	max_copies = get_max_copy_number();
 
 	sd_dprintf("flags %d, nr_zones %d, min copies %d",
-		   sys->flags, current_vnode_info->nr_zones, max_copies);
+		   sys->flags, cur_vinfo->nr_zones, max_copies);
 
-	if (!current_vnode_info->nr_zones)
+	if (!cur_vinfo->nr_zones)
 		return false;
 
 	if (sys->flags & SD_FLAG_QUORUM) {
-		if (current_vnode_info->nr_zones > (max_copies/2))
+		if (cur_vinfo->nr_zones > (max_copies/2))
 			return true;
 	} else {
-		if (current_vnode_info->nr_zones >= max_copies)
+		if (cur_vinfo->nr_zones >= max_copies)
 			return true;
 	}
 	return false;
@@ -143,9 +146,11 @@ struct vnode_info *grab_vnode_info(struct vnode_info *vnode_info)
  */
 struct vnode_info *get_vnode_info(void)
 {
-	assert(current_vnode_info);
+	struct vnode_info *cur_vinfo = thread_unsafe_get(current_vnode_info);
 
-	return grab_vnode_info(current_vnode_info);
+	assert(cur_vinfo);
+
+	return grab_vnode_info(cur_vinfo);
 }
 
 /* Release a reference to the current vnode information. */
@@ -200,15 +205,15 @@ int local_get_node_list(const struct sd_req *req, struct sd_rsp *rsp,
 {
 	struct sd_node_rsp *node_rsp = (struct sd_node_rsp *)rsp;
 	int nr_nodes;
+	struct vnode_info *cur_vinfo = thread_unsafe_get(current_vnode_info);
 
-	if (current_vnode_info) {
-		nr_nodes = current_vnode_info->nr_nodes;
-		memcpy(data, current_vnode_info->nodes,
+	if (cur_vinfo) {
+		nr_nodes = cur_vinfo->nr_nodes;
+		memcpy(data, cur_vinfo->nodes,
 			sizeof(struct sd_node) * nr_nodes);
 		node_rsp->data_length = nr_nodes * sizeof(struct sd_node);
 		node_rsp->nr_nodes = nr_nodes;
-		node_rsp->local_idx = get_node_idx(current_vnode_info,
-						   &sys->this_node);
+		node_rsp->local_idx = get_node_idx(cur_vinfo, &sys->this_node);
 	} else {
 		node_rsp->nr_nodes = 0;
 		node_rsp->local_idx = 0;
@@ -282,7 +287,7 @@ bool sd_block_handler(const struct sd_node *sender)
 
 	cluster_op_running = true;
 
-	req = list_first_entry(&sys->pending_block_list,
+	req = list_first_entry(thread_unsafe_get(pending_block_list),
 				struct request, pending_list);
 	req->work.fn = do_process_work;
 	req->work.done = cluster_op_done;
@@ -303,14 +308,16 @@ void queue_cluster_request(struct request *req)
 	sd_dprintf("%s (%p)", op_name(req->op), req);
 
 	if (has_process_work(req->op)) {
-		list_add_tail(&req->pending_list, &sys->pending_block_list);
+		list_add_tail(&req->pending_list,
+			      thread_unsafe_get(pending_block_list));
 		sys->cdrv->block();
 	} else {
 		struct vdi_op_message *msg;
 		size_t size;
 
 		msg = prepare_cluster_msg(req, &size);
-		list_add_tail(&req->pending_list, &sys->pending_notify_list);
+		list_add_tail(&req->pending_list,
+			      thread_unsafe_get(pending_notify_list));
 
 		msg->rsp.result = SD_RES_SUCCESS;
 		sys->cdrv->notify(msg, size);
@@ -538,6 +545,7 @@ static int cluster_wait_for_join_check(const struct sd_node *joined,
 	int nr, nr_local_entries, nr_failed_entries, nr_delayed_nodes;
 	uint32_t local_epoch = get_latest_epoch();
 	int ret;
+	struct vnode_info *cur_vinfo;
 
 	if (jm->nr_nodes == 0)
 		return CJ_RES_JOIN_LATER;
@@ -580,10 +588,11 @@ static int cluster_wait_for_join_check(const struct sd_node *joined,
 		return CJ_RES_FAIL;
 	}
 
-	if (!current_vnode_info)
+	cur_vinfo = thread_unsafe_get(current_vnode_info);
+	if (!cur_vinfo)
 		nr = 1;
 	else
-		nr = current_vnode_info->nr_nodes + 1;
+		nr = cur_vinfo->nr_nodes + 1;
 
 	nr_delayed_nodes = get_nodes_nr_from(&sys->delayed_nodes);
 
@@ -704,10 +713,12 @@ static void get_vdis_done(struct work *work)
 
 int log_current_epoch(void)
 {
-	if (!current_vnode_info)
+	struct vnode_info *cur_vinfo = thread_unsafe_get(current_vnode_info);
+
+	if (!cur_vinfo)
 		return update_epoch_log(sys->epoch, NULL, 0);
-	return update_epoch_log(sys->epoch, current_vnode_info->nodes,
-				current_vnode_info->nr_nodes);
+	return update_epoch_log(sys->epoch, cur_vinfo->nodes,
+				cur_vinfo->nr_nodes);
 }
 
 static struct vnode_info *alloc_old_vnode_info(const struct sd_node *joined,
@@ -852,8 +863,9 @@ static void update_cluster_info(const struct join_message *msg,
 	if (!sys->join_finished)
 		finish_join(msg, joined, nodes, nr_nodes);
 
-	old_vnode_info = current_vnode_info;
-	current_vnode_info = alloc_vnode_info(nodes, nr_nodes);
+	old_vnode_info = thread_unsafe_get(current_vnode_info);
+	thread_unsafe_set(current_vnode_info,
+			  alloc_vnode_info(nodes, nr_nodes));
 
 	switch (msg->cluster_status) {
 	case SD_STATUS_OK:
@@ -887,7 +899,8 @@ static void update_cluster_info(const struct join_message *msg,
 						nodes, nr_nodes);
 			}
 
-			start_recovery(current_vnode_info, old_vnode_info);
+			start_recovery(thread_unsafe_get(current_vnode_info),
+				       old_vnode_info);
 		}
 
 		if (have_enough_zones())
@@ -922,11 +935,13 @@ void sd_notify_handler(const struct sd_node *sender, void *data,
 
 	if (node_is_local(sender)) {
 		if (has_process_work(op))
-			req = list_first_entry(&sys->pending_block_list,
-					       struct request, pending_list);
+			req = list_first_entry(
+				thread_unsafe_get(pending_block_list),
+				struct request, pending_list);
 		else
-			req = list_first_entry(&sys->pending_notify_list,
-					       struct request, pending_list);
+			req = list_first_entry(
+				thread_unsafe_get(pending_notify_list),
+				struct request, pending_list);
 		list_del(&req->pending_list);
 	}
 
@@ -1135,8 +1150,9 @@ void sd_join_handler(const struct sd_node *joined,
 			sys->join_finished = true;
 			sys->epoch = get_latest_epoch();
 
-			put_vnode_info(current_vnode_info);
-			current_vnode_info = alloc_vnode_info(&sys->this_node, 1);
+			put_vnode_info(thread_unsafe_get(current_vnode_info));
+			thread_unsafe_set(current_vnode_info,
+					  alloc_vnode_info(&sys->this_node, 1));
 		}
 
 		nr_local = get_nodes_nr_epoch(sys->epoch);
@@ -1178,14 +1194,16 @@ void sd_leave_handler(const struct sd_node *left, const struct sd_node *members,
 		/* Mark leave node as gateway only node */
 		sys->this_node.nr_vnodes = 0;
 
-	old_vnode_info = current_vnode_info;
-	current_vnode_info = alloc_vnode_info(members, nr_members);
+	old_vnode_info = thread_unsafe_get(current_vnode_info);
+	thread_unsafe_set(current_vnode_info,
+			  alloc_vnode_info(members, nr_members));
 	switch (sys->status) {
 	case SD_STATUS_HALT:
 	case SD_STATUS_OK:
 		uatomic_inc(&sys->epoch);
 		log_current_epoch();
-		start_recovery(current_vnode_info, old_vnode_info);
+		start_recovery(thread_unsafe_get(current_vnode_info),
+			       old_vnode_info);
 		if (!have_enough_zones())
 			sys->status = SD_STATUS_HALT;
 		break;
@@ -1249,8 +1267,12 @@ int create_cluster(int port, int64_t zone, int nr_vnodes,
 		sys->status = SD_STATUS_WAIT_FOR_FORMAT;
 	}
 
-	INIT_LIST_HEAD(&sys->pending_block_list);
-	INIT_LIST_HEAD(&sys->pending_notify_list);
+	thread_unsafe_set(pending_block_list,
+			  xzalloc(sizeof(struct list_head)));
+	INIT_LIST_HEAD(thread_unsafe_get(pending_block_list));
+	thread_unsafe_set(pending_notify_list,
+			  xzalloc(sizeof(struct list_head)));
+	INIT_LIST_HEAD(thread_unsafe_get(pending_notify_list));
 	INIT_LIST_HEAD(&sys->failed_nodes);
 	INIT_LIST_HEAD(&sys->delayed_nodes);
 
diff --git a/sheep/recovery.c b/sheep/recovery.c
index 0be846c..57ea38a 100644
--- a/sheep/recovery.c
+++ b/sheep/recovery.c
@@ -46,8 +46,9 @@ struct recovery_work {
 	struct vnode_info *cur_vinfo;
 };
 
-static struct recovery_work *next_rw;
-static struct recovery_work *recovering_work;
+struct recovery_work *next_rw;
+static thread_unsafe(struct recovery_work *) recovering_work;
+
 /* Dynamically grown list buffer default as 4M (2T storage) */
 #define DEFAULT_LIST_BUFFER_SIZE (UINT64_C(1) << 22)
 static size_t list_buffer_size = DEFAULT_LIST_BUFFER_SIZE;
@@ -215,12 +216,12 @@ static void recover_object_work(struct work *work)
 
 bool node_in_recovery(void)
 {
-	return !!recovering_work;
+	return thread_unsafe_get(recovering_work) != NULL;
 }
 
 static inline void prepare_schedule_oid(uint64_t oid)
 {
-	struct recovery_work *rw = recovering_work;
+	struct recovery_work *rw = thread_unsafe_get(recovering_work);
 	int i;
 
 	for (i = 0; i < rw->nr_prio_oids; i++)
@@ -253,7 +254,7 @@ static inline void prepare_schedule_oid(uint64_t oid)
 
 bool oid_in_recovery(uint64_t oid)
 {
-	struct recovery_work *rw = recovering_work;
+	struct recovery_work *rw = thread_unsafe_get(recovering_work);
 	int i;
 
 	if (!node_in_recovery())
@@ -310,7 +311,7 @@ static inline bool run_next_rw(struct recovery_work *rw)
 		return false;
 
 	free_recovery_work(rw);
-	recovering_work = nrw;
+	thread_unsafe_set(recovering_work, nrw);
 	wakeup_all_requests();
 	queue_work(sys->recovery_wqueue, &nrw->work);
 	sd_dprintf("recovery work is superseded");
@@ -345,7 +346,7 @@ static void notify_recovery_completion_main(struct work *work)
 static inline void finish_recovery(struct recovery_work *rw)
 {
 	uint32_t recovered_epoch = rw->epoch;
-	recovering_work = NULL;
+	thread_unsafe_set(recovering_work, NULL);
 
 	if (sd_store->end_recover)
 		sd_store->end_recover(sys->epoch - 1, rw->old_vinfo);
@@ -445,9 +446,11 @@ static void recover_next_object(struct recovery_work *rw)
 
 void resume_suspended_recovery(void)
 {
-	if (recovering_work && recovering_work->suspended) {
-		recovering_work->suspended = false;
-		recover_next_object(recovering_work);
+	struct recovery_work *rw = thread_unsafe_get(recovering_work);
+
+	if (rw && rw->suspended) {
+		rw->suspended = false;
+		recover_next_object(rw);
 	}
 }
 
@@ -647,7 +650,7 @@ int start_recovery(struct vnode_info *cur_vinfo, struct vnode_info *old_vinfo)
 	rw->work.fn = prepare_object_list;
 	rw->work.done = finish_object_list;
 
-	if (recovering_work != NULL) {
+	if (thread_unsafe_get(recovering_work) != NULL) {
 		/* skip the previous epoch recovery */
 		struct recovery_work *nrw = uatomic_xchg_ptr(&next_rw, rw);
 		if (nrw)
@@ -660,7 +663,7 @@ int start_recovery(struct vnode_info *cur_vinfo, struct vnode_info *old_vinfo)
 		 */
 		resume_suspended_recovery();
 	} else {
-		recovering_work = rw;
+		thread_unsafe_set(recovering_work, rw);
 		queue_work(sys->recovery_wqueue, &rw->work);
 	}
 out:
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 2693c0f..7f3a16e 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -88,9 +88,6 @@ struct cluster_info {
 	 */
 	struct list_head delayed_nodes;
 
-	struct list_head pending_block_list;
-	struct list_head pending_notify_list;
-
 	DECLARE_BITMAP(vdi_inuse, SD_NR_VDIS);
 
 	uint8_t nr_copies;
-- 
1.8.1.3.566.gaa39828




More information about the sheepdog mailing list