From: levin li <xingke.lwp at taobao.com> The new joined node doesn't have the vdi copy list, or have incomplete vdi copy list, so we need to fetch the copy list data from other nodes Signed-off-by: levin li <xingke.lwp at taobao.com> --- collie/common.c | 15 +++++-- sheep/farm/farm.c | 31 +++++++++++++++- sheep/group.c | 106 +++++++++++++++++++++++++++++++++++++++++---------- sheep/recovery.c | 2 + sheep/sheep.c | 2 + sheep/sheep_priv.h | 10 +++++ sheep/vdi.c | 28 ++++++++++++- 7 files changed, 165 insertions(+), 29 deletions(-) diff --git a/collie/common.c b/collie/common.c index f885c8c..364e36c 100644 --- a/collie/common.c +++ b/collie/common.c @@ -131,7 +131,14 @@ int parse_vdi(vdi_parser_func_t func, size_t size, void *data) static struct sheepdog_inode i; struct sd_req req; static DECLARE_BITMAP(vdi_inuse, SD_NR_VDIS); - unsigned int rlen, wlen = 0; + unsigned int wlen = 0, rlen = sizeof(vdi_inuse) * 2; + char *buf; + + buf = zalloc(rlen); + if (!buf) { + fprintf(stderr, "Failed to allocate memory\n"); + return -1; + } fd = connect_to(sdhost, sdport); if (fd < 0) { @@ -140,11 +147,10 @@ int parse_vdi(vdi_parser_func_t func, size_t size, void *data) } sd_init_req(&req, SD_OP_READ_VDIS); - req.data_length = sizeof(vdi_inuse); + req.data_length = rlen; req.epoch = sd_epoch; - rlen = sizeof(vdi_inuse); - ret = exec_req(fd, &req, vdi_inuse, &wlen, &rlen); + ret = exec_req(fd, &req, buf, &wlen, &rlen); if (ret < 0) { fprintf(stderr, "Failed to read VDIs from %s:%d\n", sdhost, sdport); @@ -153,6 +159,7 @@ int parse_vdi(vdi_parser_func_t func, size_t size, void *data) } close(fd); + memcpy(&vdi_inuse, buf, sizeof(vdi_inuse)); for (nr = 0; nr < SD_NR_VDIS; nr++) { uint64_t oid; diff --git a/sheep/farm/farm.c b/sheep/farm/farm.c index 7eeae9a..221a3a7 100644 --- a/sheep/farm/farm.c +++ b/sheep/farm/farm.c @@ -294,10 +294,36 @@ out: return ret; } +static int read_vdi_copies(uint64_t oid) +{ + char path[PATH_MAX]; + int fd, flags = def_open_flags, ret; + struct sheepdog_inode inode; + + snprintf(path, sizeof(path), "%s%016" PRIx64, obj_path, oid); + + fd = open(path, flags); + if (fd < 0) { + eprintf("%m\n"); + return SD_RES_EIO; + } + + ret = xpread(fd, (void *)&inode, SD_INODE_HEADER_SIZE, 0); + if (ret != SD_INODE_HEADER_SIZE) { + eprintf("%m\n"); + return SD_RES_EIO; + } + + add_vdi_copies(oid_to_vid(oid), inode.nr_copies); + + return SD_RES_SUCCESS; +} + static int init_sys_vdi_bitmap(char *path) { DIR *dir; struct dirent *dent; + int ret = SD_RES_SUCCESS; dir = opendir(path); if (!dir) { @@ -322,10 +348,13 @@ static int init_sys_vdi_bitmap(char *path) vprintf(SDOG_DEBUG, "found the VDI object %" PRIx64 "\n", oid); set_bit(oid_to_vid(oid), sys->vdi_inuse); + ret = read_vdi_copies(oid); + if (ret != SD_RES_SUCCESS) + break; } closedir(dir); - return 0; + return ret; } static bool is_xattr_enabled(char *path) diff --git a/sheep/group.c b/sheep/group.c index 3f5a8fb..e935218 100644 --- a/sheep/group.c +++ b/sheep/group.c @@ -16,6 +16,8 @@ #include <arpa/inet.h> #include <sys/time.h> #include <sys/epoll.h> +#include <sys/eventfd.h> +#include <pthread.h> #include <urcu/uatomic.h> #include <math.h> @@ -32,13 +34,17 @@ struct node { struct list_head list; }; -struct vdi_bitmap_work { +struct get_vdis_work { struct work work; DECLARE_BITMAP(vdi_inuse, SD_NR_VDIS); size_t nr_members; struct sd_node members[]; }; +#define SD_GET_VDIS_NONE 0 +#define SD_GET_VDIS_BEGIN 1 +#define SD_GET_VDIS_WAITING 2 + struct sd_node joining_nodes[SD_MAX_NODES]; size_t nr_joining_nodes; struct sd_node all_nodes[SD_MAX_NODES]; @@ -622,14 +628,24 @@ static int cluster_running_check(struct join_message *jm) return CJ_RES_SUCCESS; } -static int get_vdi_bitmap_from(struct sd_node *node) +static int get_vdis_from(struct sd_node *node) { struct sd_req hdr; struct sd_rsp *rsp = (struct sd_rsp *)&hdr; + struct vdi_copy *vc; static DECLARE_BITMAP(tmp_vdi_inuse, SD_NR_VDIS); int fd, i, ret = SD_RES_SUCCESS; - unsigned int rlen, wlen; + unsigned int rlen = SD_NR_VDIS * 3, wlen; char host[128]; + char *buf = NULL; + int count; + + buf = zalloc(rlen); + if (!buf) { + vprintf(SDOG_ERR, "unable to allocate memory\n"); + ret = SD_RES_NO_MEM; + goto out; + } if (is_myself(node->nid.addr, node->nid.port)) goto out; @@ -647,13 +663,10 @@ static int get_vdi_bitmap_from(struct sd_node *node) sd_init_req(&hdr, SD_OP_READ_VDIS); hdr.epoch = sys->epoch; - hdr.data_length = sizeof(tmp_vdi_inuse); - rlen = hdr.data_length; + hdr.data_length = rlen; wlen = 0; - ret = exec_req(fd, &hdr, (char *)tmp_vdi_inuse, - &wlen, &rlen); - + ret = exec_req(fd, &hdr, buf, &wlen, &rlen); close(fd); if (ret || rsp->result != SD_RES_SUCCESS) { @@ -662,24 +675,31 @@ static int get_vdi_bitmap_from(struct sd_node *node) goto out; } + memcpy(tmp_vdi_inuse, buf, sizeof(tmp_vdi_inuse)); for (i = 0; i < ARRAY_SIZE(sys->vdi_inuse); i++) sys->vdi_inuse[i] |= tmp_vdi_inuse[i]; + + count = (rsp->data_length - sizeof(tmp_vdi_inuse)) / sizeof(*vc); + vc = (struct vdi_copy *)(buf + sizeof(tmp_vdi_inuse)); + for (i = 0; i < count; i++, vc++) + add_vdi_copies(vc->vid, vc->nr_copies); out: + free(buf); return ret; } -static void do_get_vdi_bitmap(struct work *work) +static void do_get_vdis(struct work *work) { - struct vdi_bitmap_work *w = - container_of(work, struct vdi_bitmap_work, work); + struct get_vdis_work *w = + container_of(work, struct get_vdis_work, work); int i; for (i = 0; i < w->nr_members; i++) { - /* We should not fetch vdi_bitmap from myself */ + /* We should not fetch vdi_bitmap and copy list from myself */ if (node_eq(&w->members[i], &sys->this_node)) continue; - get_vdi_bitmap_from(&w->members[i]); + get_vdis_from(&w->members[i]); /* * If a new comer try to join the running cluster, it only @@ -690,10 +710,21 @@ static void do_get_vdi_bitmap(struct work *work) } } -static void get_vdi_bitmap_done(struct work *work) +static void get_vdis_done(struct work *work) { - struct vdi_bitmap_work *w = - container_of(work, struct vdi_bitmap_work, work); + struct get_vdis_work *w = + container_of(work, struct get_vdis_work, work); + + pthread_mutex_lock(&sys->wait_vdis_lock); + if (sys->wait_vdis_state == SD_GET_VDIS_WAITING) { + sys->wait_vdis_state = SD_GET_VDIS_BEGIN; + pthread_cond_signal(&sys->wait_vdis_cond); + dprintf("wake up the recovery thread\n"); + } else { + sys->wait_vdis_state = SD_GET_VDIS_NONE; + dprintf("no thread waiting for vdis\n"); + } + pthread_mutex_unlock(&sys->wait_vdis_lock); free(w); } @@ -754,20 +785,50 @@ static void finish_join(struct join_message *msg, struct sd_node *joined, sockfd_cache_add_group(nodes, nr_nodes); } -static void get_vdi_bitmap(struct sd_node *nodes, size_t nr_nodes) +static void get_vdis(struct sd_node *nodes, size_t nr_nodes) { int array_len = nr_nodes * sizeof(struct sd_node); - struct vdi_bitmap_work *w; + struct get_vdis_work *w; w = xmalloc(sizeof(*w) + array_len); w->nr_members = nr_nodes; memcpy(w->members, nodes, array_len); - w->work.fn = do_get_vdi_bitmap; - w->work.done = get_vdi_bitmap_done; + pthread_mutex_lock(&sys->wait_vdis_lock); + /* If there's a existing thread waiting for get-vdis + * event to be done, wake it up before setting up a + * new one */ + if (sys->wait_vdis_state == SD_GET_VDIS_WAITING) + pthread_cond_broadcast(&sys->wait_vdis_cond); + + sys->wait_vdis_state = SD_GET_VDIS_BEGIN; + pthread_mutex_unlock(&sys->wait_vdis_lock); + + w->work.fn = do_get_vdis; + w->work.done = get_vdis_done; queue_work(sys->block_wqueue, &w->work); } +void wait_get_vdis_done(void) +{ + dprintf("waiting for vdi list\n"); + pthread_mutex_lock(&sys->wait_vdis_lock); + if (!sys->wait_vdis_state) { + dprintf("vdi list already finished\n"); + goto out; + } + + sys->wait_vdis_state = SD_GET_VDIS_WAITING; + + while (sys->wait_vdis_state == SD_GET_VDIS_WAITING) + pthread_cond_wait(&sys->wait_vdis_cond, &sys->wait_vdis_lock); + + sys->wait_vdis_state = SD_GET_VDIS_NONE; +out: + pthread_mutex_unlock(&sys->wait_vdis_lock); + dprintf("vdi list ready\n"); +} + static void prepare_recovery(struct sd_node *joined, struct sd_node *nodes, size_t nr_nodes) { @@ -844,7 +905,7 @@ static void update_cluster_info(struct join_message *msg, set_cluster_ctime(msg->ctime); /*FALLTHROUGH*/ case SD_STATUS_WAIT_FOR_JOIN: - get_vdi_bitmap(nodes, nr_nodes); + get_vdis(nodes, nr_nodes); break; default: break; @@ -1215,6 +1276,9 @@ int create_cluster(int port, int64_t zone, int nr_vnodes, sys->status = SD_STATUS_WAIT_FOR_FORMAT; } + pthread_mutex_init(&sys->wait_vdis_lock, NULL); + pthread_cond_init(&sys->wait_vdis_cond, NULL); + INIT_LIST_HEAD(&sys->pending_list); INIT_LIST_HEAD(&sys->failed_nodes); INIT_LIST_HEAD(&sys->delayed_nodes); diff --git a/sheep/recovery.c b/sheep/recovery.c index 5164aa7..369a534 100644 --- a/sheep/recovery.c +++ b/sheep/recovery.c @@ -555,6 +555,8 @@ static void prepare_object_list(struct work *work) dprintf("%u\n", rw->epoch); + wait_get_vdis_done(); + buf = xmalloc(buf_size); again: /* We need to start at random node for better load balance */ diff --git a/sheep/sheep.c b/sheep/sheep.c index b10197e..31af42c 100644 --- a/sheep/sheep.c +++ b/sheep/sheep.c @@ -22,6 +22,8 @@ #include <sys/types.h> #include <sys/stat.h> #include <sys/signalfd.h> +#include <pthread.h> +#include <sys/eventfd.h> #include <fcntl.h> #include <errno.h> diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h index 88c0b08..7d2869d 100644 --- a/sheep/sheep_priv.h +++ b/sheep/sheep_priv.h @@ -98,6 +98,10 @@ struct cluster_info { uint8_t nr_copies; int req_efd; + pthread_mutex_t wait_vdis_lock; + pthread_cond_t wait_vdis_cond; + int wait_vdis_state; + pthread_mutex_t wait_req_lock; struct list_head wait_req_queue; struct list_head wait_rw_queue; @@ -137,6 +141,11 @@ struct vdi_iocb { int nr_copies; }; +struct vdi_copy { + uint32_t vid; + uint32_t nr_copies; +}; + struct store_driver { struct list_head list; const char *name; @@ -231,6 +240,7 @@ void update_vnode_info(struct vnode_info *vnode_info); struct vnode_info *alloc_vnode_info(struct sd_node *nodes, size_t nr_nodes); void put_vnode_info(struct vnode_info *vinfo); struct vnode_info *get_vnode_info_epoch(uint32_t epoch); +void wait_get_vdis_done(void); int get_nr_copies(struct vnode_info *vnode_info); diff --git a/sheep/vdi.c b/sheep/vdi.c index 52576a5..2d58ce7 100644 --- a/sheep/vdi.c +++ b/sheep/vdi.c @@ -139,6 +139,26 @@ int add_vdi_copies(uint32_t vid, int nr_copies) return SD_RES_SUCCESS; } +static int fill_vdi_copy_list(void *data) +{ + int nr = 0; + struct rb_node *n; + struct vdi_copy *vc = data; + struct vdi_copy_entry *entry; + + pthread_rwlock_rdlock(&vdi_copy_lock); + for (n = rb_first(&vdi_copy_root); n; n = rb_next(n)) { + entry = rb_entry(n, struct vdi_copy_entry, node); + vc->vid = entry->vid; + vc->nr_copies = entry->nr_copies; + vc++; + nr++; + } + pthread_rwlock_unlock(&vdi_copy_lock); + + return nr * sizeof(*vc); +} + int vdi_exist(uint32_t vid) { struct sheepdog_inode *inode; @@ -488,11 +508,13 @@ out: int read_vdis(char *data, int len, unsigned int *rsp_len) { - if (len != sizeof(sys->vdi_inuse)) - return SD_RES_INVALID_PARMS; + int length; memcpy(data, sys->vdi_inuse, sizeof(sys->vdi_inuse)); - *rsp_len = sizeof(sys->vdi_inuse); + /* put vdi copy list at the end of vdi bitmap */ + length = fill_vdi_copy_list(data + sizeof(sys->vdi_inuse)); + + *rsp_len = sizeof(sys->vdi_inuse) + length; return SD_RES_SUCCESS; } -- 1.7.1 |