From: Yunkai Zhang <qiushu.zyk at taobao.com> We have discussed this issue in maillist: http://lists.wpkg.org/pipermail/sheepdog/2012-May/003315.html This patch depends on a third-party usersapce RCU library, we can get it from here: http://lttng.org/urcu. Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com> --- configure.ac | 3 ++- sheep/Makefile.am | 7 +++++-- sheep/group.c | 46 +++++++++++++++++++++++++++++++++------------- sheep/sdnet.c | 5 +++-- sheep/sheep_priv.h | 4 +--- sheep/work.c | 3 +++ 6 files changed, 47 insertions(+), 21 deletions(-) diff --git a/configure.ac b/configure.ac index 7a5eedf..b7af48f 100644 --- a/configure.ac +++ b/configure.ac @@ -77,6 +77,7 @@ AM_MISSING_PROG(AUTOM4TE, autom4te, $missing_dir) # Checks for libraries. AC_CHECK_LIB([socket], [socket]) +PKG_CHECK_MODULES([liburcu],[liburcu]) # Checks for header files. AC_FUNC_ALLOCA @@ -86,7 +87,7 @@ AC_HEADER_SYS_WAIT AC_CHECK_HEADERS([arpa/inet.h fcntl.h limits.h netdb.h netinet/in.h stdint.h \ stdlib.h string.h sys/ioctl.h sys/param.h sys/socket.h \ sys/time.h syslog.h unistd.h sys/types.h getopt.h malloc.h \ - sys/sockio.h utmpx.h]) + sys/sockio.h utmpx.h urcu.h]) # Checks for typedefs, structures, and compiler characteristics. AC_C_CONST diff --git a/sheep/Makefile.am b/sheep/Makefile.am index 7448ae1..bca365c 100644 --- a/sheep/Makefile.am +++ b/sheep/Makefile.am @@ -20,7 +20,8 @@ MAINTAINERCLEANFILES = Makefile.in AM_CFLAGS = INCLUDES = -I$(top_builddir)/include -I$(top_srcdir)/include \ - $(libcpg_CFLAGS) $(libcfg_CFLAGS) $(libacrd_CFLAGS) + $(libcpg_CFLAGS) $(libcfg_CFLAGS) $(libacrd_CFLAGS) \ + $(liburcu_CFLAGS) sbin_PROGRAMS = sheep @@ -47,7 +48,9 @@ sheep_SOURCES += trace/trace.c trace/mcount.S trace/stabs.c trace/graph.c endif sheep_LDADD = ../lib/libsheepdog.a -lpthread \ - $(libcpg_LIBS) $(libcfg_LIBS) $(libacrd_LIBS) $(LIBS) + $(libcpg_LIBS) $(libcfg_LIBS) $(libacrd_LIBS) $(LIBS) \ + $(liburcu_LIBS) + sheep_DEPENDENCIES = ../lib/libsheepdog.a diff --git a/sheep/group.c b/sheep/group.c index c7fd387..cd95ba7 100644 --- a/sheep/group.c +++ b/sheep/group.c @@ -13,6 +13,7 @@ #include <stdlib.h> #include <unistd.h> #include <netdb.h> +#include <urcu.h> #include <arpa/inet.h> #include <sys/time.h> #include <sys/epoll.h> @@ -36,7 +37,7 @@ struct vnode_info { struct sd_vnode entries[SD_MAX_VNODES]; int nr_vnodes; int nr_zones; - int refcnt; + struct rcu_head rcu; }; struct join_message { @@ -104,7 +105,6 @@ struct work_leave { }) static int event_running; -static struct vnode_info *current_vnode_info; static size_t get_join_message_size(struct join_message *jm) { @@ -155,15 +155,27 @@ int get_max_nr_copies_from(struct sd_node *nodes, int nr_nodes) struct vnode_info *get_vnode_info(void) { - assert(current_vnode_info); - current_vnode_info->refcnt++; - return current_vnode_info; + struct vnode_info *vnodes, *p; + + assert(sys->current_vnode_info); + + vnodes = zalloc(sizeof(*vnodes)); + if (!vnodes) + panic("failed to allocate memory\n"); + + rcu_read_lock(); + + p = rcu_dereference(sys->current_vnode_info); + memcpy(vnodes, p, sizeof(*p)); + + rcu_read_unlock(); + + return vnodes; } void put_vnode_info(struct vnode_info *vnodes) { - if (vnodes && --vnodes->refcnt == 0) - free(vnodes); + free(vnodes); } struct sd_vnode *oid_to_vnode(struct vnode_info *vnode_info, uint64_t oid, @@ -176,10 +188,16 @@ struct sd_vnode *oid_to_vnode(struct vnode_info *vnode_info, uint64_t oid, return &vnode_info->entries[n]; } +static void vnode_info_reclaim(struct rcu_head *head) +{ + struct vnode_info *vnodes; + vnodes = container_of(head, struct vnode_info, rcu); + free(vnodes); +} static int update_vnode_info(void) { - struct vnode_info *vnode_info; + struct vnode_info *vnode_info, *old_vnode_info; vnode_info = zalloc(sizeof(*vnode_info)); if (!vnode_info) { @@ -190,10 +208,12 @@ static int update_vnode_info(void) vnode_info->nr_vnodes = nodes_to_vnodes(sys->nodes, sys->nr_nodes, vnode_info->entries); vnode_info->nr_zones = get_zones_nr_from(sys->nodes, sys->nr_nodes); - vnode_info->refcnt = 1; - put_vnode_info(current_vnode_info); - current_vnode_info = vnode_info; + old_vnode_info = sys->current_vnode_info; + rcu_assign_pointer(sys->current_vnode_info, vnode_info); + if (old_vnode_info) + call_rcu(&old_vnode_info->rcu, vnode_info_reclaim); + return 0; } @@ -841,7 +861,7 @@ static void __sd_join_done(struct event_struct *cevent) } if (sys_stat_halt()) { - if (current_vnode_info->nr_zones >= sys->nr_copies) + if (sys->current_vnode_info->nr_zones >= sys->nr_copies) sys_stat_set(SD_STATUS_OK); } @@ -871,7 +891,7 @@ static void __sd_leave_done(struct event_struct *cevent) start_recovery(sys->epoch); if (sys_can_halt()) { - if (current_vnode_info->nr_zones < sys->nr_copies) + if (sys->current_vnode_info->nr_zones < sys->nr_copies) sys_stat_set(SD_STATUS_HALT); } } diff --git a/sheep/sdnet.c b/sheep/sdnet.c index f59b1ff..c8c9bdb 100644 --- a/sheep/sdnet.c +++ b/sheep/sdnet.c @@ -303,7 +303,9 @@ static void queue_request(struct request *req) * called before we set up current_vnode_info */ if (!is_force_op(req->op)) - req->vnodes = get_vnode_info(); + /* We can reference current_vnode_info safely + * because we are in main thread here */ + req->vnodes = sys->current_vnode_info; if (is_io_op(req->op)) { req->work.fn = do_io_request; @@ -379,7 +381,6 @@ static void free_request(struct request *req) sys->outstanding_data_size -= req->data_length; list_del(&req->r_siblings); - put_vnode_info(req->vnodes); free(req->data); free(req); } diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h index 2275a93..c3a152b 100644 --- a/sheep/sheep_priv.h +++ b/sheep/sheep_priv.h @@ -123,9 +123,7 @@ struct cluster_info { struct sd_node nodes[SD_MAX_NODES]; int nr_nodes; - /* this array contains a list of ordered virtual nodes */ - struct sd_vnode vnodes[SD_MAX_VNODES]; - int nr_vnodes; + struct vnode_info *current_vnode_info; struct list_head pending_list; diff --git a/sheep/work.c b/sheep/work.c index 8564cb2..84ce3c2 100644 --- a/sheep/work.c +++ b/sheep/work.c @@ -19,6 +19,7 @@ #include <stdio.h> #include <unistd.h> #include <fcntl.h> +#include <urcu.h> #include <stdlib.h> #include <syscall.h> #include <sys/types.h> @@ -172,6 +173,7 @@ static void *worker_routine(void *arg) /* started this thread */ pthread_mutex_unlock(&wi->startup_lock); + rcu_register_thread(); while (!(wi->q.wq_state & WQ_DEAD)) { pthread_mutex_lock(&wi->pending_lock); @@ -200,6 +202,7 @@ retest: eventfd_write(efd, value); } + rcu_unregister_thread(); pthread_exit(NULL); } -- 1.7.7.6 |