From 9d13ec65ede775f896c3da1cfa35283afe2f796c Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 16 Jul 2015 16:54:19 -0400 Subject: tipc: introduce link entry structure to struct tipc_node struct 'tipc_node' currently contains two arrays for link attributes, one for the link pointers, and one for the usable link MTUs. We now group those into a new struct 'tipc_link_entry', and intoduce one single array consisting of such enties. Apart from being a cosmetic improvement, this is a starting point for the strict master-slave relation between node and link that we will introduce in the following commits. Reviewed-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/node.c | 163 +++++++++++++++++++++++++++----------------------------- 1 file changed, 78 insertions(+), 85 deletions(-) (limited to 'net/tipc/node.c') diff --git a/net/tipc/node.c b/net/tipc/node.c index 0b1d61a5f853..db46e5d1d156 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -224,126 +224,119 @@ void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port) * * Link becomes active (alone or shared) or standby, depending on its priority. */ -void tipc_node_link_up(struct tipc_node *n_ptr, struct tipc_link *l_ptr) +void tipc_node_link_up(struct tipc_node *n, int bearer_id) { - struct tipc_link **active = &n_ptr->active_links[0]; + struct tipc_link_entry **actv = &n->active_links[0]; + struct tipc_link_entry *le = &n->links[bearer_id]; + struct tipc_link *l = le->link; - n_ptr->working_links++; - n_ptr->action_flags |= TIPC_NOTIFY_LINK_UP; - n_ptr->link_id = l_ptr->peer_bearer_id << 16 | l_ptr->bearer_id; + /* Leave room for tunnel header when returning 'mtu' to users: */ + n->links[bearer_id].mtu = l->mtu - INT_H_SIZE; + + n->working_links++; + n->action_flags |= TIPC_NOTIFY_LINK_UP; + n->link_id = l->peer_bearer_id << 16 | l->bearer_id; pr_debug("Established link <%s> on network plane %c\n", - l_ptr->name, l_ptr->net_plane); + l->name, l->net_plane); - if (!active[0]) { - active[0] = active[1] = l_ptr; - node_established_contact(n_ptr); - goto exit; + /* No active links ? => take both active slots */ + if (!actv[0]) { + actv[0] = le; + actv[1] = le; + node_established_contact(n); + return; } - if (l_ptr->priority < active[0]->priority) { - pr_debug("New link <%s> becomes standby\n", l_ptr->name); - goto exit; + if (l->priority < actv[0]->link->priority) { + pr_debug("New link <%s> becomes standby\n", l->name); + return; } - tipc_link_dup_queue_xmit(active[0], l_ptr); - if (l_ptr->priority == active[0]->priority) { - active[0] = l_ptr; - goto exit; + tipc_link_dup_queue_xmit(actv[0]->link, l); + + /* Take one active slot if applicable */ + if (l->priority == actv[0]->link->priority) { + actv[0] = le; + return; } - pr_debug("Old link <%s> becomes standby\n", active[0]->name); - if (active[1] != active[0]) - pr_debug("Old link <%s> becomes standby\n", active[1]->name); - active[0] = active[1] = l_ptr; -exit: - /* Leave room for changeover header when returning 'mtu' to users: */ - n_ptr->act_mtus[0] = active[0]->mtu - INT_H_SIZE; - n_ptr->act_mtus[1] = active[1]->mtu - INT_H_SIZE; + /* Higher prio than current active? => take both active slots */ + pr_debug("Old l <%s> becomes standby\n", actv[0]->link->name); + if (actv[1] != actv[0]) + pr_debug("Old link <%s> now standby\n", actv[1]->link->name); + actv[0] = le; + actv[1] = le; } /** - * node_select_active_links - select active link + * node_select_active_links - select which working links should be active */ -static void node_select_active_links(struct tipc_node *n_ptr) +static void node_select_active_links(struct tipc_node *n) { - struct tipc_link **active = &n_ptr->active_links[0]; - u32 i; - u32 highest_prio = 0; + struct tipc_link_entry **actv = &n->active_links[0]; + struct tipc_link *l; + u32 b, highest = 0; - active[0] = active[1] = NULL; - - for (i = 0; i < MAX_BEARERS; i++) { - struct tipc_link *l_ptr = n_ptr->links[i]; + actv[0] = NULL; + actv[1] = NULL; - if (!l_ptr || !tipc_link_is_up(l_ptr) || - (l_ptr->priority < highest_prio)) + for (b = 0; b < MAX_BEARERS; b++) { + l = n->links[b].link; + if (!l || !tipc_link_is_up(l) || (l->priority < highest)) + continue; + if (l->priority > highest) { + highest = l->priority; + actv[0] = &n->links[b]; + actv[1] = &n->links[b]; continue; - - if (l_ptr->priority > highest_prio) { - highest_prio = l_ptr->priority; - active[0] = active[1] = l_ptr; - } else { - active[1] = l_ptr; } + actv[1] = &n->links[b]; } } /** * tipc_node_link_down - handle loss of link */ -void tipc_node_link_down(struct tipc_node *n_ptr, struct tipc_link *l_ptr) +void tipc_node_link_down(struct tipc_node *n, int bearer_id) { - struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id); - struct tipc_link **active; + struct tipc_link_entry **actv = &n->active_links[0]; + struct tipc_link_entry *le = &n->links[bearer_id]; + struct tipc_link *l = le->link; - n_ptr->working_links--; - n_ptr->action_flags |= TIPC_NOTIFY_LINK_DOWN; - n_ptr->link_id = l_ptr->peer_bearer_id << 16 | l_ptr->bearer_id; + n->working_links--; + n->action_flags |= TIPC_NOTIFY_LINK_DOWN; + n->link_id = l->peer_bearer_id << 16 | l->bearer_id; - if (!tipc_link_is_active(l_ptr)) { + if (!tipc_link_is_active(l)) { pr_debug("Lost standby link <%s> on network plane %c\n", - l_ptr->name, l_ptr->net_plane); + l->name, l->net_plane); return; } pr_debug("Lost link <%s> on network plane %c\n", - l_ptr->name, l_ptr->net_plane); - - active = &n_ptr->active_links[0]; - if (active[0] == l_ptr) - active[0] = active[1]; - if (active[1] == l_ptr) - active[1] = active[0]; - if (active[0] == l_ptr) - node_select_active_links(n_ptr); - if (tipc_node_is_up(n_ptr)) - tipc_link_failover_send_queue(l_ptr); - else - node_lost_contact(n_ptr); + l->name, l->net_plane); - /* Leave room for changeover header when returning 'mtu' to users: */ - if (active[0]) { - n_ptr->act_mtus[0] = active[0]->mtu - INT_H_SIZE; - n_ptr->act_mtus[1] = active[1]->mtu - INT_H_SIZE; - return; - } - /* Loopback link went down? No fragmentation needed from now on. */ - if (n_ptr->addr == tn->own_addr) { - n_ptr->act_mtus[0] = MAX_MSG_SIZE; - n_ptr->act_mtus[1] = MAX_MSG_SIZE; - } -} + /* Resdistribute active slots if applicable */ + if (actv[0] == le) + actv[0] = actv[1]; + if (actv[1] == le) + actv[1] = actv[0]; -int tipc_node_active_links(struct tipc_node *n_ptr) -{ - return n_ptr->active_links[0] != NULL; + /* Last link of this priority? => select other ones if available */ + if (actv[0] == le) + node_select_active_links(n); + + if (tipc_node_is_up(n)) + tipc_link_failover_send_queue(l); + else + node_lost_contact(n); } -int tipc_node_is_up(struct tipc_node *n_ptr) +bool tipc_node_is_up(struct tipc_node *n) { - return tipc_node_active_links(n_ptr); + return n->active_links[0]; } void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr) { - n_ptr->links[l_ptr->bearer_id] = l_ptr; + n_ptr->links[l_ptr->bearer_id].link = l_ptr; n_ptr->link_cnt++; } @@ -352,9 +345,9 @@ void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr) int i; for (i = 0; i < MAX_BEARERS; i++) { - if (l_ptr != n_ptr->links[i]) + if (l_ptr != n_ptr->links[i].link) continue; - n_ptr->links[i] = NULL; + n_ptr->links[i].link = NULL; n_ptr->link_cnt--; } } @@ -396,7 +389,7 @@ static void node_lost_contact(struct tipc_node *n_ptr) /* Abort any ongoing link failover */ for (i = 0; i < MAX_BEARERS; i++) { - struct tipc_link *l_ptr = n_ptr->links[i]; + struct tipc_link *l_ptr = n_ptr->links[i].link; if (!l_ptr) continue; l_ptr->flags &= ~LINK_FAILINGOVER; @@ -453,7 +446,7 @@ int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 addr, goto exit; tipc_node_lock(node); - link = node->links[bearer_id]; + link = node->links[bearer_id].link; if (link) { strncpy(linkname, link->name, len); err = 0; -- cgit v1.2.3 From d3a43b907ae688af6cb753c53cd7de05f3c1ba85 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 16 Jul 2015 16:54:20 -0400 Subject: tipc: move link creation from neighbor discoverer to node As a step towards turning links into node internal entities, we move the creation of links from the neighbor discovery logics to the node's link control logics. We also create an additional entry for the link's media address in the newly introduced struct tipc_link_entry, since this is where it is needed in the upcoming commits. The current copy in struct tipc_link is kept for now, but will be removed later. Reviewed-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/node.c | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) (limited to 'net/tipc/node.c') diff --git a/net/tipc/node.c b/net/tipc/node.c index db46e5d1d156..06f642abdf38 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -334,6 +334,33 @@ bool tipc_node_is_up(struct tipc_node *n) return n->active_links[0]; } +void tipc_node_check_dest(struct tipc_node *n, struct tipc_bearer *b, + bool *link_up, bool *addr_match, + struct tipc_media_addr *maddr) +{ + struct tipc_link *l = n->links[b->identity].link; + struct tipc_media_addr *curr = &n->links[b->identity].maddr; + + *link_up = l && tipc_link_is_up(l); + *addr_match = l && !memcmp(curr, maddr, sizeof(*maddr)); +} + +bool tipc_node_update_dest(struct tipc_node *n, struct tipc_bearer *b, + struct tipc_media_addr *maddr) +{ + struct tipc_link *l = n->links[b->identity].link; + struct tipc_media_addr *curr = &n->links[b->identity].maddr; + + if (!l) + l = tipc_link_create(n, b, maddr); + if (!l) + return false; + memcpy(&l->media_addr, maddr, sizeof(*maddr)); + memcpy(curr, maddr, sizeof(*maddr)); + tipc_link_reset(l); + return true; +} + void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr) { n_ptr->links[l_ptr->bearer_id].link = l_ptr; -- cgit v1.2.3 From d39bbd445dc44259c77bbbc8aadcce7dcdba39cc Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 16 Jul 2015 16:54:21 -0400 Subject: tipc: move link input queue to tipc_node At present, the link input queue and the name distributor receive queues are fields aggregated in struct tipc_link. This is a hazard, because a link might be deleted while a receiving socket still keeps reference to one of the queues. This commit fixes this bug. However, rather than adding yet another reference counter to the critical data path, we move the two queues to safe ground inside struct tipc_node, which is already protected, and let the link code only handle references to the queues. This is also in line with planned later changes in this area. Reviewed-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/node.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'net/tipc/node.c') diff --git a/net/tipc/node.c b/net/tipc/node.c index 06f642abdf38..20ec61ceffac 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -132,6 +132,7 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr) INIT_LIST_HEAD(&n_ptr->list); INIT_LIST_HEAD(&n_ptr->publ_list); INIT_LIST_HEAD(&n_ptr->conn_sks); + skb_queue_head_init(&n_ptr->bclink.namedq); __skb_queue_head_init(&n_ptr->bclink.deferdq); hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]); list_for_each_entry_rcu(temp_node, &tn->node_list, list) { @@ -350,9 +351,10 @@ bool tipc_node_update_dest(struct tipc_node *n, struct tipc_bearer *b, { struct tipc_link *l = n->links[b->identity].link; struct tipc_media_addr *curr = &n->links[b->identity].maddr; + struct sk_buff_head *inputq = &n->links[b->identity].inputq; if (!l) - l = tipc_link_create(n, b, maddr); + l = tipc_link_create(n, b, maddr, inputq, &n->bclink.namedq); if (!l) return false; memcpy(&l->media_addr, maddr, sizeof(*maddr)); -- cgit v1.2.3 From 36e78a463b26c9b8017a2e11dcd6c4b8e34b4161 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 16 Jul 2015 16:54:22 -0400 Subject: tipc: use bearer index when looking up active links struct tipc_node currently holds two arrays of link pointers; one, indexed by bearer identity, which contains all links irrespective of current state, and one two-slot array for the currently active link or links. The latter array contains direct pointers into the elements of the former. This has the effect that we cannot know the bearer id of a link when accessing it via the "active_links[]" array without actually dereferencing the pointer, something we want to avoid in some cases. In this commit, we do instead store the bearer identity in the "active_links" array, and use this as an index to find the right element in the overall link entry array. This change should be seen as a preparation for the later commits in this series. Reviewed-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/node.c | 106 +++++++++++++++++++++++--------------------------------- 1 file changed, 44 insertions(+), 62 deletions(-) (limited to 'net/tipc/node.c') diff --git a/net/tipc/node.c b/net/tipc/node.c index 20ec61ceffac..19729645d494 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -142,6 +142,8 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr) list_add_tail_rcu(&n_ptr->list, &temp_node->list); n_ptr->action_flags = TIPC_WAIT_PEER_LINKS_DOWN; n_ptr->signature = INVALID_NODE_SIG; + n_ptr->active_links[0] = INVALID_BEARER_ID; + n_ptr->active_links[1] = INVALID_BEARER_ID; tipc_node_get(n_ptr); exit: spin_unlock_bh(&tn->node_list_lock); @@ -227,12 +229,13 @@ void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port) */ void tipc_node_link_up(struct tipc_node *n, int bearer_id) { - struct tipc_link_entry **actv = &n->active_links[0]; - struct tipc_link_entry *le = &n->links[bearer_id]; - struct tipc_link *l = le->link; + int *slot0 = &n->active_links[0]; + int *slot1 = &n->active_links[1]; + struct tipc_link_entry *links = n->links; + struct tipc_link *l = n->links[bearer_id].link; /* Leave room for tunnel header when returning 'mtu' to users: */ - n->links[bearer_id].mtu = l->mtu - INT_H_SIZE; + links[bearer_id].mtu = l->mtu - INT_H_SIZE; n->working_links++; n->action_flags |= TIPC_NOTIFY_LINK_UP; @@ -242,55 +245,30 @@ void tipc_node_link_up(struct tipc_node *n, int bearer_id) l->name, l->net_plane); /* No active links ? => take both active slots */ - if (!actv[0]) { - actv[0] = le; - actv[1] = le; + if (*slot0 < 0) { + *slot0 = bearer_id; + *slot1 = bearer_id; node_established_contact(n); return; } - if (l->priority < actv[0]->link->priority) { + + /* Lower prio than current active ? => no slot */ + if (l->priority < links[*slot0].link->priority) { pr_debug("New link <%s> becomes standby\n", l->name); return; } - tipc_link_dup_queue_xmit(actv[0]->link, l); + tipc_link_dup_queue_xmit(links[*slot0].link, l); - /* Take one active slot if applicable */ - if (l->priority == actv[0]->link->priority) { - actv[0] = le; + /* Same prio as current active ? => take one slot */ + if (l->priority == links[*slot0].link->priority) { + *slot0 = bearer_id; return; } - /* Higher prio than current active? => take both active slots */ - pr_debug("Old l <%s> becomes standby\n", actv[0]->link->name); - if (actv[1] != actv[0]) - pr_debug("Old link <%s> now standby\n", actv[1]->link->name); - actv[0] = le; - actv[1] = le; -} - -/** - * node_select_active_links - select which working links should be active - */ -static void node_select_active_links(struct tipc_node *n) -{ - struct tipc_link_entry **actv = &n->active_links[0]; - struct tipc_link *l; - u32 b, highest = 0; - actv[0] = NULL; - actv[1] = NULL; - - for (b = 0; b < MAX_BEARERS; b++) { - l = n->links[b].link; - if (!l || !tipc_link_is_up(l) || (l->priority < highest)) - continue; - if (l->priority > highest) { - highest = l->priority; - actv[0] = &n->links[b]; - actv[1] = &n->links[b]; - continue; - } - actv[1] = &n->links[b]; - } + /* Higher prio than current active => take both active slots */ + pr_debug("Old link <%s> now standby\n", links[*slot0].link->name); + *slot0 = bearer_id; + *slot1 = bearer_id; } /** @@ -298,32 +276,36 @@ static void node_select_active_links(struct tipc_node *n) */ void tipc_node_link_down(struct tipc_node *n, int bearer_id) { - struct tipc_link_entry **actv = &n->active_links[0]; - struct tipc_link_entry *le = &n->links[bearer_id]; - struct tipc_link *l = le->link; + int *slot0 = &n->active_links[0]; + int *slot1 = &n->active_links[1]; + int i, highest = 0; + struct tipc_link *l, *_l; + l = n->links[bearer_id].link; n->working_links--; n->action_flags |= TIPC_NOTIFY_LINK_DOWN; n->link_id = l->peer_bearer_id << 16 | l->bearer_id; - if (!tipc_link_is_active(l)) { - pr_debug("Lost standby link <%s> on network plane %c\n", - l->name, l->net_plane); - return; - } pr_debug("Lost link <%s> on network plane %c\n", l->name, l->net_plane); - /* Resdistribute active slots if applicable */ - if (actv[0] == le) - actv[0] = actv[1]; - if (actv[1] == le) - actv[1] = actv[0]; - - /* Last link of this priority? => select other ones if available */ - if (actv[0] == le) - node_select_active_links(n); - + /* Select new active link if any available */ + *slot0 = INVALID_BEARER_ID; + *slot1 = INVALID_BEARER_ID; + for (i = 0; i < MAX_BEARERS; i++) { + _l = n->links[i].link; + if (!_l || !tipc_link_is_up(_l)) + continue; + if (_l->priority < highest) + continue; + if (_l->priority > highest) { + highest = _l->priority; + *slot0 = i; + *slot1 = i; + continue; + } + *slot1 = i; + } if (tipc_node_is_up(n)) tipc_link_failover_send_queue(l); else @@ -332,7 +314,7 @@ void tipc_node_link_down(struct tipc_node *n, int bearer_id) bool tipc_node_is_up(struct tipc_node *n) { - return n->active_links[0]; + return n->active_links[0] != INVALID_BEARER_ID; } void tipc_node_check_dest(struct tipc_node *n, struct tipc_bearer *b, -- cgit v1.2.3 From af9b028e270fda6fb812d70d17d902297df1ceb5 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 16 Jul 2015 16:54:24 -0400 Subject: tipc: make media xmit call outside node spinlock context Currently, message sending is performed through a deep call chain, where the node spinlock is grabbed and held during a significant part of the transmission time. This is clearly detrimental to overall throughput performance; it would be better if we could send the message after the spinlock has been released. In this commit, we do instead let the call revert on the stack after the buffer chain has been added to the transmission queue, whereafter clones of the buffers are transmitted to the device layer outside the spinlock scope. As a further step in our effort to separate the roles of the node and link entities we also move the function tipc_link_xmit() to node.c, and rename it to tipc_node_xmit(). Reviewed-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/node.c | 78 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) (limited to 'net/tipc/node.c') diff --git a/net/tipc/node.c b/net/tipc/node.c index 19729645d494..ad759bb034e7 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -563,6 +563,84 @@ msg_full: return -EMSGSIZE; } +static struct tipc_link *tipc_node_select_link(struct tipc_node *n, int sel, + int *bearer_id, + struct tipc_media_addr **maddr) +{ + int id = n->active_links[sel & 1]; + + if (unlikely(id < 0)) + return NULL; + + *bearer_id = id; + *maddr = &n->links[id].maddr; + return n->links[id].link; +} + +/** + * tipc_node_xmit() is the general link level function for message sending + * @net: the applicable net namespace + * @list: chain of buffers containing message + * @dnode: address of destination node + * @selector: a number used for deterministic link selection + * Consumes the buffer chain, except when returning -ELINKCONG + * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE + */ +int tipc_node_xmit(struct net *net, struct sk_buff_head *list, + u32 dnode, int selector) +{ + struct tipc_link *l = NULL; + struct tipc_node *n; + struct sk_buff_head xmitq; + struct tipc_media_addr *maddr; + int bearer_id; + int rc = -EHOSTUNREACH; + + __skb_queue_head_init(&xmitq); + n = tipc_node_find(net, dnode); + if (likely(n)) { + tipc_node_lock(n); + l = tipc_node_select_link(n, selector, &bearer_id, &maddr); + if (likely(l)) + rc = tipc_link_xmit(l, list, &xmitq); + if (unlikely(rc == -ENOBUFS)) + tipc_link_reset(l); + tipc_node_unlock(n); + tipc_node_put(n); + } + if (likely(!rc)) { + tipc_bearer_xmit(net, bearer_id, &xmitq, maddr); + return 0; + } + if (likely(in_own_node(net, dnode))) { + tipc_sk_rcv(net, list); + return 0; + } + return rc; +} + +/* tipc_node_xmit_skb(): send single buffer to destination + * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE + * messages, which will not be rejected + * The only exception is datagram messages rerouted after secondary + * lookup, which are rare and safe to dispose of anyway. + * TODO: Return real return value, and let callers use + * tipc_wait_for_sendpkt() where applicable + */ +int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode, + u32 selector) +{ + struct sk_buff_head head; + int rc; + + skb_queue_head_init(&head); + __skb_queue_tail(&head, skb); + rc = tipc_node_xmit(net, &head, dnode, selector); + if (rc == -ELINKCONG) + kfree_skb(skb); + return 0; +} + int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb) { int err; -- cgit v1.2.3 From d3504c3449fead545e5254bfb11da916f72c4734 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 16 Jul 2015 16:54:25 -0400 Subject: tipc: clean up definitions and usage of link flags The status flag LINK_STOPPED is not needed any more, since the mechanism for delayed deletion of links has been removed. Likewise, LINK_STARTED and LINK_START_EVT are unnecessary, because we can just as well start the link timer directly from inside tipc_link_create(). We eliminate these flags in this commit. Instead of the above flags, we now introduce three new link modes, TIPC_LINK_OPEN, TIPC_LINK_BLOCKED and TIPC_LINK_TUNNEL. The values indicate whether, and in the case of TIPC_LINK_TUNNEL, which, messages the link is allowed to receive in this state. TIPC_LINK_BLOCKED also blocks timer-driven protocol messages to be sent out, and any change to the link FSM. Since the modes are mutually exclusive, we convert them to state values, and rename the 'flags' field in struct tipc_link to 'exec_mode'. Finally, we move the #defines for link FSM states and events from link.h into enums inside the file link.c, which is the real usage scope of these definitions. Reviewed-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/node.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'net/tipc/node.c') diff --git a/net/tipc/node.c b/net/tipc/node.c index ad759bb034e7..b7a4457f653c 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -403,7 +403,7 @@ static void node_lost_contact(struct tipc_node *n_ptr) struct tipc_link *l_ptr = n_ptr->links[i].link; if (!l_ptr) continue; - l_ptr->flags &= ~LINK_FAILINGOVER; + l_ptr->exec_mode = TIPC_LINK_OPEN; l_ptr->failover_checkpt = 0; l_ptr->failover_pkts = 0; kfree_skb(l_ptr->failover_skb); -- cgit v1.2.3 From 8a1577c96f122308ac9b5f195f9f9a7dd74ac541 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 16 Jul 2015 16:54:29 -0400 Subject: tipc: move link supervision timer to node level In our effort to move control of the links to the link aggregation layer, we move the perodic link supervision timer to struct tipc_node. The new timer is shared between all links belonging to the node, thus saving resources, while still kicking the FSM on both its pertaining links at each expiration. The current link timer and corresponding functions are removed. Reviewed-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/node.c | 66 +++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 62 insertions(+), 4 deletions(-) (limited to 'net/tipc/node.c') diff --git a/net/tipc/node.c b/net/tipc/node.c index b7a4457f653c..77effb233725 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -44,6 +44,7 @@ static void node_lost_contact(struct tipc_node *n_ptr); static void node_established_contact(struct tipc_node *n_ptr); static void tipc_node_delete(struct tipc_node *node); +static void tipc_node_timeout(unsigned long data); struct tipc_sock_conn { u32 port; @@ -145,11 +146,27 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr) n_ptr->active_links[0] = INVALID_BEARER_ID; n_ptr->active_links[1] = INVALID_BEARER_ID; tipc_node_get(n_ptr); + setup_timer(&n_ptr->timer, tipc_node_timeout, (unsigned long)n_ptr); + n_ptr->keepalive_intv = U32_MAX; exit: spin_unlock_bh(&tn->node_list_lock); return n_ptr; } +static void tipc_node_calculate_timer(struct tipc_node *n, struct tipc_link *l) +{ + unsigned long tol = l->tolerance; + unsigned long intv = ((tol / 4) > 500) ? 500 : tol / 4; + unsigned long keepalive_intv = msecs_to_jiffies(intv); + + /* Link with lowest tolerance determines timer interval */ + if (keepalive_intv < n->keepalive_intv) + n->keepalive_intv = keepalive_intv; + + /* Ensure link's abort limit corresponds to current interval */ + l->abort_limit = l->tolerance / jiffies_to_msecs(n->keepalive_intv); +} + static void tipc_node_delete(struct tipc_node *node) { list_del_rcu(&node->list); @@ -163,8 +180,11 @@ void tipc_node_stop(struct net *net) struct tipc_node *node, *t_node; spin_lock_bh(&tn->node_list_lock); - list_for_each_entry_safe(node, t_node, &tn->node_list, list) + list_for_each_entry_safe(node, t_node, &tn->node_list, list) { + if (del_timer(&node->timer)) + tipc_node_put(node); tipc_node_put(node); + } spin_unlock_bh(&tn->node_list_lock); } @@ -222,6 +242,38 @@ void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port) tipc_node_put(node); } +/* tipc_node_timeout - handle expiration of node timer + */ +static void tipc_node_timeout(unsigned long data) +{ + struct tipc_node *n = (struct tipc_node *)data; + struct sk_buff_head xmitq; + struct tipc_link *l; + struct tipc_media_addr *maddr; + int bearer_id; + int rc = 0; + + __skb_queue_head_init(&xmitq); + + for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) { + tipc_node_lock(n); + l = n->links[bearer_id].link; + if (l) { + /* Link tolerance may change asynchronously: */ + tipc_node_calculate_timer(n, l); + rc = tipc_link_timeout(l, &xmitq); + if (rc & TIPC_LINK_DOWN_EVT) + tipc_link_reset(l); + } + tipc_node_unlock(n); + maddr = &n->links[bearer_id].maddr; + tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr); + } + if (!mod_timer(&n->timer, jiffies + n->keepalive_intv)) + tipc_node_get(n); + tipc_node_put(n); +} + /** * tipc_node_link_up - handle addition of link * @@ -335,10 +387,16 @@ bool tipc_node_update_dest(struct tipc_node *n, struct tipc_bearer *b, struct tipc_media_addr *curr = &n->links[b->identity].maddr; struct sk_buff_head *inputq = &n->links[b->identity].inputq; - if (!l) + if (!l) { l = tipc_link_create(n, b, maddr, inputq, &n->bclink.namedq); - if (!l) - return false; + if (!l) + return false; + tipc_node_calculate_timer(n, l); + if (n->link_cnt == 1) { + if (!mod_timer(&n->timer, jiffies + n->keepalive_intv)) + tipc_node_get(n); + } + } memcpy(&l->media_addr, maddr, sizeof(*maddr)); memcpy(curr, maddr, sizeof(*maddr)); tipc_link_reset(l); -- cgit v1.2.3 From 1a20cc254e60e79929ef7edb5cf784df86b46e42 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 16 Jul 2015 16:54:30 -0400 Subject: tipc: introduce node contact FSM The logics for determining when a node is permitted to establish and maintain contact with its peer node becomes non-trivial in the presence of multiple parallel links that may come and go independently. A known failure scenario is that one endpoint registers both its links to the peer lost, cleans up it binding table, and prepares for a table update once contact is re-establihed, while the other endpoint may see its links reset and re-established one by one, hence seeing no need to re-synchronize the binding table. To avoid this, a node must not allow re-establishing contact until it has confirmation that even the peer has lost both links. Currently, the mechanism for handling this consists of setting and resetting two state flags from different locations in the code. This solution is hard to understand and maintain. A closer analysis even reveals that it is not completely safe. In this commit we do instead introduce an FSM that keeps track of the conditions for when the node can establish and maintain links. It has six states and four events, and is strictly based on explicit knowledge about the own node's and the peer node's contact states. Only events leading to state change are shown as edges in the figure below. +--------------+ | SELF_UP/ | +---------------->| PEER_COMING |-----------------+ SELF_ | +--------------+ |PEER_ ESTBL_ | | |ESTBL_ CONTACT| SELF_LOST_CONTACT | |CONTACT | v | | +--------------+ | | PEER_ | SELF_DOWN/ | SELF_ | | LOST_ +--| PEER_LEAVING |<--+ LOST_ v +-------------+ CONTACT | +--------------+ | CONTACT +-----------+ | SELF_DOWN/ |<----------+ +----------| SELF_UP/ | | PEER_DOWN |<----------+ +----------| PEER_UP | +-------------+ SELF_ | +--------------+ | PEER_ +-----------+ | LOST_ +--| SELF_LEAVING/|<--+ LOST_ A | CONTACT | PEER_DOWN | CONTACT | | +--------------+ | | A | PEER_ | PEER_LOST_CONTACT | |SELF_ ESTBL_ | | |ESTBL_ CONTACT| +--------------+ |CONTACT +---------------->| PEER_UP/ |-----------------+ | SELF_COMING | +--------------+ Reviewed-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/node.c | 130 +++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 125 insertions(+), 5 deletions(-) (limited to 'net/tipc/node.c') diff --git a/net/tipc/node.c b/net/tipc/node.c index 77effb233725..9dbbb5de287b 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -141,7 +141,7 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr) break; } list_add_tail_rcu(&n_ptr->list, &temp_node->list); - n_ptr->action_flags = TIPC_WAIT_PEER_LINKS_DOWN; + n_ptr->state = SELF_DOWN_PEER_DOWN; n_ptr->signature = INVALID_NODE_SIG; n_ptr->active_links[0] = INVALID_BEARER_ID; n_ptr->active_links[1] = INVALID_BEARER_ID; @@ -421,8 +421,131 @@ void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr) } } +/* tipc_node_fsm_evt - node finite state machine + * Determines when contact is allowed with peer node + */ +void tipc_node_fsm_evt(struct tipc_node *n, int evt) +{ + int state = n->state; + + switch (state) { + case SELF_DOWN_PEER_DOWN: + switch (evt) { + case SELF_ESTABL_CONTACT_EVT: + state = SELF_UP_PEER_COMING; + break; + case PEER_ESTABL_CONTACT_EVT: + state = SELF_COMING_PEER_UP; + break; + case SELF_LOST_CONTACT_EVT: + case PEER_LOST_CONTACT_EVT: + break; + default: + pr_err("Unknown node fsm evt %x/%x\n", state, evt); + } + break; + case SELF_UP_PEER_UP: + switch (evt) { + case SELF_LOST_CONTACT_EVT: + state = SELF_DOWN_PEER_LEAVING; + break; + case PEER_LOST_CONTACT_EVT: + state = SELF_LEAVING_PEER_DOWN; + break; + case SELF_ESTABL_CONTACT_EVT: + case PEER_ESTABL_CONTACT_EVT: + break; + default: + pr_err("Unknown node fsm evt %x/%x\n", state, evt); + } + break; + case SELF_DOWN_PEER_LEAVING: + switch (evt) { + case PEER_LOST_CONTACT_EVT: + state = SELF_DOWN_PEER_DOWN; + break; + case SELF_ESTABL_CONTACT_EVT: + case PEER_ESTABL_CONTACT_EVT: + case SELF_LOST_CONTACT_EVT: + break; + default: + pr_err("Unknown node fsm evt %x/%x\n", state, evt); + } + break; + case SELF_UP_PEER_COMING: + switch (evt) { + case PEER_ESTABL_CONTACT_EVT: + state = SELF_UP_PEER_UP; + break; + case SELF_LOST_CONTACT_EVT: + state = SELF_DOWN_PEER_LEAVING; + break; + case SELF_ESTABL_CONTACT_EVT: + case PEER_LOST_CONTACT_EVT: + break; + default: + pr_err("Unknown node fsm evt %x/%x\n", state, evt); + } + break; + case SELF_COMING_PEER_UP: + switch (evt) { + case SELF_ESTABL_CONTACT_EVT: + state = SELF_UP_PEER_UP; + break; + case PEER_LOST_CONTACT_EVT: + state = SELF_LEAVING_PEER_DOWN; + break; + case SELF_LOST_CONTACT_EVT: + case PEER_ESTABL_CONTACT_EVT: + break; + default: + pr_err("Unknown node fsm evt %x/%x\n", state, evt); + } + break; + case SELF_LEAVING_PEER_DOWN: + switch (evt) { + case SELF_LOST_CONTACT_EVT: + state = SELF_DOWN_PEER_DOWN; + break; + case SELF_ESTABL_CONTACT_EVT: + case PEER_ESTABL_CONTACT_EVT: + case PEER_LOST_CONTACT_EVT: + break; + default: + pr_err("Unknown node fsm evt %x/%x\n", state, evt); + } + break; + default: + pr_err("Unknown node fsm state %x\n", state); + break; + } + + n->state = state; +} + +bool tipc_node_filter_skb(struct tipc_node *n, struct tipc_msg *hdr) +{ + int state = n->state; + + if (likely(state == SELF_UP_PEER_UP)) + return true; + if (state == SELF_DOWN_PEER_DOWN) + return true; + if (state == SELF_UP_PEER_COMING) + return true; + if (state == SELF_COMING_PEER_UP) + return true; + if (state == SELF_LEAVING_PEER_DOWN) + return false; + if (state == SELF_DOWN_PEER_LEAVING) + if (!msg_peer_is_up(hdr)) + return true; + return false; +} + static void node_established_contact(struct tipc_node *n_ptr) { + tipc_node_fsm_evt(n_ptr, SELF_ESTABL_CONTACT_EVT); n_ptr->action_flags |= TIPC_NOTIFY_NODE_UP; n_ptr->bclink.oos_state = 0; n_ptr->bclink.acked = tipc_bclink_get_last_sent(n_ptr->net); @@ -468,11 +591,8 @@ static void node_lost_contact(struct tipc_node *n_ptr) l_ptr->failover_skb = NULL; tipc_link_reset_fragments(l_ptr); } - - n_ptr->action_flags &= ~TIPC_WAIT_OWN_LINKS_DOWN; - /* Prevent re-contact with node until cleanup is done */ - n_ptr->action_flags |= TIPC_WAIT_PEER_LINKS_DOWN; + tipc_node_fsm_evt(n_ptr, SELF_LOST_CONTACT_EVT); /* Notify publications from this node */ n_ptr->action_flags |= TIPC_NOTIFY_NODE_DOWN; -- cgit v1.2.3 From d999297c3dbbe7fdd832f7fa4ec84301e170b3e6 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 16 Jul 2015 16:54:31 -0400 Subject: tipc: reduce locking scope during packet reception We convert packet/message reception according to the same principle we have been using for message sending and timeout handling: We move the function tipc_rcv() to node.c, hence handling the initial packet reception at the link aggregation level. The function grabs the node lock, selects the receiving link, and accesses it via a new call tipc_link_rcv(). This function appends buffers to the input queue for delivery upwards, but it may also append outgoing packets to the xmit queue, just as we do during regular message sending. The latter will happen when buffers are forwarded from the link backlog, or when retransmission is requested. Upon return of this function, and after having released the node lock, tipc_rcv() delivers/tranmsits the contents of those queues, but it may also perform actions such as link activation or reset, as indicated by the return flags from the link. This reduces the number of cpu cycles spent inside the node spinlock, and reduces contention on that lock. Reviewed-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/node.c | 105 ++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 98 insertions(+), 7 deletions(-) (limited to 'net/tipc/node.c') diff --git a/net/tipc/node.c b/net/tipc/node.c index 9dbbb5de287b..e92f84afbf95 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -40,11 +40,13 @@ #include "name_distr.h" #include "socket.h" #include "bcast.h" +#include "discover.h" static void node_lost_contact(struct tipc_node *n_ptr); static void node_established_contact(struct tipc_node *n_ptr); static void tipc_node_delete(struct tipc_node *node); static void tipc_node_timeout(unsigned long data); +static void tipc_node_fsm_evt(struct tipc_node *n, int evt); struct tipc_sock_conn { u32 port; @@ -141,7 +143,7 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr) break; } list_add_tail_rcu(&n_ptr->list, &temp_node->list); - n_ptr->state = SELF_DOWN_PEER_DOWN; + n_ptr->state = SELF_DOWN_PEER_LEAVING; n_ptr->signature = INVALID_NODE_SIG; n_ptr->active_links[0] = INVALID_BEARER_ID; n_ptr->active_links[1] = INVALID_BEARER_ID; @@ -424,7 +426,7 @@ void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr) /* tipc_node_fsm_evt - node finite state machine * Determines when contact is allowed with peer node */ -void tipc_node_fsm_evt(struct tipc_node *n, int evt) +static void tipc_node_fsm_evt(struct tipc_node *n, int evt) { int state = n->state; @@ -523,23 +525,36 @@ void tipc_node_fsm_evt(struct tipc_node *n, int evt) n->state = state; } -bool tipc_node_filter_skb(struct tipc_node *n, struct tipc_msg *hdr) +bool tipc_node_filter_skb(struct tipc_node *n, struct tipc_link *l, + struct tipc_msg *hdr) { int state = n->state; if (likely(state == SELF_UP_PEER_UP)) return true; + if (state == SELF_DOWN_PEER_DOWN) return true; - if (state == SELF_UP_PEER_COMING) + + if (state == SELF_UP_PEER_COMING) { + /* If not traffic msg, peer may still be ESTABLISHING */ + if (tipc_link_is_up(l) && msg_is_traffic(hdr)) + tipc_node_fsm_evt(n, PEER_ESTABL_CONTACT_EVT); return true; + } + if (state == SELF_COMING_PEER_UP) return true; + if (state == SELF_LEAVING_PEER_DOWN) return false; - if (state == SELF_DOWN_PEER_LEAVING) - if (!msg_peer_is_up(hdr)) - return true; + + if (state == SELF_DOWN_PEER_LEAVING) { + if (msg_peer_is_up(hdr)) + return false; + tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT); + return true; + } return false; } @@ -819,6 +834,82 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode, return 0; } +/** + * tipc_rcv - process TIPC packets/messages arriving from off-node + * @net: the applicable net namespace + * @skb: TIPC packet + * @bearer: pointer to bearer message arrived on + * + * Invoked with no locks held. Bearer pointer must point to a valid bearer + * structure (i.e. cannot be NULL), but bearer can be inactive. + */ +void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) +{ + struct sk_buff_head xmitq; + struct tipc_node *n; + struct tipc_link *l; + struct tipc_msg *hdr; + struct tipc_media_addr *maddr; + int bearer_id = b->identity; + int rc = 0; + + __skb_queue_head_init(&xmitq); + + /* Ensure message is well-formed */ + if (unlikely(!tipc_msg_validate(skb))) + goto discard; + + /* Handle arrival of a non-unicast link packet */ + hdr = buf_msg(skb); + if (unlikely(msg_non_seq(hdr))) { + if (msg_user(hdr) == LINK_CONFIG) + tipc_disc_rcv(net, skb, b); + else + tipc_bclink_rcv(net, skb); + return; + } + + /* Locate neighboring node that sent packet */ + n = tipc_node_find(net, msg_prevnode(hdr)); + if (unlikely(!n)) + goto discard; + tipc_node_lock(n); + + /* Locate link endpoint that should handle packet */ + l = n->links[bearer_id].link; + if (unlikely(!l)) + goto unlock; + + /* Is reception of this packet permitted at the moment ? */ + if (unlikely(n->state != SELF_UP_PEER_UP)) + if (!tipc_node_filter_skb(n, l, hdr)) + goto unlock; + + if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) + tipc_bclink_sync_state(n, hdr); + + /* Release acked broadcast messages */ + if (unlikely(n->bclink.acked != msg_bcast_ack(hdr))) + tipc_bclink_acknowledge(n, msg_bcast_ack(hdr)); + + /* Check protocol and update link state */ + rc = tipc_link_rcv(l, skb, &xmitq); + + if (unlikely(rc & TIPC_LINK_UP_EVT)) + tipc_link_activate(l); + if (unlikely(rc & TIPC_LINK_DOWN_EVT)) + tipc_link_reset(l); + skb = NULL; +unlock: + tipc_node_unlock(n); + tipc_sk_rcv(net, &n->links[bearer_id].inputq); + maddr = &n->links[bearer_id].maddr; + tipc_bearer_xmit(net, bearer_id, &xmitq, maddr); + tipc_node_put(n); +discard: + kfree_skb(skb); +} + int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb) { int err; -- cgit v1.2.3 From cbeb83ca68dcedf69b336fd1c5263658cbe5b51e Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 30 Jul 2015 18:24:15 -0400 Subject: tipc: eliminate function tipc_link_activate() The function tipc_link_activate() is redundant, since it mostly performs settings that have already been done in a preceding tipc_link_reset(). There are three exceptions to this: - The actual state change to TIPC_LINK_WORKING. This should anyway be done in the FSM, and not in a separate function. - Registration of the link with the bearer. This should be done by the node, since we don't want the link to have any knowledge about its specific bearer. - Call to tipc_node_link_up() for user access registration. With the new role distribution between link aggregation and link level this becomes the wrong call order; tipc_node_link_up() should instead be called directly as a result of a TIPC_LINK_UP event, hence by the node itself. This commit implements those changes. Tested-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/node.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'net/tipc/node.c') diff --git a/net/tipc/node.c b/net/tipc/node.c index e92f84afbf95..558df25a7fc6 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -295,11 +295,13 @@ void tipc_node_link_up(struct tipc_node *n, int bearer_id) n->action_flags |= TIPC_NOTIFY_LINK_UP; n->link_id = l->peer_bearer_id << 16 | l->bearer_id; + tipc_bearer_add_dest(n->net, bearer_id, n->addr); + pr_debug("Established link <%s> on network plane %c\n", l->name, l->net_plane); /* No active links ? => take both active slots */ - if (*slot0 < 0) { + if (!tipc_node_is_up(n)) { *slot0 = bearer_id; *slot1 = bearer_id; node_established_contact(n); @@ -896,7 +898,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) rc = tipc_link_rcv(l, skb, &xmitq); if (unlikely(rc & TIPC_LINK_UP_EVT)) - tipc_link_activate(l); + tipc_node_link_up(n, bearer_id); if (unlikely(rc & TIPC_LINK_DOWN_EVT)) tipc_link_reset(l); skb = NULL; -- cgit v1.2.3 From 6144a996a65199480eed7521c1c50590c282e78e Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 30 Jul 2015 18:24:16 -0400 Subject: tipc: move all link_reset() calls to link aggregation level In line with our effort to let the node level have full control over its links, we want to move all link reset calls from link.c to node.c. Some of the calls can be moved by simply moving the calling function, when this is the right thing to do. For the remaining calls we use the now established technique of returning a TIPC_LINK_DOWN_EVT flag from tipc_link_rcv(), whereafter we perform the reset call when the call returns. This change serves as a preparation for the coming commits. Tested-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/node.c | 84 ++++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 81 insertions(+), 3 deletions(-) (limited to 'net/tipc/node.c') diff --git a/net/tipc/node.c b/net/tipc/node.c index 558df25a7fc6..6a0680ba98a9 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -407,6 +407,44 @@ bool tipc_node_update_dest(struct tipc_node *n, struct tipc_bearer *b, return true; } +void tipc_node_delete_links(struct net *net, int bearer_id) +{ + struct tipc_net *tn = net_generic(net, tipc_net_id); + struct tipc_link *l; + struct tipc_node *n; + + rcu_read_lock(); + list_for_each_entry_rcu(n, &tn->node_list, list) { + tipc_node_lock(n); + l = n->links[bearer_id].link; + if (l) { + tipc_link_reset(l); + n->links[bearer_id].link = NULL; + n->link_cnt--; + } + tipc_node_unlock(n); + kfree(l); + } + rcu_read_unlock(); +} + +static void tipc_node_reset_links(struct tipc_node *n) +{ + char addr_string[16]; + u32 i; + + tipc_node_lock(n); + + pr_warn("Resetting all links to %s\n", + tipc_addr_string_fill(addr_string, n->addr)); + + for (i = 0; i < MAX_BEARERS; i++) { + if (n->links[i].link) + tipc_link_reset(n->links[i].link); + } + tipc_node_unlock(n); +} + void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr) { n_ptr->links[l_ptr->bearer_id].link = l_ptr; @@ -721,7 +759,7 @@ void tipc_node_unlock(struct tipc_node *node) tipc_bclink_input(net); if (flags & TIPC_BCAST_RESET) - tipc_link_reset_all(node); + tipc_node_reset_links(node); } /* Caller should hold node lock for the passed node */ @@ -836,6 +874,40 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode, return 0; } +/* tipc_node_tnl_init(): handle a received TUNNEL_PROTOCOL packet, + * in order to control parallel link failover or synchronization + */ +static void tipc_node_tnl_init(struct tipc_node *n, int bearer_id, + struct sk_buff *skb) +{ + struct tipc_link *tnl, *pl; + struct tipc_msg *hdr = buf_msg(skb); + u16 oseqno = msg_seqno(hdr); + int pb_id = msg_bearer_id(hdr); + + if (pb_id >= MAX_BEARERS) + return; + + tnl = n->links[bearer_id].link; + if (!tnl) + return; + + /* Ignore if duplicate */ + if (less(oseqno, tnl->rcv_nxt)) + return; + + pl = n->links[pb_id].link; + if (!pl) + return; + + if (msg_type(hdr) == FAILOVER_MSG) { + if (tipc_link_is_up(pl)) { + tipc_link_reset(pl); + pl->exec_mode = TIPC_LINK_BLOCKED; + } + } +} + /** * tipc_rcv - process TIPC packets/messages arriving from off-node * @net: the applicable net namespace @@ -854,6 +926,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) struct tipc_media_addr *maddr; int bearer_id = b->identity; int rc = 0; + int usr; __skb_queue_head_init(&xmitq); @@ -863,8 +936,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) /* Handle arrival of a non-unicast link packet */ hdr = buf_msg(skb); + usr = msg_user(hdr); if (unlikely(msg_non_seq(hdr))) { - if (msg_user(hdr) == LINK_CONFIG) + if (usr == LINK_CONFIG) tipc_disc_rcv(net, skb, b); else tipc_bclink_rcv(net, skb); @@ -877,6 +951,10 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) goto discard; tipc_node_lock(n); + /* Prepare links for tunneled reception if applicable */ + if (unlikely(usr == TUNNEL_PROTOCOL)) + tipc_node_tnl_init(n, bearer_id, skb); + /* Locate link endpoint that should handle packet */ l = n->links[bearer_id].link; if (unlikely(!l)) @@ -887,7 +965,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) if (!tipc_node_filter_skb(n, l, hdr)) goto unlock; - if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) + if (unlikely(usr == LINK_PROTOCOL)) tipc_bclink_sync_state(n, hdr); /* Release acked broadcast messages */ -- cgit v1.2.3 From 655fb243b8ae5e652f744311bcb6e806e83cea1e Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 30 Jul 2015 18:24:17 -0400 Subject: tipc: reverse call order for link_reset()->node_link_down() In many cases the call order when a link is reset goes as follows: tipc_node_xx()->tipc_link_reset()->tipc_node_link_down() This is not the right order if we want the node to be in control, so in this commit we change the order to: tipc_node_xx()->tipc_node_link_down()->tipc_link_reset() The fact that tipc_link_reset() now is called from only one location with a well-defined state will also facilitate later simplifications of tipc_link_reset() and the link FSM. Tested-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/node.c | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) (limited to 'net/tipc/node.c') diff --git a/net/tipc/node.c b/net/tipc/node.c index 6a0680ba98a9..65c2c80cffe7 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -265,7 +265,7 @@ static void tipc_node_timeout(unsigned long data) tipc_node_calculate_timer(n, l); rc = tipc_link_timeout(l, &xmitq); if (rc & TIPC_LINK_DOWN_EVT) - tipc_link_reset(l); + tipc_node_link_down(n, bearer_id); } tipc_node_unlock(n); maddr = &n->links[bearer_id].maddr; @@ -338,10 +338,15 @@ void tipc_node_link_down(struct tipc_node *n, int bearer_id) struct tipc_link *l, *_l; l = n->links[bearer_id].link; + if (!l || !tipc_link_is_up(l)) + return; + n->working_links--; n->action_flags |= TIPC_NOTIFY_LINK_DOWN; n->link_id = l->peer_bearer_id << 16 | l->bearer_id; + tipc_bearer_remove_dest(n->net, l->bearer_id, n->addr); + pr_debug("Lost link <%s> on network plane %c\n", l->name, l->net_plane); @@ -352,6 +357,8 @@ void tipc_node_link_down(struct tipc_node *n, int bearer_id) _l = n->links[i].link; if (!_l || !tipc_link_is_up(_l)) continue; + if (_l == l) + continue; if (_l->priority < highest) continue; if (_l->priority > highest) { @@ -362,9 +369,13 @@ void tipc_node_link_down(struct tipc_node *n, int bearer_id) } *slot1 = i; } + if (tipc_node_is_up(n)) tipc_link_failover_send_queue(l); - else + + tipc_link_reset(l); + + if (!tipc_node_is_up(n)) node_lost_contact(n); } @@ -403,7 +414,7 @@ bool tipc_node_update_dest(struct tipc_node *n, struct tipc_bearer *b, } memcpy(&l->media_addr, maddr, sizeof(*maddr)); memcpy(curr, maddr, sizeof(*maddr)); - tipc_link_reset(l); + tipc_node_link_down(n, b->identity); return true; } @@ -418,7 +429,7 @@ void tipc_node_delete_links(struct net *net, int bearer_id) tipc_node_lock(n); l = n->links[bearer_id].link; if (l) { - tipc_link_reset(l); + tipc_node_link_down(n, bearer_id); n->links[bearer_id].link = NULL; n->link_cnt--; } @@ -439,8 +450,9 @@ static void tipc_node_reset_links(struct tipc_node *n) tipc_addr_string_fill(addr_string, n->addr)); for (i = 0; i < MAX_BEARERS; i++) { - if (n->links[i].link) - tipc_link_reset(n->links[i].link); + if (!n->links[i].link) + continue; + tipc_node_link_down(n, i); } tipc_node_unlock(n); } @@ -837,7 +849,7 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list, if (likely(l)) rc = tipc_link_xmit(l, list, &xmitq); if (unlikely(rc == -ENOBUFS)) - tipc_link_reset(l); + tipc_node_link_down(n, bearer_id); tipc_node_unlock(n); tipc_node_put(n); } @@ -902,7 +914,7 @@ static void tipc_node_tnl_init(struct tipc_node *n, int bearer_id, if (msg_type(hdr) == FAILOVER_MSG) { if (tipc_link_is_up(pl)) { - tipc_link_reset(pl); + tipc_node_link_down(n, pb_id); pl->exec_mode = TIPC_LINK_BLOCKED; } } @@ -978,7 +990,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) if (unlikely(rc & TIPC_LINK_UP_EVT)) tipc_node_link_up(n, bearer_id); if (unlikely(rc & TIPC_LINK_DOWN_EVT)) - tipc_link_reset(l); + tipc_node_link_down(n, bearer_id); skb = NULL; unlock: tipc_node_unlock(n); -- cgit v1.2.3 From 66996b6c47ed7f6bbb01a768e23fae262c7db8e0 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 30 Jul 2015 18:24:18 -0400 Subject: tipc: extend node FSM In the next commit, we will move link synch/failover orchestration to the link aggregation level. In order to do this, we first need to extend the node FSM with two more states, NODE_SYNCHING and NODE_FAILINGOVER, plus four new events to enter and leave those states. This commit introduces this change, without yet making use of it. The node FSM now looks as follows: +-----------------------------------------+ | PEER_DOWN_EVT| | | +------------------------+----------------+ | |SELF_DOWN_EVT | | | | | | | | +-----------+ +-----------+ | | |NODE_ | |NODE_ | | | +----------|FAILINGOVER|<---------|SYNCHING |------------+ | | |SELF_ +-----------+ FAILOVER_+-----------+ PEER_ | | | |DOWN_EVT | A BEGIN_EVT A | DOWN_EVT| | | | | | | | | | | | | | | | | | | | |FAILOVER_|FAILOVER_ |SYNCH_ |SYNCH_ | | | | |END_EVT |BEGIN_EVT |BEGIN_EVT|END_EVT | | | | | | | | | | | | | | | | | | | | | +--------------+ | | | | | +------->| SELF_UP_ |<-------+ | | | | +----------------| PEER_UP |------------------+ | | | | |SELF_DOWN_EVT +--------------+ PEER_DOWN_EVT| | | | | | A A | | | | | | | | | | | | | | PEER_UP_EVT| |SELF_UP_EVT | | | | | | | | | | | V V V | | V V V +------------+ +-----------+ +-----------+ +------------+ |SELF_DOWN_ | |SELF_UP_ | |PEER_UP_ | |PEER_DOWN | |PEER_LEAVING|<------|PEER_COMING| |SELF_COMING|------>|SELF_LEAVING| +------------+ SELF_ +-----------+ +-----------+ PEER_ +------------+ | DOWN_EVT A A DOWN_EVT | | | | | | | | | | SELF_UP_EVT| |PEER_UP_EVT | | | | | | | | | |PEER_DOWN_EVT +--------------+ SELF_DOWN_EVT| +------------------->| SELF_DOWN_ |<--------------------+ | PEER_DOWN | +--------------+ Tested-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/node.c | 89 ++++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 82 insertions(+), 7 deletions(-) (limited to 'net/tipc/node.c') diff --git a/net/tipc/node.c b/net/tipc/node.c index 65c2c80cffe7..6b18d73830ca 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -494,8 +494,12 @@ static void tipc_node_fsm_evt(struct tipc_node *n, int evt) case SELF_LOST_CONTACT_EVT: case PEER_LOST_CONTACT_EVT: break; + case NODE_SYNCH_END_EVT: + case NODE_SYNCH_BEGIN_EVT: + case NODE_FAILOVER_BEGIN_EVT: + case NODE_FAILOVER_END_EVT: default: - pr_err("Unknown node fsm evt %x/%x\n", state, evt); + goto illegal_evt; } break; case SELF_UP_PEER_UP: @@ -506,11 +510,19 @@ static void tipc_node_fsm_evt(struct tipc_node *n, int evt) case PEER_LOST_CONTACT_EVT: state = SELF_LEAVING_PEER_DOWN; break; + case NODE_SYNCH_BEGIN_EVT: + state = NODE_SYNCHING; + break; + case NODE_FAILOVER_BEGIN_EVT: + state = NODE_FAILINGOVER; + break; case SELF_ESTABL_CONTACT_EVT: case PEER_ESTABL_CONTACT_EVT: + case NODE_SYNCH_END_EVT: + case NODE_FAILOVER_END_EVT: break; default: - pr_err("Unknown node fsm evt %x/%x\n", state, evt); + goto illegal_evt; } break; case SELF_DOWN_PEER_LEAVING: @@ -522,8 +534,12 @@ static void tipc_node_fsm_evt(struct tipc_node *n, int evt) case PEER_ESTABL_CONTACT_EVT: case SELF_LOST_CONTACT_EVT: break; + case NODE_SYNCH_END_EVT: + case NODE_SYNCH_BEGIN_EVT: + case NODE_FAILOVER_BEGIN_EVT: + case NODE_FAILOVER_END_EVT: default: - pr_err("Unknown node fsm evt %x/%x\n", state, evt); + goto illegal_evt; } break; case SELF_UP_PEER_COMING: @@ -537,8 +553,12 @@ static void tipc_node_fsm_evt(struct tipc_node *n, int evt) case SELF_ESTABL_CONTACT_EVT: case PEER_LOST_CONTACT_EVT: break; + case NODE_SYNCH_END_EVT: + case NODE_SYNCH_BEGIN_EVT: + case NODE_FAILOVER_BEGIN_EVT: + case NODE_FAILOVER_END_EVT: default: - pr_err("Unknown node fsm evt %x/%x\n", state, evt); + goto illegal_evt; } break; case SELF_COMING_PEER_UP: @@ -552,8 +572,12 @@ static void tipc_node_fsm_evt(struct tipc_node *n, int evt) case SELF_LOST_CONTACT_EVT: case PEER_ESTABL_CONTACT_EVT: break; + case NODE_SYNCH_END_EVT: + case NODE_SYNCH_BEGIN_EVT: + case NODE_FAILOVER_BEGIN_EVT: + case NODE_FAILOVER_END_EVT: default: - pr_err("Unknown node fsm evt %x/%x\n", state, evt); + goto illegal_evt; } break; case SELF_LEAVING_PEER_DOWN: @@ -565,16 +589,67 @@ static void tipc_node_fsm_evt(struct tipc_node *n, int evt) case PEER_ESTABL_CONTACT_EVT: case PEER_LOST_CONTACT_EVT: break; + case NODE_SYNCH_END_EVT: + case NODE_SYNCH_BEGIN_EVT: + case NODE_FAILOVER_BEGIN_EVT: + case NODE_FAILOVER_END_EVT: + default: + goto illegal_evt; + } + break; + case NODE_FAILINGOVER: + switch (evt) { + case SELF_LOST_CONTACT_EVT: + state = SELF_DOWN_PEER_LEAVING; + break; + case PEER_LOST_CONTACT_EVT: + state = SELF_LEAVING_PEER_DOWN; + break; + case NODE_FAILOVER_END_EVT: + state = SELF_UP_PEER_UP; + break; + case NODE_FAILOVER_BEGIN_EVT: + case SELF_ESTABL_CONTACT_EVT: + case PEER_ESTABL_CONTACT_EVT: + break; + case NODE_SYNCH_BEGIN_EVT: + case NODE_SYNCH_END_EVT: default: - pr_err("Unknown node fsm evt %x/%x\n", state, evt); + goto illegal_evt; + } + break; + case NODE_SYNCHING: + switch (evt) { + case SELF_LOST_CONTACT_EVT: + state = SELF_DOWN_PEER_LEAVING; + break; + case PEER_LOST_CONTACT_EVT: + state = SELF_LEAVING_PEER_DOWN; + break; + case NODE_SYNCH_END_EVT: + state = SELF_UP_PEER_UP; + break; + case NODE_FAILOVER_BEGIN_EVT: + state = NODE_FAILINGOVER; + break; + case NODE_SYNCH_BEGIN_EVT: + case SELF_ESTABL_CONTACT_EVT: + case PEER_ESTABL_CONTACT_EVT: + break; + case NODE_FAILOVER_END_EVT: + default: + goto illegal_evt; } break; default: pr_err("Unknown node fsm state %x\n", state); break; } - n->state = state; + return; + +illegal_evt: + pr_err("Illegal node fsm evt %x in state %x\n", evt, state); } bool tipc_node_filter_skb(struct tipc_node *n, struct tipc_link *l, -- cgit v1.2.3 From 6e498158a827fd515b514842e9a06bdf0f75ab86 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 30 Jul 2015 18:24:19 -0400 Subject: tipc: move link synch and failover to link aggregation level Link failover and synchronization have until now been handled by the links themselves, forcing them to have knowledge about and to access parallel links in order to make the two algorithms work correctly. In this commit, we move the control part of this functionality to the link aggregation level in node.c, which is the right location for this. As a result, the two algorithms become easier to follow, and the link implementation becomes simpler. Tested-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/node.c | 291 ++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 190 insertions(+), 101 deletions(-) (limited to 'net/tipc/node.c') diff --git a/net/tipc/node.c b/net/tipc/node.c index 6b18d73830ca..b0372bb107f6 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -42,6 +42,31 @@ #include "bcast.h" #include "discover.h" +/* Node FSM states and events: + */ +enum { + SELF_DOWN_PEER_DOWN = 0xdd, + SELF_UP_PEER_UP = 0xaa, + SELF_DOWN_PEER_LEAVING = 0xd1, + SELF_UP_PEER_COMING = 0xac, + SELF_COMING_PEER_UP = 0xca, + SELF_LEAVING_PEER_DOWN = 0x1d, + NODE_FAILINGOVER = 0xf0, + NODE_SYNCHING = 0xcc +}; + +enum { + SELF_ESTABL_CONTACT_EVT = 0xece, + SELF_LOST_CONTACT_EVT = 0x1ce, + PEER_ESTABL_CONTACT_EVT = 0x9ece, + PEER_LOST_CONTACT_EVT = 0x91ce, + NODE_FAILOVER_BEGIN_EVT = 0xfbe, + NODE_FAILOVER_END_EVT = 0xfee, + NODE_SYNCH_BEGIN_EVT = 0xcbe, + NODE_SYNCH_END_EVT = 0xcee +}; + +static void tipc_node_link_down(struct tipc_node *n, int bearer_id); static void node_lost_contact(struct tipc_node *n_ptr); static void node_established_contact(struct tipc_node *n_ptr); static void tipc_node_delete(struct tipc_node *node); @@ -281,69 +306,75 @@ static void tipc_node_timeout(unsigned long data) * * Link becomes active (alone or shared) or standby, depending on its priority. */ -void tipc_node_link_up(struct tipc_node *n, int bearer_id) +static void tipc_node_link_up(struct tipc_node *n, int bearer_id, + struct sk_buff_head *xmitq) { int *slot0 = &n->active_links[0]; int *slot1 = &n->active_links[1]; - struct tipc_link_entry *links = n->links; - struct tipc_link *l = n->links[bearer_id].link; - - /* Leave room for tunnel header when returning 'mtu' to users: */ - links[bearer_id].mtu = l->mtu - INT_H_SIZE; + struct tipc_link *ol = node_active_link(n, 0); + struct tipc_link *nl = n->links[bearer_id].link; + if (n->working_links > 1) { + pr_warn("Attempt to establish 3rd link to %x\n", n->addr); + return; + } n->working_links++; n->action_flags |= TIPC_NOTIFY_LINK_UP; - n->link_id = l->peer_bearer_id << 16 | l->bearer_id; + n->link_id = nl->peer_bearer_id << 16 | bearer_id; + + /* Leave room for tunnel header when returning 'mtu' to users: */ + n->links[bearer_id].mtu = nl->mtu - INT_H_SIZE; tipc_bearer_add_dest(n->net, bearer_id, n->addr); pr_debug("Established link <%s> on network plane %c\n", - l->name, l->net_plane); + nl->name, nl->net_plane); - /* No active links ? => take both active slots */ - if (!tipc_node_is_up(n)) { + /* First link? => give it both slots */ + if (!ol) { *slot0 = bearer_id; *slot1 = bearer_id; + nl->exec_mode = TIPC_LINK_OPEN; node_established_contact(n); return; } - /* Lower prio than current active ? => no slot */ - if (l->priority < links[*slot0].link->priority) { - pr_debug("New link <%s> becomes standby\n", l->name); - return; - } - tipc_link_dup_queue_xmit(links[*slot0].link, l); - - /* Same prio as current active ? => take one slot */ - if (l->priority == links[*slot0].link->priority) { + /* Second link => redistribute slots */ + if (nl->priority > ol->priority) { + pr_debug("Old link <%s> becomes standby\n", ol->name); *slot0 = bearer_id; - return; + *slot1 = bearer_id; + } else if (nl->priority == ol->priority) { + *slot0 = bearer_id; + } else { + pr_debug("New link <%s> is standby\n", nl->name); } - /* Higher prio than current active => take both active slots */ - pr_debug("Old link <%s> now standby\n", links[*slot0].link->name); - *slot0 = bearer_id; - *slot1 = bearer_id; + /* Prepare synchronization with first link */ + tipc_link_tnl_prepare(ol, nl, SYNCH_MSG, xmitq); } /** * tipc_node_link_down - handle loss of link */ -void tipc_node_link_down(struct tipc_node *n, int bearer_id) +static void tipc_node_link_down(struct tipc_node *n, int bearer_id) { int *slot0 = &n->active_links[0]; int *slot1 = &n->active_links[1]; + struct tipc_media_addr *maddr = &n->links[bearer_id].maddr; int i, highest = 0; - struct tipc_link *l, *_l; + struct tipc_link *l, *_l, *tnl; + struct sk_buff_head xmitq; l = n->links[bearer_id].link; if (!l || !tipc_link_is_up(l)) return; + __skb_queue_head_init(&xmitq); + n->working_links--; n->action_flags |= TIPC_NOTIFY_LINK_DOWN; - n->link_id = l->peer_bearer_id << 16 | l->bearer_id; + n->link_id = l->peer_bearer_id << 16 | bearer_id; tipc_bearer_remove_dest(n->net, l->bearer_id, n->addr); @@ -370,13 +401,19 @@ void tipc_node_link_down(struct tipc_node *n, int bearer_id) *slot1 = i; } - if (tipc_node_is_up(n)) - tipc_link_failover_send_queue(l); + if (!tipc_node_is_up(n)) { + tipc_link_reset(l); + node_lost_contact(n); + return; + } + /* There is still a working link => initiate failover */ + tnl = node_active_link(n, 0); + tipc_node_fsm_evt(n, NODE_FAILOVER_BEGIN_EVT); + n->sync_point = tnl->rcv_nxt + (U16_MAX / 2 - 1); + tipc_link_tnl_prepare(l, tnl, FAILOVER_MSG, &xmitq); tipc_link_reset(l); - - if (!tipc_node_is_up(n)) - node_lost_contact(n); + tipc_bearer_xmit(n->net, tnl->bearer_id, &xmitq, maddr); } bool tipc_node_is_up(struct tipc_node *n) @@ -652,37 +689,22 @@ illegal_evt: pr_err("Illegal node fsm evt %x in state %x\n", evt, state); } -bool tipc_node_filter_skb(struct tipc_node *n, struct tipc_link *l, - struct tipc_msg *hdr) +bool tipc_node_filter_pkt(struct tipc_node *n, struct tipc_msg *hdr) { int state = n->state; if (likely(state == SELF_UP_PEER_UP)) return true; - if (state == SELF_DOWN_PEER_DOWN) - return true; - - if (state == SELF_UP_PEER_COMING) { - /* If not traffic msg, peer may still be ESTABLISHING */ - if (tipc_link_is_up(l) && msg_is_traffic(hdr)) - tipc_node_fsm_evt(n, PEER_ESTABL_CONTACT_EVT); - return true; - } - - if (state == SELF_COMING_PEER_UP) - return true; - if (state == SELF_LEAVING_PEER_DOWN) return false; if (state == SELF_DOWN_PEER_LEAVING) { - if (msg_peer_is_up(hdr)) + if (msg_peer_node_is_up(hdr)) return false; - tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT); - return true; } - return false; + + return true; } static void node_established_contact(struct tipc_node *n_ptr) @@ -727,10 +749,8 @@ static void node_lost_contact(struct tipc_node *n_ptr) if (!l_ptr) continue; l_ptr->exec_mode = TIPC_LINK_OPEN; - l_ptr->failover_checkpt = 0; - l_ptr->failover_pkts = 0; - kfree_skb(l_ptr->failover_skb); - l_ptr->failover_skb = NULL; + kfree_skb(l_ptr->failover_reasm_skb); + l_ptr->failover_reasm_skb = NULL; tipc_link_reset_fragments(l_ptr); } /* Prevent re-contact with node until cleanup is done */ @@ -961,38 +981,111 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode, return 0; } -/* tipc_node_tnl_init(): handle a received TUNNEL_PROTOCOL packet, - * in order to control parallel link failover or synchronization +/** + * tipc_node_check_state - check and if necessary update node state + * @skb: TIPC packet + * @bearer_id: identity of bearer delivering the packet + * Returns true if state is ok, otherwise consumes buffer and returns false */ -static void tipc_node_tnl_init(struct tipc_node *n, int bearer_id, - struct sk_buff *skb) +static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, + int bearer_id) { - struct tipc_link *tnl, *pl; struct tipc_msg *hdr = buf_msg(skb); + int usr = msg_user(hdr); + int mtyp = msg_type(hdr); u16 oseqno = msg_seqno(hdr); - int pb_id = msg_bearer_id(hdr); + u16 iseqno = msg_seqno(msg_get_wrapped(hdr)); + u16 exp_pkts = msg_msgcnt(hdr); + u16 rcv_nxt, syncpt, dlv_nxt; + int state = n->state; + struct tipc_link *l, *pl = NULL; + struct sk_buff_head; + int i; - if (pb_id >= MAX_BEARERS) - return; + l = n->links[bearer_id].link; + if (!l) + return false; + rcv_nxt = l->rcv_nxt; - tnl = n->links[bearer_id].link; - if (!tnl) - return; - /* Ignore if duplicate */ - if (less(oseqno, tnl->rcv_nxt)) - return; + if (likely((state == SELF_UP_PEER_UP) && (usr != TUNNEL_PROTOCOL))) + return true; - pl = n->links[pb_id].link; - if (!pl) - return; + /* Find parallel link, if any */ + for (i = 0; i < MAX_BEARERS; i++) { + if ((i != bearer_id) && n->links[i].link) { + pl = n->links[i].link; + break; + } + } - if (msg_type(hdr) == FAILOVER_MSG) { - if (tipc_link_is_up(pl)) { - tipc_node_link_down(n, pb_id); + /* Update node accesibility if applicable */ + if (state == SELF_UP_PEER_COMING) { + if (!tipc_link_is_up(l)) + return true; + if (!msg_peer_link_is_up(hdr)) + return true; + tipc_node_fsm_evt(n, PEER_ESTABL_CONTACT_EVT); + } + + if (state == SELF_DOWN_PEER_LEAVING) { + if (msg_peer_node_is_up(hdr)) + return false; + tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT); + } + + /* Ignore duplicate packets */ + if (less(oseqno, rcv_nxt)) + return true; + + /* Initiate or update failover mode if applicable */ + if ((usr == TUNNEL_PROTOCOL) && (mtyp == FAILOVER_MSG)) { + syncpt = oseqno + exp_pkts - 1; + if (pl && tipc_link_is_up(pl)) { + tipc_node_link_down(n, pl->bearer_id); pl->exec_mode = TIPC_LINK_BLOCKED; } + /* If pkts arrive out of order, use lowest calculated syncpt */ + if (less(syncpt, n->sync_point)) + n->sync_point = syncpt; + } + + /* Open parallel link when tunnel link reaches synch point */ + if ((n->state == NODE_FAILINGOVER) && (more(rcv_nxt, n->sync_point))) { + tipc_node_fsm_evt(n, NODE_FAILOVER_END_EVT); + if (pl) + pl->exec_mode = TIPC_LINK_OPEN; + return true; + } + + /* Initiate or update synch mode if applicable */ + if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG)) { + syncpt = iseqno + exp_pkts - 1; + if (n->state == SELF_UP_PEER_UP) { + n->sync_point = syncpt; + tipc_node_fsm_evt(n, NODE_SYNCH_BEGIN_EVT); + } + l->exec_mode = TIPC_LINK_TUNNEL; + if (less(syncpt, n->sync_point)) + n->sync_point = syncpt; } + + /* Open tunnel link when parallel link reaches synch point */ + if ((n->state == NODE_SYNCHING) && (l->exec_mode == TIPC_LINK_TUNNEL)) { + if (pl) + dlv_nxt = mod(pl->rcv_nxt - skb_queue_len(pl->inputq)); + if (!pl || more(dlv_nxt, n->sync_point)) { + tipc_node_fsm_evt(n, NODE_SYNCH_END_EVT); + l->exec_mode = TIPC_LINK_OPEN; + return true; + } + if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG)) + return true; + if (usr == LINK_PROTOCOL) + return true; + return false; + } + return true; } /** @@ -1008,12 +1101,11 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) { struct sk_buff_head xmitq; struct tipc_node *n; - struct tipc_link *l; - struct tipc_msg *hdr; - struct tipc_media_addr *maddr; + struct tipc_msg *hdr = buf_msg(skb); + int usr = msg_user(hdr); int bearer_id = b->identity; + struct tipc_link_entry *le; int rc = 0; - int usr; __skb_queue_head_init(&xmitq); @@ -1022,8 +1114,6 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) goto discard; /* Handle arrival of a non-unicast link packet */ - hdr = buf_msg(skb); - usr = msg_user(hdr); if (unlikely(msg_non_seq(hdr))) { if (usr == LINK_CONFIG) tipc_disc_rcv(net, skb, b); @@ -1036,42 +1126,41 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) n = tipc_node_find(net, msg_prevnode(hdr)); if (unlikely(!n)) goto discard; - tipc_node_lock(n); + le = &n->links[bearer_id]; - /* Prepare links for tunneled reception if applicable */ - if (unlikely(usr == TUNNEL_PROTOCOL)) - tipc_node_tnl_init(n, bearer_id, skb); + tipc_node_lock(n); - /* Locate link endpoint that should handle packet */ - l = n->links[bearer_id].link; - if (unlikely(!l)) + /* Is reception permitted at the moment ? */ + if (!tipc_node_filter_pkt(n, hdr)) goto unlock; - /* Is reception of this packet permitted at the moment ? */ - if (unlikely(n->state != SELF_UP_PEER_UP)) - if (!tipc_node_filter_skb(n, l, hdr)) - goto unlock; - - if (unlikely(usr == LINK_PROTOCOL)) + if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) tipc_bclink_sync_state(n, hdr); /* Release acked broadcast messages */ if (unlikely(n->bclink.acked != msg_bcast_ack(hdr))) tipc_bclink_acknowledge(n, msg_bcast_ack(hdr)); - /* Check protocol and update link state */ - rc = tipc_link_rcv(l, skb, &xmitq); + /* Check and if necessary update node state */ + if (likely(tipc_node_check_state(n, skb, bearer_id))) { + rc = tipc_link_rcv(le->link, skb, &xmitq); + skb = NULL; + } if (unlikely(rc & TIPC_LINK_UP_EVT)) - tipc_node_link_up(n, bearer_id); + tipc_node_link_up(n, bearer_id, &xmitq); + if (unlikely(rc & TIPC_LINK_DOWN_EVT)) tipc_node_link_down(n, bearer_id); - skb = NULL; unlock: tipc_node_unlock(n); - tipc_sk_rcv(net, &n->links[bearer_id].inputq); - maddr = &n->links[bearer_id].maddr; - tipc_bearer_xmit(net, bearer_id, &xmitq, maddr); + + if (!skb_queue_empty(&le->inputq)) + tipc_sk_rcv(net, &le->inputq); + + if (!skb_queue_empty(&xmitq)) + tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr); + tipc_node_put(n); discard: kfree_skb(skb); -- cgit v1.2.3 From 5045f7b9009f1455268b98cecbcc271663934c85 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 30 Jul 2015 18:24:20 -0400 Subject: tipc: move protocol message sending away from link FSM The implementation of the link FSM currently takes decisions about and sends out link protocol messages. This is unnecessary, since such actions are not the result of any link state change, and are even decided based on non-FSM state information ("silent_intv_cnt"). We now move the sending of unicast link protocol messages to the function tipc_link_timeout(), and the initial broadcast synchronization message to tipc_node_link_up(). The latter is done because a link instance should not need to know whether it is the first or second link to a destination. Such information is now restricted to and handled by the link aggregation layer in node.c Tested-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/node.c | 1 + 1 file changed, 1 insertion(+) (limited to 'net/tipc/node.c') diff --git a/net/tipc/node.c b/net/tipc/node.c index b0372bb107f6..9e20acffb3d4 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -335,6 +335,7 @@ static void tipc_node_link_up(struct tipc_node *n, int bearer_id, *slot0 = bearer_id; *slot1 = bearer_id; nl->exec_mode = TIPC_LINK_OPEN; + tipc_link_build_bcast_sync_msg(nl, xmitq); node_established_contact(n); return; } -- cgit v1.2.3 From 662921cd0a53db4504838dfbb7d996f9e6e94001 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 30 Jul 2015 18:24:21 -0400 Subject: tipc: merge link->exec_mode and link->state into one FSM Until now, we have been handling link failover and synchronization by using an additional link state variable, "exec_mode". This variable is not independent of the link FSM state, something causing a risk of inconsistencies, apart from the fact that it clutters the code. The conditions are now in place to define a new link FSM that covers all existing use cases, including failover and synchronization, and eliminate the "exec_mode" field altogether. The FSM must also support non-atomic resetting of links, which will be introduced later. The new link FSM is shown below, with 7 states and 8 events. Only events leading to state change are shown as edges. +------------------------------------+ |RESET_EVT | | | | +--------------+ | +-----------------| SYNCHING |-----------------+ | |FAILURE_EVT +--------------+ PEER_RESET_EVT| | | A | | | | | | | | | | | | | | |SYNCH_ |SYNCH_ | | | |BEGIN_EVT |END_EVT | | | | | | | V | V V | +-------------+ +--------------+ +------------+ | | RESETTING |<---------| ESTABLISHED |--------->| PEER_RESET | | +-------------+ FAILURE_ +--------------+ PEER_ +------------+ | | EVT | A RESET_EVT | | | | | | | | | | | | | +--------------+ | | | RESET_EVT| |RESET_EVT |ESTABLISH_EVT | | | | | | | | | | | | V V | | | +-------------+ +--------------+ RESET_EVT| +--->| RESET |--------->| ESTABLISHING |<----------------+ +-------------+ PEER_ +--------------+ | A RESET_EVT | | | | | | | |FAILOVER_ |FAILOVER_ |FAILOVER_ |BEGIN_EVT |END_EVT |BEGIN_EVT | | | V | | +-------------+ | | FAILINGOVER |<----------------+ +-------------+ These changes are fully backwards compatible. Tested-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/node.c | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) (limited to 'net/tipc/node.c') diff --git a/net/tipc/node.c b/net/tipc/node.c index 9e20acffb3d4..a3ceeda2a80a 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -334,7 +334,6 @@ static void tipc_node_link_up(struct tipc_node *n, int bearer_id, if (!ol) { *slot0 = bearer_id; *slot1 = bearer_id; - nl->exec_mode = TIPC_LINK_OPEN; tipc_link_build_bcast_sync_msg(nl, xmitq); node_established_contact(n); return; @@ -368,7 +367,7 @@ static void tipc_node_link_down(struct tipc_node *n, int bearer_id) struct sk_buff_head xmitq; l = n->links[bearer_id].link; - if (!l || !tipc_link_is_up(l)) + if (!l || tipc_link_is_reset(l)) return; __skb_queue_head_init(&xmitq); @@ -414,6 +413,7 @@ static void tipc_node_link_down(struct tipc_node *n, int bearer_id) n->sync_point = tnl->rcv_nxt + (U16_MAX / 2 - 1); tipc_link_tnl_prepare(l, tnl, FAILOVER_MSG, &xmitq); tipc_link_reset(l); + tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT); tipc_bearer_xmit(n->net, tnl->bearer_id, &xmitq, maddr); } @@ -749,7 +749,7 @@ static void node_lost_contact(struct tipc_node *n_ptr) struct tipc_link *l_ptr = n_ptr->links[i].link; if (!l_ptr) continue; - l_ptr->exec_mode = TIPC_LINK_OPEN; + tipc_link_fsm_evt(l_ptr, LINK_FAILOVER_END_EVT); kfree_skb(l_ptr->failover_reasm_skb); l_ptr->failover_reasm_skb = NULL; tipc_link_reset_fragments(l_ptr); @@ -989,7 +989,7 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode, * Returns true if state is ok, otherwise consumes buffer and returns false */ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, - int bearer_id) + int bearer_id, struct sk_buff_head *xmitq) { struct tipc_msg *hdr = buf_msg(skb); int usr = msg_user(hdr); @@ -1042,42 +1042,47 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, /* Initiate or update failover mode if applicable */ if ((usr == TUNNEL_PROTOCOL) && (mtyp == FAILOVER_MSG)) { syncpt = oseqno + exp_pkts - 1; - if (pl && tipc_link_is_up(pl)) { + if (pl && tipc_link_is_up(pl)) tipc_node_link_down(n, pl->bearer_id); - pl->exec_mode = TIPC_LINK_BLOCKED; - } + /* If pkts arrive out of order, use lowest calculated syncpt */ if (less(syncpt, n->sync_point)) n->sync_point = syncpt; } /* Open parallel link when tunnel link reaches synch point */ - if ((n->state == NODE_FAILINGOVER) && (more(rcv_nxt, n->sync_point))) { + if ((n->state == NODE_FAILINGOVER) && !tipc_link_is_failingover(l)) { + if (!more(rcv_nxt, n->sync_point)) + return true; tipc_node_fsm_evt(n, NODE_FAILOVER_END_EVT); if (pl) - pl->exec_mode = TIPC_LINK_OPEN; + tipc_link_fsm_evt(pl, LINK_FAILOVER_END_EVT); return true; } /* Initiate or update synch mode if applicable */ if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG)) { syncpt = iseqno + exp_pkts - 1; + if (!tipc_link_is_up(l)) { + tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT); + tipc_node_link_up(n, bearer_id, xmitq); + } if (n->state == SELF_UP_PEER_UP) { n->sync_point = syncpt; + tipc_link_fsm_evt(l, LINK_SYNCH_BEGIN_EVT); tipc_node_fsm_evt(n, NODE_SYNCH_BEGIN_EVT); } - l->exec_mode = TIPC_LINK_TUNNEL; if (less(syncpt, n->sync_point)) n->sync_point = syncpt; } /* Open tunnel link when parallel link reaches synch point */ - if ((n->state == NODE_SYNCHING) && (l->exec_mode == TIPC_LINK_TUNNEL)) { + if ((n->state == NODE_SYNCHING) && tipc_link_is_synching(l)) { if (pl) dlv_nxt = mod(pl->rcv_nxt - skb_queue_len(pl->inputq)); if (!pl || more(dlv_nxt, n->sync_point)) { + tipc_link_fsm_evt(l, LINK_SYNCH_END_EVT); tipc_node_fsm_evt(n, NODE_SYNCH_END_EVT); - l->exec_mode = TIPC_LINK_OPEN; return true; } if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG)) @@ -1143,7 +1148,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) tipc_bclink_acknowledge(n, msg_bcast_ack(hdr)); /* Check and if necessary update node state */ - if (likely(tipc_node_check_state(n, skb, bearer_id))) { + if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) { rc = tipc_link_rcv(le->link, skb, &xmitq); skb = NULL; } -- cgit v1.2.3 From cf148816acb6def45474001302368eb472995e62 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 30 Jul 2015 18:24:22 -0400 Subject: tipc: move received discovery data evaluation inside node.c The node lock is currently grabbed and and released in the function tipc_disc_rcv() in the file discover.c. As a preparation for the next commits, we need to move this node lock handling, along with the code area it is covering, to node.c. This commit introduces this change. Tested-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/node.c | 123 ++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 102 insertions(+), 21 deletions(-) (limited to 'net/tipc/node.c') diff --git a/net/tipc/node.c b/net/tipc/node.c index a3ceeda2a80a..d03e88f2273b 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -138,7 +138,7 @@ struct tipc_node *tipc_node_find(struct net *net, u32 addr) return NULL; } -struct tipc_node *tipc_node_create(struct net *net, u32 addr) +struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities) { struct tipc_net *tn = net_generic(net, tipc_net_id); struct tipc_node *n_ptr, *temp_node; @@ -154,6 +154,7 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr) } n_ptr->addr = addr; n_ptr->net = net; + n_ptr->capabilities = capabilities; kref_init(&n_ptr->kref); spin_lock_init(&n_ptr->lock); INIT_HLIST_NODE(&n_ptr->hash); @@ -422,38 +423,118 @@ bool tipc_node_is_up(struct tipc_node *n) return n->active_links[0] != INVALID_BEARER_ID; } -void tipc_node_check_dest(struct tipc_node *n, struct tipc_bearer *b, - bool *link_up, bool *addr_match, - struct tipc_media_addr *maddr) +void tipc_node_check_dest(struct net *net, u32 onode, + struct tipc_bearer *b, + u16 capabilities, u32 signature, + struct tipc_media_addr *maddr, + bool *respond, bool *dupl_addr) { - struct tipc_link *l = n->links[b->identity].link; - struct tipc_media_addr *curr = &n->links[b->identity].maddr; + struct tipc_node *n; + struct tipc_link *l; + struct tipc_media_addr *curr_maddr; + struct sk_buff_head *inputq; + bool addr_match = false; + bool sign_match = false; + bool link_up = false; + bool accept_addr = false; + + *dupl_addr = false; + *respond = false; + + n = tipc_node_create(net, onode, capabilities); + if (!n) + return; - *link_up = l && tipc_link_is_up(l); - *addr_match = l && !memcmp(curr, maddr, sizeof(*maddr)); -} + tipc_node_lock(n); -bool tipc_node_update_dest(struct tipc_node *n, struct tipc_bearer *b, - struct tipc_media_addr *maddr) -{ - struct tipc_link *l = n->links[b->identity].link; - struct tipc_media_addr *curr = &n->links[b->identity].maddr; - struct sk_buff_head *inputq = &n->links[b->identity].inputq; + curr_maddr = &n->links[b->identity].maddr; + inputq = &n->links[b->identity].inputq; + + /* Prepare to validate requesting node's signature and media address */ + l = n->links[b->identity].link; + link_up = l && tipc_link_is_up(l); + addr_match = l && !memcmp(curr_maddr, maddr, sizeof(*maddr)); + sign_match = (signature == n->signature); + + /* These three flags give us eight permutations: */ + + if (sign_match && addr_match && link_up) { + /* All is fine. Do nothing. */ + } else if (sign_match && addr_match && !link_up) { + /* Respond. The link will come up in due time */ + *respond = true; + } else if (sign_match && !addr_match && link_up) { + /* Peer has changed i/f address without rebooting. + * If so, the link will reset soon, and the next + * discovery will be accepted. So we can ignore it. + * It may also be an cloned or malicious peer having + * chosen the same node address and signature as an + * existing one. + * Ignore requests until the link goes down, if ever. + */ + *dupl_addr = true; + } else if (sign_match && !addr_match && !link_up) { + /* Peer link has changed i/f address without rebooting. + * It may also be a cloned or malicious peer; we can't + * distinguish between the two. + * The signature is correct, so we must accept. + */ + accept_addr = true; + *respond = true; + } else if (!sign_match && addr_match && link_up) { + /* Peer node rebooted. Two possibilities: + * - Delayed re-discovery; this link endpoint has already + * reset and re-established contact with the peer, before + * receiving a discovery message from that node. + * (The peer happened to receive one from this node first). + * - The peer came back so fast that our side has not + * discovered it yet. Probing from this side will soon + * reset the link, since there can be no working link + * endpoint at the peer end, and the link will re-establish. + * Accept the signature, since it comes from a known peer. + */ + n->signature = signature; + } else if (!sign_match && addr_match && !link_up) { + /* The peer node has rebooted. + * Accept signature, since it is a known peer. + */ + n->signature = signature; + *respond = true; + } else if (!sign_match && !addr_match && link_up) { + /* Peer rebooted with new address, or a new/duplicate peer. + * Ignore until the link goes down, if ever. + */ + *dupl_addr = true; + } else if (!sign_match && !addr_match && !link_up) { + /* Peer rebooted with new address, or it is a new peer. + * Accept signature and address. + */ + n->signature = signature; + accept_addr = true; + *respond = true; + } + + if (!accept_addr) + goto exit; + /* Now create new link if not already existing */ if (!l) { l = tipc_link_create(n, b, maddr, inputq, &n->bclink.namedq); - if (!l) - return false; + if (!l) { + *respond = false; + goto exit; + } tipc_node_calculate_timer(n, l); - if (n->link_cnt == 1) { + if (n->link_cnt == 1) if (!mod_timer(&n->timer, jiffies + n->keepalive_intv)) tipc_node_get(n); - } } memcpy(&l->media_addr, maddr, sizeof(*maddr)); - memcpy(curr, maddr, sizeof(*maddr)); + memcpy(curr_maddr, maddr, sizeof(*maddr)); tipc_node_link_down(n, b->identity); - return true; +exit: + tipc_node_unlock(n); + tipc_node_put(n); } void tipc_node_delete_links(struct net *net, int bearer_id) -- cgit v1.2.3 From 598411d70f85dcf5b5c6c2369cc48637c251b656 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 30 Jul 2015 18:24:23 -0400 Subject: tipc: make resetting of links non-atomic In order to facilitate future improvements to the locking structure, we want to make resetting and establishing of links non-atomic. I.e., the functions tipc_node_link_up() and tipc_node_link_down() should be called from outside the node lock context, and grab/release the node lock themselves. This requires that we can freeze the link state from the moment it is set to RESETTING or PEER_RESET in one lock context until it is set to RESET or ESTABLISHING in a later context. The recently introduced link FSM makes this possible, so we are now ready to introduce the above change. This commit implements this. Tested-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/node.c | 166 +++++++++++++++++++++++++++++++++----------------------- 1 file changed, 97 insertions(+), 69 deletions(-) (limited to 'net/tipc/node.c') diff --git a/net/tipc/node.c b/net/tipc/node.c index d03e88f2273b..cdca57be85bf 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -66,8 +66,12 @@ enum { NODE_SYNCH_END_EVT = 0xcee }; -static void tipc_node_link_down(struct tipc_node *n, int bearer_id); -static void node_lost_contact(struct tipc_node *n_ptr); +static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id, + struct sk_buff_head *xmitq, + struct tipc_media_addr **maddr); +static void tipc_node_link_down(struct tipc_node *n, int bearer_id, + bool delete); +static void node_lost_contact(struct tipc_node *n, struct sk_buff_head *inputq); static void node_established_contact(struct tipc_node *n_ptr); static void tipc_node_delete(struct tipc_node *node); static void tipc_node_timeout(unsigned long data); @@ -275,9 +279,8 @@ void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port) static void tipc_node_timeout(unsigned long data) { struct tipc_node *n = (struct tipc_node *)data; + struct tipc_link_entry *le; struct sk_buff_head xmitq; - struct tipc_link *l; - struct tipc_media_addr *maddr; int bearer_id; int rc = 0; @@ -285,17 +288,16 @@ static void tipc_node_timeout(unsigned long data) for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) { tipc_node_lock(n); - l = n->links[bearer_id].link; - if (l) { + le = &n->links[bearer_id]; + if (le->link) { /* Link tolerance may change asynchronously: */ - tipc_node_calculate_timer(n, l); - rc = tipc_link_timeout(l, &xmitq); - if (rc & TIPC_LINK_DOWN_EVT) - tipc_node_link_down(n, bearer_id); + tipc_node_calculate_timer(n, le->link); + rc = tipc_link_timeout(le->link, &xmitq); } tipc_node_unlock(n); - maddr = &n->links[bearer_id].maddr; - tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr); + tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr); + if (rc & TIPC_LINK_DOWN_EVT) + tipc_node_link_down(n, bearer_id, false); } if (!mod_timer(&n->timer, jiffies + n->keepalive_intv)) tipc_node_get(n); @@ -303,18 +305,21 @@ static void tipc_node_timeout(unsigned long data) } /** - * tipc_node_link_up - handle addition of link - * + * __tipc_node_link_up - handle addition of link + * Node lock must be held by caller * Link becomes active (alone or shared) or standby, depending on its priority. */ -static void tipc_node_link_up(struct tipc_node *n, int bearer_id, - struct sk_buff_head *xmitq) +static void __tipc_node_link_up(struct tipc_node *n, int bearer_id, + struct sk_buff_head *xmitq) { int *slot0 = &n->active_links[0]; int *slot1 = &n->active_links[1]; struct tipc_link *ol = node_active_link(n, 0); struct tipc_link *nl = n->links[bearer_id].link; + if (!nl || !tipc_link_is_up(nl)) + return; + if (n->working_links > 1) { pr_warn("Attempt to establish 3rd link to %x\n", n->addr); return; @@ -356,28 +361,40 @@ static void tipc_node_link_up(struct tipc_node *n, int bearer_id, } /** - * tipc_node_link_down - handle loss of link + * tipc_node_link_up - handle addition of link + * + * Link becomes active (alone or shared) or standby, depending on its priority. */ -static void tipc_node_link_down(struct tipc_node *n, int bearer_id) +static void tipc_node_link_up(struct tipc_node *n, int bearer_id, + struct sk_buff_head *xmitq) { + tipc_node_lock(n); + __tipc_node_link_up(n, bearer_id, xmitq); + tipc_node_unlock(n); +} + +/** + * __tipc_node_link_down - handle loss of link + */ +static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id, + struct sk_buff_head *xmitq, + struct tipc_media_addr **maddr) +{ + struct tipc_link_entry *le = &n->links[*bearer_id]; int *slot0 = &n->active_links[0]; int *slot1 = &n->active_links[1]; - struct tipc_media_addr *maddr = &n->links[bearer_id].maddr; int i, highest = 0; struct tipc_link *l, *_l, *tnl; - struct sk_buff_head xmitq; - l = n->links[bearer_id].link; + l = n->links[*bearer_id].link; if (!l || tipc_link_is_reset(l)) return; - __skb_queue_head_init(&xmitq); - n->working_links--; n->action_flags |= TIPC_NOTIFY_LINK_DOWN; - n->link_id = l->peer_bearer_id << 16 | bearer_id; + n->link_id = l->peer_bearer_id << 16 | *bearer_id; - tipc_bearer_remove_dest(n->net, l->bearer_id, n->addr); + tipc_bearer_remove_dest(n->net, *bearer_id, n->addr); pr_debug("Lost link <%s> on network plane %c\n", l->name, l->net_plane); @@ -404,18 +421,40 @@ static void tipc_node_link_down(struct tipc_node *n, int bearer_id) if (!tipc_node_is_up(n)) { tipc_link_reset(l); - node_lost_contact(n); + node_lost_contact(n, &le->inputq); return; } /* There is still a working link => initiate failover */ tnl = node_active_link(n, 0); - tipc_node_fsm_evt(n, NODE_FAILOVER_BEGIN_EVT); n->sync_point = tnl->rcv_nxt + (U16_MAX / 2 - 1); - tipc_link_tnl_prepare(l, tnl, FAILOVER_MSG, &xmitq); + tipc_link_tnl_prepare(l, tnl, FAILOVER_MSG, xmitq); tipc_link_reset(l); tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT); - tipc_bearer_xmit(n->net, tnl->bearer_id, &xmitq, maddr); + tipc_node_fsm_evt(n, NODE_FAILOVER_BEGIN_EVT); + *maddr = &n->links[tnl->bearer_id].maddr; + *bearer_id = tnl->bearer_id; +} + +static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete) +{ + struct tipc_link_entry *le = &n->links[bearer_id]; + struct tipc_media_addr *maddr; + struct sk_buff_head xmitq; + + __skb_queue_head_init(&xmitq); + + tipc_node_lock(n); + __tipc_node_link_down(n, &bearer_id, &xmitq, &maddr); + if (delete && le->link) { + kfree(le->link); + le->link = NULL; + n->link_cnt--; + } + tipc_node_unlock(n); + + tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr); + tipc_sk_rcv(n->net, &le->inputq); } bool tipc_node_is_up(struct tipc_node *n) @@ -437,7 +476,7 @@ void tipc_node_check_dest(struct net *net, u32 onode, bool sign_match = false; bool link_up = false; bool accept_addr = false; - + bool reset = true; *dupl_addr = false; *respond = false; @@ -460,6 +499,7 @@ void tipc_node_check_dest(struct net *net, u32 onode, if (sign_match && addr_match && link_up) { /* All is fine. Do nothing. */ + reset = false; } else if (sign_match && addr_match && !link_up) { /* Respond. The link will come up in due time */ *respond = true; @@ -531,29 +571,21 @@ void tipc_node_check_dest(struct net *net, u32 onode, } memcpy(&l->media_addr, maddr, sizeof(*maddr)); memcpy(curr_maddr, maddr, sizeof(*maddr)); - tipc_node_link_down(n, b->identity); exit: tipc_node_unlock(n); + if (reset) + tipc_node_link_down(n, b->identity, false); tipc_node_put(n); } void tipc_node_delete_links(struct net *net, int bearer_id) { struct tipc_net *tn = net_generic(net, tipc_net_id); - struct tipc_link *l; struct tipc_node *n; rcu_read_lock(); list_for_each_entry_rcu(n, &tn->node_list, list) { - tipc_node_lock(n); - l = n->links[bearer_id].link; - if (l) { - tipc_node_link_down(n, bearer_id); - n->links[bearer_id].link = NULL; - n->link_cnt--; - } - tipc_node_unlock(n); - kfree(l); + tipc_node_link_down(n, bearer_id, true); } rcu_read_unlock(); } @@ -561,19 +593,14 @@ void tipc_node_delete_links(struct net *net, int bearer_id) static void tipc_node_reset_links(struct tipc_node *n) { char addr_string[16]; - u32 i; - - tipc_node_lock(n); + int i; pr_warn("Resetting all links to %s\n", tipc_addr_string_fill(addr_string, n->addr)); for (i = 0; i < MAX_BEARERS; i++) { - if (!n->links[i].link) - continue; - tipc_node_link_down(n, i); + tipc_node_link_down(n, i, false); } - tipc_node_unlock(n); } void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr) @@ -798,10 +825,12 @@ static void node_established_contact(struct tipc_node *n_ptr) tipc_bclink_add_node(n_ptr->net, n_ptr->addr); } -static void node_lost_contact(struct tipc_node *n_ptr) +static void node_lost_contact(struct tipc_node *n_ptr, + struct sk_buff_head *inputq) { char addr_string[16]; struct tipc_sock_conn *conn, *safe; + struct tipc_link *l; struct list_head *conns = &n_ptr->conn_sks; struct sk_buff *skb; struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id); @@ -827,14 +856,11 @@ static void node_lost_contact(struct tipc_node *n_ptr) /* Abort any ongoing link failover */ for (i = 0; i < MAX_BEARERS; i++) { - struct tipc_link *l_ptr = n_ptr->links[i].link; - if (!l_ptr) - continue; - tipc_link_fsm_evt(l_ptr, LINK_FAILOVER_END_EVT); - kfree_skb(l_ptr->failover_reasm_skb); - l_ptr->failover_reasm_skb = NULL; - tipc_link_reset_fragments(l_ptr); + l = n_ptr->links[i].link; + if (l) + tipc_link_fsm_evt(l, LINK_FAILOVER_END_EVT); } + /* Prevent re-contact with node until cleanup is done */ tipc_node_fsm_evt(n_ptr, SELF_LOST_CONTACT_EVT); @@ -848,7 +874,7 @@ static void node_lost_contact(struct tipc_node *n_ptr) conn->peer_node, conn->port, conn->peer_port, TIPC_ERR_NO_NODE); if (likely(skb)) { - skb_queue_tail(n_ptr->inputq, skb); + skb_queue_tail(inputq, skb); n_ptr->action_flags |= TIPC_MSG_EVT; } list_del(&conn->list); @@ -1025,9 +1051,9 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list, l = tipc_node_select_link(n, selector, &bearer_id, &maddr); if (likely(l)) rc = tipc_link_xmit(l, list, &xmitq); - if (unlikely(rc == -ENOBUFS)) - tipc_node_link_down(n, bearer_id); tipc_node_unlock(n); + if (unlikely(rc == -ENOBUFS)) + tipc_node_link_down(n, bearer_id, false); tipc_node_put(n); } if (likely(!rc)) { @@ -1081,8 +1107,8 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, u16 rcv_nxt, syncpt, dlv_nxt; int state = n->state; struct tipc_link *l, *pl = NULL; - struct sk_buff_head; - int i; + struct tipc_media_addr *maddr; + int i, pb_id; l = n->links[bearer_id].link; if (!l) @@ -1123,9 +1149,11 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, /* Initiate or update failover mode if applicable */ if ((usr == TUNNEL_PROTOCOL) && (mtyp == FAILOVER_MSG)) { syncpt = oseqno + exp_pkts - 1; - if (pl && tipc_link_is_up(pl)) - tipc_node_link_down(n, pl->bearer_id); - + if (pl && tipc_link_is_up(pl)) { + pb_id = pl->bearer_id; + __tipc_node_link_down(n, &pb_id, xmitq, &maddr); + tipc_skb_queue_splice_tail_init(pl->inputq, l->inputq); + } /* If pkts arrive out of order, use lowest calculated syncpt */ if (less(syncpt, n->sync_point)) n->sync_point = syncpt; @@ -1146,7 +1174,7 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, syncpt = iseqno + exp_pkts - 1; if (!tipc_link_is_up(l)) { tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT); - tipc_node_link_up(n, bearer_id, xmitq); + __tipc_node_link_up(n, bearer_id, xmitq); } if (n->state == SELF_UP_PEER_UP) { n->sync_point = syncpt; @@ -1224,7 +1252,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) tipc_bclink_sync_state(n, hdr); - /* Release acked broadcast messages */ + /* Release acked broadcast packets */ if (unlikely(n->bclink.acked != msg_bcast_ack(hdr))) tipc_bclink_acknowledge(n, msg_bcast_ack(hdr)); @@ -1233,14 +1261,14 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) rc = tipc_link_rcv(le->link, skb, &xmitq); skb = NULL; } +unlock: + tipc_node_unlock(n); if (unlikely(rc & TIPC_LINK_UP_EVT)) tipc_node_link_up(n, bearer_id, &xmitq); if (unlikely(rc & TIPC_LINK_DOWN_EVT)) - tipc_node_link_down(n, bearer_id); -unlock: - tipc_node_unlock(n); + tipc_node_link_down(n, bearer_id, false); if (!skb_queue_empty(&le->inputq)) tipc_sk_rcv(net, &le->inputq); -- cgit v1.2.3 From 23d8335d786472021b5c733f228c7074208dcfa0 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 30 Jul 2015 18:24:24 -0400 Subject: tipc: remove implicit message delivery in node_unlock() After the most recent changes, all access calls to a link which may entail addition of messages to the link's input queue are postpended by an explicit call to tipc_sk_rcv(), using a reference to the correct queue. This means that the potentially hazardous implicit delivery, using tipc_node_unlock() in combination with a binary flag and a cached queue pointer, now has become redundant. This commit removes this implicit delivery mechanism both for regular data messages and for binding table update messages. Tested-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/node.c | 26 +++++++------------------- 1 file changed, 7 insertions(+), 19 deletions(-) (limited to 'net/tipc/node.c') diff --git a/net/tipc/node.c b/net/tipc/node.c index cdca57be85bf..9e9b0938bd17 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -873,10 +873,8 @@ static void node_lost_contact(struct tipc_node *n_ptr, SHORT_H_SIZE, 0, tn->own_addr, conn->peer_node, conn->port, conn->peer_port, TIPC_ERR_NO_NODE); - if (likely(skb)) { + if (likely(skb)) skb_queue_tail(inputq, skb); - n_ptr->action_flags |= TIPC_MSG_EVT; - } list_del(&conn->list); kfree(conn); } @@ -923,27 +921,20 @@ void tipc_node_unlock(struct tipc_node *node) u32 flags = node->action_flags; u32 link_id = 0; struct list_head *publ_list; - struct sk_buff_head *inputq = node->inputq; - struct sk_buff_head *namedq; - if (likely(!flags || (flags == TIPC_MSG_EVT))) { - node->action_flags = 0; + if (likely(!flags)) { spin_unlock_bh(&node->lock); - if (flags == TIPC_MSG_EVT) - tipc_sk_rcv(net, inputq); return; } addr = node->addr; link_id = node->link_id; - namedq = node->namedq; publ_list = &node->publ_list; - node->action_flags &= ~(TIPC_MSG_EVT | - TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP | + node->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP | TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP | TIPC_WAKEUP_BCAST_USERS | TIPC_BCAST_MSG_EVT | - TIPC_NAMED_MSG_EVT | TIPC_BCAST_RESET); + TIPC_BCAST_RESET); spin_unlock_bh(&node->lock); @@ -964,12 +955,6 @@ void tipc_node_unlock(struct tipc_node *node) tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr, link_id, addr); - if (flags & TIPC_MSG_EVT) - tipc_sk_rcv(net, inputq); - - if (flags & TIPC_NAMED_MSG_EVT) - tipc_named_rcv(net, namedq); - if (flags & TIPC_BCAST_MSG_EVT) tipc_bclink_input(net); @@ -1270,6 +1255,9 @@ unlock: if (unlikely(rc & TIPC_LINK_DOWN_EVT)) tipc_node_link_down(n, bearer_id, false); + if (unlikely(!skb_queue_empty(&n->bclink.namedq))) + tipc_named_rcv(net, &n->bclink.namedq); + if (!skb_queue_empty(&le->inputq)) tipc_sk_rcv(net, &le->inputq); -- cgit v1.2.3 From 440d8963cd590ec9387d76a36e60c02da9ed944d Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 30 Jul 2015 18:24:26 -0400 Subject: tipc: clean up link creation We simplify the link creation function tipc_link_create() and the way the link struct it is connected to the node struct. In particular, we remove the duplicate initialization of some fields which are anyway set in tipc_link_reset(). Tested-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/node.c | 48 ++++++++++++++++-------------------------------- 1 file changed, 16 insertions(+), 32 deletions(-) (limited to 'net/tipc/node.c') diff --git a/net/tipc/node.c b/net/tipc/node.c index 9e9b0938bd17..7c191641b44f 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -320,10 +320,6 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id, if (!nl || !tipc_link_is_up(nl)) return; - if (n->working_links > 1) { - pr_warn("Attempt to establish 3rd link to %x\n", n->addr); - return; - } n->working_links++; n->action_flags |= TIPC_NOTIFY_LINK_UP; n->link_id = nl->peer_bearer_id << 16 | bearer_id; @@ -470,13 +466,13 @@ void tipc_node_check_dest(struct net *net, u32 onode, { struct tipc_node *n; struct tipc_link *l; - struct tipc_media_addr *curr_maddr; - struct sk_buff_head *inputq; + struct tipc_link_entry *le; bool addr_match = false; bool sign_match = false; bool link_up = false; bool accept_addr = false; bool reset = true; + *dupl_addr = false; *respond = false; @@ -486,13 +482,12 @@ void tipc_node_check_dest(struct net *net, u32 onode, tipc_node_lock(n); - curr_maddr = &n->links[b->identity].maddr; - inputq = &n->links[b->identity].inputq; + le = &n->links[b->identity]; /* Prepare to validate requesting node's signature and media address */ - l = n->links[b->identity].link; + l = le->link; link_up = l && tipc_link_is_up(l); - addr_match = l && !memcmp(curr_maddr, maddr, sizeof(*maddr)); + addr_match = l && !memcmp(&le->maddr, maddr, sizeof(*maddr)); sign_match = (signature == n->signature); /* These three flags give us eight permutations: */ @@ -559,18 +554,25 @@ void tipc_node_check_dest(struct net *net, u32 onode, /* Now create new link if not already existing */ if (!l) { - l = tipc_link_create(n, b, maddr, inputq, &n->bclink.namedq); - if (!l) { + if (n->link_cnt == 2) { + pr_warn("Cannot establish 3rd link to %x\n", n->addr); + goto exit; + } + if (!tipc_link_create(n, b, mod(tipc_net(net)->random), + tipc_own_addr(net), onode, &le->maddr, + &le->inputq, &n->bclink.namedq, &l)) { *respond = false; goto exit; } + tipc_link_reset(l); + le->link = l; + n->link_cnt++; tipc_node_calculate_timer(n, l); if (n->link_cnt == 1) if (!mod_timer(&n->timer, jiffies + n->keepalive_intv)) tipc_node_get(n); } - memcpy(&l->media_addr, maddr, sizeof(*maddr)); - memcpy(curr_maddr, maddr, sizeof(*maddr)); + memcpy(&le->maddr, maddr, sizeof(*maddr)); exit: tipc_node_unlock(n); if (reset) @@ -603,24 +605,6 @@ static void tipc_node_reset_links(struct tipc_node *n) } } -void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr) -{ - n_ptr->links[l_ptr->bearer_id].link = l_ptr; - n_ptr->link_cnt++; -} - -void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr) -{ - int i; - - for (i = 0; i < MAX_BEARERS; i++) { - if (l_ptr != n_ptr->links[i].link) - continue; - n_ptr->links[i].link = NULL; - n_ptr->link_cnt--; - } -} - /* tipc_node_fsm_evt - node finite state machine * Determines when contact is allowed with peer node */ -- cgit v1.2.3 From 17b2063077a7478e5fd3c34b04a059dbb8474638 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 20 Aug 2015 02:12:54 -0400 Subject: tipc: eliminate risk of premature link setup during failover When a link goes down, and there is still a working link towards its destination node, a failover is initiated, and the failed link is not allowed to re-establish until that procedure is finished. To ensure this, the concerned link endpoints are set to state LINK_FAILINGOVER, and the node endpoints to NODE_FAILINGOVER during the failover period. However, if the link reset is due to a disabled bearer, the corres- ponding link endpoint is deleted, and only the node endpoint knows about the ongoing failover. Now, if the disabled bearer is re-enabled during the failover period, the discovery mechanism may create a new link endpoint that is ready to be established, despite that this is not permitted. This situation may cause both the ongoing failover and any subsequent link synchronization to fail. In this commit, we ensure that a newly created link goes directly to state LINK_FAILINGOVER if the corresponding node state is NODE_FAILINGOVER. This eliminates the problem described above. Furthermore, we tighten the criteria for which packets are allowed to end a failover state in the function tipc_node_check_state(). By checking that the receiving link is up and running, instead of just checking that it is not in failover mode, we eliminate the risk that protocol packets from the re-created link may cause the failover to be prematurely terminated. Reviewed-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/node.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'net/tipc/node.c') diff --git a/net/tipc/node.c b/net/tipc/node.c index 7c191641b44f..004834bd1605 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -565,6 +565,8 @@ void tipc_node_check_dest(struct net *net, u32 onode, goto exit; } tipc_link_reset(l); + if (n->state == NODE_FAILINGOVER) + tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT); le->link = l; n->link_cnt++; tipc_node_calculate_timer(n, l); @@ -1129,7 +1131,7 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, } /* Open parallel link when tunnel link reaches synch point */ - if ((n->state == NODE_FAILINGOVER) && !tipc_link_is_failingover(l)) { + if ((n->state == NODE_FAILINGOVER) && tipc_link_is_up(l)) { if (!more(rcv_nxt, n->sync_point)) return true; tipc_node_fsm_evt(n, NODE_FAILOVER_END_EVT); -- cgit v1.2.3 From 5ae2f8e6857968d6dddbd3879ed0a32b860e02d1 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 20 Aug 2015 02:12:55 -0400 Subject: tipc: interrupt link synchronization when a link goes down When we introduced the new link failover/synch mechanism in commit 6e498158a827fd515b514842e9a06bdf0f75ab86 ("tipc: move link synch and failover to link aggregation level"), we missed the case when the non-tunnel link goes down during the link synchronization period. In this case the tunnel link will remain in state LINK_SYNCHING, something leading to unpredictable behavior when the failover procedure is initiated. In this commit, we ensure that the node and remaining link goes back to regular communication state (SELF_UP_PEER_UP/LINK_ESTABLISHED) when one of the parallel links goes down. We also ensure that we don't re-enter synch mode if subsequent SYNCH packets arrive on the remaining link. Reviewed-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/node.c | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) (limited to 'net/tipc/node.c') diff --git a/net/tipc/node.c b/net/tipc/node.c index 004834bd1605..937cc6192bcf 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -423,6 +423,8 @@ static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id, /* There is still a working link => initiate failover */ tnl = node_active_link(n, 0); + tipc_link_fsm_evt(tnl, LINK_SYNCH_END_EVT); + tipc_node_fsm_evt(n, NODE_SYNCH_END_EVT); n->sync_point = tnl->rcv_nxt + (U16_MAX / 2 - 1); tipc_link_tnl_prepare(l, tnl, FAILOVER_MSG, xmitq); tipc_link_reset(l); @@ -1140,6 +1142,10 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, return true; } + /* No synching needed if only one link */ + if (!pl || !tipc_link_is_up(pl)) + return true; + /* Initiate or update synch mode if applicable */ if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG)) { syncpt = iseqno + exp_pkts - 1; @@ -1158,9 +1164,8 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, /* Open tunnel link when parallel link reaches synch point */ if ((n->state == NODE_SYNCHING) && tipc_link_is_synching(l)) { - if (pl) - dlv_nxt = mod(pl->rcv_nxt - skb_queue_len(pl->inputq)); - if (!pl || more(dlv_nxt, n->sync_point)) { + dlv_nxt = pl->rcv_nxt - mod(skb_queue_len(pl->inputq)); + if (more(dlv_nxt, n->sync_point)) { tipc_link_fsm_evt(l, LINK_SYNCH_END_EVT); tipc_node_fsm_evt(n, NODE_SYNCH_END_EVT); return true; -- cgit v1.2.3 From 2be80c2d87de789550982e74a11e9f9ff5940845 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 20 Aug 2015 02:12:56 -0400 Subject: tipc: fix stale link problem during synchronization Recent changes to the link synchronization means that we can now just drop packets arriving on the synchronizing link before the synch point is reached. This has lead to significant simplifications to the implementation, but also turns out to have a flip side that we need to consider. Under unlucky circumstances, the two endpoints may end up repeatedly dropping each other's packets, while immediately asking for retransmission of the same packets, just to drop them once more. This pattern will eventually be broken when the synch point is reached on the other link, but before that, the endpoints may have arrived at the retransmission limit (stale counter) that indicates that the link should be broken. We see this happen at rare occasions. The fix for this is to not ask for retransmissions when a link is in state LINK_SYNCHING. The fact that the link has reached this state means that it has already received the first SYNCH packet, and that it knows the synch point. Hence, it doesn't need any more packets until the other link has reached the synch point, whereafter it can go ahead and ask for the missing packets. However, because of the reduced traffic on the synching link that follows this change, it may now take longer to discover that the synch point has been reached. We compensate for this by letting all packets, on any of the links, trig a check for synchronization termination. This is possible because the packets themselves don't contain any information that is needed for discovering this condition. Reviewed-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/node.c | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) (limited to 'net/tipc/node.c') diff --git a/net/tipc/node.c b/net/tipc/node.c index 937cc6192bcf..703875fd6cde 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -1079,7 +1079,7 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, u16 exp_pkts = msg_msgcnt(hdr); u16 rcv_nxt, syncpt, dlv_nxt; int state = n->state; - struct tipc_link *l, *pl = NULL; + struct tipc_link *l, *tnl, *pl = NULL; struct tipc_media_addr *maddr; int i, pb_id; @@ -1164,12 +1164,20 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, /* Open tunnel link when parallel link reaches synch point */ if ((n->state == NODE_SYNCHING) && tipc_link_is_synching(l)) { + if (tipc_link_is_synching(l)) { + tnl = l; + } else { + tnl = pl; + pl = l; + } dlv_nxt = pl->rcv_nxt - mod(skb_queue_len(pl->inputq)); if (more(dlv_nxt, n->sync_point)) { - tipc_link_fsm_evt(l, LINK_SYNCH_END_EVT); + tipc_link_fsm_evt(tnl, LINK_SYNCH_END_EVT); tipc_node_fsm_evt(n, NODE_SYNCH_END_EVT); return true; } + if (l == pl) + return true; if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG)) return true; if (usr == LINK_PROTOCOL) -- cgit v1.2.3