[sheepdog] [PATCH v2 1/2] zookeeper: retry zk_create_seq_node on retryable error
MORITA Kazutaka
morita.kazutaka at gmail.com
Thu May 30 16:27:25 CEST 2013
From: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
This checks znodes in the zookeeper queue, and retries zoo_create()
when the requested znode is not created.
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
sheep/cluster/zookeeper.c | 78 +++++++++++++++++++++++++++++++++++++++++----
1 file changed, 71 insertions(+), 7 deletions(-)
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index d82a486..5480475 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -58,6 +58,7 @@ struct zk_node {
};
struct zk_event {
+ uint64_t id;
enum zk_event_type type;
struct zk_node sender;
enum cluster_join_result join_result;
@@ -174,11 +175,13 @@ zk_create_node(const char *path, const char *value, int valuelen,
}
/*
- * FIXME: ZOPERATIONTIMEOUT or ZCONNECTIONLOSS will cause duplicate seq node of
- * the same event that we can't handle at all. Panic out in any error case would
- * be the safest option.
+ * Create a znode after adding a unique monotonically increasing sequence number
+ * to the path name.
+ *
+ * Note that the caller has to retry this function when this returns
+ * ZOPERATIONTIMEOUT or ZCONNECTIONLOSS and the znode is not created.
*/
-static inline ZOOAPI void
+static inline ZOOAPI int
zk_create_seq_node(const char *path, const char *value, int valuelen,
char *path_buffer, int path_buffer_len)
{
@@ -186,7 +189,9 @@ zk_create_seq_node(const char *path, const char *value, int valuelen,
rc = zoo_create(zhandle, path, value, valuelen, &ZOO_OPEN_ACL_UNSAFE,
ZOO_SEQUENCE, path_buffer, path_buffer_len);
if (rc != ZOK)
- panic("failed, path:%s, %s", path, zerror(rc));
+ sd_iprintf("failed, path:%s, %s", path, zerror(rc));
+
+ return rc;
}
static inline ZOOAPI int zk_get_data(const char *path, void *buffer,
@@ -255,15 +260,60 @@ static bool zk_queue_peek(void)
return false;
}
+/* return true if there is a node with 'id' in the queue. */
+static bool zk_find_seq_node(uint64_t id)
+{
+ int rc, len;
+
+ for (int seq = queue_pos; ; seq++) {
+ char seq_path[MAX_NODE_STR_LEN];
+ struct zk_event ev;
+
+ snprintf(seq_path, sizeof(seq_path), QUEUE_ZNODE"/%010"PRId32,
+ seq);
+ len = offsetof(typeof(ev), id) + sizeof(ev.id);
+ rc = zk_get_data(seq_path, &ev, &len);
+ switch (rc) {
+ case ZOK:
+ if (ev.id == id) {
+ sd_dprintf("id %"PRIx64" is found in %s", id,
+ seq_path);
+ return true;
+ }
+ break;
+ case ZNONODE:
+ sd_dprintf("id %"PRIx64" is not found", id);
+ return false;
+ default:
+ panic("failed, %s", zerror(rc));
+ break;
+ }
+ }
+}
+
static void zk_queue_push(struct zk_event *ev)
{
static bool first_push = true;
- int len;
+ int rc, len;
char path[MAX_NODE_STR_LEN], buf[MAX_NODE_STR_LEN];
len = (char *)(ev->buf) - (char *)ev + ev->buf_len;
snprintf(path, sizeof(path), "%s/", QUEUE_ZNODE);
- zk_create_seq_node(path, (char *)ev, len, buf, sizeof(buf));
+again:
+ rc = zk_create_seq_node(path, (char *)ev, len, buf, sizeof(buf));
+ switch (rc) {
+ case ZOK:
+ /* Success */
+ break;
+ case ZOPERATIONTIMEOUT:
+ case ZCONNECTIONLOSS:
+ if (!zk_find_seq_node(ev->id))
+ /* retry if seq_node was not created */
+ goto again;
+ break;
+ default:
+ panic("failed, path:%s, %s", path, zerror(rc));
+ }
if (first_push) {
int32_t seq;
@@ -418,11 +468,24 @@ static void zk_queue_init(void)
zk_init_node(MEMBER_ZNODE);
}
+/* Calculate a unique 64 bit integer from this_node and the sequence number. */
+static uint64_t get_uniq_id(void)
+{
+ static int seq;
+ uint64_t id, n = uatomic_add_return(&seq, 1);
+
+ id = fnv_64a_buf(&this_node, sizeof(this_node), FNV1A_64_INIT);
+ id = fnv_64a_buf(&n, sizeof(n), id);
+
+ return id;
+}
+
static int add_event(enum zk_event_type type, struct zk_node *znode, void *buf,
size_t buf_len)
{
struct zk_event ev;
+ ev.id = get_uniq_id();
ev.type = type;
ev.sender = *znode;
ev.buf_len = buf_len;
@@ -478,6 +541,7 @@ static int add_join_event(void *msg, size_t msg_len)
size_t len = msg_len + sizeof(struct sd_node) * SD_MAX_NODES;
assert(len <= SD_MAX_EVENT_BUF_SIZE);
+ ev.id = get_uniq_id();
ev.type = EVENT_JOIN_REQUEST;
ev.sender = this_node;
ev.msg_len = msg_len;
--
1.7.9.5
More information about the sheepdog
mailing list