[sheepdog] [PATCH 1/3] zookeeper: retry zk_create_seq_node on	retryable error
    MORITA Kazutaka 
    morita.kazutaka at gmail.com
       
    Wed May 29 12:37:47 CEST 2013
    
    
  
From: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
This checks znodes in the zookeeper queue, and retries zoo_create()
when the requested znode is not created.
Signed-off-by: MORITA Kazutaka <morita.kazutaka at lab.ntt.co.jp>
---
 sheep/cluster/zookeeper.c |   78 +++++++++++++++++++++++++++++++++++++++++----
 1 file changed, 71 insertions(+), 7 deletions(-)
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 1cd39f7..c23291b 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -58,6 +58,7 @@ struct zk_node {
 };
 
 struct zk_event {
+	uint64_t id;
 	enum zk_event_type type;
 	struct zk_node sender;
 	enum cluster_join_result join_result;
@@ -174,11 +175,13 @@ zk_create_node(const char *path, const char *value, int valuelen,
 }
 
 /*
- * FIXME: ZOPERATIONTIMEOUT or ZCONNECTIONLOSS will cause duplicate seq node of
- * the same event that we can't handle at all. Panic out in any error case would
- * be the safest option.
+ * Create a znode after adding a unique monotonically increasing sequence number
+ * to the path name.
+ *
+ * Note that this function can return ZOPERATIONTIMEOUT or ZCONNECTIONLOSS and
+ * the caller has to handle it.
  */
-static inline ZOOAPI void
+static inline ZOOAPI int
 zk_create_seq_node(const char *path, const char *value, int valuelen,
 		   char *path_buffer, int path_buffer_len)
 {
@@ -186,7 +189,9 @@ zk_create_seq_node(const char *path, const char *value, int valuelen,
 	rc = zoo_create(zhandle, path, value, valuelen, &ZOO_OPEN_ACL_UNSAFE,
 			ZOO_SEQUENCE, path_buffer, path_buffer_len);
 	if (rc != ZOK)
-		panic("failed, path:%s, %s", path, zerror(rc));
+		sd_iprintf("failed, path:%s, %s", path, zerror(rc));
+
+	return rc;
 }
 
 static inline ZOOAPI int zk_get_data(const char *path, void *buffer,
@@ -255,15 +260,60 @@ static bool zk_queue_peek(void)
 	return false;
 }
 
+/* return true if there is a node with 'id' in the queue. */
+static bool zk_find_seq_node(uint64_t id)
+{
+	int rc, len;
+
+	for (int seq = queue_pos; ; seq++) {
+		char seq_path[MAX_NODE_STR_LEN];
+		struct zk_event ev;
+
+		snprintf(seq_path, sizeof(seq_path), QUEUE_ZNODE"/%010"PRId32,
+			 seq);
+		len = offsetof(typeof(ev), id) + sizeof(ev.id);
+		rc = zk_get_data(seq_path, &ev, &len);
+		switch (rc) {
+		case ZOK:
+			if (ev.id == id) {
+				sd_dprintf("id %"PRIx64" is found in %s", id,
+					   seq_path);
+				return true;
+			}
+			break;
+		case ZNONODE:
+			sd_dprintf("id %"PRIx64" is not found", id);
+			return false;
+		default:
+			panic("failed, %s", zerror(rc));
+			break;
+		}
+	}
+}
+
 static void zk_queue_push(struct zk_event *ev)
 {
 	static bool first_push = true;
-	int len;
+	int rc, len;
 	char path[MAX_NODE_STR_LEN], buf[MAX_NODE_STR_LEN];
 
 	len = (char *)(ev->buf) - (char *)ev + ev->buf_len;
 	snprintf(path, sizeof(path), "%s/", QUEUE_ZNODE);
-	zk_create_seq_node(path, (char *)ev, len, buf, sizeof(buf));
+again:
+	rc = zk_create_seq_node(path, (char *)ev, len, buf, sizeof(buf));
+	switch (rc) {
+	case ZOK:
+		/* Success */
+		break;
+	case ZOPERATIONTIMEOUT:
+	case ZCONNECTIONLOSS:
+		if (!zk_find_seq_node(ev->id))
+			/* retry if seq_node was not created */
+			goto again;
+		break;
+	default:
+		panic("failed, path:%s, %s", path, zerror(rc));
+	}
 	if (first_push) {
 		int32_t seq;
 
@@ -416,11 +466,24 @@ static void zk_queue_init(void)
 	zk_init_node(MEMBER_ZNODE);
 }
 
+/* Calculate a unique 64 bit integer from this_node and the sequence number. */
+static uint64_t get_uniq_id(void)
+{
+	static int seq;
+	uint64_t id, n = uatomic_add_return(&seq, 1);
+
+	id = fnv_64a_buf(&this_node, sizeof(this_node), FNV1A_64_INIT);
+	id = fnv_64a_buf(&n, sizeof(n), id);
+
+	return id;
+}
+
 static int add_event(enum zk_event_type type, struct zk_node *znode, void *buf,
 		     size_t buf_len)
 {
 	struct zk_event ev;
 
+	ev.id = get_uniq_id();
 	ev.type = type;
 	ev.sender = *znode;
 	ev.buf_len = buf_len;
@@ -476,6 +539,7 @@ static int add_join_event(void *msg, size_t msg_len)
 	size_t len = msg_len + sizeof(struct sd_node) * SD_MAX_NODES;
 
 	assert(len <= SD_MAX_EVENT_BUF_SIZE);
+	ev.id = get_uniq_id();
 	ev.type = EVENT_JOIN_REQUEST;
 	ev.sender = this_node;
 	ev.msg_len = msg_len;
-- 
1.7.9.5
    
    
More information about the sheepdog
mailing list