[sheepdog] [PATCH v6 1/2] lib: move sockfd cache from sheep to lib
Hitoshi Mitake
mitake.hitoshi at gmail.com
Wed Jul 31 07:47:59 CEST 2013
At Wed, 31 Jul 2013 13:39:41 +0800,
Liu Yuan wrote:
>
> On Wed, Jul 31, 2013 at 02:18:13PM +0900, Hitoshi Mitake wrote:
> > At Wed, 31 Jul 2013 11:36:46 +0800,
> > Liu Yuan wrote:
> > >
> > > On Wed, Jul 31, 2013 at 11:24:03AM +0800, Liu Yuan wrote:
> > > > On Wed, Jul 31, 2013 at 11:43:14AM +0900, Hitoshi Mitake wrote:
> > > > > On some subcommands, collie also issues lots of request to sheeps. So
> > > > > collie can enjoy sockfd caching. For this purpose, this patch moves
> > > > > sockfd from sheep to lib and generalize the interfaces of sockfd.
> > > > >
> > > > > This patch doesn't change anything related to IO NIC in sockfd
> > > > > cache. Because collie can know address and port of IO NIC so
> > > > > potentially use them.
> > > > >
> > > > > Signed-off-by: Hitoshi Mitake <mitake.hitoshi at lab.ntt.co.jp>
> > > > > ---
> > > > >
> > > > > v6:
> > > > > - revive revalidate_node() because it is required for handling corner
> > > > > case of sheep and doesn't harm collie's behavior
> > > > > - trivial cleaning
> > > > >
> > > > > v5:
> > > > > - make the workqueue for cachefd growth an internal thing in
> > > > > lib/sockfd_cache.c
> > > > > - remove revalidate_node(), because this relies on the assumption of
> > > > > sheep
> > > > > - better naming of functions. do_sockfd_cache_{get, put}() ->
> > > > > sockfd_cache_{get, put}_long()
> > > > >
> > > > > include/Makefile.am | 3 +-
> > > > > include/internal_proto.h | 2 +
> > > > > include/sockfd_cache.h | 22 ++
> > > > > lib/Makefile.am | 2 +-
> > > > > {sheep => lib}/sockfd_cache.c | 125 +++++-----
> > > > > sheep/gateway.c | 10 +-
> > > > > sheep/group.c | 2 +-
> > > > > sheep/sheep.c | 9 +-
> > > > > sheep/sheep_priv.h | 12 +-
> > > > > sheep/sockfd_cache.c | 509 +----------------------------------------
> > > > > 10 files changed, 105 insertions(+), 591 deletions(-)
> > > > > create mode 100644 include/sockfd_cache.h
> > > > > copy {sheep => lib}/sockfd_cache.c (89%)
> > > > >
> > > > > diff --git a/include/Makefile.am b/include/Makefile.am
> > > > > index 0acb76e..4d0c229 100644
> > > > > --- a/include/Makefile.am
> > > > > +++ b/include/Makefile.am
> > > > > @@ -2,4 +2,5 @@ MAINTAINERCLEANFILES = Makefile.in config.h.in
> > > > >
> > > > > noinst_HEADERS = bitops.h event.h logger.h sheepdog_proto.h util.h \
> > > > > list.h net.h sheep.h exits.h strbuf.h rbtree.h \
> > > > > - sha1.h option.h internal_proto.h shepherd.h work.h
> > > > > + sha1.h option.h internal_proto.h shepherd.h work.h \
> > > > > + sockfd_cache.h
> > > > > diff --git a/include/internal_proto.h b/include/internal_proto.h
> > > > > index 0463eae..0061007 100644
> > > > > --- a/include/internal_proto.h
> > > > > +++ b/include/internal_proto.h
> > > > > @@ -20,6 +20,8 @@
> > > > > #include <stdint.h>
> > > > > #include <netinet/in.h>
> > > > >
> > > > > +#include "sheepdog_proto.h"
> > > > > +
> > > > > #define SD_SHEEP_PROTO_VER 0x08
> > > > >
> > > > > #define SD_DEFAULT_COPIES 3
> > > > > diff --git a/include/sockfd_cache.h b/include/sockfd_cache.h
> > > > > new file mode 100644
> > > > > index 0000000..96f37ae
> > > > > --- /dev/null
> > > > > +++ b/include/sockfd_cache.h
> > > > > @@ -0,0 +1,22 @@
> > > > > +#ifndef SOCKFD_CACHE_H
> > > > > +#define SOCKFD_CACHE_H
> > > > > +
> > > > > +#include "internal_proto.h"
> > > > > +#include "work.h"
> > > > > +
> > > > > +struct sockfd *sockfd_cache_get(const struct node_id *nid);
> > > > > +void sockfd_cache_put(const struct node_id *nid, struct sockfd *sfd);
> > > > > +void sockfd_node_del(const struct node_id *nid);
> > > > > +void sockfd_cache_del(const struct node_id *nid, struct sockfd *sfd);
> > > > > +void sockfd_cache_add(const struct node_id *nid);
> > > > > +void sockfd_cache_add_group(const struct sd_node *nodes, int nr);
> > > > > +
> > > > > +int sockfd_init(void);
> > > > > +
> > > > > +/* sockfd_cache */
> > > > > +struct sockfd {
> > > > > + int fd;
> > > > > + int idx;
> > > > > +};
> > > > > +
> > > > > +#endif /* SOCKFD_CACHE_H */
> > > > > diff --git a/lib/Makefile.am b/lib/Makefile.am
> > > > > index a95ce4c..22edf6e 100644
> > > > > --- a/lib/Makefile.am
> > > > > +++ b/lib/Makefile.am
> > > > > @@ -5,7 +5,7 @@ INCLUDES = -I$(top_builddir)/include -I$(top_srcdir)/include
> > > > > noinst_LIBRARIES = libsheepdog.a
> > > > >
> > > > > libsheepdog_a_SOURCES = event.c logger.c net.c util.c rbtree.c strbuf.c \
> > > > > - sha1.c option.c work.c
> > > > > + sha1.c option.c work.c sockfd_cache.c
> > > > >
> > > > > # support for GNU Flymake
> > > > > check-syntax:
> > > > > diff --git a/sheep/sockfd_cache.c b/lib/sockfd_cache.c
> > > > > similarity index 89%
> > > > > copy from sheep/sockfd_cache.c
> > > > > copy to lib/sockfd_cache.c
> > > > > index 13bb7f6..e633e72 100644
> > > > > --- a/sheep/sockfd_cache.c
> > > > > +++ b/lib/sockfd_cache.c
> > > > > @@ -27,7 +27,13 @@
> > > > > * 7 support dual connections to a single node.
> > > > > */
> > > > >
> > > > > -#include "sheep_priv.h"
> > > > > +#include <pthread.h>
> > > > > +
> > > > > +#include "sockfd_cache.h"
> > > > > +#include "work.h"
> > > > > +#include "rbtree.h"
> > > > > +#include "util.h"
> > > > > +#include "sheep.h"
> > > > >
> > > > > struct sockfd_cache {
> > > > > struct rb_root root;
> > > > > @@ -208,20 +214,6 @@ false_out:
> > > > > return false;
> > > > > }
> > > > >
> > > > > -/* When node craches, we should delete it from the cache */
> > > > > -void sockfd_cache_del(const struct node_id *nid)
> > > > > -{
> > > > > - char name[INET6_ADDRSTRLEN];
> > > > > - int n;
> > > > > -
> > > > > - if (!sockfd_cache_destroy(nid))
> > > > > - return;
> > > > > -
> > > > > - n = uatomic_sub_return(&sockfd_cache.count, 1);
> > > > > - addr_to_str(name, sizeof(name), nid->addr, 0);
> > > > > - sd_dprintf("%s:%d, count %d", name, nid->port, n);
> > > > > -}
> > > > > -
> > > > > static void sockfd_cache_add_nolock(const struct node_id *nid)
> > > > > {
> > > > > struct sockfd_cache_entry *new = xmalloc(sizeof(*new));
> > > > > @@ -282,6 +274,8 @@ void sockfd_cache_add(const struct node_id *nid)
> > > > > static uatomic_bool fds_in_grow;
> > > > > static int fds_high_watermark = FDS_WATERMARK(DEFAULT_FDS_COUNT);
> > > > >
> > > > > +static struct work_queue *grow_wq;
> > > > > +
> > > > > static void do_grow_fds(struct work *work)
> > > > > {
> > > > > struct sockfd_cache_entry *entry;
> > > > > @@ -318,6 +312,15 @@ static inline void check_idx(int idx)
> > > > > {
> > > > > struct work *w;
> > > > >
> > > > > + if (!grow_wq) {
> > > > > + sd_dprintf("Request for growing fds is issued, but"
> > > > > + " a work queue for this work is not registered yet.");
> > > > > + sd_dprintf("Initializing work queue in earlier stage"
> > > > > + " is suggested.");
> > > > > +
> > > > > + return;
> > > > > + }
> > > > > +
> > > > > if (idx <= fds_high_watermark)
> > > > > return;
> > > > > if (!uatomic_set_true(&fds_in_grow))
> > > > > @@ -326,7 +329,7 @@ static inline void check_idx(int idx)
> > > > > w = xmalloc(sizeof(*w));
> > > > > w->fn = do_grow_fds;
> > > > > w->done = grow_fds_done;
> > > > > - queue_work(sys->sockfd_wqueue, w);
> > > > > + queue_work(grow_wq, w);
> > > > > }
> > > > >
> > > > > /* Add the node back if it is still alive */
> > > > > @@ -350,7 +353,7 @@ alive:
> > > > > }
> > > > >
> > > > > /* Try to create/get cached IO connection. If failed, fallback to non-IO one */
> > > > > -static struct sockfd *sockfd_cache_get(const struct node_id *nid)
> > > > > +static struct sockfd *sockfd_cache_get_long(const struct node_id *nid)
> > > > > {
> > > > > struct sockfd_cache_entry *entry;
> > > > > struct sockfd *sfd;
> > > > > @@ -372,8 +375,10 @@ grab:
> > > > > */
> > > > > if (!revalidate_node(nid))
> > > > > return NULL;
> > > > > +
> > > > > goto grab;
> > > > > }
> > > > > +
> > > > > check_idx(idx);
> > > > > if (entry->fds[idx].fd != -1) {
> > > > > sd_dprintf("%s:%d, idx %d", name, port, idx);
> > > > > @@ -402,7 +407,7 @@ out:
> > > > > return sfd;
> > > > > }
> > > > >
> > > > > -static void sockfd_cache_put(const struct node_id *nid, int idx)
> > > > > +static void sockfd_cache_put_long(const struct node_id *nid, int idx)
> > > > > {
> > > > > bool use_io = nid->io_port ? true : false;
> > > > > const uint8_t *addr = use_io ? nid->io_addr : nid->addr;
> > > > > @@ -442,20 +447,36 @@ static void sockfd_cache_close(const struct node_id *nid, int idx)
> > > > > }
> > > > >
> > > > > /*
> > > > > + * Create work queue for growing fds.
> > > > > + * Before this function called, growing cannot be done.
> > > > > + */
> > > > > +int sockfd_init(void)
> > > > > +{
> > > > > + grow_wq = create_ordered_work_queue("sockfd_grow");
> > > > > +
> > > > > + if (!grow_wq) {
> > > > > + sd_eprintf("error at creating workqueue for sockfd growth");
> > > > > + return -1;
> > > > > + }
> > > > > +
> > > > > + return 0;
> > > > > +}
> > > > > +
> > > > > +/*
> > > > > * Return a sockfd connected to the node to the caller
> > > > > *
> > > > > * Try to get a 'long' FD as best, which is cached and never closed. If no FD
> > > > > * available, we return a 'short' FD which is supposed to be closed by
> > > > > - * sheep_put_sockfd().
> > > > > + * put_sockfd().
> > > > > *
> > > > > * ret_idx is opaque to the caller, -1 indicates it is a short FD.
> > > > > */
> > > > > -struct sockfd *sheep_get_sockfd(const struct node_id *nid)
> > > > > +struct sockfd *sockfd_cache_get(const struct node_id *nid)
> > > > > {
> > > > > struct sockfd *sfd;
> > > > > int fd;
> > > > >
> > > > > - sfd = sockfd_cache_get(nid);
> > > > > + sfd = sockfd_cache_get_long(nid);
> > > > > if (sfd)
> > > > > return sfd;
> > > > >
> > > > > @@ -473,16 +494,12 @@ struct sockfd *sheep_get_sockfd(const struct node_id *nid)
> > > > >
> > > > > /*
> > > > > * Release a sockfd connected to the node, which is acquired from
> > > > > - * sheep_get_sockfd()
> > > > > + * sockfd_cache_get()
> > > > > *
> > > > > * If it is a long FD, just decrease the refcount to make it available again.
> > > > > * If it is a short FD, close it.
> > > > > - *
> > > > > - * sheep_put_sockfd() or sheep_del_sockfd() should be paired with
> > > > > - * sheep_get_sockfd()
> > > > > */
> > > > > -
> > > > > -void sheep_put_sockfd(const struct node_id *nid, struct sockfd *sfd)
> > > > > +void sockfd_cache_put(const struct node_id *nid, struct sockfd *sfd)
> > > > > {
> > > > > if (sfd->idx == -1) {
> > > > > sd_dprintf("%d", sfd->fd);
> > > > > @@ -491,18 +508,32 @@ void sheep_put_sockfd(const struct node_id *nid, struct sockfd *sfd)
> > > > > return;
> > > > > }
> > > > >
> > > > > - sockfd_cache_put(nid, sfd->idx);
> > > > > + sockfd_cache_put_long(nid, sfd->idx);
> > > > > free(sfd);
> > > > > }
> > > > >
> > > > > +/* Delete all sockfd connected to the node, when node is crashed. */
> > > > > +void sockfd_node_del(const struct node_id *nid)
> > > > > +{
> > > > > + char name[INET6_ADDRSTRLEN];
> > > > > + int n;
> > > > > +
> > > > > + if (!sockfd_cache_destroy(nid))
> > > > > + return;
> > > > > +
> > > > > + n = uatomic_sub_return(&sockfd_cache.count, 1);
> > > > > + addr_to_str(name, sizeof(name), nid->addr, 0);
> > > > > + sd_dprintf("%s:%d, count %d", name, nid->port, n);
> > > > > +}
> > > > > +
> > > > > /*
> > > > > - * Delete a sockfd connected to the node, when node is crashed.
> > > > > + * Delete a sockfd connected to the node.
> > > > > *
> > > > > * If it is a long FD, de-refcount it and tres to destroy all the cached FDs of
> > > > > * this node in the cache.
> > > > > * If it is a short FD, just close it.
> > > > > */
> > > > > -void sheep_del_sockfd(const struct node_id *nid, struct sockfd *sfd)
> > > > > +void sockfd_cache_del(const struct node_id *nid, struct sockfd *sfd)
> > > > > {
> > > > > if (sfd->idx == -1) {
> > > > > sd_dprintf("%d", sfd->fd);
> > > > > @@ -512,38 +543,6 @@ void sheep_del_sockfd(const struct node_id *nid, struct sockfd *sfd)
> > > > > }
> > > > >
> > > > > sockfd_cache_close(nid, sfd->idx);
> > > > > - sockfd_cache_del(nid);
> > > > > + sockfd_cache_put_long(nid, sfd->idx);
> > > > > free(sfd);
> > > > > }
> > > >
> > > > Seems that you changed the behavior of sockfd_cache_del, why? And it is
> > > > confusing and more importantly it is totally wrong, calling sheep_del_sockfd()
> > > > means to delete this node from sockfd cache, and with your change, it doesn't.
> >
> > Thanks for your catching and pointing, this is my mistake.
> >
> > > >
> > > > Please write what have you changed about sockfd_cache in the commit log for
> > > > easier review.
> > > >
> > >
> > > Current sockfd cache is kind of stable, so it is better not to change the code
> > > unless you have reason. I think you can just mainly change some function names
> > > and add sockfd_init to make it work with collie, no?
> >
> > Basically, yes. But I have to do two fixes.
> >
> > 1. Previous sheep_del_sockfd() (current sockfd_cache_del()) calls
> > sockfd_cache_close(). But it is needless because the fd of sfd is
> > closed during sockfd_cache_del(). This is a minor one.
>
> sockfd_cache_close() is needed because it will try to close the fd.
> sockfd_cache_del() will try to destroy the fd if all the fds are *closed*.
> If you don't close the fd, sockfd_cache_del() will fail to destroy the fds.
> sockfd_cache_del() will finally be called again if failred previously.
>
> Your change is wrong to me.
Ah, sorry, this is my misunderstand. As you say, sockfd_cache_close()
is needed for slots_all_free().
>
> >
> > 2. Memory leak in sockfd_cache_destroy(). The function doesn't free
> > entry->fds currently.
>
> Good catch. But this ineed should be on another patch. And sockfd_cache_add_nolock()
> too need to free entry->fds.
>
> I'd suggest introduce a helper function free_entry() to exlude future mistakes
> on it.
OK, I'll do this in the 3rd patch of the next version.
Thanks,
Hitoshi
More information about the sheepdog
mailing list