[sheepdog] [PATCH] sheep: remove block/unblock callbacks
Liu Yuan
namei.unix at gmail.com
Wed Jun 6 11:33:21 CEST 2012
From: Liu Yuan <tailai.ly at taobao.com>
update: clean up strcut cluster_driver too
------------------------------------------- >8
This patch tries to completely remove block/unblock as well as
sd_block_handler() code from both cluster driver and core sheep code, simpifying
the code a lot also boost performance a lot. We actually have a very nice
construct to do blocking request handling:our top/bottom style work queue. So
the requests can be nicely ordered by a block_wqueue.
Signed-off-by: Liu Yuan <tailai.ly at taobao.com>
---
sheep/cluster.h | 14 ---------
sheep/cluster/accord.c | 39 -------------------------
sheep/cluster/corosync.c | 59 ++++----------------------------------
sheep/cluster/local.c | 40 --------------------------
sheep/cluster/zookeeper.c | 50 +-------------------------------
sheep/group.c | 69 +++++++++++----------------------------------
6 files changed, 24 insertions(+), 247 deletions(-)
diff --git a/sheep/cluster.h b/sheep/cluster.h
index 07e5f7b..970e6ac 100644
--- a/sheep/cluster.h
+++ b/sheep/cluster.h
@@ -83,20 +83,6 @@ struct cluster_driver {
*/
int (*notify)(void *msg, size_t msg_len);
- /*
- * Send a message to all nodes to block further events.
- *
- * Once the cluster driver has ensured that events are blocked on all
- * nodes it needs to call sd_block_handler() on the node where ->block
- * was called.
- */
- void (*block)(void);
-
- /*
- * Unblock events on all nodes, and send a a message to all nodes.
- */
- void (*unblock)(void *msg, size_t msg_len);
-
struct list_head list;
};
diff --git a/sheep/cluster/accord.c b/sheep/cluster/accord.c
index c3e0320..27ddb99 100644
--- a/sheep/cluster/accord.c
+++ b/sheep/cluster/accord.c
@@ -30,7 +30,6 @@ enum acrd_event_type {
EVENT_JOIN_REQUEST = 1,
EVENT_JOIN_RESPONSE,
EVENT_LEAVE,
- EVENT_BLOCK,
EVENT_NOTIFY,
};
@@ -46,8 +45,6 @@ struct acrd_event {
uint64_t ids[SD_MAX_NODES];
enum cluster_join_result join_result;
-
- int callbacked; /* set non-zero after sd_block_handler() was called */
};
static struct sd_node this_node;
@@ -279,7 +276,6 @@ static int add_event(struct acrd_handle *ah, enum acrd_event_type type,
memmove(i, i + 1, sizeof(*i) * (ev.nr_nodes - idx));
break;
case EVENT_NOTIFY:
- case EVENT_BLOCK:
break;
case EVENT_JOIN_RESPONSE:
abort();
@@ -427,29 +423,6 @@ static int accord_notify(void *msg, size_t msg_len)
return add_event(ahandle, EVENT_NOTIFY, &this_node, msg, msg_len);
}
-static void accord_block(void)
-{
- add_event(ahandle, EVENT_BLOCK, &this_node, NULL, 0);
-}
-
-static void accord_unblock(void *msg, size_t msg_len)
-{
- struct acrd_event ev;
-
- pthread_mutex_lock(&queue_lock);
-
- acrd_queue_pop(ahandle, &ev);
-
- ev.type = EVENT_NOTIFY;
- ev.buf_len = msg_len;
- if (msg)
- memcpy(ev.buf, msg, msg_len);
-
- acrd_queue_push_back(ahandle, &ev);
-
- pthread_mutex_unlock(&queue_lock);
-}
-
static void acrd_handler(int listen_fd, int events, void *data)
{
int ret;
@@ -510,16 +483,6 @@ static void acrd_handler(int listen_fd, int events, void *data)
case EVENT_LEAVE:
sd_leave_handler(&ev.sender, ev.nodes, ev.nr_nodes);
break;
- case EVENT_BLOCK:
- if (node_cmp(&ev.sender, &this_node) == 0 && !ev.callbacked) {
- ev.callbacked = 1;
-
- acrd_queue_push_back(ahandle, &ev);
- sd_block_handler();
- } else {
- acrd_queue_push_back(ahandle, NULL);
- }
- break;
case EVENT_NOTIFY:
sd_notify_handler(&ev.sender, ev.buf, ev.buf_len);
break;
@@ -592,8 +555,6 @@ struct cluster_driver cdrv_accord = {
.join = accord_join,
.leave = accord_leave,
.notify = accord_notify,
- .block = accord_block,
- .unblock = accord_unblock,
};
cdrv_register(cdrv_accord);
diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
index e6655a0..6ecbf27 100644
--- a/sheep/cluster/corosync.c
+++ b/sheep/cluster/corosync.c
@@ -43,7 +43,6 @@ enum corosync_event_type {
COROSYNC_EVENT_TYPE_JOIN_REQUEST,
COROSYNC_EVENT_TYPE_JOIN_RESPONSE,
COROSYNC_EVENT_TYPE_LEAVE,
- COROSYNC_EVENT_TYPE_BLOCK,
COROSYNC_EVENT_TYPE_NOTIFY,
};
@@ -53,8 +52,6 @@ enum corosync_message_type {
COROSYNC_MSG_TYPE_JOIN_RESPONSE,
COROSYNC_MSG_TYPE_LEAVE,
COROSYNC_MSG_TYPE_NOTIFY,
- COROSYNC_MSG_TYPE_BLOCK,
- COROSYNC_MSG_TYPE_UNBLOCK,
};
struct corosync_event {
@@ -291,8 +288,8 @@ static int __corosync_dispatch_one(struct corosync_event *cevent)
case CJ_RES_JOIN_LATER:
build_node_list(cpg_nodes, nr_cpg_nodes, entries);
sd_join_handler(&cevent->sender.ent, entries,
- nr_cpg_nodes, cevent->result,
- cevent->msg);
+ nr_cpg_nodes, cevent->result,
+ cevent->msg);
break;
}
break;
@@ -307,21 +304,11 @@ static int __corosync_dispatch_one(struct corosync_event *cevent)
build_node_list(cpg_nodes, nr_cpg_nodes, entries);
sd_leave_handler(&cevent->sender.ent, entries, nr_cpg_nodes);
break;
- case COROSYNC_EVENT_TYPE_BLOCK:
- if (cpg_node_equal(&cevent->sender, &this_node) &&
- !cevent->callbacked) {
- sd_block_handler();
- cevent->callbacked = 1;
- }
-
- /* block the rest messages until unblock message comes */
- return 0;
case COROSYNC_EVENT_TYPE_NOTIFY:
sd_notify_handler(&cevent->sender.ent, cevent->msg,
- cevent->msg_len);
+ cevent->msg_len);
break;
}
-
return 1;
}
@@ -330,7 +317,8 @@ static void __corosync_dispatch(void)
struct corosync_event *cevent;
while (!list_empty(&corosync_event_list)) {
- cevent = list_first_entry(&corosync_event_list, typeof(*cevent), list);
+ cevent = list_first_entry(&corosync_event_list, typeof(*cevent),
+ list);
/* update join status */
if (!join_finished) {
@@ -362,7 +350,6 @@ static void __corosync_dispatch(void)
} else {
switch (cevent->type) {
case COROSYNC_MSG_TYPE_JOIN_REQUEST:
- case COROSYNC_MSG_TYPE_BLOCK:
return;
default:
break;
@@ -420,16 +407,12 @@ static void cdrv_cpg_deliver(cpg_handle_t handle,
cevent->sender = cmsg->sender;
cevent->msg_len = cmsg->msg_len;
break;
- case COROSYNC_MSG_TYPE_BLOCK:
case COROSYNC_MSG_TYPE_NOTIFY:
cevent = zalloc(sizeof(*cevent));
if (!cevent)
panic("failed to allocate memory\n");
- if (cmsg->type == COROSYNC_MSG_TYPE_BLOCK)
- cevent->type = COROSYNC_EVENT_TYPE_BLOCK;
- else
- cevent->type = COROSYNC_EVENT_TYPE_NOTIFY;
+ cevent->type = COROSYNC_EVENT_TYPE_NOTIFY;
cevent->sender = cmsg->sender;
cevent->msg_len = cmsg->msg_len;
@@ -480,14 +463,6 @@ static void cdrv_cpg_deliver(cpg_handle_t handle,
sizeof(*cmsg->nodes) * cmsg->nr_nodes);
break;
- case COROSYNC_MSG_TYPE_UNBLOCK:
- cevent = update_event(COROSYNC_EVENT_TYPE_BLOCK, &cmsg->sender,
- cmsg->msg, cmsg->msg_len);
- if (!cevent)
- break;
-
- cevent->type = COROSYNC_EVENT_TYPE_NOTIFY;
- break;
}
__corosync_dispatch();
@@ -542,14 +517,6 @@ static void cdrv_cpg_confchg(cpg_handle_t handle,
continue;
}
- cevent = find_event(COROSYNC_EVENT_TYPE_BLOCK, left_sheep + i);
- if (cevent) {
- /* the node left before sending UNBLOCK */
- list_del(&cevent->list);
- free(cevent->msg);
- free(cevent);
- }
-
cevent = zalloc(sizeof(*cevent));
if (!cevent)
panic("failed to allocate memory\n");
@@ -640,18 +607,6 @@ static int corosync_leave(void)
NULL, 0);
}
-static void corosync_block(void)
-{
- send_message(COROSYNC_MSG_TYPE_BLOCK, 0, &this_node, NULL, 0,
- NULL, 0);
-}
-
-static void corosync_unblock(void *msg, size_t msg_len)
-{
- send_message(COROSYNC_MSG_TYPE_UNBLOCK, 0, &this_node, NULL, 0,
- msg, msg_len);
-}
-
static int corosync_notify(void *msg, size_t msg_len)
{
return send_message(COROSYNC_MSG_TYPE_NOTIFY, 0, &this_node,
@@ -739,8 +694,6 @@ struct cluster_driver cdrv_corosync = {
.join = corosync_join,
.leave = corosync_leave,
.notify = corosync_notify,
- .block = corosync_block,
- .unblock = corosync_unblock,
};
cdrv_register(cdrv_corosync);
diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c
index 4f2d8e8..a1f0cc6 100644
--- a/sheep/cluster/local.c
+++ b/sheep/cluster/local.c
@@ -36,7 +36,6 @@ enum local_event_type {
EVENT_JOIN_REQUEST = 1,
EVENT_JOIN_RESPONSE,
EVENT_LEAVE,
- EVENT_BLOCK,
EVENT_NOTIFY,
};
@@ -52,8 +51,6 @@ struct local_event {
pid_t pids[SD_MAX_NODES];
enum cluster_join_result join_result;
-
- int callbacked; /* set non-zero after sd_block_handler() was called */
};
@@ -244,7 +241,6 @@ static void add_event(enum local_event_type type, struct sd_node *node,
memmove(p, p + 1, sizeof(*p) * (ev.nr_nodes - idx));
break;
case EVENT_NOTIFY:
- case EVENT_BLOCK:
break;
case EVENT_JOIN_RESPONSE:
abort();
@@ -314,34 +310,6 @@ static int local_notify(void *msg, size_t msg_len)
return 0;
}
-static void local_block(void)
-{
- shm_queue_lock();
-
- add_event(EVENT_BLOCK, &this_node, NULL, 0);
-
- shm_queue_unlock();
-}
-
-static void local_unblock(void *msg, size_t msg_len)
-{
- struct local_event *ev;
-
- shm_queue_lock();
-
- ev = shm_queue_peek();
-
- ev->type = EVENT_NOTIFY;
- ev->buf_len = msg_len;
- if (msg)
- memcpy(ev->buf, msg, msg_len);
- msync(ev, sizeof(*ev), MS_SYNC);
-
- shm_queue_notify();
-
- shm_queue_unlock();
-}
-
static void local_handler(int listen_fd, int events, void *data)
{
struct signalfd_siginfo siginfo;
@@ -404,12 +372,6 @@ static void local_handler(int listen_fd, int events, void *data)
sd_leave_handler(&ev->sender, ev->nodes, ev->nr_nodes);
shm_queue_pop();
break;
- case EVENT_BLOCK:
- if (node_eq(&ev->sender, &this_node) && !ev->callbacked) {
- sd_block_handler();
- ev->callbacked = 1;
- }
- break;
case EVENT_NOTIFY:
sd_notify_handler(&ev->sender, ev->buf, ev->buf_len);
shm_queue_pop();
@@ -467,8 +429,6 @@ struct cluster_driver cdrv_local = {
.join = local_join,
.leave = local_leave,
.notify = local_notify,
- .block = local_block,
- .unblock = local_unblock,
};
cdrv_register(cdrv_local);
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 8499cb4..7924920 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -44,7 +44,6 @@ enum zk_event_type {
EVENT_JOIN_REQUEST = 1,
EVENT_JOIN_RESPONSE,
EVENT_LEAVE,
- EVENT_BLOCK,
EVENT_NOTIFY,
};
@@ -60,8 +59,6 @@ struct zk_event {
enum cluster_join_result join_result;
- int callbacked; /* set non-zero after sd_block_handler() was called */
-
size_t buf_len;
uint8_t buf[SD_MAX_EVENT_BUF_SIZE];
};
@@ -82,7 +79,7 @@ static size_t nr_zk_nodes;
static inline int is_blocking_event(struct zk_event *ev)
{
- return ev->type == EVENT_BLOCK || ev->type == EVENT_JOIN_REQUEST;
+ return ev->type == EVENT_JOIN_REQUEST;
}
/* zookeeper API wrapper */
@@ -497,7 +494,6 @@ static int add_event(zhandle_t *zh, enum zk_event_type type,
ev.type = type;
ev.sender = *znode;
ev.buf_len = buf_len;
- ev.callbacked = 0;
if (buf)
memcpy(ev.buf, buf, buf_len);
zk_queue_push(zh, &ev);
@@ -514,7 +510,6 @@ static int leave_event(zhandle_t *zh, struct zk_node *znode)
ev->type = EVENT_LEAVE;
ev->sender = *znode;
ev->buf_len = 0;
- ev->callbacked = 0;
nr_levents = uatomic_add_return(&nr_zk_levents, 1);
dprintf("nr_zk_levents:%d, tail:%u\n", nr_levents, zk_levent_tail);
@@ -611,34 +606,6 @@ static int zk_notify(void *msg, size_t msg_len)
return add_event(zhandle, EVENT_NOTIFY, &this_node, msg, msg_len);
}
-static void zk_block(void)
-{
- add_event(zhandle, EVENT_BLOCK, &this_node, NULL, 0);
-}
-
-static void zk_unblock(void *msg, size_t msg_len)
-{
- int rc;
- struct zk_event ev;
- eventfd_t value = 1;
-
- rc = zk_queue_pop(zhandle, &ev);
- assert(rc == 0);
-
- ev.type = EVENT_NOTIFY;
- ev.buf_len = msg_len;
- if (msg)
- memcpy(ev.buf, msg, msg_len);
-
- zk_queue_push_back(zhandle, &ev);
-
- uatomic_dec(&zk_notify_blocked);
-
- /* this notify is necessary */
- dprintf("write event to efd:%d\n", efd);
- eventfd_write(efd, value);
-}
-
static void zk_handler(int listen_fd, int events, void *data)
{
int ret, rc;
@@ -764,19 +731,6 @@ static void zk_handler(int listen_fd, int events, void *data)
build_node_list(zk_node_btroot);
sd_leave_handler(&ev.sender.node, sd_nodes, nr_sd_nodes);
break;
- case EVENT_BLOCK:
- dprintf("BLOCK\n");
- if (node_eq(&ev.sender.node, &this_node.node)
- && !ev.callbacked) {
- uatomic_inc(&zk_notify_blocked);
- ev.callbacked = 1;
- zk_queue_push_back(zhandle, &ev);
- sd_block_handler();
- } else {
- zk_queue_push_back(zhandle, NULL);
- }
-
- break;
case EVENT_NOTIFY:
dprintf("NOTIFY\n");
sd_notify_handler(&ev.sender.node, ev.buf, ev.buf_len);
@@ -835,8 +789,6 @@ struct cluster_driver cdrv_zookeeper = {
.join = zk_join,
.leave = zk_leave,
.notify = zk_notify,
- .block = zk_block,
- .unblock = zk_unblock,
};
cdrv_register(cdrv_zookeeper);
diff --git a/sheep/group.c b/sheep/group.c
index c2679f2..638200f 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -252,7 +252,7 @@ int get_nr_copies(struct vnode_info *vnode_info)
}
static struct vdi_op_message *prepare_cluster_msg(struct request *req,
- size_t *sizep)
+ size_t *sizep)
{
struct vdi_op_message *msg;
size_t size;
@@ -264,11 +264,7 @@ static struct vdi_op_message *prepare_cluster_msg(struct request *req,
assert(size <= SD_MAX_EVENT_BUF_SIZE);
- msg = zalloc(size);
- if (!msg) {
- eprintf("failed to allocate memory\n");
- return NULL;
- }
+ msg = xzalloc(size);
memcpy(&msg->req, &req->rq, sizeof(struct sd_req));
memcpy(&msg->rsp, &req->rp, sizeof(struct sd_rsp));
@@ -283,9 +279,11 @@ static struct vdi_op_message *prepare_cluster_msg(struct request *req,
static void do_cluster_request(struct work *work)
{
struct request *req = container_of(work, struct request, work);
- int ret;
+ int ret = SD_RES_SUCCESS;
+
+ if (has_process_work(req->op))
+ ret = do_process_work(req);
- ret = do_process_work(req);
req->rp.result = ret;
}
@@ -296,32 +294,14 @@ static void cluster_op_done(struct work *work)
size_t size;
msg = prepare_cluster_msg(req, &size);
- if (!msg)
- panic();
- sys->cdrv->unblock(msg, size);
+ /* Kick off the cluster to process_main() */
+ sys->cdrv->notify(msg, size);
free(msg);
}
/*
- * Perform a blocked cluster operation.
- *
- * Must run in the main thread as it access unlocked state like
- * sys->pending_list.
- */
-void sd_block_handler(void)
-{
- struct request *req = list_first_entry(&sys->pending_list,
- struct request, pending_list);
-
- req->work.fn = do_cluster_request;
- req->work.done = cluster_op_done;
-
- queue_work(sys->block_wqueue, &req->work);
-}
-
-/*
* Execute a cluster operation by letting the cluster driver send it to all
* nodes in the cluster.
*
@@ -332,24 +312,10 @@ void queue_cluster_request(struct request *req)
{
eprintf("%p %x\n", req, req->rq.opcode);
- if (has_process_work(req->op)) {
- list_add_tail(&req->pending_list, &sys->pending_list);
- sys->cdrv->block();
- } else {
- struct vdi_op_message *msg;
- size_t size;
-
- msg = prepare_cluster_msg(req, &size);
- if (!msg)
- return;
-
- list_add_tail(&req->pending_list, &sys->pending_list);
-
- msg->rsp.result = SD_RES_SUCCESS;
- sys->cdrv->notify(msg, size);
-
- free(msg);
- }
+ list_add_tail(&req->pending_list, &sys->pending_list);
+ req->work.fn = do_cluster_request;
+ req->work.done = cluster_op_done;
+ queue_work(sys->block_wqueue, &req->work);
}
static inline int get_nodes_nr_from(struct list_head *l)
@@ -748,20 +714,19 @@ void sd_notify_handler(struct sd_node *sender, void *data, size_t data_len)
struct vdi_op_message *msg = data;
struct sd_op_template *op = get_sd_op(msg->req.opcode);
int ret = msg->rsp.result;
- struct request *req = NULL;
dprintf("size: %zd, from: %s\n", data_len, node_to_str(sender));
+ if (ret == SD_RES_SUCCESS && has_process_main(op))
+ ret = do_process_main(op, &msg->req, &msg->rsp, msg->data);
+
if (is_myself(sender->addr, sender->port)) {
+ struct request *req;
+
req = list_first_entry(&sys->pending_list, struct request,
pending_list);
list_del(&req->pending_list);
- }
-
- if (ret == SD_RES_SUCCESS && has_process_main(op))
- ret = do_process_main(op, &msg->req, &msg->rsp, msg->data);
- if (req) {
msg->rsp.result = ret;
if (has_process_main(req->op))
memcpy(req->data, msg->data, msg->rsp.data_length);
--
1.7.10.2
More information about the sheepdog
mailing list