2 * net/tipc/link.c: TIPC link code
4 * Copyright (c) 1996-2007, 2012-2015, Ericsson AB
5 * Copyright (c) 2004-2007, 2010-2013, Wind River Systems
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are met:
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 * 3. Neither the names of the copyright holders nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
20 * Alternatively, this software may be distributed under the terms of the
21 * GNU General Public License ("GPL") version 2 as published by the Free
22 * Software Foundation.
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34 * POSSIBILITY OF SUCH DAMAGE.
42 #include "name_distr.h"
46 #include <linux/pkt_sched.h>
49 * Error message prefixes
51 static const char *link_co_err
= "Link changeover error, ";
52 static const char *link_rst_msg
= "Resetting link ";
53 static const char *link_unk_evt
= "Unknown link event ";
55 static const struct nla_policy tipc_nl_link_policy
[TIPC_NLA_LINK_MAX
+ 1] = {
56 [TIPC_NLA_LINK_UNSPEC
] = { .type
= NLA_UNSPEC
},
57 [TIPC_NLA_LINK_NAME
] = {
59 .len
= TIPC_MAX_LINK_NAME
61 [TIPC_NLA_LINK_MTU
] = { .type
= NLA_U32
},
62 [TIPC_NLA_LINK_BROADCAST
] = { .type
= NLA_FLAG
},
63 [TIPC_NLA_LINK_UP
] = { .type
= NLA_FLAG
},
64 [TIPC_NLA_LINK_ACTIVE
] = { .type
= NLA_FLAG
},
65 [TIPC_NLA_LINK_PROP
] = { .type
= NLA_NESTED
},
66 [TIPC_NLA_LINK_STATS
] = { .type
= NLA_NESTED
},
67 [TIPC_NLA_LINK_RX
] = { .type
= NLA_U32
},
68 [TIPC_NLA_LINK_TX
] = { .type
= NLA_U32
}
71 /* Properties valid for media, bearar and link */
72 static const struct nla_policy tipc_nl_prop_policy
[TIPC_NLA_PROP_MAX
+ 1] = {
73 [TIPC_NLA_PROP_UNSPEC
] = { .type
= NLA_UNSPEC
},
74 [TIPC_NLA_PROP_PRIO
] = { .type
= NLA_U32
},
75 [TIPC_NLA_PROP_TOL
] = { .type
= NLA_U32
},
76 [TIPC_NLA_PROP_WIN
] = { .type
= NLA_U32
}
80 * Interval between NACKs when packets arrive out of order
82 #define TIPC_NACK_INTV (TIPC_MIN_LINK_WIN * 2)
84 * Out-of-range value for link session numbers
86 #define WILDCARD_SESSION 0x10000
88 /* State value stored in 'failover_pkts'
90 #define FIRST_FAILOVER 0xffffu
92 /* Link FSM states and events:
98 TIPC_LINK_ESTABLISHING
102 PEER_RESET_EVT
= RESET_MSG
,
103 ACTIVATE_EVT
= ACTIVATE_MSG
,
104 TRAFFIC_EVT
, /* Any other valid msg from peer */
105 SILENCE_EVT
/* Peer was silent during last timer interval*/
108 /* Link FSM state checking routines
110 static int link_working(struct tipc_link
*l
)
112 return l
->state
== TIPC_LINK_WORKING
;
115 static int link_probing(struct tipc_link
*l
)
117 return l
->state
== TIPC_LINK_PROBING
;
120 static int link_resetting(struct tipc_link
*l
)
122 return l
->state
== TIPC_LINK_RESETTING
;
125 static int link_establishing(struct tipc_link
*l
)
127 return l
->state
== TIPC_LINK_ESTABLISHING
;
130 static int tipc_link_proto_rcv(struct tipc_link
*l
, struct sk_buff
*skb
,
131 struct sk_buff_head
*xmitq
);
132 static void tipc_link_build_proto_msg(struct tipc_link
*l
, int mtyp
, bool probe
,
133 u16 rcvgap
, int tolerance
, int priority
,
134 struct sk_buff_head
*xmitq
);
135 static void link_reset_statistics(struct tipc_link
*l_ptr
);
136 static void link_print(struct tipc_link
*l_ptr
, const char *str
);
137 static void tipc_link_build_bcast_sync_msg(struct tipc_link
*l
,
138 struct sk_buff_head
*xmitq
);
139 static void tipc_link_sync_rcv(struct tipc_node
*n
, struct sk_buff
*buf
);
140 static void tipc_link_input(struct tipc_link
*l
, struct sk_buff
*skb
);
141 static bool tipc_data_input(struct tipc_link
*l
, struct sk_buff
*skb
);
142 static bool tipc_link_failover_rcv(struct tipc_link
*l
, struct sk_buff
**skb
);
145 * Simple link routines
147 static unsigned int align(unsigned int i
)
149 return (i
+ 3) & ~3u;
152 static struct tipc_link
*tipc_parallel_link(struct tipc_link
*l
)
154 struct tipc_node
*n
= l
->owner
;
156 if (node_active_link(n
, 0) != l
)
157 return node_active_link(n
, 0);
158 return node_active_link(n
, 1);
162 * Simple non-static link routines (i.e. referenced outside this file)
164 int tipc_link_is_up(struct tipc_link
*l_ptr
)
168 return link_working(l_ptr
) || link_probing(l_ptr
);
171 int tipc_link_is_active(struct tipc_link
*l
)
173 struct tipc_node
*n
= l
->owner
;
175 return (node_active_link(n
, 0) == l
) || (node_active_link(n
, 1) == l
);
179 * tipc_link_create - create a new link
180 * @n_ptr: pointer to associated node
181 * @b_ptr: pointer to associated bearer
182 * @media_addr: media address to use when sending messages over link
184 * Returns pointer to link.
186 struct tipc_link
*tipc_link_create(struct tipc_node
*n_ptr
,
187 struct tipc_bearer
*b_ptr
,
188 const struct tipc_media_addr
*media_addr
,
189 struct sk_buff_head
*inputq
,
190 struct sk_buff_head
*namedq
)
192 struct tipc_net
*tn
= net_generic(n_ptr
->net
, tipc_net_id
);
193 struct tipc_link
*l_ptr
;
194 struct tipc_msg
*msg
;
196 char addr_string
[16];
197 u32 peer
= n_ptr
->addr
;
199 if (n_ptr
->link_cnt
>= MAX_BEARERS
) {
200 tipc_addr_string_fill(addr_string
, n_ptr
->addr
);
201 pr_err("Cannot establish %uth link to %s. Max %u allowed.\n",
202 n_ptr
->link_cnt
, addr_string
, MAX_BEARERS
);
206 if (n_ptr
->links
[b_ptr
->identity
].link
) {
207 tipc_addr_string_fill(addr_string
, n_ptr
->addr
);
208 pr_err("Attempt to establish second link on <%s> to %s\n",
209 b_ptr
->name
, addr_string
);
213 l_ptr
= kzalloc(sizeof(*l_ptr
), GFP_ATOMIC
);
215 pr_warn("Link creation failed, no memory\n");
219 if_name
= strchr(b_ptr
->name
, ':') + 1;
220 sprintf(l_ptr
->name
, "%u.%u.%u:%s-%u.%u.%u:unknown",
221 tipc_zone(tn
->own_addr
), tipc_cluster(tn
->own_addr
),
222 tipc_node(tn
->own_addr
),
224 tipc_zone(peer
), tipc_cluster(peer
), tipc_node(peer
));
225 /* note: peer i/f name is updated by reset/activate message */
226 memcpy(&l_ptr
->media_addr
, media_addr
, sizeof(*media_addr
));
227 l_ptr
->owner
= n_ptr
;
228 l_ptr
->peer_session
= WILDCARD_SESSION
;
229 l_ptr
->bearer_id
= b_ptr
->identity
;
230 l_ptr
->tolerance
= b_ptr
->tolerance
;
231 l_ptr
->state
= TIPC_LINK_RESETTING
;
233 l_ptr
->pmsg
= (struct tipc_msg
*)&l_ptr
->proto_msg
;
235 tipc_msg_init(tn
->own_addr
, msg
, LINK_PROTOCOL
, RESET_MSG
, INT_H_SIZE
,
237 msg_set_size(msg
, sizeof(l_ptr
->proto_msg
));
238 msg_set_session(msg
, (tn
->random
& 0xffff));
239 msg_set_bearer_id(msg
, b_ptr
->identity
);
240 strcpy((char *)msg_data(msg
), if_name
);
241 l_ptr
->net_plane
= b_ptr
->net_plane
;
242 l_ptr
->advertised_mtu
= b_ptr
->mtu
;
243 l_ptr
->mtu
= l_ptr
->advertised_mtu
;
244 l_ptr
->priority
= b_ptr
->priority
;
245 tipc_link_set_queue_limits(l_ptr
, b_ptr
->window
);
247 __skb_queue_head_init(&l_ptr
->transmq
);
248 __skb_queue_head_init(&l_ptr
->backlogq
);
249 __skb_queue_head_init(&l_ptr
->deferdq
);
250 skb_queue_head_init(&l_ptr
->wakeupq
);
251 l_ptr
->inputq
= inputq
;
252 l_ptr
->namedq
= namedq
;
253 skb_queue_head_init(l_ptr
->inputq
);
254 link_reset_statistics(l_ptr
);
255 tipc_node_attach_link(n_ptr
, l_ptr
);
260 * tipc_link_delete - Delete a link
261 * @l: link to be deleted
263 void tipc_link_delete(struct tipc_link
*l
)
266 tipc_link_reset_fragments(l
);
267 tipc_node_detach_link(l
->owner
, l
);
270 void tipc_link_delete_list(struct net
*net
, unsigned int bearer_id
)
272 struct tipc_net
*tn
= net_generic(net
, tipc_net_id
);
273 struct tipc_link
*link
;
274 struct tipc_node
*node
;
277 list_for_each_entry_rcu(node
, &tn
->node_list
, list
) {
278 tipc_node_lock(node
);
279 link
= node
->links
[bearer_id
].link
;
281 tipc_link_delete(link
);
282 tipc_node_unlock(node
);
287 /* tipc_link_build_bcast_sync_msg() - synchronize broadcast link endpoints.
289 * Give a newly added peer node the sequence number where it should
290 * start receiving and acking broadcast packets.
292 static void tipc_link_build_bcast_sync_msg(struct tipc_link
*l
,
293 struct sk_buff_head
*xmitq
)
296 struct sk_buff_head list
;
298 skb
= tipc_msg_create(BCAST_PROTOCOL
, STATE_MSG
, INT_H_SIZE
,
299 0, l
->addr
, link_own_addr(l
), 0, 0, 0);
302 __skb_queue_head_init(&list
);
303 __skb_queue_tail(&list
, skb
);
304 tipc_link_xmit(l
, &list
, xmitq
);
308 * tipc_link_fsm_evt - link finite state machine
309 * @l: pointer to link
310 * @evt: state machine event to be processed
311 * @xmitq: queue to prepend created protocol message, if any
313 static int tipc_link_fsm_evt(struct tipc_link
*l
, int evt
,
314 struct sk_buff_head
*xmitq
)
316 int mtyp
= 0, rc
= 0;
317 struct tipc_link
*pl
;
320 LINK_ACTIVATE
= (1 << 1),
321 SND_PROBE
= (1 << 2),
322 SND_STATE
= (1 << 3),
323 SND_RESET
= (1 << 4),
324 SND_ACTIVATE
= (1 << 5),
325 SND_BCAST_SYNC
= (1 << 6)
328 if (l
->exec_mode
== TIPC_LINK_BLOCKED
)
332 case TIPC_LINK_WORKING
:
338 l
->state
= TIPC_LINK_PROBING
;
339 actions
|= SND_PROBE
;
342 actions
|= LINK_RESET
| SND_ACTIVATE
;
345 pr_debug("%s%u WORKING\n", link_unk_evt
, evt
);
348 case TIPC_LINK_PROBING
:
352 l
->state
= TIPC_LINK_WORKING
;
355 actions
|= LINK_RESET
| SND_ACTIVATE
;
358 if (l
->silent_intv_cnt
<= l
->abort_limit
) {
359 actions
|= SND_PROBE
;
362 actions
|= LINK_RESET
| SND_RESET
;
365 pr_err("%s%u PROBING\n", link_unk_evt
, evt
);
368 case TIPC_LINK_RESETTING
:
373 pl
= node_active_link(l
->owner
, 0);
374 if (pl
&& link_probing(pl
))
376 actions
|= LINK_ACTIVATE
;
377 if (!l
->owner
->working_links
)
378 actions
|= SND_BCAST_SYNC
;
381 l
->state
= TIPC_LINK_ESTABLISHING
;
382 actions
|= SND_ACTIVATE
;
385 actions
|= SND_RESET
;
388 pr_err("%s%u in RESETTING\n", link_unk_evt
, evt
);
391 case TIPC_LINK_ESTABLISHING
:
395 pl
= node_active_link(l
->owner
, 0);
396 if (pl
&& link_probing(pl
))
398 actions
|= LINK_ACTIVATE
;
399 if (!l
->owner
->working_links
)
400 actions
|= SND_BCAST_SYNC
;
405 actions
|= SND_ACTIVATE
;
408 pr_err("%s%u ESTABLISHING\n", link_unk_evt
, evt
);
412 pr_err("Unknown link state %u/%u\n", l
->state
, evt
);
415 /* Perform actions as decided by FSM */
416 if (actions
& LINK_RESET
) {
417 l
->exec_mode
= TIPC_LINK_BLOCKED
;
418 rc
|= TIPC_LINK_DOWN_EVT
;
420 if (actions
& LINK_ACTIVATE
) {
421 l
->exec_mode
= TIPC_LINK_OPEN
;
422 rc
|= TIPC_LINK_UP_EVT
;
424 if (actions
& (SND_STATE
| SND_PROBE
))
426 if (actions
& SND_RESET
)
428 if (actions
& SND_ACTIVATE
)
430 if (actions
& (SND_PROBE
| SND_STATE
| SND_RESET
| SND_ACTIVATE
))
431 tipc_link_build_proto_msg(l
, mtyp
, actions
& SND_PROBE
,
433 if (actions
& SND_BCAST_SYNC
)
434 tipc_link_build_bcast_sync_msg(l
, xmitq
);
438 /* link_profile_stats - update statistical profiling of traffic
440 static void link_profile_stats(struct tipc_link
*l
)
443 struct tipc_msg
*msg
;
446 /* Update counters used in statistical profiling of send traffic */
447 l
->stats
.accu_queue_sz
+= skb_queue_len(&l
->transmq
);
448 l
->stats
.queue_sz_counts
++;
450 skb
= skb_peek(&l
->transmq
);
454 length
= msg_size(msg
);
456 if (msg_user(msg
) == MSG_FRAGMENTER
) {
457 if (msg_type(msg
) != FIRST_FRAGMENT
)
459 length
= msg_size(msg_get_wrapped(msg
));
461 l
->stats
.msg_lengths_total
+= length
;
462 l
->stats
.msg_length_counts
++;
464 l
->stats
.msg_length_profile
[0]++;
465 else if (length
<= 256)
466 l
->stats
.msg_length_profile
[1]++;
467 else if (length
<= 1024)
468 l
->stats
.msg_length_profile
[2]++;
469 else if (length
<= 4096)
470 l
->stats
.msg_length_profile
[3]++;
471 else if (length
<= 16384)
472 l
->stats
.msg_length_profile
[4]++;
473 else if (length
<= 32768)
474 l
->stats
.msg_length_profile
[5]++;
476 l
->stats
.msg_length_profile
[6]++;
479 /* tipc_link_timeout - perform periodic task as instructed from node timeout
481 int tipc_link_timeout(struct tipc_link
*l
, struct sk_buff_head
*xmitq
)
485 link_profile_stats(l
);
486 if (l
->silent_intv_cnt
)
487 rc
= tipc_link_fsm_evt(l
, SILENCE_EVT
, xmitq
);
488 else if (link_working(l
) && tipc_bclink_acks_missing(l
->owner
))
489 tipc_link_build_proto_msg(l
, STATE_MSG
, 0, 0, 0, 0, xmitq
);
490 l
->silent_intv_cnt
++;
495 * link_schedule_user - schedule a message sender for wakeup after congestion
496 * @link: congested link
497 * @list: message that was attempted sent
498 * Create pseudo msg to send back to user when congestion abates
499 * Does not consume buffer list
501 static int link_schedule_user(struct tipc_link
*link
, struct sk_buff_head
*list
)
503 struct tipc_msg
*msg
= buf_msg(skb_peek(list
));
504 int imp
= msg_importance(msg
);
505 u32 oport
= msg_origport(msg
);
506 u32 addr
= link_own_addr(link
);
509 /* This really cannot happen... */
510 if (unlikely(imp
> TIPC_CRITICAL_IMPORTANCE
)) {
511 pr_warn("%s<%s>, send queue full", link_rst_msg
, link
->name
);
514 /* Non-blocking sender: */
515 if (TIPC_SKB_CB(skb_peek(list
))->wakeup_pending
)
518 /* Create and schedule wakeup pseudo message */
519 skb
= tipc_msg_create(SOCK_WAKEUP
, 0, INT_H_SIZE
, 0,
520 addr
, addr
, oport
, 0, 0);
523 TIPC_SKB_CB(skb
)->chain_sz
= skb_queue_len(list
);
524 TIPC_SKB_CB(skb
)->chain_imp
= imp
;
525 skb_queue_tail(&link
->wakeupq
, skb
);
526 link
->stats
.link_congs
++;
531 * link_prepare_wakeup - prepare users for wakeup after congestion
532 * @link: congested link
533 * Move a number of waiting users, as permitted by available space in
534 * the send queue, from link wait queue to node wait queue for wakeup
536 void link_prepare_wakeup(struct tipc_link
*l
)
538 int pnd
[TIPC_SYSTEM_IMPORTANCE
+ 1] = {0,};
540 struct sk_buff
*skb
, *tmp
;
542 skb_queue_walk_safe(&l
->wakeupq
, skb
, tmp
) {
543 imp
= TIPC_SKB_CB(skb
)->chain_imp
;
544 lim
= l
->window
+ l
->backlog
[imp
].limit
;
545 pnd
[imp
] += TIPC_SKB_CB(skb
)->chain_sz
;
546 if ((pnd
[imp
] + l
->backlog
[imp
].len
) >= lim
)
548 skb_unlink(skb
, &l
->wakeupq
);
549 skb_queue_tail(l
->inputq
, skb
);
550 l
->owner
->inputq
= l
->inputq
;
551 l
->owner
->action_flags
|= TIPC_MSG_EVT
;
556 * tipc_link_reset_fragments - purge link's inbound message fragments queue
557 * @l_ptr: pointer to link
559 void tipc_link_reset_fragments(struct tipc_link
*l_ptr
)
561 kfree_skb(l_ptr
->reasm_buf
);
562 l_ptr
->reasm_buf
= NULL
;
565 void tipc_link_purge_backlog(struct tipc_link
*l
)
567 __skb_queue_purge(&l
->backlogq
);
568 l
->backlog
[TIPC_LOW_IMPORTANCE
].len
= 0;
569 l
->backlog
[TIPC_MEDIUM_IMPORTANCE
].len
= 0;
570 l
->backlog
[TIPC_HIGH_IMPORTANCE
].len
= 0;
571 l
->backlog
[TIPC_CRITICAL_IMPORTANCE
].len
= 0;
572 l
->backlog
[TIPC_SYSTEM_IMPORTANCE
].len
= 0;
576 * tipc_link_purge_queues - purge all pkt queues associated with link
577 * @l_ptr: pointer to link
579 void tipc_link_purge_queues(struct tipc_link
*l_ptr
)
581 __skb_queue_purge(&l_ptr
->deferdq
);
582 __skb_queue_purge(&l_ptr
->transmq
);
583 tipc_link_purge_backlog(l_ptr
);
584 tipc_link_reset_fragments(l_ptr
);
587 void tipc_link_reset(struct tipc_link
*l_ptr
)
589 u32 prev_state
= l_ptr
->state
;
590 int was_active_link
= tipc_link_is_active(l_ptr
);
591 struct tipc_node
*owner
= l_ptr
->owner
;
592 struct tipc_link
*pl
= tipc_parallel_link(l_ptr
);
594 msg_set_session(l_ptr
->pmsg
, ((msg_session(l_ptr
->pmsg
) + 1) & 0xffff));
596 /* Link is down, accept any session */
597 l_ptr
->peer_session
= WILDCARD_SESSION
;
599 /* Prepare for renewed mtu size negotiation */
600 l_ptr
->mtu
= l_ptr
->advertised_mtu
;
602 l_ptr
->state
= TIPC_LINK_RESETTING
;
604 if ((prev_state
== TIPC_LINK_RESETTING
) ||
605 (prev_state
== TIPC_LINK_ESTABLISHING
))
608 tipc_node_link_down(l_ptr
->owner
, l_ptr
->bearer_id
);
609 tipc_bearer_remove_dest(owner
->net
, l_ptr
->bearer_id
, l_ptr
->addr
);
611 if (was_active_link
&& tipc_node_is_up(l_ptr
->owner
) && (pl
!= l_ptr
)) {
612 l_ptr
->exec_mode
= TIPC_LINK_BLOCKED
;
613 l_ptr
->failover_checkpt
= l_ptr
->rcv_nxt
;
614 pl
->failover_pkts
= FIRST_FAILOVER
;
615 pl
->failover_checkpt
= l_ptr
->rcv_nxt
;
616 pl
->failover_skb
= l_ptr
->reasm_buf
;
618 kfree_skb(l_ptr
->reasm_buf
);
620 /* Clean up all queues, except inputq: */
621 __skb_queue_purge(&l_ptr
->transmq
);
622 __skb_queue_purge(&l_ptr
->deferdq
);
624 owner
->inputq
= l_ptr
->inputq
;
625 skb_queue_splice_init(&l_ptr
->wakeupq
, owner
->inputq
);
626 if (!skb_queue_empty(owner
->inputq
))
627 owner
->action_flags
|= TIPC_MSG_EVT
;
628 tipc_link_purge_backlog(l_ptr
);
629 l_ptr
->reasm_buf
= NULL
;
630 l_ptr
->rcv_unacked
= 0;
633 l_ptr
->silent_intv_cnt
= 0;
634 l_ptr
->stats
.recv_info
= 0;
635 l_ptr
->stale_count
= 0;
636 link_reset_statistics(l_ptr
);
639 void tipc_link_activate(struct tipc_link
*link
)
641 struct tipc_node
*node
= link
->owner
;
644 link
->stats
.recv_info
= 1;
645 link
->silent_intv_cnt
= 0;
646 link
->state
= TIPC_LINK_WORKING
;
647 link
->exec_mode
= TIPC_LINK_OPEN
;
648 tipc_node_link_up(node
, link
->bearer_id
);
649 tipc_bearer_add_dest(node
->net
, link
->bearer_id
, link
->addr
);
653 * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
655 * @list: chain of buffers containing message
657 * Consumes the buffer chain, except when returning an error code,
658 * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
659 * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
661 int __tipc_link_xmit(struct net
*net
, struct tipc_link
*link
,
662 struct sk_buff_head
*list
)
664 struct tipc_msg
*msg
= buf_msg(skb_peek(list
));
665 unsigned int maxwin
= link
->window
;
666 unsigned int i
, imp
= msg_importance(msg
);
667 uint mtu
= link
->mtu
;
668 u16 ack
= mod(link
->rcv_nxt
- 1);
669 u16 seqno
= link
->snd_nxt
;
670 u16 bc_last_in
= link
->owner
->bclink
.last_in
;
671 struct tipc_media_addr
*addr
= &link
->media_addr
;
672 struct sk_buff_head
*transmq
= &link
->transmq
;
673 struct sk_buff_head
*backlogq
= &link
->backlogq
;
674 struct sk_buff
*skb
, *bskb
;
676 /* Match msg importance against this and all higher backlog limits: */
677 for (i
= imp
; i
<= TIPC_SYSTEM_IMPORTANCE
; i
++) {
678 if (unlikely(link
->backlog
[i
].len
>= link
->backlog
[i
].limit
))
679 return link_schedule_user(link
, list
);
681 if (unlikely(msg_size(msg
) > mtu
))
684 /* Prepare each packet for sending, and add to relevant queue: */
685 while (skb_queue_len(list
)) {
686 skb
= skb_peek(list
);
688 msg_set_seqno(msg
, seqno
);
689 msg_set_ack(msg
, ack
);
690 msg_set_bcast_ack(msg
, bc_last_in
);
692 if (likely(skb_queue_len(transmq
) < maxwin
)) {
694 __skb_queue_tail(transmq
, skb
);
695 tipc_bearer_send(net
, link
->bearer_id
, skb
, addr
);
696 link
->rcv_unacked
= 0;
700 if (tipc_msg_bundle(skb_peek_tail(backlogq
), msg
, mtu
)) {
701 kfree_skb(__skb_dequeue(list
));
702 link
->stats
.sent_bundled
++;
705 if (tipc_msg_make_bundle(&bskb
, msg
, mtu
, link
->addr
)) {
706 kfree_skb(__skb_dequeue(list
));
707 __skb_queue_tail(backlogq
, bskb
);
708 link
->backlog
[msg_importance(buf_msg(bskb
))].len
++;
709 link
->stats
.sent_bundled
++;
710 link
->stats
.sent_bundles
++;
713 link
->backlog
[imp
].len
+= skb_queue_len(list
);
714 skb_queue_splice_tail_init(list
, backlogq
);
716 link
->snd_nxt
= seqno
;
721 * tipc_link_xmit(): enqueue buffer list according to queue situation
723 * @list: chain of buffers containing message
724 * @xmitq: returned list of packets to be sent by caller
726 * Consumes the buffer chain, except when returning -ELINKCONG,
727 * since the caller then may want to make more send attempts.
728 * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
729 * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
731 int tipc_link_xmit(struct tipc_link
*l
, struct sk_buff_head
*list
,
732 struct sk_buff_head
*xmitq
)
734 struct tipc_msg
*hdr
= buf_msg(skb_peek(list
));
735 unsigned int maxwin
= l
->window
;
736 unsigned int i
, imp
= msg_importance(hdr
);
737 unsigned int mtu
= l
->mtu
;
738 u16 ack
= l
->rcv_nxt
- 1;
739 u16 seqno
= l
->snd_nxt
;
740 u16 bc_last_in
= l
->owner
->bclink
.last_in
;
741 struct sk_buff_head
*transmq
= &l
->transmq
;
742 struct sk_buff_head
*backlogq
= &l
->backlogq
;
743 struct sk_buff
*skb
, *_skb
, *bskb
;
745 /* Match msg importance against this and all higher backlog limits: */
746 for (i
= imp
; i
<= TIPC_SYSTEM_IMPORTANCE
; i
++) {
747 if (unlikely(l
->backlog
[i
].len
>= l
->backlog
[i
].limit
))
748 return link_schedule_user(l
, list
);
750 if (unlikely(msg_size(hdr
) > mtu
))
753 /* Prepare each packet for sending, and add to relevant queue: */
754 while (skb_queue_len(list
)) {
755 skb
= skb_peek(list
);
757 msg_set_seqno(hdr
, seqno
);
758 msg_set_ack(hdr
, ack
);
759 msg_set_bcast_ack(hdr
, bc_last_in
);
761 if (likely(skb_queue_len(transmq
) < maxwin
)) {
762 _skb
= skb_clone(skb
, GFP_ATOMIC
);
766 __skb_queue_tail(transmq
, skb
);
767 __skb_queue_tail(xmitq
, _skb
);
772 if (tipc_msg_bundle(skb_peek_tail(backlogq
), hdr
, mtu
)) {
773 kfree_skb(__skb_dequeue(list
));
774 l
->stats
.sent_bundled
++;
777 if (tipc_msg_make_bundle(&bskb
, hdr
, mtu
, l
->addr
)) {
778 kfree_skb(__skb_dequeue(list
));
779 __skb_queue_tail(backlogq
, bskb
);
780 l
->backlog
[msg_importance(buf_msg(bskb
))].len
++;
781 l
->stats
.sent_bundled
++;
782 l
->stats
.sent_bundles
++;
785 l
->backlog
[imp
].len
+= skb_queue_len(list
);
786 skb_queue_splice_tail_init(list
, backlogq
);
792 static void skb2list(struct sk_buff
*skb
, struct sk_buff_head
*list
)
794 skb_queue_head_init(list
);
795 __skb_queue_tail(list
, skb
);
798 static int __tipc_link_xmit_skb(struct tipc_link
*link
, struct sk_buff
*skb
)
800 struct sk_buff_head head
;
802 skb2list(skb
, &head
);
803 return __tipc_link_xmit(link
->owner
->net
, link
, &head
);
807 * tipc_link_sync_rcv - synchronize broadcast link endpoints.
808 * Receive the sequence number where we should start receiving and
809 * acking broadcast packets from a newly added peer node, and open
810 * up for reception of such packets.
812 * Called with node locked
814 static void tipc_link_sync_rcv(struct tipc_node
*n
, struct sk_buff
*buf
)
816 struct tipc_msg
*msg
= buf_msg(buf
);
818 n
->bclink
.last_sent
= n
->bclink
.last_in
= msg_last_bcast(msg
);
819 n
->bclink
.recv_permitted
= true;
824 * tipc_link_push_packets - push unsent packets to bearer
826 * Push out the unsent messages of a link where congestion
827 * has abated. Node is locked.
829 * Called with node locked
831 void tipc_link_push_packets(struct tipc_link
*link
)
834 struct tipc_msg
*msg
;
835 u16 seqno
= link
->snd_nxt
;
836 u16 ack
= mod(link
->rcv_nxt
- 1);
838 while (skb_queue_len(&link
->transmq
) < link
->window
) {
839 skb
= __skb_dequeue(&link
->backlogq
);
843 link
->backlog
[msg_importance(msg
)].len
--;
844 msg_set_ack(msg
, ack
);
845 msg_set_seqno(msg
, seqno
);
846 seqno
= mod(seqno
+ 1);
847 msg_set_bcast_ack(msg
, link
->owner
->bclink
.last_in
);
848 link
->rcv_unacked
= 0;
849 __skb_queue_tail(&link
->transmq
, skb
);
850 tipc_bearer_send(link
->owner
->net
, link
->bearer_id
,
851 skb
, &link
->media_addr
);
853 link
->snd_nxt
= seqno
;
856 void tipc_link_advance_backlog(struct tipc_link
*l
, struct sk_buff_head
*xmitq
)
858 struct sk_buff
*skb
, *_skb
;
859 struct tipc_msg
*hdr
;
860 u16 seqno
= l
->snd_nxt
;
861 u16 ack
= l
->rcv_nxt
- 1;
863 while (skb_queue_len(&l
->transmq
) < l
->window
) {
864 skb
= skb_peek(&l
->backlogq
);
867 _skb
= skb_clone(skb
, GFP_ATOMIC
);
870 __skb_dequeue(&l
->backlogq
);
872 l
->backlog
[msg_importance(hdr
)].len
--;
873 __skb_queue_tail(&l
->transmq
, skb
);
874 __skb_queue_tail(xmitq
, _skb
);
875 msg_set_ack(hdr
, ack
);
876 msg_set_seqno(hdr
, seqno
);
877 msg_set_bcast_ack(hdr
, l
->owner
->bclink
.last_in
);
884 void tipc_link_reset_all(struct tipc_node
*node
)
886 char addr_string
[16];
889 tipc_node_lock(node
);
891 pr_warn("Resetting all links to %s\n",
892 tipc_addr_string_fill(addr_string
, node
->addr
));
894 for (i
= 0; i
< MAX_BEARERS
; i
++) {
895 if (node
->links
[i
].link
) {
896 link_print(node
->links
[i
].link
, "Resetting link\n");
897 tipc_link_reset(node
->links
[i
].link
);
901 tipc_node_unlock(node
);
904 static void link_retransmit_failure(struct tipc_link
*l_ptr
,
907 struct tipc_msg
*msg
= buf_msg(buf
);
908 struct net
*net
= l_ptr
->owner
->net
;
910 pr_warn("Retransmission failure on link <%s>\n", l_ptr
->name
);
913 /* Handle failure on standard link */
914 link_print(l_ptr
, "Resetting link ");
915 pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n",
916 msg_user(msg
), msg_type(msg
), msg_size(msg
),
918 pr_info("sqno %u, prev: %x, src: %x\n",
919 msg_seqno(msg
), msg_prevnode(msg
), msg_orignode(msg
));
920 tipc_link_reset(l_ptr
);
922 /* Handle failure on broadcast link */
923 struct tipc_node
*n_ptr
;
924 char addr_string
[16];
926 pr_info("Msg seq number: %u, ", msg_seqno(msg
));
927 pr_cont("Outstanding acks: %lu\n",
928 (unsigned long) TIPC_SKB_CB(buf
)->handle
);
930 n_ptr
= tipc_bclink_retransmit_to(net
);
932 tipc_addr_string_fill(addr_string
, n_ptr
->addr
);
933 pr_info("Broadcast link info for %s\n", addr_string
);
934 pr_info("Reception permitted: %d, Acked: %u\n",
935 n_ptr
->bclink
.recv_permitted
,
936 n_ptr
->bclink
.acked
);
937 pr_info("Last in: %u, Oos state: %u, Last sent: %u\n",
938 n_ptr
->bclink
.last_in
,
939 n_ptr
->bclink
.oos_state
,
940 n_ptr
->bclink
.last_sent
);
942 n_ptr
->action_flags
|= TIPC_BCAST_RESET
;
943 l_ptr
->stale_count
= 0;
947 void tipc_link_retransmit(struct tipc_link
*l_ptr
, struct sk_buff
*skb
,
950 struct tipc_msg
*msg
;
957 /* Detect repeated retransmit failures */
958 if (l_ptr
->last_retransm
== msg_seqno(msg
)) {
959 if (++l_ptr
->stale_count
> 100) {
960 link_retransmit_failure(l_ptr
, skb
);
964 l_ptr
->last_retransm
= msg_seqno(msg
);
965 l_ptr
->stale_count
= 1;
968 skb_queue_walk_from(&l_ptr
->transmq
, skb
) {
972 msg_set_ack(msg
, mod(l_ptr
->rcv_nxt
- 1));
973 msg_set_bcast_ack(msg
, l_ptr
->owner
->bclink
.last_in
);
974 tipc_bearer_send(l_ptr
->owner
->net
, l_ptr
->bearer_id
, skb
,
977 l_ptr
->stats
.retransmitted
++;
981 static int tipc_link_retransm(struct tipc_link
*l
, int retransm
,
982 struct sk_buff_head
*xmitq
)
984 struct sk_buff
*_skb
, *skb
= skb_peek(&l
->transmq
);
985 struct tipc_msg
*hdr
;
990 /* Detect repeated retransmit failures on same packet */
991 if (likely(l
->last_retransm
!= buf_seqno(skb
))) {
992 l
->last_retransm
= buf_seqno(skb
);
994 } else if (++l
->stale_count
> 100) {
995 link_retransmit_failure(l
, skb
);
996 return TIPC_LINK_DOWN_EVT
;
998 skb_queue_walk(&l
->transmq
, skb
) {
1002 _skb
= __pskb_copy(skb
, MIN_H_SIZE
, GFP_ATOMIC
);
1005 hdr
= buf_msg(_skb
);
1006 msg_set_ack(hdr
, l
->rcv_nxt
- 1);
1007 msg_set_bcast_ack(hdr
, l
->owner
->bclink
.last_in
);
1008 _skb
->priority
= TC_PRIO_CONTROL
;
1009 __skb_queue_tail(xmitq
, _skb
);
1011 l
->stats
.retransmitted
++;
1016 /* link_synch(): check if all packets arrived before the synch
1017 * point have been consumed
1018 * Returns true if the parallel links are synched, otherwise false
1020 static bool link_synch(struct tipc_link
*l
)
1022 unsigned int post_synch
;
1023 struct tipc_link
*pl
;
1025 pl
= tipc_parallel_link(l
);
1029 /* Was last pre-synch packet added to input queue ? */
1030 if (less_eq(pl
->rcv_nxt
, l
->synch_point
))
1033 /* Is it still in the input queue ? */
1034 post_synch
= mod(pl
->rcv_nxt
- l
->synch_point
) - 1;
1035 if (skb_queue_len(pl
->inputq
) > post_synch
)
1038 l
->exec_mode
= TIPC_LINK_OPEN
;
1042 /* tipc_data_input - deliver data and name distr msgs to upper layer
1044 * Consumes buffer if message is of right type
1045 * Node lock must be held
1047 static bool tipc_data_input(struct tipc_link
*link
, struct sk_buff
*skb
)
1049 struct tipc_node
*node
= link
->owner
;
1050 struct tipc_msg
*msg
= buf_msg(skb
);
1051 u32 dport
= msg_destport(msg
);
1053 switch (msg_user(msg
)) {
1054 case TIPC_LOW_IMPORTANCE
:
1055 case TIPC_MEDIUM_IMPORTANCE
:
1056 case TIPC_HIGH_IMPORTANCE
:
1057 case TIPC_CRITICAL_IMPORTANCE
:
1059 if (tipc_skb_queue_tail(link
->inputq
, skb
, dport
)) {
1060 node
->inputq
= link
->inputq
;
1061 node
->action_flags
|= TIPC_MSG_EVT
;
1064 case NAME_DISTRIBUTOR
:
1065 node
->bclink
.recv_permitted
= true;
1066 node
->namedq
= link
->namedq
;
1067 skb_queue_tail(link
->namedq
, skb
);
1068 if (skb_queue_len(link
->namedq
) == 1)
1069 node
->action_flags
|= TIPC_NAMED_MSG_EVT
;
1072 case TUNNEL_PROTOCOL
:
1073 case MSG_FRAGMENTER
:
1074 case BCAST_PROTOCOL
:
1077 pr_warn("Dropping received illegal msg type\n");
1083 /* tipc_link_input - process packet that has passed link protocol check
1086 * Node lock must be held
1088 static void tipc_link_input(struct tipc_link
*link
, struct sk_buff
*skb
)
1090 struct tipc_node
*node
= link
->owner
;
1091 struct tipc_msg
*msg
= buf_msg(skb
);
1092 struct sk_buff
*iskb
;
1095 switch (msg_user(msg
)) {
1096 case TUNNEL_PROTOCOL
:
1098 link
->exec_mode
= TIPC_LINK_TUNNEL
;
1099 link
->synch_point
= msg_seqno(msg_get_wrapped(msg
));
1103 if (!tipc_link_failover_rcv(link
, &skb
))
1105 if (msg_user(buf_msg(skb
)) != MSG_BUNDLER
) {
1106 tipc_data_input(link
, skb
);
1110 link
->stats
.recv_bundles
++;
1111 link
->stats
.recv_bundled
+= msg_msgcnt(msg
);
1113 while (tipc_msg_extract(skb
, &iskb
, &pos
))
1114 tipc_data_input(link
, iskb
);
1116 case MSG_FRAGMENTER
:
1117 link
->stats
.recv_fragments
++;
1118 if (tipc_buf_append(&link
->reasm_buf
, &skb
)) {
1119 link
->stats
.recv_fragmented
++;
1120 tipc_data_input(link
, skb
);
1121 } else if (!link
->reasm_buf
) {
1122 tipc_link_reset(link
);
1125 case BCAST_PROTOCOL
:
1126 tipc_link_sync_rcv(node
, skb
);
1133 static bool tipc_link_release_pkts(struct tipc_link
*l
, u16 acked
)
1135 bool released
= false;
1136 struct sk_buff
*skb
, *tmp
;
1138 skb_queue_walk_safe(&l
->transmq
, skb
, tmp
) {
1139 if (more(buf_seqno(skb
), acked
))
1141 __skb_unlink(skb
, &l
->transmq
);
1148 /* tipc_link_rcv - process TIPC packets/messages arriving from off-node
1149 * @link: the link that should handle the message
1151 * @xmitq: queue to place packets to be sent after this call
1153 int tipc_link_rcv(struct tipc_link
*l
, struct sk_buff
*skb
,
1154 struct sk_buff_head
*xmitq
)
1156 struct sk_buff_head
*arrvq
= &l
->deferdq
;
1157 struct sk_buff
*tmp
;
1158 struct tipc_msg
*hdr
;
1162 if (unlikely(!__tipc_skb_queue_sorted(arrvq
, skb
))) {
1163 if (!(skb_queue_len(arrvq
) % TIPC_NACK_INTV
))
1164 tipc_link_build_proto_msg(l
, STATE_MSG
, 0,
1169 skb_queue_walk_safe(arrvq
, skb
, tmp
) {
1172 /* Verify and update link state */
1173 if (unlikely(msg_user(hdr
) == LINK_PROTOCOL
)) {
1174 __skb_dequeue(arrvq
);
1175 rc
|= tipc_link_proto_rcv(l
, skb
, xmitq
);
1179 if (unlikely(!link_working(l
))) {
1180 rc
|= tipc_link_fsm_evt(l
, TRAFFIC_EVT
, xmitq
);
1181 if (!link_working(l
)) {
1182 kfree_skb(__skb_dequeue(arrvq
));
1187 l
->silent_intv_cnt
= 0;
1189 /* Forward queues and wake up waiting users */
1190 if (likely(tipc_link_release_pkts(l
, msg_ack(hdr
)))) {
1191 tipc_link_advance_backlog(l
, xmitq
);
1192 if (unlikely(!skb_queue_empty(&l
->wakeupq
)))
1193 link_prepare_wakeup(l
);
1196 /* Defer reception if there is a gap in the sequence */
1197 seqno
= msg_seqno(hdr
);
1198 rcv_nxt
= l
->rcv_nxt
;
1199 if (unlikely(less(rcv_nxt
, seqno
))) {
1200 l
->stats
.deferred_recv
++;
1204 __skb_dequeue(arrvq
);
1206 /* Drop if packet already received */
1207 if (unlikely(more(rcv_nxt
, seqno
))) {
1208 l
->stats
.duplicates
++;
1213 /* Synchronize with parallel link if applicable */
1214 if (unlikely(l
->exec_mode
== TIPC_LINK_TUNNEL
))
1215 if (!msg_dup(hdr
) && !link_synch(l
)) {
1220 /* Packet can be delivered */
1222 l
->stats
.recv_info
++;
1223 if (unlikely(!tipc_data_input(l
, skb
)))
1224 tipc_link_input(l
, skb
);
1226 /* Ack at regular intervals */
1227 if (unlikely(++l
->rcv_unacked
>= TIPC_MIN_LINK_WIN
)) {
1229 l
->stats
.sent_acks
++;
1230 tipc_link_build_proto_msg(l
, STATE_MSG
,
1238 * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue
1240 * Returns increase in queue length (i.e. 0 or 1)
1242 u32
tipc_link_defer_pkt(struct sk_buff_head
*list
, struct sk_buff
*skb
)
1244 struct sk_buff
*skb1
;
1245 u16 seq_no
= buf_seqno(skb
);
1248 if (skb_queue_empty(list
)) {
1249 __skb_queue_tail(list
, skb
);
1254 if (less(buf_seqno(skb_peek_tail(list
)), seq_no
)) {
1255 __skb_queue_tail(list
, skb
);
1259 /* Locate insertion point in queue, then insert; discard if duplicate */
1260 skb_queue_walk(list
, skb1
) {
1261 u16 curr_seqno
= buf_seqno(skb1
);
1263 if (seq_no
== curr_seqno
) {
1268 if (less(seq_no
, curr_seqno
))
1272 __skb_queue_before(list
, skb1
, skb
);
1277 * Send protocol message to the other endpoint.
1279 void tipc_link_proto_xmit(struct tipc_link
*l
, u32 msg_typ
, int probe_msg
,
1280 u32 gap
, u32 tolerance
, u32 priority
)
1282 struct sk_buff
*skb
= NULL
;
1283 struct sk_buff_head xmitq
;
1285 __skb_queue_head_init(&xmitq
);
1286 tipc_link_build_proto_msg(l
, msg_typ
, probe_msg
, gap
,
1287 tolerance
, priority
, &xmitq
);
1288 skb
= __skb_dequeue(&xmitq
);
1291 tipc_bearer_send(l
->owner
->net
, l
->bearer_id
, skb
, &l
->media_addr
);
1296 /* tipc_link_build_proto_msg: prepare link protocol message for transmission
1298 static void tipc_link_build_proto_msg(struct tipc_link
*l
, int mtyp
, bool probe
,
1299 u16 rcvgap
, int tolerance
, int priority
,
1300 struct sk_buff_head
*xmitq
)
1302 struct sk_buff
*skb
= NULL
;
1303 struct tipc_msg
*hdr
= l
->pmsg
;
1304 u16 snd_nxt
= l
->snd_nxt
;
1305 u16 rcv_nxt
= l
->rcv_nxt
;
1306 u16 rcv_last
= rcv_nxt
- 1;
1307 int node_up
= l
->owner
->bclink
.recv_permitted
;
1309 /* Don't send protocol message during reset or link failover */
1310 if (l
->exec_mode
== TIPC_LINK_BLOCKED
)
1313 msg_set_type(hdr
, mtyp
);
1314 msg_set_net_plane(hdr
, l
->net_plane
);
1315 msg_set_bcast_ack(hdr
, l
->owner
->bclink
.last_in
);
1316 msg_set_last_bcast(hdr
, tipc_bclink_get_last_sent(l
->owner
->net
));
1317 msg_set_link_tolerance(hdr
, tolerance
);
1318 msg_set_linkprio(hdr
, priority
);
1319 msg_set_redundant_link(hdr
, node_up
);
1320 msg_set_seq_gap(hdr
, 0);
1322 /* Compatibility: created msg must not be in sequence with pkt flow */
1323 msg_set_seqno(hdr
, snd_nxt
+ U16_MAX
/ 2);
1325 if (mtyp
== STATE_MSG
) {
1326 if (!tipc_link_is_up(l
))
1328 msg_set_next_sent(hdr
, snd_nxt
);
1330 /* Override rcvgap if there are packets in deferred queue */
1331 if (!skb_queue_empty(&l
->deferdq
))
1332 rcvgap
= buf_seqno(skb_peek(&l
->deferdq
)) - rcv_nxt
;
1334 msg_set_seq_gap(hdr
, rcvgap
);
1335 l
->stats
.sent_nacks
++;
1337 msg_set_ack(hdr
, rcv_last
);
1338 msg_set_probe(hdr
, probe
);
1340 l
->stats
.sent_probes
++;
1341 l
->stats
.sent_states
++;
1343 /* RESET_MSG or ACTIVATE_MSG */
1344 msg_set_max_pkt(hdr
, l
->advertised_mtu
);
1345 msg_set_ack(hdr
, l
->failover_checkpt
- 1);
1346 msg_set_next_sent(hdr
, 1);
1348 skb
= tipc_buf_acquire(msg_size(hdr
));
1351 skb_copy_to_linear_data(skb
, hdr
, msg_size(hdr
));
1352 skb
->priority
= TC_PRIO_CONTROL
;
1353 __skb_queue_head(xmitq
, skb
);
1356 /* tipc_link_tunnel_xmit(): Tunnel one packet via a link belonging to
1357 * a different bearer. Owner node is locked.
1359 static void tipc_link_tunnel_xmit(struct tipc_link
*l_ptr
,
1360 struct tipc_msg
*tunnel_hdr
,
1361 struct tipc_msg
*msg
,
1364 struct tipc_link
*tunnel
;
1365 struct sk_buff
*skb
;
1366 u32 length
= msg_size(msg
);
1368 tunnel
= node_active_link(l_ptr
->owner
, selector
& 1);
1369 if (!tipc_link_is_up(tunnel
)) {
1370 pr_warn("%stunnel link no longer available\n", link_co_err
);
1373 msg_set_size(tunnel_hdr
, length
+ INT_H_SIZE
);
1374 skb
= tipc_buf_acquire(length
+ INT_H_SIZE
);
1376 pr_warn("%sunable to send tunnel msg\n", link_co_err
);
1379 skb_copy_to_linear_data(skb
, tunnel_hdr
, INT_H_SIZE
);
1380 skb_copy_to_linear_data_offset(skb
, INT_H_SIZE
, msg
, length
);
1381 __tipc_link_xmit_skb(tunnel
, skb
);
1385 /* tipc_link_failover_send_queue(): A link has gone down, but a second
1386 * link is still active. We can do failover. Tunnel the failing link's
1387 * whole send queue via the remaining link. This way, we don't lose
1388 * any packets, and sequence order is preserved for subsequent traffic
1389 * sent over the remaining link. Owner node is locked.
1391 void tipc_link_failover_send_queue(struct tipc_link
*l_ptr
)
1394 struct tipc_link
*tunnel
= node_active_link(l_ptr
->owner
, 0);
1395 struct tipc_msg tunnel_hdr
;
1396 struct sk_buff
*skb
;
1402 tipc_msg_init(link_own_addr(l_ptr
), &tunnel_hdr
, TUNNEL_PROTOCOL
,
1403 FAILOVER_MSG
, INT_H_SIZE
, l_ptr
->addr
);
1405 skb_queue_walk(&l_ptr
->backlogq
, skb
) {
1406 msg_set_seqno(buf_msg(skb
), l_ptr
->snd_nxt
);
1407 l_ptr
->snd_nxt
= mod(l_ptr
->snd_nxt
+ 1);
1409 skb_queue_splice_tail_init(&l_ptr
->backlogq
, &l_ptr
->transmq
);
1410 tipc_link_purge_backlog(l_ptr
);
1411 msgcount
= skb_queue_len(&l_ptr
->transmq
);
1412 msg_set_bearer_id(&tunnel_hdr
, l_ptr
->peer_bearer_id
);
1413 msg_set_msgcnt(&tunnel_hdr
, msgcount
);
1415 if (skb_queue_empty(&l_ptr
->transmq
)) {
1416 skb
= tipc_buf_acquire(INT_H_SIZE
);
1418 skb_copy_to_linear_data(skb
, &tunnel_hdr
, INT_H_SIZE
);
1419 msg_set_size(&tunnel_hdr
, INT_H_SIZE
);
1420 __tipc_link_xmit_skb(tunnel
, skb
);
1422 pr_warn("%sunable to send changeover msg\n",
1428 split_bundles
= (node_active_link(l_ptr
->owner
, 0) !=
1429 node_active_link(l_ptr
->owner
, 0));
1431 skb_queue_walk(&l_ptr
->transmq
, skb
) {
1432 struct tipc_msg
*msg
= buf_msg(skb
);
1434 if ((msg_user(msg
) == MSG_BUNDLER
) && split_bundles
) {
1435 struct tipc_msg
*m
= msg_get_wrapped(msg
);
1436 unchar
*pos
= (unchar
*)m
;
1438 msgcount
= msg_msgcnt(msg
);
1439 while (msgcount
--) {
1440 msg_set_seqno(m
, msg_seqno(msg
));
1441 tipc_link_tunnel_xmit(l_ptr
, &tunnel_hdr
, m
,
1442 msg_link_selector(m
));
1443 pos
+= align(msg_size(m
));
1444 m
= (struct tipc_msg
*)pos
;
1447 tipc_link_tunnel_xmit(l_ptr
, &tunnel_hdr
, msg
,
1448 msg_link_selector(msg
));
1453 /* tipc_link_dup_queue_xmit(): A second link has become active. Tunnel a
1454 * duplicate of the first link's send queue via the new link. This way, we
1455 * are guaranteed that currently queued packets from a socket are delivered
1456 * before future traffic from the same socket, even if this is using the
1457 * new link. The last arriving copy of each duplicate packet is dropped at
1458 * the receiving end by the regular protocol check, so packet cardinality
1459 * and sequence order is preserved per sender/receiver socket pair.
1460 * Owner node is locked.
1462 void tipc_link_dup_queue_xmit(struct tipc_link
*link
,
1463 struct tipc_link
*tnl
)
1465 struct sk_buff
*skb
;
1466 struct tipc_msg tnl_hdr
;
1467 struct sk_buff_head
*queue
= &link
->transmq
;
1471 tipc_msg_init(link_own_addr(link
), &tnl_hdr
, TUNNEL_PROTOCOL
,
1472 SYNCH_MSG
, INT_H_SIZE
, link
->addr
);
1473 mcnt
= skb_queue_len(&link
->transmq
) + skb_queue_len(&link
->backlogq
);
1474 msg_set_msgcnt(&tnl_hdr
, mcnt
);
1475 msg_set_bearer_id(&tnl_hdr
, link
->peer_bearer_id
);
1478 skb_queue_walk(queue
, skb
) {
1479 struct sk_buff
*outskb
;
1480 struct tipc_msg
*msg
= buf_msg(skb
);
1481 u32 len
= msg_size(msg
);
1483 msg_set_ack(msg
, mod(link
->rcv_nxt
- 1));
1484 msg_set_bcast_ack(msg
, link
->owner
->bclink
.last_in
);
1485 msg_set_size(&tnl_hdr
, len
+ INT_H_SIZE
);
1486 outskb
= tipc_buf_acquire(len
+ INT_H_SIZE
);
1487 if (outskb
== NULL
) {
1488 pr_warn("%sunable to send duplicate msg\n",
1492 skb_copy_to_linear_data(outskb
, &tnl_hdr
, INT_H_SIZE
);
1493 skb_copy_to_linear_data_offset(outskb
, INT_H_SIZE
,
1495 __tipc_link_xmit_skb(tnl
, outskb
);
1496 if (!tipc_link_is_up(link
))
1499 if (queue
== &link
->backlogq
)
1501 seqno
= link
->snd_nxt
;
1502 skb_queue_walk(&link
->backlogq
, skb
) {
1503 msg_set_seqno(buf_msg(skb
), seqno
);
1504 seqno
= mod(seqno
+ 1);
1506 queue
= &link
->backlogq
;
1510 /* tipc_link_failover_rcv(): Receive a tunnelled FAILOVER_MSG packet
1511 * Owner node is locked.
1513 static bool tipc_link_failover_rcv(struct tipc_link
*link
,
1514 struct sk_buff
**skb
)
1516 struct tipc_msg
*msg
= buf_msg(*skb
);
1517 struct sk_buff
*iskb
= NULL
;
1518 struct tipc_link
*pl
= NULL
;
1519 int bearer_id
= msg_bearer_id(msg
);
1522 if (msg_type(msg
) != FAILOVER_MSG
) {
1523 pr_warn("%sunknown tunnel pkt received\n", link_co_err
);
1526 if (bearer_id
>= MAX_BEARERS
)
1529 if (bearer_id
== link
->bearer_id
)
1532 pl
= link
->owner
->links
[bearer_id
].link
;
1533 if (pl
&& tipc_link_is_up(pl
))
1534 tipc_link_reset(pl
);
1536 if (link
->failover_pkts
== FIRST_FAILOVER
)
1537 link
->failover_pkts
= msg_msgcnt(msg
);
1539 /* Should we expect an inner packet? */
1540 if (!link
->failover_pkts
)
1543 if (!tipc_msg_extract(*skb
, &iskb
, &pos
)) {
1544 pr_warn("%sno inner failover pkt\n", link_co_err
);
1548 link
->failover_pkts
--;
1551 /* Was this packet already delivered? */
1552 if (less(buf_seqno(iskb
), link
->failover_checkpt
)) {
1557 if (msg_user(buf_msg(iskb
)) == MSG_FRAGMENTER
) {
1558 link
->stats
.recv_fragments
++;
1559 tipc_buf_append(&link
->failover_skb
, &iskb
);
1562 if (!link
->failover_pkts
&& pl
)
1563 pl
->exec_mode
= TIPC_LINK_OPEN
;
1569 /* tipc_link_proto_rcv(): receive link level protocol message :
1570 * Note that network plane id propagates through the network, and may
1571 * change at any time. The node with lowest numerical id determines
1574 static int tipc_link_proto_rcv(struct tipc_link
*l
, struct sk_buff
*skb
,
1575 struct sk_buff_head
*xmitq
)
1577 struct tipc_msg
*hdr
= buf_msg(skb
);
1579 u16 nacked_gap
= msg_seq_gap(hdr
);
1580 u16 peers_snd_nxt
= msg_next_sent(hdr
);
1581 u16 peers_tol
= msg_link_tolerance(hdr
);
1582 u16 peers_prio
= msg_linkprio(hdr
);
1586 if (l
->exec_mode
== TIPC_LINK_BLOCKED
)
1589 if (link_own_addr(l
) > msg_prevnode(hdr
))
1590 l
->net_plane
= msg_net_plane(hdr
);
1592 switch (msg_type(hdr
)) {
1595 /* Ignore duplicate RESET with old session number */
1596 if ((less_eq(msg_session(hdr
), l
->peer_session
)) &&
1597 (l
->peer_session
!= WILDCARD_SESSION
))
1602 /* Complete own link name with peer's interface name */
1603 if_name
= strrchr(l
->name
, ':') + 1;
1604 if (sizeof(l
->name
) - (if_name
- l
->name
) <= TIPC_MAX_IF_NAME
)
1606 if (msg_data_sz(hdr
) < TIPC_MAX_IF_NAME
)
1608 strncpy(if_name
, msg_data(hdr
), TIPC_MAX_IF_NAME
);
1610 /* Update own tolerance if peer indicates a non-zero value */
1611 if (in_range(peers_tol
, TIPC_MIN_LINK_TOL
, TIPC_MAX_LINK_TOL
))
1612 l
->tolerance
= peers_tol
;
1614 /* Update own priority if peer's priority is higher */
1615 if (in_range(peers_prio
, l
->priority
+ 1, TIPC_MAX_LINK_PRI
))
1616 l
->priority
= peers_prio
;
1618 l
->peer_session
= msg_session(hdr
);
1619 l
->peer_bearer_id
= msg_bearer_id(hdr
);
1620 rc
= tipc_link_fsm_evt(l
, msg_type(hdr
), xmitq
);
1621 if (l
->mtu
> msg_max_pkt(hdr
))
1622 l
->mtu
= msg_max_pkt(hdr
);
1625 /* Update own tolerance if peer indicates a non-zero value */
1626 if (in_range(peers_tol
, TIPC_MIN_LINK_TOL
, TIPC_MAX_LINK_TOL
))
1627 l
->tolerance
= peers_tol
;
1629 l
->silent_intv_cnt
= 0;
1630 l
->stats
.recv_states
++;
1632 l
->stats
.recv_probes
++;
1633 rc
= tipc_link_fsm_evt(l
, TRAFFIC_EVT
, xmitq
);
1634 if (!tipc_link_is_up(l
))
1637 /* Has peer sent packets we haven't received yet ? */
1638 if (more(peers_snd_nxt
, l
->rcv_nxt
))
1639 rcvgap
= peers_snd_nxt
- l
->rcv_nxt
;
1640 if (rcvgap
|| (msg_probe(hdr
)))
1641 tipc_link_build_proto_msg(l
, STATE_MSG
, 0, rcvgap
,
1643 tipc_link_release_pkts(l
, msg_ack(hdr
));
1645 /* If NACK, retransmit will now start at right position */
1647 rc
|= tipc_link_retransm(l
, nacked_gap
, xmitq
);
1648 l
->stats
.recv_nacks
++;
1650 tipc_link_advance_backlog(l
, xmitq
);
1651 if (unlikely(!skb_queue_empty(&l
->wakeupq
)))
1652 link_prepare_wakeup(l
);
1659 void tipc_link_set_queue_limits(struct tipc_link
*l
, u32 win
)
1661 int max_bulk
= TIPC_MAX_PUBLICATIONS
/ (l
->mtu
/ ITEM_SIZE
);
1664 l
->backlog
[TIPC_LOW_IMPORTANCE
].limit
= win
/ 2;
1665 l
->backlog
[TIPC_MEDIUM_IMPORTANCE
].limit
= win
;
1666 l
->backlog
[TIPC_HIGH_IMPORTANCE
].limit
= win
/ 2 * 3;
1667 l
->backlog
[TIPC_CRITICAL_IMPORTANCE
].limit
= win
* 2;
1668 l
->backlog
[TIPC_SYSTEM_IMPORTANCE
].limit
= max_bulk
;
1671 /* tipc_link_find_owner - locate owner node of link by link's name
1672 * @net: the applicable net namespace
1673 * @name: pointer to link name string
1674 * @bearer_id: pointer to index in 'node->links' array where the link was found.
1676 * Returns pointer to node owning the link, or 0 if no matching link is found.
1678 static struct tipc_node
*tipc_link_find_owner(struct net
*net
,
1679 const char *link_name
,
1680 unsigned int *bearer_id
)
1682 struct tipc_net
*tn
= net_generic(net
, tipc_net_id
);
1683 struct tipc_link
*l_ptr
;
1684 struct tipc_node
*n_ptr
;
1685 struct tipc_node
*found_node
= NULL
;
1690 list_for_each_entry_rcu(n_ptr
, &tn
->node_list
, list
) {
1691 tipc_node_lock(n_ptr
);
1692 for (i
= 0; i
< MAX_BEARERS
; i
++) {
1693 l_ptr
= n_ptr
->links
[i
].link
;
1694 if (l_ptr
&& !strcmp(l_ptr
->name
, link_name
)) {
1700 tipc_node_unlock(n_ptr
);
1710 * link_reset_statistics - reset link statistics
1711 * @l_ptr: pointer to link
1713 static void link_reset_statistics(struct tipc_link
*l_ptr
)
1715 memset(&l_ptr
->stats
, 0, sizeof(l_ptr
->stats
));
1716 l_ptr
->stats
.sent_info
= l_ptr
->snd_nxt
;
1717 l_ptr
->stats
.recv_info
= l_ptr
->rcv_nxt
;
1720 static void link_print(struct tipc_link
*l
, const char *str
)
1722 struct sk_buff
*hskb
= skb_peek(&l
->transmq
);
1723 u16 head
= hskb
? msg_seqno(buf_msg(hskb
)) : l
->snd_nxt
;
1724 u16 tail
= l
->snd_nxt
- 1;
1726 pr_info("%s Link <%s>:", str
, l
->name
);
1728 if (link_probing(l
))
1730 else if (link_establishing(l
))
1732 else if (link_resetting(l
))
1734 else if (link_working(l
))
1739 pr_info("XMTQ: %u [%u-%u], BKLGQ: %u, SNDNX: %u, RCVNX: %u\n",
1740 skb_queue_len(&l
->transmq
), head
, tail
,
1741 skb_queue_len(&l
->backlogq
), l
->snd_nxt
, l
->rcv_nxt
);
1744 /* Parse and validate nested (link) properties valid for media, bearer and link
1746 int tipc_nl_parse_link_prop(struct nlattr
*prop
, struct nlattr
*props
[])
1750 err
= nla_parse_nested(props
, TIPC_NLA_PROP_MAX
, prop
,
1751 tipc_nl_prop_policy
);
1755 if (props
[TIPC_NLA_PROP_PRIO
]) {
1758 prio
= nla_get_u32(props
[TIPC_NLA_PROP_PRIO
]);
1759 if (prio
> TIPC_MAX_LINK_PRI
)
1763 if (props
[TIPC_NLA_PROP_TOL
]) {
1766 tol
= nla_get_u32(props
[TIPC_NLA_PROP_TOL
]);
1767 if ((tol
< TIPC_MIN_LINK_TOL
) || (tol
> TIPC_MAX_LINK_TOL
))
1771 if (props
[TIPC_NLA_PROP_WIN
]) {
1774 win
= nla_get_u32(props
[TIPC_NLA_PROP_WIN
]);
1775 if ((win
< TIPC_MIN_LINK_WIN
) || (win
> TIPC_MAX_LINK_WIN
))
1782 int tipc_nl_link_set(struct sk_buff
*skb
, struct genl_info
*info
)
1788 struct tipc_link
*link
;
1789 struct tipc_node
*node
;
1790 struct nlattr
*attrs
[TIPC_NLA_LINK_MAX
+ 1];
1791 struct net
*net
= sock_net(skb
->sk
);
1793 if (!info
->attrs
[TIPC_NLA_LINK
])
1796 err
= nla_parse_nested(attrs
, TIPC_NLA_LINK_MAX
,
1797 info
->attrs
[TIPC_NLA_LINK
],
1798 tipc_nl_link_policy
);
1802 if (!attrs
[TIPC_NLA_LINK_NAME
])
1805 name
= nla_data(attrs
[TIPC_NLA_LINK_NAME
]);
1807 if (strcmp(name
, tipc_bclink_name
) == 0)
1808 return tipc_nl_bc_link_set(net
, attrs
);
1810 node
= tipc_link_find_owner(net
, name
, &bearer_id
);
1814 tipc_node_lock(node
);
1816 link
= node
->links
[bearer_id
].link
;
1822 if (attrs
[TIPC_NLA_LINK_PROP
]) {
1823 struct nlattr
*props
[TIPC_NLA_PROP_MAX
+ 1];
1825 err
= tipc_nl_parse_link_prop(attrs
[TIPC_NLA_LINK_PROP
],
1832 if (props
[TIPC_NLA_PROP_TOL
]) {
1835 tol
= nla_get_u32(props
[TIPC_NLA_PROP_TOL
]);
1836 link
->tolerance
= tol
;
1837 tipc_link_proto_xmit(link
, STATE_MSG
, 0, 0, tol
, 0);
1839 if (props
[TIPC_NLA_PROP_PRIO
]) {
1842 prio
= nla_get_u32(props
[TIPC_NLA_PROP_PRIO
]);
1843 link
->priority
= prio
;
1844 tipc_link_proto_xmit(link
, STATE_MSG
, 0, 0, 0, prio
);
1846 if (props
[TIPC_NLA_PROP_WIN
]) {
1849 win
= nla_get_u32(props
[TIPC_NLA_PROP_WIN
]);
1850 tipc_link_set_queue_limits(link
, win
);
1855 tipc_node_unlock(node
);
1860 static int __tipc_nl_add_stats(struct sk_buff
*skb
, struct tipc_stats
*s
)
1863 struct nlattr
*stats
;
1870 struct nla_map map
[] = {
1871 {TIPC_NLA_STATS_RX_INFO
, s
->recv_info
},
1872 {TIPC_NLA_STATS_RX_FRAGMENTS
, s
->recv_fragments
},
1873 {TIPC_NLA_STATS_RX_FRAGMENTED
, s
->recv_fragmented
},
1874 {TIPC_NLA_STATS_RX_BUNDLES
, s
->recv_bundles
},
1875 {TIPC_NLA_STATS_RX_BUNDLED
, s
->recv_bundled
},
1876 {TIPC_NLA_STATS_TX_INFO
, s
->sent_info
},
1877 {TIPC_NLA_STATS_TX_FRAGMENTS
, s
->sent_fragments
},
1878 {TIPC_NLA_STATS_TX_FRAGMENTED
, s
->sent_fragmented
},
1879 {TIPC_NLA_STATS_TX_BUNDLES
, s
->sent_bundles
},
1880 {TIPC_NLA_STATS_TX_BUNDLED
, s
->sent_bundled
},
1881 {TIPC_NLA_STATS_MSG_PROF_TOT
, (s
->msg_length_counts
) ?
1882 s
->msg_length_counts
: 1},
1883 {TIPC_NLA_STATS_MSG_LEN_CNT
, s
->msg_length_counts
},
1884 {TIPC_NLA_STATS_MSG_LEN_TOT
, s
->msg_lengths_total
},
1885 {TIPC_NLA_STATS_MSG_LEN_P0
, s
->msg_length_profile
[0]},
1886 {TIPC_NLA_STATS_MSG_LEN_P1
, s
->msg_length_profile
[1]},
1887 {TIPC_NLA_STATS_MSG_LEN_P2
, s
->msg_length_profile
[2]},
1888 {TIPC_NLA_STATS_MSG_LEN_P3
, s
->msg_length_profile
[3]},
1889 {TIPC_NLA_STATS_MSG_LEN_P4
, s
->msg_length_profile
[4]},
1890 {TIPC_NLA_STATS_MSG_LEN_P5
, s
->msg_length_profile
[5]},
1891 {TIPC_NLA_STATS_MSG_LEN_P6
, s
->msg_length_profile
[6]},
1892 {TIPC_NLA_STATS_RX_STATES
, s
->recv_states
},
1893 {TIPC_NLA_STATS_RX_PROBES
, s
->recv_probes
},
1894 {TIPC_NLA_STATS_RX_NACKS
, s
->recv_nacks
},
1895 {TIPC_NLA_STATS_RX_DEFERRED
, s
->deferred_recv
},
1896 {TIPC_NLA_STATS_TX_STATES
, s
->sent_states
},
1897 {TIPC_NLA_STATS_TX_PROBES
, s
->sent_probes
},
1898 {TIPC_NLA_STATS_TX_NACKS
, s
->sent_nacks
},
1899 {TIPC_NLA_STATS_TX_ACKS
, s
->sent_acks
},
1900 {TIPC_NLA_STATS_RETRANSMITTED
, s
->retransmitted
},
1901 {TIPC_NLA_STATS_DUPLICATES
, s
->duplicates
},
1902 {TIPC_NLA_STATS_LINK_CONGS
, s
->link_congs
},
1903 {TIPC_NLA_STATS_MAX_QUEUE
, s
->max_queue_sz
},
1904 {TIPC_NLA_STATS_AVG_QUEUE
, s
->queue_sz_counts
?
1905 (s
->accu_queue_sz
/ s
->queue_sz_counts
) : 0}
1908 stats
= nla_nest_start(skb
, TIPC_NLA_LINK_STATS
);
1912 for (i
= 0; i
< ARRAY_SIZE(map
); i
++)
1913 if (nla_put_u32(skb
, map
[i
].key
, map
[i
].val
))
1916 nla_nest_end(skb
, stats
);
1920 nla_nest_cancel(skb
, stats
);
1925 /* Caller should hold appropriate locks to protect the link */
1926 static int __tipc_nl_add_link(struct net
*net
, struct tipc_nl_msg
*msg
,
1927 struct tipc_link
*link
, int nlflags
)
1931 struct nlattr
*attrs
;
1932 struct nlattr
*prop
;
1933 struct tipc_net
*tn
= net_generic(net
, tipc_net_id
);
1935 hdr
= genlmsg_put(msg
->skb
, msg
->portid
, msg
->seq
, &tipc_genl_family
,
1936 nlflags
, TIPC_NL_LINK_GET
);
1940 attrs
= nla_nest_start(msg
->skb
, TIPC_NLA_LINK
);
1944 if (nla_put_string(msg
->skb
, TIPC_NLA_LINK_NAME
, link
->name
))
1946 if (nla_put_u32(msg
->skb
, TIPC_NLA_LINK_DEST
,
1947 tipc_cluster_mask(tn
->own_addr
)))
1949 if (nla_put_u32(msg
->skb
, TIPC_NLA_LINK_MTU
, link
->mtu
))
1951 if (nla_put_u32(msg
->skb
, TIPC_NLA_LINK_RX
, link
->rcv_nxt
))
1953 if (nla_put_u32(msg
->skb
, TIPC_NLA_LINK_TX
, link
->snd_nxt
))
1956 if (tipc_link_is_up(link
))
1957 if (nla_put_flag(msg
->skb
, TIPC_NLA_LINK_UP
))
1959 if (tipc_link_is_active(link
))
1960 if (nla_put_flag(msg
->skb
, TIPC_NLA_LINK_ACTIVE
))
1963 prop
= nla_nest_start(msg
->skb
, TIPC_NLA_LINK_PROP
);
1966 if (nla_put_u32(msg
->skb
, TIPC_NLA_PROP_PRIO
, link
->priority
))
1968 if (nla_put_u32(msg
->skb
, TIPC_NLA_PROP_TOL
, link
->tolerance
))
1970 if (nla_put_u32(msg
->skb
, TIPC_NLA_PROP_WIN
,
1973 if (nla_put_u32(msg
->skb
, TIPC_NLA_PROP_PRIO
, link
->priority
))
1975 nla_nest_end(msg
->skb
, prop
);
1977 err
= __tipc_nl_add_stats(msg
->skb
, &link
->stats
);
1981 nla_nest_end(msg
->skb
, attrs
);
1982 genlmsg_end(msg
->skb
, hdr
);
1987 nla_nest_cancel(msg
->skb
, prop
);
1989 nla_nest_cancel(msg
->skb
, attrs
);
1991 genlmsg_cancel(msg
->skb
, hdr
);
1996 /* Caller should hold node lock */
1997 static int __tipc_nl_add_node_links(struct net
*net
, struct tipc_nl_msg
*msg
,
1998 struct tipc_node
*node
, u32
*prev_link
)
2003 for (i
= *prev_link
; i
< MAX_BEARERS
; i
++) {
2006 if (!node
->links
[i
].link
)
2009 err
= __tipc_nl_add_link(net
, msg
,
2010 node
->links
[i
].link
, NLM_F_MULTI
);
2019 int tipc_nl_link_dump(struct sk_buff
*skb
, struct netlink_callback
*cb
)
2021 struct net
*net
= sock_net(skb
->sk
);
2022 struct tipc_net
*tn
= net_generic(net
, tipc_net_id
);
2023 struct tipc_node
*node
;
2024 struct tipc_nl_msg msg
;
2025 u32 prev_node
= cb
->args
[0];
2026 u32 prev_link
= cb
->args
[1];
2027 int done
= cb
->args
[2];
2034 msg
.portid
= NETLINK_CB(cb
->skb
).portid
;
2035 msg
.seq
= cb
->nlh
->nlmsg_seq
;
2039 node
= tipc_node_find(net
, prev_node
);
2041 /* We never set seq or call nl_dump_check_consistent()
2042 * this means that setting prev_seq here will cause the
2043 * consistence check to fail in the netlink callback
2044 * handler. Resulting in the last NLMSG_DONE message
2045 * having the NLM_F_DUMP_INTR flag set.
2050 tipc_node_put(node
);
2052 list_for_each_entry_continue_rcu(node
, &tn
->node_list
,
2054 tipc_node_lock(node
);
2055 err
= __tipc_nl_add_node_links(net
, &msg
, node
,
2057 tipc_node_unlock(node
);
2061 prev_node
= node
->addr
;
2064 err
= tipc_nl_add_bc_link(net
, &msg
);
2068 list_for_each_entry_rcu(node
, &tn
->node_list
, list
) {
2069 tipc_node_lock(node
);
2070 err
= __tipc_nl_add_node_links(net
, &msg
, node
,
2072 tipc_node_unlock(node
);
2076 prev_node
= node
->addr
;
2083 cb
->args
[0] = prev_node
;
2084 cb
->args
[1] = prev_link
;
2090 int tipc_nl_link_get(struct sk_buff
*skb
, struct genl_info
*info
)
2092 struct net
*net
= genl_info_net(info
);
2093 struct tipc_nl_msg msg
;
2097 msg
.portid
= info
->snd_portid
;
2098 msg
.seq
= info
->snd_seq
;
2100 if (!info
->attrs
[TIPC_NLA_LINK_NAME
])
2102 name
= nla_data(info
->attrs
[TIPC_NLA_LINK_NAME
]);
2104 msg
.skb
= nlmsg_new(NLMSG_GOODSIZE
, GFP_KERNEL
);
2108 if (strcmp(name
, tipc_bclink_name
) == 0) {
2109 err
= tipc_nl_add_bc_link(net
, &msg
);
2111 nlmsg_free(msg
.skb
);
2116 struct tipc_node
*node
;
2117 struct tipc_link
*link
;
2119 node
= tipc_link_find_owner(net
, name
, &bearer_id
);
2123 tipc_node_lock(node
);
2124 link
= node
->links
[bearer_id
].link
;
2126 tipc_node_unlock(node
);
2127 nlmsg_free(msg
.skb
);
2131 err
= __tipc_nl_add_link(net
, &msg
, link
, 0);
2132 tipc_node_unlock(node
);
2134 nlmsg_free(msg
.skb
);
2139 return genlmsg_reply(msg
.skb
, info
);
2142 int tipc_nl_link_reset_stats(struct sk_buff
*skb
, struct genl_info
*info
)
2146 unsigned int bearer_id
;
2147 struct tipc_link
*link
;
2148 struct tipc_node
*node
;
2149 struct nlattr
*attrs
[TIPC_NLA_LINK_MAX
+ 1];
2150 struct net
*net
= sock_net(skb
->sk
);
2152 if (!info
->attrs
[TIPC_NLA_LINK
])
2155 err
= nla_parse_nested(attrs
, TIPC_NLA_LINK_MAX
,
2156 info
->attrs
[TIPC_NLA_LINK
],
2157 tipc_nl_link_policy
);
2161 if (!attrs
[TIPC_NLA_LINK_NAME
])
2164 link_name
= nla_data(attrs
[TIPC_NLA_LINK_NAME
]);
2166 if (strcmp(link_name
, tipc_bclink_name
) == 0) {
2167 err
= tipc_bclink_reset_stats(net
);
2173 node
= tipc_link_find_owner(net
, link_name
, &bearer_id
);
2177 tipc_node_lock(node
);
2179 link
= node
->links
[bearer_id
].link
;
2181 tipc_node_unlock(node
);
2185 link_reset_statistics(link
);
2187 tipc_node_unlock(node
);