mirror of
https://github.com/armbian/linux-cix.git
synced 2026-01-06 12:30:45 -08:00
tipc: reduce risk of user starvation during link congestion
The socket code currently handles link congestion by either blocking and trying to send again when the congestion has abated, or just returning to the user with -EAGAIN and let him re-try later. This mechanism is prone to starvation, because the wakeup algorithm is non-atomic. During the time the link issues a wakeup signal, until the socket wakes up and re-attempts sending, other senders may have come in between and occupied the free buffer space in the link. This in turn may lead to a socket having to make many send attempts before it is successful. In extremely loaded systems we have observed latency times of several seconds before a low-priority socket is able to send out a message. In this commit, we simplify this mechanism and reduce the risk of the described scenario happening. When a message is attempted sent via a congested link, we now let it be added to the link's backlog queue anyway, thus permitting an oversubscription of one message per source socket. We still create a wakeup item and return an error code, hence instructing the sender to block or stop sending. Only when enough space has been freed up in the link's backlog queue do we issue a wakeup event that allows the sender to continue with the next message, if any. The fact that a socket now can consider a message sent even when the link returns a congestion code means that the sending socket code can be simplified. Also, since this is a good opportunity to get rid of the obsolete 'mtu change' condition in the three socket send functions, we now choose to refactor those functions completely. Signed-off-by: Parthasarathy Bhuvaragan <parthasarathy.bhuvaragan@ericsson.com> Acked-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
This commit is contained in:
committed by
David S. Miller
parent
4d8642d896
commit
365ad353c2
@@ -174,7 +174,7 @@ static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq)
|
||||
* and to identified node local sockets
|
||||
* @net: the applicable net namespace
|
||||
* @list: chain of buffers containing message
|
||||
* Consumes the buffer chain, except when returning -ELINKCONG
|
||||
* Consumes the buffer chain.
|
||||
* Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
|
||||
*/
|
||||
int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
|
||||
@@ -197,7 +197,7 @@ int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
|
||||
tipc_bcast_unlock(net);
|
||||
|
||||
/* Don't send to local node if adding to link failed */
|
||||
if (unlikely(rc)) {
|
||||
if (unlikely(rc && (rc != -ELINKCONG))) {
|
||||
__skb_queue_purge(&rcvq);
|
||||
return rc;
|
||||
}
|
||||
@@ -206,7 +206,7 @@ int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
|
||||
tipc_bcbase_xmit(net, &xmitq);
|
||||
tipc_sk_mcast_rcv(net, &rcvq, &inputq);
|
||||
__skb_queue_purge(list);
|
||||
return 0;
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* tipc_bcast_rcv - receive a broadcast packet, and deliver to rcv link
|
||||
|
||||
@@ -776,60 +776,47 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
|
||||
|
||||
/**
|
||||
* link_schedule_user - schedule a message sender for wakeup after congestion
|
||||
* @link: congested link
|
||||
* @list: message that was attempted sent
|
||||
* @l: congested link
|
||||
* @hdr: header of message that is being sent
|
||||
* Create pseudo msg to send back to user when congestion abates
|
||||
* Does not consume buffer list
|
||||
*/
|
||||
static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
|
||||
static int link_schedule_user(struct tipc_link *l, struct tipc_msg *hdr)
|
||||
{
|
||||
struct tipc_msg *msg = buf_msg(skb_peek(list));
|
||||
int imp = msg_importance(msg);
|
||||
u32 oport = msg_origport(msg);
|
||||
u32 addr = tipc_own_addr(link->net);
|
||||
u32 dnode = tipc_own_addr(l->net);
|
||||
u32 dport = msg_origport(hdr);
|
||||
struct sk_buff *skb;
|
||||
|
||||
/* This really cannot happen... */
|
||||
if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
|
||||
pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
|
||||
return -ENOBUFS;
|
||||
}
|
||||
/* Non-blocking sender: */
|
||||
if (TIPC_SKB_CB(skb_peek(list))->wakeup_pending)
|
||||
return -ELINKCONG;
|
||||
|
||||
/* Create and schedule wakeup pseudo message */
|
||||
skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0,
|
||||
addr, addr, oport, 0, 0);
|
||||
dnode, l->addr, dport, 0, 0);
|
||||
if (!skb)
|
||||
return -ENOBUFS;
|
||||
TIPC_SKB_CB(skb)->chain_sz = skb_queue_len(list);
|
||||
TIPC_SKB_CB(skb)->chain_imp = imp;
|
||||
skb_queue_tail(&link->wakeupq, skb);
|
||||
link->stats.link_congs++;
|
||||
msg_set_dest_droppable(buf_msg(skb), true);
|
||||
TIPC_SKB_CB(skb)->chain_imp = msg_importance(hdr);
|
||||
skb_queue_tail(&l->wakeupq, skb);
|
||||
l->stats.link_congs++;
|
||||
return -ELINKCONG;
|
||||
}
|
||||
|
||||
/**
|
||||
* link_prepare_wakeup - prepare users for wakeup after congestion
|
||||
* @link: congested link
|
||||
* Move a number of waiting users, as permitted by available space in
|
||||
* the send queue, from link wait queue to node wait queue for wakeup
|
||||
* @l: congested link
|
||||
* Wake up a number of waiting users, as permitted by available space
|
||||
* in the send queue
|
||||
*/
|
||||
void link_prepare_wakeup(struct tipc_link *l)
|
||||
{
|
||||
int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,};
|
||||
int imp, lim;
|
||||
struct sk_buff *skb, *tmp;
|
||||
int imp, i = 0;
|
||||
|
||||
skb_queue_walk_safe(&l->wakeupq, skb, tmp) {
|
||||
imp = TIPC_SKB_CB(skb)->chain_imp;
|
||||
lim = l->backlog[imp].limit;
|
||||
pnd[imp] += TIPC_SKB_CB(skb)->chain_sz;
|
||||
if ((pnd[imp] + l->backlog[imp].len) >= lim)
|
||||
if (l->backlog[imp].len < l->backlog[imp].limit) {
|
||||
skb_unlink(skb, &l->wakeupq);
|
||||
skb_queue_tail(l->inputq, skb);
|
||||
} else if (i++ > 10) {
|
||||
break;
|
||||
skb_unlink(skb, &l->wakeupq);
|
||||
skb_queue_tail(l->inputq, skb);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -869,8 +856,7 @@ void tipc_link_reset(struct tipc_link *l)
|
||||
* @list: chain of buffers containing message
|
||||
* @xmitq: returned list of packets to be sent by caller
|
||||
*
|
||||
* Consumes the buffer chain, except when returning -ELINKCONG,
|
||||
* since the caller then may want to make more send attempts.
|
||||
* Consumes the buffer chain.
|
||||
* Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
|
||||
* Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
|
||||
*/
|
||||
@@ -879,7 +865,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
|
||||
{
|
||||
struct tipc_msg *hdr = buf_msg(skb_peek(list));
|
||||
unsigned int maxwin = l->window;
|
||||
unsigned int i, imp = msg_importance(hdr);
|
||||
int imp = msg_importance(hdr);
|
||||
unsigned int mtu = l->mtu;
|
||||
u16 ack = l->rcv_nxt - 1;
|
||||
u16 seqno = l->snd_nxt;
|
||||
@@ -888,19 +874,22 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
|
||||
struct sk_buff_head *backlogq = &l->backlogq;
|
||||
struct sk_buff *skb, *_skb, *bskb;
|
||||
int pkt_cnt = skb_queue_len(list);
|
||||
int rc = 0;
|
||||
|
||||
/* Match msg importance against this and all higher backlog limits: */
|
||||
if (!skb_queue_empty(backlogq)) {
|
||||
for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
|
||||
if (unlikely(l->backlog[i].len >= l->backlog[i].limit))
|
||||
return link_schedule_user(l, list);
|
||||
}
|
||||
}
|
||||
if (unlikely(msg_size(hdr) > mtu)) {
|
||||
skb_queue_purge(list);
|
||||
return -EMSGSIZE;
|
||||
}
|
||||
|
||||
/* Allow oversubscription of one data msg per source at congestion */
|
||||
if (unlikely(l->backlog[imp].len >= l->backlog[imp].limit)) {
|
||||
if (imp == TIPC_SYSTEM_IMPORTANCE) {
|
||||
pr_warn("%s<%s>, link overflow", link_rst_msg, l->name);
|
||||
return -ENOBUFS;
|
||||
}
|
||||
rc = link_schedule_user(l, hdr);
|
||||
}
|
||||
|
||||
if (pkt_cnt > 1) {
|
||||
l->stats.sent_fragmented++;
|
||||
l->stats.sent_fragments += pkt_cnt;
|
||||
@@ -946,7 +935,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
|
||||
skb_queue_splice_tail_init(list, backlogq);
|
||||
}
|
||||
l->snd_nxt = seqno;
|
||||
return 0;
|
||||
return rc;
|
||||
}
|
||||
|
||||
void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq)
|
||||
|
||||
@@ -98,8 +98,6 @@ struct tipc_skb_cb {
|
||||
u32 bytes_read;
|
||||
struct sk_buff *tail;
|
||||
bool validated;
|
||||
bool wakeup_pending;
|
||||
u16 chain_sz;
|
||||
u16 chain_imp;
|
||||
u16 ackers;
|
||||
};
|
||||
|
||||
@@ -1167,7 +1167,7 @@ msg_full:
|
||||
* @list: chain of buffers containing message
|
||||
* @dnode: address of destination node
|
||||
* @selector: a number used for deterministic link selection
|
||||
* Consumes the buffer chain, except when returning -ELINKCONG
|
||||
* Consumes the buffer chain.
|
||||
* Returns 0 if success, otherwise: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE,-ENOBUF
|
||||
*/
|
||||
int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
|
||||
@@ -1206,10 +1206,10 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
|
||||
spin_unlock_bh(&le->lock);
|
||||
tipc_node_read_unlock(n);
|
||||
|
||||
if (likely(rc == 0))
|
||||
tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
|
||||
else if (rc == -ENOBUFS)
|
||||
if (unlikely(rc == -ENOBUFS))
|
||||
tipc_node_link_down(n, bearer_id, false);
|
||||
else
|
||||
tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
|
||||
|
||||
tipc_node_put(n);
|
||||
|
||||
@@ -1221,20 +1221,15 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
|
||||
* messages, which will not be rejected
|
||||
* The only exception is datagram messages rerouted after secondary
|
||||
* lookup, which are rare and safe to dispose of anyway.
|
||||
* TODO: Return real return value, and let callers use
|
||||
* tipc_wait_for_sendpkt() where applicable
|
||||
*/
|
||||
int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
|
||||
u32 selector)
|
||||
{
|
||||
struct sk_buff_head head;
|
||||
int rc;
|
||||
|
||||
skb_queue_head_init(&head);
|
||||
__skb_queue_tail(&head, skb);
|
||||
rc = tipc_node_xmit(net, &head, dnode, selector);
|
||||
if (rc == -ELINKCONG)
|
||||
kfree_skb(skb);
|
||||
tipc_node_xmit(net, &head, dnode, selector);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user