[sheepdog] [PATCH v2 05/11] sheep: define some thread unsafe variables as thread_unsafe
MORITA Kazutaka
morita.kazutaka at lab.ntt.co.jp
Fri Apr 19 09:14:54 CEST 2013
This declares current_vnode_info, sys->pending_block_list,
sys->pending_notify_list, and recovering_work as thread_unsafe
variables.
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
sheep/group.c | 92 +++++++++++++++++++++++++++++++++---------------------
sheep/recovery.c | 27 +++++++++-------
sheep/sheep_priv.h | 3 --
3 files changed, 72 insertions(+), 50 deletions(-)
diff --git a/sheep/group.c b/sheep/group.c
index f802756..342579f 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -45,7 +45,9 @@ static pthread_mutex_t wait_vdis_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t wait_vdis_cond = PTHREAD_COND_INITIALIZER;
static bool is_vdi_list_ready = true;
-static struct vnode_info *current_vnode_info;
+static thread_unsafe(struct vnode_info *) current_vnode_info;
+static thread_unsafe(struct list_head *) pending_block_list;
+static thread_unsafe(struct list_head *) pending_notify_list;
static size_t get_join_message_size(struct join_message *jm)
{
@@ -87,26 +89,27 @@ static int get_zones_nr_from(const struct sd_node *nodes, int nr_nodes)
bool have_enough_zones(void)
{
int max_copies;
+ struct vnode_info *cur_vinfo = thread_unsafe_get(current_vnode_info);
if (sys->flags & SD_FLAG_NOHALT)
return true;
- if (!current_vnode_info)
+ if (!cur_vinfo)
return false;
max_copies = get_max_copy_number();
sd_dprintf("flags %d, nr_zones %d, min copies %d",
- sys->flags, current_vnode_info->nr_zones, max_copies);
+ sys->flags, cur_vinfo->nr_zones, max_copies);
- if (!current_vnode_info->nr_zones)
+ if (!cur_vinfo->nr_zones)
return false;
if (sys->flags & SD_FLAG_QUORUM) {
- if (current_vnode_info->nr_zones > (max_copies/2))
+ if (cur_vinfo->nr_zones > (max_copies/2))
return true;
} else {
- if (current_vnode_info->nr_zones >= max_copies)
+ if (cur_vinfo->nr_zones >= max_copies)
return true;
}
return false;
@@ -143,9 +146,11 @@ struct vnode_info *grab_vnode_info(struct vnode_info *vnode_info)
*/
struct vnode_info *get_vnode_info(void)
{
- assert(current_vnode_info);
+ struct vnode_info *cur_vinfo = thread_unsafe_get(current_vnode_info);
- return grab_vnode_info(current_vnode_info);
+ assert(cur_vinfo);
+
+ return grab_vnode_info(cur_vinfo);
}
/* Release a reference to the current vnode information. */
@@ -200,15 +205,15 @@ int local_get_node_list(const struct sd_req *req, struct sd_rsp *rsp,
{
struct sd_node_rsp *node_rsp = (struct sd_node_rsp *)rsp;
int nr_nodes;
+ struct vnode_info *cur_vinfo = thread_unsafe_get(current_vnode_info);
- if (current_vnode_info) {
- nr_nodes = current_vnode_info->nr_nodes;
- memcpy(data, current_vnode_info->nodes,
+ if (cur_vinfo) {
+ nr_nodes = cur_vinfo->nr_nodes;
+ memcpy(data, cur_vinfo->nodes,
sizeof(struct sd_node) * nr_nodes);
node_rsp->data_length = nr_nodes * sizeof(struct sd_node);
node_rsp->nr_nodes = nr_nodes;
- node_rsp->local_idx = get_node_idx(current_vnode_info,
- &sys->this_node);
+ node_rsp->local_idx = get_node_idx(cur_vinfo, &sys->this_node);
} else {
node_rsp->nr_nodes = 0;
node_rsp->local_idx = 0;
@@ -282,7 +287,7 @@ bool sd_block_handler(const struct sd_node *sender)
cluster_op_running = true;
- req = list_first_entry(&sys->pending_block_list,
+ req = list_first_entry(thread_unsafe_get(pending_block_list),
struct request, pending_list);
req->work.fn = do_process_work;
req->work.done = cluster_op_done;
@@ -303,14 +308,16 @@ void queue_cluster_request(struct request *req)
sd_dprintf("%s (%p)", op_name(req->op), req);
if (has_process_work(req->op)) {
- list_add_tail(&req->pending_list, &sys->pending_block_list);
+ list_add_tail(&req->pending_list,
+ thread_unsafe_get(pending_block_list));
sys->cdrv->block();
} else {
struct vdi_op_message *msg;
size_t size;
msg = prepare_cluster_msg(req, &size);
- list_add_tail(&req->pending_list, &sys->pending_notify_list);
+ list_add_tail(&req->pending_list,
+ thread_unsafe_get(pending_notify_list));
msg->rsp.result = SD_RES_SUCCESS;
sys->cdrv->notify(msg, size);
@@ -538,6 +545,7 @@ static int cluster_wait_for_join_check(const struct sd_node *joined,
int nr, nr_local_entries, nr_failed_entries, nr_delayed_nodes;
uint32_t local_epoch = get_latest_epoch();
int ret;
+ struct vnode_info *cur_vinfo;
if (jm->nr_nodes == 0)
return CJ_RES_JOIN_LATER;
@@ -580,10 +588,11 @@ static int cluster_wait_for_join_check(const struct sd_node *joined,
return CJ_RES_FAIL;
}
- if (!current_vnode_info)
+ cur_vinfo = thread_unsafe_get(current_vnode_info);
+ if (!cur_vinfo)
nr = 1;
else
- nr = current_vnode_info->nr_nodes + 1;
+ nr = cur_vinfo->nr_nodes + 1;
nr_delayed_nodes = get_nodes_nr_from(&sys->delayed_nodes);
@@ -704,10 +713,12 @@ static void get_vdis_done(struct work *work)
int log_current_epoch(void)
{
- if (!current_vnode_info)
+ struct vnode_info *cur_vinfo = thread_unsafe_get(current_vnode_info);
+
+ if (!cur_vinfo)
return update_epoch_log(sys->epoch, NULL, 0);
- return update_epoch_log(sys->epoch, current_vnode_info->nodes,
- current_vnode_info->nr_nodes);
+ return update_epoch_log(sys->epoch, cur_vinfo->nodes,
+ cur_vinfo->nr_nodes);
}
static struct vnode_info *alloc_old_vnode_info(const struct sd_node *joined,
@@ -852,8 +863,9 @@ static void update_cluster_info(const struct join_message *msg,
if (!sys->join_finished)
finish_join(msg, joined, nodes, nr_nodes);
- old_vnode_info = current_vnode_info;
- current_vnode_info = alloc_vnode_info(nodes, nr_nodes);
+ old_vnode_info = thread_unsafe_get(current_vnode_info);
+ thread_unsafe_set(current_vnode_info,
+ alloc_vnode_info(nodes, nr_nodes));
switch (msg->cluster_status) {
case SD_STATUS_OK:
@@ -887,7 +899,8 @@ static void update_cluster_info(const struct join_message *msg,
nodes, nr_nodes);
}
- start_recovery(current_vnode_info, old_vnode_info);
+ start_recovery(thread_unsafe_get(current_vnode_info),
+ old_vnode_info);
}
if (have_enough_zones())
@@ -922,11 +935,13 @@ void sd_notify_handler(const struct sd_node *sender, void *data,
if (node_is_local(sender)) {
if (has_process_work(op))
- req = list_first_entry(&sys->pending_block_list,
- struct request, pending_list);
+ req = list_first_entry(
+ thread_unsafe_get(pending_block_list),
+ struct request, pending_list);
else
- req = list_first_entry(&sys->pending_notify_list,
- struct request, pending_list);
+ req = list_first_entry(
+ thread_unsafe_get(pending_notify_list),
+ struct request, pending_list);
list_del(&req->pending_list);
}
@@ -1135,8 +1150,9 @@ void sd_join_handler(const struct sd_node *joined,
sys->join_finished = true;
sys->epoch = get_latest_epoch();
- put_vnode_info(current_vnode_info);
- current_vnode_info = alloc_vnode_info(&sys->this_node, 1);
+ put_vnode_info(thread_unsafe_get(current_vnode_info));
+ thread_unsafe_set(current_vnode_info,
+ alloc_vnode_info(&sys->this_node, 1));
}
nr_local = get_nodes_nr_epoch(sys->epoch);
@@ -1178,14 +1194,16 @@ void sd_leave_handler(const struct sd_node *left, const struct sd_node *members,
/* Mark leave node as gateway only node */
sys->this_node.nr_vnodes = 0;
- old_vnode_info = current_vnode_info;
- current_vnode_info = alloc_vnode_info(members, nr_members);
+ old_vnode_info = thread_unsafe_get(current_vnode_info);
+ thread_unsafe_set(current_vnode_info,
+ alloc_vnode_info(members, nr_members));
switch (sys->status) {
case SD_STATUS_HALT:
case SD_STATUS_OK:
uatomic_inc(&sys->epoch);
log_current_epoch();
- start_recovery(current_vnode_info, old_vnode_info);
+ start_recovery(thread_unsafe_get(current_vnode_info),
+ old_vnode_info);
if (!have_enough_zones())
sys->status = SD_STATUS_HALT;
break;
@@ -1249,8 +1267,12 @@ int create_cluster(int port, int64_t zone, int nr_vnodes,
sys->status = SD_STATUS_WAIT_FOR_FORMAT;
}
- INIT_LIST_HEAD(&sys->pending_block_list);
- INIT_LIST_HEAD(&sys->pending_notify_list);
+ thread_unsafe_set(pending_block_list,
+ xzalloc(sizeof(struct list_head)));
+ INIT_LIST_HEAD(thread_unsafe_get(pending_block_list));
+ thread_unsafe_set(pending_notify_list,
+ xzalloc(sizeof(struct list_head)));
+ INIT_LIST_HEAD(thread_unsafe_get(pending_notify_list));
INIT_LIST_HEAD(&sys->failed_nodes);
INIT_LIST_HEAD(&sys->delayed_nodes);
diff --git a/sheep/recovery.c b/sheep/recovery.c
index 0be846c..57ea38a 100644
--- a/sheep/recovery.c
+++ b/sheep/recovery.c
@@ -46,8 +46,9 @@ struct recovery_work {
struct vnode_info *cur_vinfo;
};
-static struct recovery_work *next_rw;
-static struct recovery_work *recovering_work;
+struct recovery_work *next_rw;
+static thread_unsafe(struct recovery_work *) recovering_work;
+
/* Dynamically grown list buffer default as 4M (2T storage) */
#define DEFAULT_LIST_BUFFER_SIZE (UINT64_C(1) << 22)
static size_t list_buffer_size = DEFAULT_LIST_BUFFER_SIZE;
@@ -215,12 +216,12 @@ static void recover_object_work(struct work *work)
bool node_in_recovery(void)
{
- return !!recovering_work;
+ return thread_unsafe_get(recovering_work) != NULL;
}
static inline void prepare_schedule_oid(uint64_t oid)
{
- struct recovery_work *rw = recovering_work;
+ struct recovery_work *rw = thread_unsafe_get(recovering_work);
int i;
for (i = 0; i < rw->nr_prio_oids; i++)
@@ -253,7 +254,7 @@ static inline void prepare_schedule_oid(uint64_t oid)
bool oid_in_recovery(uint64_t oid)
{
- struct recovery_work *rw = recovering_work;
+ struct recovery_work *rw = thread_unsafe_get(recovering_work);
int i;
if (!node_in_recovery())
@@ -310,7 +311,7 @@ static inline bool run_next_rw(struct recovery_work *rw)
return false;
free_recovery_work(rw);
- recovering_work = nrw;
+ thread_unsafe_set(recovering_work, nrw);
wakeup_all_requests();
queue_work(sys->recovery_wqueue, &nrw->work);
sd_dprintf("recovery work is superseded");
@@ -345,7 +346,7 @@ static void notify_recovery_completion_main(struct work *work)
static inline void finish_recovery(struct recovery_work *rw)
{
uint32_t recovered_epoch = rw->epoch;
- recovering_work = NULL;
+ thread_unsafe_set(recovering_work, NULL);
if (sd_store->end_recover)
sd_store->end_recover(sys->epoch - 1, rw->old_vinfo);
@@ -445,9 +446,11 @@ static void recover_next_object(struct recovery_work *rw)
void resume_suspended_recovery(void)
{
- if (recovering_work && recovering_work->suspended) {
- recovering_work->suspended = false;
- recover_next_object(recovering_work);
+ struct recovery_work *rw = thread_unsafe_get(recovering_work);
+
+ if (rw && rw->suspended) {
+ rw->suspended = false;
+ recover_next_object(rw);
}
}
@@ -647,7 +650,7 @@ int start_recovery(struct vnode_info *cur_vinfo, struct vnode_info *old_vinfo)
rw->work.fn = prepare_object_list;
rw->work.done = finish_object_list;
- if (recovering_work != NULL) {
+ if (thread_unsafe_get(recovering_work) != NULL) {
/* skip the previous epoch recovery */
struct recovery_work *nrw = uatomic_xchg_ptr(&next_rw, rw);
if (nrw)
@@ -660,7 +663,7 @@ int start_recovery(struct vnode_info *cur_vinfo, struct vnode_info *old_vinfo)
*/
resume_suspended_recovery();
} else {
- recovering_work = rw;
+ thread_unsafe_set(recovering_work, rw);
queue_work(sys->recovery_wqueue, &rw->work);
}
out:
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 2693c0f..7f3a16e 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -88,9 +88,6 @@ struct cluster_info {
*/
struct list_head delayed_nodes;
- struct list_head pending_block_list;
- struct list_head pending_notify_list;
-
DECLARE_BITMAP(vdi_inuse, SD_NR_VDIS);
uint8_t nr_copies;
--
1.8.1.3.566.gaa39828
More information about the sheepdog
mailing list