[sheepdog] [PATCH v4 3/4] sheep: add SD_OP_FLUSH_NODES and SD_OP_FLUSH_PEER for writeback cache semantics

Hitoshi Mitake mitake.hitoshi at lab.ntt.co.jp
Fri Sep 7 07:25:53 CEST 2012


This patch adds two new internal sheep operation: SD_OP_FLUSH_NODES and
SD_OP_FLUSH_PEER for implementing writeback cache semantics in backend stores.

If writeback cache semantics is used in backend stores, explicit
flushing in all sheeps is required when gateway sheep receives SD_OP_FLUSH_VDI.

After applying this patch, SD_OP_FLUSH_NODES will be queued as a gateway
request when sheep receives SD_OP_FLUSH_VDI. SD_OP_FLUSH_NODES forwards
SD_OP_FLUSH_PEER to all other sheeps. After receiving the
SD_OP_FLUSH_PEER, sheeps flush their cache of backend stores.

This patch also modifies command line option of sheep. -w was used for
enabling object cache and specyfing size of it. After applying this
patch, -w is also used for enabling writeback cache semantics in
backend stores. Example of new -w is like this:
-w disk ... enable writeback cache semantics of disks
-w disk,object:size=50 ... enable writeback cache semantics of disks, and
enable object cache with 50MB memory
-w object:size=50 ... enable object cache with 50MB memory
-w object:size=50:directio ... enable object cache with 50MB memory with
O_DIRECT

Cc: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
Cc: Liu Yuan <tailai.ly at taobao.com>
Signed-off-by: Hitoshi Mitake <mitake.hitoshi at lab.ntt.co.jp>
---
v4:
* rename SD_OP_SYNC_VDI -> SD_OP_FLUSH_NODES
* cleaning the members related to cache in struct cluster_info
* change option -w. Using direct IO for object cache can be specified like this:
  -w object:directio

v3: move conditional branch on sys->gateway_only from default_flush() to
peer_flush(), based on Liu Yuan's advice

8<---
 include/internal_proto.h |    2 +
 sheep/gateway.c          |    8 +-
 sheep/object_cache.c     |   10 ++--
 sheep/ops.c              |   45 +++++++++++--
 sheep/request.c          |    2 +-
 sheep/sheep.c            |  160 ++++++++++++++++++++++++++++++++++++++--------
 sheep/sheep_priv.h       |   27 ++++++--
 sheep/store.c            |    9 +--
 8 files changed, 210 insertions(+), 53 deletions(-)

diff --git a/include/internal_proto.h b/include/internal_proto.h
index 5288823..ec27cfc 100644
--- a/include/internal_proto.h
+++ b/include/internal_proto.h
@@ -65,6 +65,8 @@
 #define SD_OP_INFO_RECOVER 0xAA
 #define SD_OP_GET_VDI_COPIES 0xAB
 #define SD_OP_COMPLETE_RECOVERY 0xAC
+#define SD_OP_FLUSH_NODES 0xAD
+#define SD_OP_FLUSH_PEER 0xAE
 
 /* internal flags for hdr.flags, must be above 0x80 */
 #define SD_FLAG_CMD_RECOVERY 0x0080
diff --git a/sheep/gateway.c b/sheep/gateway.c
index 19b90b9..3e8e673 100644
--- a/sheep/gateway.c
+++ b/sheep/gateway.c
@@ -32,7 +32,7 @@ int gateway_read_obj(struct request *req)
 	uint64_t oid = req->rq.obj.oid;
 	int nr_copies, j;
 
-	if (sys->enable_write_cache && !req->local && !bypass_object_cache(req))
+	if (is_object_cache_enabled() && !req->local && !bypass_object_cache(req))
 		return object_cache_handle_request(req);
 
 	nr_copies = get_req_copy_number(req);
@@ -328,7 +328,7 @@ static int gateway_forward_request(struct request *req, bool all_node)
 
 int gateway_write_obj(struct request *req)
 {
-	if (sys->enable_write_cache && !req->local && !bypass_object_cache(req))
+	if (is_object_cache_enabled() && !req->local && !bypass_object_cache(req))
 		return object_cache_handle_request(req);
 
 	return gateway_forward_request(req, false);
@@ -336,7 +336,7 @@ int gateway_write_obj(struct request *req)
 
 int gateway_create_and_write_obj(struct request *req)
 {
-	if (sys->enable_write_cache && !req->local && !bypass_object_cache(req))
+	if (is_object_cache_enabled() && !req->local && !bypass_object_cache(req))
 		return object_cache_handle_request(req);
 
 	return gateway_forward_request(req, false);
@@ -347,7 +347,7 @@ int gateway_remove_obj(struct request *req)
 	return gateway_forward_request(req, false);
 }
 
-int gateway_sync_vdi(struct request *req)
+int gateway_flush_nodes(struct request *req)
 {
 	return gateway_forward_request(req, true);
 }
diff --git a/sheep/object_cache.c b/sheep/object_cache.c
index 89a24c1..959ca15 100644
--- a/sheep/object_cache.c
+++ b/sheep/object_cache.c
@@ -307,7 +307,7 @@ static int read_cache_object_noupdate(uint32_t vid, uint32_t idx, void *buf,
 	strbuf_addstr(&p, cache_dir);
 	strbuf_addf(&p, "/%06"PRIx32"/%08"PRIx32, vid, idx);
 
-	if (sys->use_directio && !idx_has_vdi_bit(idx))
+	if (sys->object_cache_directio && !idx_has_vdi_bit(idx))
 		flags |= O_DIRECT;
 
 	fd = open(p.buf, flags, def_fmode);
@@ -344,7 +344,7 @@ static int write_cache_object_noupdate(uint32_t vid, uint32_t idx, void *buf,
 	strbuf_addstr(&p, cache_dir);
 	strbuf_addf(&p, "/%06"PRIx32"/%08"PRIx32, vid, idx);
 
-	if (sys->use_directio && !idx_has_vdi_bit(idx))
+	if (sys->object_cache_directio && !idx_has_vdi_bit(idx))
 		flags |= O_DIRECT;
 
 	fd = open(p.buf, flags, def_fmode);
@@ -538,7 +538,7 @@ static void do_reclaim(struct work *work)
 		unsigned data_length;
 		/* Reclaim cache to 80% of max size */
 		if (uatomic_read(&sys_cache.cache_size) <=
-		    sys->cache_size * 8 / 10)
+			sys->object_cache_size * 8 / 10)
 			break;
 
 		if (do_reclaim_object(entry) < 0)
@@ -619,10 +619,10 @@ void object_cache_try_to_reclaim(void)
 {
 	struct work *work;
 
-	if (!sys->cache_size)
+	if (!sys->object_cache_size)
 		return;
 
-	if (uatomic_read(&sys_cache.cache_size) < sys->cache_size)
+	if (uatomic_read(&sys_cache.cache_size) < sys->object_cache_size)
 		return;
 
 	if (mark_cache_in_reclaim())
diff --git a/sheep/ops.c b/sheep/ops.c
index ba29235..4de4e20 100644
--- a/sheep/ops.c
+++ b/sheep/ops.c
@@ -189,7 +189,7 @@ static int post_cluster_del_vdi(const struct sd_req *req, struct sd_rsp *rsp,
 	struct cache_deletion_work *dw;
 	int ret = rsp->result;
 
-	if (!sys->enable_write_cache)
+	if (!is_object_cache_enabled())
 		return ret;
 
 	dw = xzalloc(sizeof(*dw));
@@ -649,7 +649,7 @@ static int local_set_cache_size(const struct sd_req *req, struct sd_rsp *rsp,
 {
 	int cache_size = *(int *)data;
 
-	uatomic_set(&sys->cache_size, cache_size);
+	uatomic_set(&sys->object_cache_size, cache_size);
 	dprintf("Max cache size set to %dM\n", cache_size);
 
 	object_cache_try_to_reclaim();
@@ -686,14 +686,27 @@ static int local_get_snap_file(struct request *req)
 
 static int local_flush_vdi(struct request *req)
 {
-	if (!sys->enable_write_cache)
-		return SD_RES_SUCCESS;
-	return object_cache_flush_vdi(req);
+	int ret = SD_RES_SUCCESS;
+
+	if (is_object_cache_enabled()) {
+		ret = object_cache_flush_vdi(req);
+		if (ret != SD_RES_SUCCESS)
+			return ret;
+	}
+
+	if (is_disk_cache_enabled()) {
+		struct sd_req hdr;
+
+		sd_init_req(&hdr, SD_OP_FLUSH_NODES);
+		return exec_local_req(&hdr, NULL);
+	}
+
+	return ret;
 }
 
 static int local_flush_and_del(struct request *req)
 {
-	if (!sys->enable_write_cache)
+	if (!is_object_cache_enabled())
 		return SD_RES_SUCCESS;
 	return object_cache_flush_and_del(req);
 }
@@ -949,6 +962,14 @@ out:
 	return ret;
 }
 
+int peer_flush(struct request *req)
+{
+	if (sys->gateway_only)
+		return SD_RES_SUCCESS;
+
+	return sd_store->flush();
+}
+
 static struct sd_op_template sd_ops[] = {
 
 	/* cluster operations */
@@ -1216,6 +1237,17 @@ static struct sd_op_template sd_ops[] = {
 		.type = SD_OP_TYPE_LOCAL,
 		.process_main = local_info_recover,
 	},
+
+	[SD_OP_FLUSH_PEER] = {
+		.name = "FLUSH_PEER",
+		.type = SD_OP_TYPE_PEER,
+		.process_work = peer_flush,
+	},
+	[SD_OP_FLUSH_NODES] = {
+		.name = "FLUSH_NODES",
+		.type = SD_OP_TYPE_GATEWAY,
+		.process_work = gateway_flush_nodes,
+	},
 };
 
 struct sd_op_template *get_sd_op(uint8_t opcode)
@@ -1301,6 +1333,7 @@ static int map_table[] = {
 	[SD_OP_READ_OBJ] = SD_OP_READ_PEER,
 	[SD_OP_WRITE_OBJ] = SD_OP_WRITE_PEER,
 	[SD_OP_REMOVE_OBJ] = SD_OP_REMOVE_PEER,
+	[SD_OP_FLUSH_NODES] = SD_OP_FLUSH_PEER,
 };
 
 int gateway_to_peer_opcode(int opcode)
diff --git a/sheep/request.c b/sheep/request.c
index 3f9b870..ae12bd3 100644
--- a/sheep/request.c
+++ b/sheep/request.c
@@ -287,7 +287,7 @@ static void queue_gateway_request(struct request *req)
 	 * Even if it doesn't exist in cache, we'll rely on cache layer to pull
 	 * it.
 	 */
-	if (sys->enable_write_cache)
+	if (is_object_cache_enabled())
 		goto queue_work;
 
 	if (req->local_oid)
diff --git a/sheep/sheep.c b/sheep/sheep.c
index e1434cf..a936306 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -40,7 +40,6 @@ static char program_name[] = "sheep";
 static struct option const long_options[] = {
 	{"cluster", required_argument, NULL, 'c'},
 	{"debug", no_argument, NULL, 'd'},
-	{"directio", no_argument, NULL, 'D'},
 	{"foreground", no_argument, NULL, 'f'},
 	{"gateway", no_argument, NULL, 'g'},
 	{"help", no_argument, NULL, 'h'},
@@ -49,9 +48,9 @@ static struct option const long_options[] = {
 	{"stdout", no_argument, NULL, 'o'},
 	{"port", required_argument, NULL, 'p'},
 	{"disk-space", required_argument, NULL, 's'},
-	{"enable-cache", required_argument, NULL, 'w'},
 	{"zone", required_argument, NULL, 'z'},
 	{"pidfile", required_argument, NULL, 'P'},
+	{"write-cache", required_argument, NULL, 'w'},
 	{NULL, 0, NULL, 0},
 };
 
@@ -69,7 +68,6 @@ Usage: %s [OPTION]... [PATH]\n\
 Options:\n\
   -c, --cluster           specify the cluster driver\n\
   -d, --debug             include debug messages in the log\n\
-  -D, --directio          use direct IO when accessing the object from object cache\n\
   -f, --foreground        make the program run in the foreground\n\
   -g, --gateway           make the progam run as a gateway mode\n\
   -h, --help              display this help and exit\n\
@@ -78,9 +76,9 @@ Options:\n\
   -p, --port              specify the TCP port on which to listen\n\
   -P, --pidfile           create a pid file\n\
   -s, --disk-space        specify the free disk space in megabytes\n\
-  -w, --enable-cache      enable object cache and specify the max size (M) and mode\n\
   -y, --myaddr            specify the address advertised to other sheep\n\
   -z, --zone              specify the zone id\n\
+  -w, --write-cache             specify the cache type\n\
 ", PACKAGE_VERSION, program_name);
 	exit(status);
 }
@@ -178,6 +176,132 @@ static int init_signal(void)
 static struct cluster_info __sys;
 struct cluster_info *sys = &__sys;
 
+static void parse_arg(char *arg, const char *delim, void (*fn)(char *))
+{
+	char *savep, *s;
+
+	s = strtok_r(arg, delim, &savep);
+	do {
+		fn(s);
+	} while ((s = strtok_r(NULL, delim, &savep)));
+}
+
+static void object_cache_size_set(char *s)
+{
+	const char *header = "size=";
+	int len = strlen(header);
+	char *size, *p;
+	int64_t cache_size;
+
+	assert(!strncmp(s, header, len));
+
+	size = s + len;
+	cache_size = strtol(size, &p, 10);
+	if (size == p || cache_size < 0 || UINT64_MAX < cache_size)
+		goto err;
+
+	sys->object_cache_size = cache_size * 1024 * 1024;
+	return;
+
+err:
+	fprintf(stderr, "Invalid object cache option '%s': "
+		"size must be an integer between 0 and %lu\n",
+		s, UINT64_MAX);
+	exit(1);
+}
+
+static void object_cache_directio_set(char *s)
+{
+	assert(!strcmp(s, "directio"));
+	sys->object_cache_directio = true;
+}
+
+static void _object_cache_set(char *s)
+{
+	int i;
+	static int first = 1;
+
+	struct object_cache_arg {
+		const char *name;
+		void (*set)(char *);
+	};
+
+	struct object_cache_arg object_cache_args[] = {
+		{ "size=", object_cache_size_set },
+		{ "directio", object_cache_directio_set },
+		{ NULL, NULL },
+	};
+
+	if (first) {
+		assert(!strcmp(s, "object"));
+		first = 0;
+		return;
+	}
+
+	for (i = 0; object_cache_args[i].name; i++) {
+		const char *n = object_cache_args[i].name;
+
+		if (!strncmp(s, n, strlen(n))) {
+			object_cache_args[i].set(s);
+			return;
+		}
+	}
+
+	fprintf(stderr, "invalid object cache arg: %s\n", s);
+	exit(1);
+}
+
+static void object_cache_set(char *s)
+{
+	sys->enabled_cache_type |= CACHE_TYPE_OBJECT;
+	parse_arg(s, ":", _object_cache_set);
+}
+
+static void disk_cache_set(char *s)
+{
+	assert(!strcmp(s, "disk"));
+	sys->enabled_cache_type |= CACHE_TYPE_DISK;
+}
+
+static void do_cache_type(char *s)
+{
+	int i;
+
+	struct cache_type {
+		const char *name;
+		void (*set)(char *);
+	};
+	struct cache_type cache_types[] = {
+		{ "object", object_cache_set },
+		{ "disk", disk_cache_set },
+		{ NULL, NULL },
+	};
+
+	for (i = 0; cache_types[i].name; i++) {
+		const char *n = cache_types[i].name;
+
+		if (!strncmp(s, n, strlen(n))) {
+			cache_types[i].set(s);
+			return;
+		}
+	}
+
+	fprintf(stderr, "invalid cache type: %s\n", s);
+	exit(1);
+}
+
+static void init_cache_type(char *arg)
+{
+	sys->object_cache_size = -1;
+
+	parse_arg(arg, ",", do_cache_type);
+
+	if (is_object_cache_enabled() && sys->object_cache_size == -1) {
+		fprintf(stderr, "object cache size is not set\n");
+		exit(1);
+	}
+}
+
 int main(int argc, char **argv)
 {
 	int ch, longindex;
@@ -188,14 +312,12 @@ int main(int argc, char **argv)
 	int log_level = SDOG_INFO;
 	char path[PATH_MAX];
 	int64_t zone = -1;
-	int64_t cache_size = 0;
 	int64_t free_space = 0;
 	int nr_vnodes = SD_DEFAULT_VNODES;
 	bool explicit_addr = false;
 	int af;
 	char *p;
 	struct cluster_driver *cdrv;
-	int enable_object_cache = 0; /* disabled by default */
 	char *pid_file = NULL;
 
 	signal(SIGPIPE, SIG_IGN);
@@ -242,10 +364,6 @@ int main(int argc, char **argv)
 			/* removed soon. use loglevel instead */
 			log_level = SDOG_DEBUG;
 			break;
-		case 'D':
-			dprintf("direct IO mode\n");
-			sys->use_directio = 1;
-			break;
 		case 'g':
 			/* same as '-v 0' */
 			nr_vnodes = 0;
@@ -263,21 +381,6 @@ int main(int argc, char **argv)
 			}
 			sys->this_node.zone = zone;
 			break;
-		case 'w':
-			enable_object_cache = 1;
-			cache_size = strtol(optarg, &p, 10);
-			if (optarg == p || cache_size < 0 ||
-			    UINT64_MAX < cache_size) {
-				fprintf(stderr, "Invalid cache size '%s': "
-					"must be an integer between 0 and %lu\n",
-					optarg, UINT64_MAX);
-				exit(1);
-			}
-			sys->cache_size = cache_size * 1024 * 1024;
-
-			fprintf(stdout, "enable write cache, "
-				"max cache size %" PRIu64 "M\n", cache_size);
-			break;
 		case 's':
 			free_space = strtoll(optarg, &p, 10);
 			if (optarg == p || free_space <= 0 ||
@@ -303,6 +406,9 @@ int main(int argc, char **argv)
 
 			sys->cdrv_option = get_cdrv_option(sys->cdrv, optarg);
 			break;
+		case 'w':
+			init_cache_type(optarg);
+			break;
 		case 'h':
 			usage(0);
 			break;
@@ -334,7 +440,7 @@ int main(int argc, char **argv)
 	if (ret)
 		exit(1);
 
-	ret = init_store(dir, enable_object_cache);
+	ret = init_store(dir);
 	if (ret)
 		exit(1);
 
@@ -364,7 +470,7 @@ int main(int argc, char **argv)
 	sys->deletion_wqueue = init_work_queue("deletion", true);
 	sys->block_wqueue = init_work_queue("block", true);
 	sys->sockfd_wqueue = init_work_queue("sockfd", true);
-	if (sys->enable_write_cache) {
+	if (is_object_cache_enabled()) {
 		sys->reclaim_wqueue = init_work_queue("reclaim", true);
 		if (!sys->reclaim_wqueue)
 			exit(1);
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index c095101..b31328a 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -66,8 +66,6 @@ struct cluster_info {
 	struct cluster_driver *cdrv;
 	const char *cdrv_option;
 
-	int enable_write_cache;
-
 	/* set after finishing the JOIN procedure */
 	int join_finished;
 	struct sd_node this_node;
@@ -76,7 +74,6 @@ struct cluster_info {
 	uint32_t status;
 	uint16_t flags;
 
-	uint64_t cache_size;
 	uint64_t disk_space;
 
 	/*
@@ -107,7 +104,6 @@ struct cluster_info {
 
 	uint32_t recovered_epoch;
 
-	int use_directio;
 	uint8_t gateway_only;
 	uint8_t disable_recovery;
 
@@ -119,6 +115,13 @@ struct cluster_info {
 	struct work_queue *block_wqueue;
 	struct work_queue *sockfd_wqueue;
 	struct work_queue *reclaim_wqueue;
+
+#define CACHE_TYPE_OBJECT 0x1
+#define CACHE_TYPE_DISK   0x2
+	int enabled_cache_type;
+
+	uint64_t object_cache_size;
+	bool object_cache_directio;
 };
 
 struct siocb {
@@ -212,7 +215,7 @@ static inline uint32_t sys_epoch(void)
 
 int create_listen_port(int port, void *data);
 
-int init_store(const char *dir, int enable_write_cache);
+int init_store(const char *dir);
 int init_base_path(const char *dir);
 
 int fill_vdi_copy_list(void *data);
@@ -356,12 +359,16 @@ int gateway_read_obj(struct request *req);
 int gateway_write_obj(struct request *req);
 int gateway_create_and_write_obj(struct request *req);
 int gateway_remove_obj(struct request *req);
+int gateway_flush_nodes(struct request *req);
 
 /* backend store */
 int peer_read_obj(struct request *req);
 int peer_write_obj(struct request *req);
 int peer_create_and_write_obj(struct request *req);
 int peer_remove_obj(struct request *req);
+int peer_flush(struct request *req);
+
+int default_flush(void);
 
 /* object_cache */
 
@@ -394,4 +401,14 @@ struct sockfd *sheep_get_sockfd(struct node_id *);
 void sheep_put_sockfd(struct node_id *, struct sockfd *);
 void sheep_del_sockfd(struct node_id *, struct sockfd *);
 
+static inline bool is_object_cache_enabled(void)
+{
+	return !!(sys->enabled_cache_type & CACHE_TYPE_OBJECT);
+}
+
+static inline bool is_disk_cache_enabled(void)
+{
+	return !!(sys->enabled_cache_type & CACHE_TYPE_DISK);
+}
+
 #endif
diff --git a/sheep/store.c b/sheep/store.c
index 573bd52..56a1d5a 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -480,7 +480,7 @@ out:
 	return ret;
 }
 
-int init_store(const char *d, int enable_write_cache)
+int init_store(const char *d)
 {
 	int ret;
 
@@ -514,8 +514,7 @@ int init_store(const char *d, int enable_write_cache)
 			return ret;
 	}
 
-	if (enable_write_cache) {
-		sys->enable_write_cache = 1;
+	if (is_object_cache_enabled()) {
 		ret = object_cache_init(d);
 		if (ret)
 			return 1;
@@ -533,7 +532,7 @@ int write_object(uint64_t oid, char *data, unsigned int datalen,
 	struct sd_req hdr;
 	int ret;
 
-	if (sys->enable_write_cache && object_is_cached(oid)) {
+	if (is_object_cache_enabled() && object_is_cached(oid)) {
 		ret = object_cache_write(oid, data, datalen, offset,
 					 flags, create);
 		if (ret == SD_RES_NO_CACHE)
@@ -593,7 +592,7 @@ int read_object(uint64_t oid, char *data, unsigned int datalen,
 {
 	int ret;
 
-	if (sys->enable_write_cache && object_is_cached(oid)) {
+	if (is_object_cache_enabled() && object_is_cached(oid)) {
 		ret = object_cache_read(oid, data, datalen, offset);
 		if (ret != SD_RES_SUCCESS) {
 			eprintf("try forward read %"PRIx64" %"PRIx32"\n",
-- 
1.7.2.5




More information about the sheepdog mailing list