2 * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
6 * This file contains code imported from the OFED rds source file cong.c
7 * Oracle elects to have and use the contents of cong.c under and governed
8 * by the OpenIB.org BSD license (see below for full license text). However,
9 * the following notice accompanied the original version of this file:
14 * Copyright (c) 2007 Oracle. All rights reserved.
16 * This software is available to you under a choice of one of two
17 * licenses. You may choose to be licensed under the terms of the GNU
18 * General Public License (GPL) Version 2, available from the file
19 * COPYING in the main directory of this source tree, or the
20 * OpenIB.org BSD license below:
22 * Redistribution and use in source and binary forms, with or
23 * without modification, are permitted provided that the following
26 * - Redistributions of source code must retain the above
27 * copyright notice, this list of conditions and the following
30 * - Redistributions in binary form must reproduce the above
31 * copyright notice, this list of conditions and the following
32 * disclaimer in the documentation and/or other materials
33 * provided with the distribution.
35 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
36 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
37 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
38 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
39 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
40 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
41 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
47 #include <sys/ib/clients/rdsv3/rdsv3.h>
48 #include <sys/ib/clients/rdsv3/rdsv3_impl.h>
49 #include <sys/ib/clients/rdsv3/rdsv3_debug.h>
52 * This file implements the receive side of the unconventional congestion
55 * Messages waiting in the receive queue on the receiving socket are accounted
56 * against the sockets SO_RCVBUF option value. Only the payload bytes in the
57 * message are accounted for. If the number of bytes queued equals or exceeds
58 * rcvbuf then the socket is congested. All sends attempted to this socket's
59 * address should return block or return -EWOULDBLOCK.
61 * Applications are expected to be reasonably tuned such that this situation
62 * very rarely occurs. An application encountering this "back-pressure" is
65 * This is implemented by having each node maintain bitmaps which indicate
66 * which ports on bound addresses are congested. As the bitmap changes it is
67 * sent through all the connections which terminate in the local address of the
68 * bitmap which changed.
70 * The bitmaps are allocated as connections are brought up. This avoids
71 * allocation in the interrupt handling path which queues messages on sockets.
72 * The dense bitmaps let transports send the entire bitmap on any bitmap change
73 * reasonably efficiently. This is much easier to implement than some
74 * finer-grained communication of per-port congestion. The sender does a very
75 * inexpensive bit test to test if the port it's about to send to is congested
80 * Interaction with poll is a tad tricky. We want all processes stuck in
81 * poll to wake up and check whether a congested destination became uncongested.
82 * The really sad thing is we have no idea which destinations the application
83 * wants to send to - we don't even know which rdsv3_connections are involved.
84 * So until we implement a more flexible rds poll interface, we have to make
86 * We maintain a global counter that is incremented each time a congestion map
87 * update is received. Each rds socket tracks this value, and if rdsv3_poll
88 * finds that the saved generation number is smaller than the global generation
89 * number, it wakes up the process.
91 static atomic_t rdsv3_cong_generation
= ATOMIC_INIT(0);
94 * Congestion monitoring
96 static struct list rdsv3_cong_monitor
;
97 static krwlock_t rdsv3_cong_monitor_lock
;
100 * Yes, a global lock. It's used so infrequently that it's worth keeping it
101 * global to simplify the locking. It's only used in the following
104 * - on connection buildup to associate a conn with its maps
105 * - on map changes to inform conns of a new map to send
107 * It's sadly ordered under the socket callback lock and the connection lock.
108 * Receive paths can mark ports congested from interrupt context so the
109 * lock masks interrupts.
111 static kmutex_t rdsv3_cong_lock
;
112 static struct avl_tree rdsv3_cong_tree
;
114 static struct rdsv3_cong_map
*
115 rdsv3_cong_tree_walk(uint32_be_t addr
, struct rdsv3_cong_map
*insert
)
117 struct rdsv3_cong_map
*map
;
121 map
= avl_find(&rdsv3_cong_tree
, insert
, &where
);
123 avl_insert(&rdsv3_cong_tree
, insert
, where
);
127 struct rdsv3_cong_map map1
;
129 map
= avl_find(&rdsv3_cong_tree
, &map1
, &where
);
136 * There is only ever one bitmap for any address. Connections try and allocate
137 * these bitmaps in the process getting pointers to them. The bitmaps are only
138 * ever freed as the module is removed after all connections have been freed.
140 static struct rdsv3_cong_map
*
141 rdsv3_cong_from_addr(uint32_be_t addr
)
143 struct rdsv3_cong_map
*map
;
144 struct rdsv3_cong_map
*ret
= NULL
;
148 RDSV3_DPRINTF4("rdsv3_cong_from_addr", "Enter(addr: %x)", ntohl(addr
));
150 map
= kmem_zalloc(sizeof (struct rdsv3_cong_map
), KM_NOSLEEP
);
155 rdsv3_init_waitqueue(&map
->m_waitq
);
156 list_create(&map
->m_conn_list
, sizeof (struct rdsv3_connection
),
157 offsetof(struct rdsv3_connection
, c_map_item
));
159 for (i
= 0; i
< RDSV3_CONG_MAP_PAGES
; i
++) {
160 zp
= (unsigned long)kmem_zalloc(PAGE_SIZE
, KM_NOSLEEP
);
163 map
->m_page_addrs
[i
] = zp
;
166 mutex_enter(&rdsv3_cong_lock
);
167 ret
= rdsv3_cong_tree_walk(addr
, map
);
168 mutex_exit(&rdsv3_cong_lock
);
177 for (i
= 0; i
< RDSV3_CONG_MAP_PAGES
&& map
->m_page_addrs
[i
];
179 kmem_free((void *)map
->m_page_addrs
[i
], PAGE_SIZE
);
180 kmem_free(map
, sizeof (*map
));
183 RDSV3_DPRINTF5("rdsv3_cong_from_addr", "map %p for addr %x",
190 * Put the conn on its local map's list. This is called when the conn is
191 * really added to the hash. It's nested under the rdsv3_conn_lock, sadly.
194 rdsv3_cong_add_conn(struct rdsv3_connection
*conn
)
196 RDSV3_DPRINTF4("rdsv3_cong_add_conn", "Enter(conn: %p)", conn
);
198 RDSV3_DPRINTF5("rdsv3_cong_add_conn", "conn %p now on map %p",
199 conn
, conn
->c_lcong
);
200 mutex_enter(&rdsv3_cong_lock
);
201 list_insert_tail(&conn
->c_lcong
->m_conn_list
, conn
);
202 mutex_exit(&rdsv3_cong_lock
);
204 RDSV3_DPRINTF4("rdsv3_cong_add_conn", "Return(conn: %p)", conn
);
208 rdsv3_cong_remove_conn(struct rdsv3_connection
*conn
)
210 RDSV3_DPRINTF4("rdsv3_cong_remove_conn", "Enter(conn: %p)", conn
);
212 RDSV3_DPRINTF5("rdsv3_cong_remove_conn", "removing conn %p from map %p",
213 conn
, conn
->c_lcong
);
214 mutex_enter(&rdsv3_cong_lock
);
215 list_remove_node(&conn
->c_map_item
);
216 mutex_exit(&rdsv3_cong_lock
);
218 RDSV3_DPRINTF4("rdsv3_cong_remove_conn", "Return(conn: %p)", conn
);
222 rdsv3_cong_get_maps(struct rdsv3_connection
*conn
)
224 conn
->c_lcong
= rdsv3_cong_from_addr(conn
->c_laddr
);
225 conn
->c_fcong
= rdsv3_cong_from_addr(conn
->c_faddr
);
227 if (!(conn
->c_lcong
&& conn
->c_fcong
))
234 rdsv3_cong_queue_updates(struct rdsv3_cong_map
*map
)
236 struct rdsv3_connection
*conn
;
238 RDSV3_DPRINTF4("rdsv3_cong_queue_updates", "Enter(map: %p)", map
);
240 mutex_enter(&rdsv3_cong_lock
);
242 RDSV3_FOR_EACH_LIST_NODE(conn
, &map
->m_conn_list
, c_map_item
) {
243 if (!test_and_set_bit(0, &conn
->c_map_queued
)) {
244 rdsv3_stats_inc(s_cong_update_queued
);
245 (void) rdsv3_send_xmit(conn
);
249 mutex_exit(&rdsv3_cong_lock
);
251 RDSV3_DPRINTF4("rdsv3_cong_queue_updates", "Return(map: %p)", map
);
255 rdsv3_cong_map_updated(struct rdsv3_cong_map
*map
, uint64_t portmask
)
257 RDSV3_DPRINTF4("rdsv3_cong_map_updated",
258 "waking map %p for %u.%u.%u.%u",
259 map
, NIPQUAD(map
->m_addr
));
261 rdsv3_stats_inc(s_cong_update_received
);
262 atomic_inc_32(&rdsv3_cong_generation
);
265 if (waitqueue_active(&map
->m_waitq
))
267 rdsv3_wake_up(&map
->m_waitq
);
269 if (portmask
&& !list_is_empty(&rdsv3_cong_monitor
)) {
270 struct rdsv3_sock
*rs
;
272 rw_enter(&rdsv3_cong_monitor_lock
, RW_READER
);
273 RDSV3_FOR_EACH_LIST_NODE(rs
, &rdsv3_cong_monitor
,
275 mutex_enter(&rs
->rs_lock
);
276 rs
->rs_cong_notify
|= (rs
->rs_cong_mask
& portmask
);
277 rs
->rs_cong_mask
&= ~portmask
;
278 mutex_exit(&rs
->rs_lock
);
279 if (rs
->rs_cong_notify
)
280 rdsv3_wake_sk_sleep(rs
);
282 rw_exit(&rdsv3_cong_monitor_lock
);
285 RDSV3_DPRINTF4("rdsv3_cong_map_updated", "Return(map: %p)", map
);
289 rdsv3_cong_updated_since(unsigned long *recent
)
291 unsigned long gen
= atomic_get(&rdsv3_cong_generation
);
300 * We're called under the locking that protects the sockets receive buffer
301 * consumption. This makes it a lot easier for the caller to only call us
302 * when it knows that an existing set bit needs to be cleared, and vice versa.
303 * We can't block and we need to deal with concurrent sockets working against
304 * the same per-address map.
307 rdsv3_cong_set_bit(struct rdsv3_cong_map
*map
, uint16_be_t port
)
312 RDSV3_DPRINTF4("rdsv3_cong_set_bit",
313 "setting congestion for %u.%u.%u.%u:%u in map %p",
314 NIPQUAD(map
->m_addr
), ntohs(port
), map
);
316 i
= ntohs(port
) / RDSV3_CONG_MAP_PAGE_BITS
;
317 off
= ntohs(port
) % RDSV3_CONG_MAP_PAGE_BITS
;
318 set_le_bit(off
, (void *)map
->m_page_addrs
[i
]);
322 rdsv3_cong_clear_bit(struct rdsv3_cong_map
*map
, uint16_be_t port
)
327 RDSV3_DPRINTF4("rdsv3_cong_clear_bit",
328 "clearing congestion for %u.%u.%u.%u:%u in map %p\n",
329 NIPQUAD(map
->m_addr
), ntohs(port
), map
);
331 i
= ntohs(port
) / RDSV3_CONG_MAP_PAGE_BITS
;
332 off
= ntohs(port
) % RDSV3_CONG_MAP_PAGE_BITS
;
333 clear_le_bit(off
, (void *)map
->m_page_addrs
[i
]);
337 rdsv3_cong_test_bit(struct rdsv3_cong_map
*map
, uint16_be_t port
)
342 i
= ntohs(port
) / RDSV3_CONG_MAP_PAGE_BITS
;
343 off
= ntohs(port
) % RDSV3_CONG_MAP_PAGE_BITS
;
345 RDSV3_DPRINTF5("rdsv3_cong_test_bit", "port: 0x%x i = %lx off = %lx",
346 ntohs(port
), i
, off
);
348 return (test_le_bit(off
, (void *)map
->m_page_addrs
[i
]));
352 rdsv3_cong_add_socket(struct rdsv3_sock
*rs
)
354 RDSV3_DPRINTF4("rdsv3_cong_add_socket", "Enter(rs: %p)", rs
);
356 rw_enter(&rdsv3_cong_monitor_lock
, RW_WRITER
);
357 if (!list_link_active(&rs
->rs_cong_list
))
358 list_insert_head(&rdsv3_cong_monitor
, rs
);
359 rw_exit(&rdsv3_cong_monitor_lock
);
363 rdsv3_cong_remove_socket(struct rdsv3_sock
*rs
)
365 struct rdsv3_cong_map
*map
;
367 RDSV3_DPRINTF4("rdsv3_cong_remove_socket", "Enter(rs: %p)", rs
);
369 rw_enter(&rdsv3_cong_monitor_lock
, RW_WRITER
);
370 list_remove_node(&rs
->rs_cong_list
);
371 rw_exit(&rdsv3_cong_monitor_lock
);
373 /* update congestion map for now-closed port */
374 mutex_enter(&rdsv3_cong_lock
);
375 map
= rdsv3_cong_tree_walk(rs
->rs_bound_addr
, NULL
);
376 mutex_exit(&rdsv3_cong_lock
);
378 if (map
&& rdsv3_cong_test_bit(map
, rs
->rs_bound_port
)) {
379 rdsv3_cong_clear_bit(map
, rs
->rs_bound_port
);
380 rdsv3_cong_queue_updates(map
);
385 rdsv3_cong_wait(struct rdsv3_cong_map
*map
, uint16_be_t port
, int nonblock
,
386 struct rdsv3_sock
*rs
)
390 RDSV3_DPRINTF4("rdsv3_cong_wait", "Enter(rs: %p, mode: %d)",
393 if (!rdsv3_cong_test_bit(map
, port
))
396 if (rs
&& rs
->rs_cong_monitor
) {
398 * It would have been nice to have an atomic set_bit on
401 mutex_enter(&rs
->rs_lock
);
403 RDS_CONG_MONITOR_MASK(ntohs(port
));
404 mutex_exit(&rs
->rs_lock
);
407 * Test again - a congestion update may have arrived in
410 if (!rdsv3_cong_test_bit(map
, port
))
413 rdsv3_stats_inc(s_cong_send_error
);
417 rdsv3_stats_inc(s_cong_send_blocked
);
418 RDSV3_DPRINTF3("rdsv3_cong_wait", "waiting on map %p for port %u",
422 ret
= rdsv3_wait_sig(&map
->m_waitq
, !rdsv3_cong_test_bit(map
, port
));
427 mutex_enter(&map
->m_waitq
.waitq_mutex
);
428 map
->m_waitq
.waitq_waiters
++;
429 while (rdsv3_cong_test_bit(map
, port
)) {
430 ret
= cv_wait_sig(&map
->m_waitq
.waitq_cv
,
431 &map
->m_waitq
.waitq_mutex
);
437 map
->m_waitq
.waitq_waiters
--;
438 mutex_exit(&map
->m_waitq
.waitq_mutex
);
444 rdsv3_cong_exit(void)
446 struct rdsv3_cong_map
*map
;
449 RDSV3_DPRINTF4("rdsv3_cong_exit", "Enter");
451 while ((map
= avl_first(&rdsv3_cong_tree
))) {
452 RDSV3_DPRINTF5("rdsv3_cong_exit", "freeing map %p\n", map
);
453 avl_remove(&rdsv3_cong_tree
, map
);
454 for (i
= 0; i
< RDSV3_CONG_MAP_PAGES
&& map
->m_page_addrs
[i
];
456 kmem_free((void *)map
->m_page_addrs
[i
], PAGE_SIZE
);
457 kmem_free(map
, sizeof (*map
));
460 RDSV3_DPRINTF4("rdsv3_cong_exit", "Return");
464 * Allocate a RDS message containing a congestion update.
466 struct rdsv3_message
*
467 rdsv3_cong_update_alloc(struct rdsv3_connection
*conn
)
469 struct rdsv3_cong_map
*map
= conn
->c_lcong
;
470 struct rdsv3_message
*rm
;
472 rm
= rdsv3_message_map_pages(map
->m_page_addrs
, RDSV3_CONG_MAP_BYTES
);
474 rm
->m_inc
.i_hdr
.h_flags
= RDSV3_FLAG_CONG_BITMAP
;
480 rdsv3_cong_compare(const void *map1
, const void *map2
)
482 #define addr1 ((struct rdsv3_cong_map *)map1)->m_addr
483 #define addr2 ((struct rdsv3_cong_map *)map2)->m_addr
493 rdsv3_cong_init(void)
495 list_create(&rdsv3_cong_monitor
, sizeof (struct rdsv3_sock
),
496 offsetof(struct rdsv3_sock
, rs_cong_list
));
497 rw_init(&rdsv3_cong_monitor_lock
, NULL
, RW_DRIVER
, NULL
);
498 mutex_init(&rdsv3_cong_lock
, NULL
, MUTEX_DRIVER
, NULL
);
499 avl_create(&rdsv3_cong_tree
, rdsv3_cong_compare
,
500 sizeof (struct rdsv3_cong_map
), offsetof(struct rdsv3_cong_map
,