2 * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
3 * Copyright (c) 2016 by Delphix. All rights reserved.
7 * This file contains code imported from the OFED rds source file recv.c
8 * Oracle elects to have and use the contents of rds_recv.c under and governed
9 * by the OpenIB.org BSD license (see below for full license text). However,
10 * the following notice accompanied the original version of this file:
14 * Copyright (c) 2006 Oracle. All rights reserved.
16 * This software is available to you under a choice of one of two
17 * licenses. You may choose to be licensed under the terms of the GNU
18 * General Public License (GPL) Version 2, available from the file
19 * COPYING in the main directory of this source tree, or the
20 * OpenIB.org BSD license below:
22 * Redistribution and use in source and binary forms, with or
23 * without modification, are permitted provided that the following
26 * - Redistributions of source code must retain the above
27 * copyright notice, this list of conditions and the following
30 * - Redistributions in binary form must reproduce the above
31 * copyright notice, this list of conditions and the following
32 * disclaimer in the documentation and/or other materials
33 * provided with the distribution.
35 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
36 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
37 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
38 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
39 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
40 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
41 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
47 #include <sys/ib/clients/rdsv3/rdsv3.h>
48 #include <sys/ib/clients/rdsv3/rdma.h>
49 #include <sys/ib/clients/rdsv3/rdsv3_debug.h>
52 rdsv3_inc_init(struct rdsv3_incoming
*inc
, struct rdsv3_connection
*conn
,
55 RDSV3_DPRINTF5("rdsv3_inc_init", "Enter(inc: %p, conn: %p)", inc
, conn
);
57 list_link_init(&inc
->i_item
);
60 inc
->i_rdma_cookie
= 0;
64 rdsv3_inc_addref(struct rdsv3_incoming
*inc
)
66 RDSV3_DPRINTF4("rdsv3_inc_addref",
67 "addref inc %p ref %d", inc
, atomic_get(&inc
->i_refcount
));
68 atomic_inc_32(&inc
->i_refcount
);
72 rdsv3_inc_put(struct rdsv3_incoming
*inc
)
74 RDSV3_DPRINTF4("rdsv3_inc_put", "put inc %p ref %d",
75 inc
, atomic_get(&inc
->i_refcount
));
76 if (atomic_dec_and_test(&inc
->i_refcount
)) {
77 ASSERT(!list_link_active(&inc
->i_item
));
79 inc
->i_conn
->c_trans
->inc_free(inc
);
85 rdsv3_recv_rcvbuf_delta(struct rdsv3_sock
*rs
, struct rsock
*sk
,
86 struct rdsv3_cong_map
*map
,
87 int delta
, uint16_be_t port
)
91 RDSV3_DPRINTF4("rdsv3_recv_rcvbuf_delta",
92 "Enter(rs: %p, map: %p, delta: %d, port: %d)",
93 rs
, map
, delta
, port
);
98 rs
->rs_rcv_bytes
+= delta
;
99 now_congested
= rs
->rs_rcv_bytes
> rdsv3_sk_rcvbuf(rs
);
101 RDSV3_DPRINTF5("rdsv3_recv_rcvbuf_delta",
102 "rs %p (%u.%u.%u.%u:%u) recv bytes %d buf %d "
103 "now_cong %d delta %d",
104 rs
, NIPQUAD(rs
->rs_bound_addr
),
105 (int)ntohs(rs
->rs_bound_port
), rs
->rs_rcv_bytes
,
106 rdsv3_sk_rcvbuf(rs
), now_congested
, delta
);
108 /* wasn't -> am congested */
109 if (!rs
->rs_congested
&& now_congested
) {
110 rs
->rs_congested
= 1;
111 rdsv3_cong_set_bit(map
, port
);
112 rdsv3_cong_queue_updates(map
);
114 /* was -> aren't congested */
116 * Require more free space before reporting uncongested to prevent
117 * bouncing cong/uncong state too often
119 else if (rs
->rs_congested
&&
120 (rs
->rs_rcv_bytes
< (rdsv3_sk_rcvbuf(rs
)/2))) {
121 rs
->rs_congested
= 0;
122 rdsv3_cong_clear_bit(map
, port
);
123 rdsv3_cong_queue_updates(map
);
126 /* do nothing if no change in cong state */
128 RDSV3_DPRINTF4("rdsv3_recv_rcvbuf_delta", "Return(rs: %p)", rs
);
132 * Process all extension headers that come with this message.
135 rdsv3_recv_incoming_exthdrs(struct rdsv3_incoming
*inc
, struct rdsv3_sock
*rs
)
137 struct rdsv3_header
*hdr
= &inc
->i_hdr
;
138 unsigned int pos
= 0, type
, len
;
140 struct rdsv3_ext_header_version version
;
141 struct rdsv3_ext_header_rdma rdma
;
142 struct rdsv3_ext_header_rdma_dest rdma_dest
;
145 RDSV3_DPRINTF4("rdsv3_recv_incoming_exthdrs", "Enter");
147 len
= sizeof (buffer
);
148 type
= rdsv3_message_next_extension(hdr
, &pos
, &buffer
, &len
);
149 if (type
== RDSV3_EXTHDR_NONE
)
151 RDSV3_DPRINTF4("recv_incoming_exthdrs", "type %d", type
);
152 /* Process extension header here */
154 case RDSV3_EXTHDR_RDMA
:
155 rdsv3_rdma_unuse(rs
, ntohl(buffer
.rdma
.h_rdma_rkey
),
159 case RDSV3_EXTHDR_RDMA_DEST
:
161 * We ignore the size for now. We could stash it
162 * somewhere and use it for error checking.
164 inc
->i_rdma_cookie
= rdsv3_rdma_make_cookie(
165 ntohl(buffer
.rdma_dest
.h_rdma_rkey
),
166 ntohl(buffer
.rdma_dest
.h_rdma_offset
));
171 RDSV3_DPRINTF4("rdsv3_recv_incoming_exthdrs", "Return");
175 * The transport must make sure that this is serialized against other
176 * rx and conn reset on this specific conn.
178 * We currently assert that only one fragmented message will be sent
179 * down a connection at a time. This lets us reassemble in the conn
180 * instead of per-flow which means that we don't have to go digging through
181 * flows to tear down partial reassembly progress on conn failure and
182 * we save flow lookup and locking for each frag arrival. It does mean
183 * that small messages will wait behind large ones. Fragmenting at all
184 * is only to reduce the memory consumption of pre-posted buffers.
186 * The caller passes in saddr and daddr instead of us getting it from the
187 * conn. This lets loopback, who only has one conn for both directions,
188 * tell us which roles the addrs in the conn are playing for this message.
192 rdsv3_recv_incoming(struct rdsv3_connection
*conn
, uint32_be_t saddr
,
193 uint32_be_t daddr
, struct rdsv3_incoming
*inc
, int gfp
)
195 struct rdsv3_sock
*rs
= NULL
;
199 inc
->i_rx_jiffies
= jiffies
;
201 RDSV3_DPRINTF5("rdsv3_recv_incoming",
202 "conn %p next %llu inc %p seq %llu len %u sport %u dport %u "
203 "flags 0x%x rx_jiffies %lu", conn
,
204 (unsigned long long)conn
->c_next_rx_seq
,
206 (unsigned long long)ntohll(inc
->i_hdr
.h_sequence
),
207 ntohl(inc
->i_hdr
.h_len
),
208 ntohs(inc
->i_hdr
.h_sport
),
209 ntohs(inc
->i_hdr
.h_dport
),
214 * Sequence numbers should only increase. Messages get their
215 * sequence number as they're queued in a sending conn. They
216 * can be dropped, though, if the sending socket is closed before
217 * they hit the wire. So sequence numbers can skip forward
218 * under normal operation. They can also drop back in the conn
219 * failover case as previously sent messages are resent down the
220 * new instance of a conn. We drop those, otherwise we have
221 * to assume that the next valid seq does not come after a
222 * hole in the fragment stream.
224 * The headers don't give us a way to realize if fragments of
225 * a message have been dropped. We assume that frags that arrive
226 * to a flow are part of the current message on the flow that is
227 * being reassembled. This means that senders can't drop messages
228 * from the sending conn until all their frags are sent.
230 * XXX we could spend more on the wire to get more robust failure
231 * detection, arguably worth it to avoid data corruption.
233 if (ntohll(inc
->i_hdr
.h_sequence
) < conn
->c_next_rx_seq
&&
234 (inc
->i_hdr
.h_flags
& RDSV3_FLAG_RETRANSMITTED
)) {
235 rdsv3_stats_inc(s_recv_drop_old_seq
);
238 conn
->c_next_rx_seq
= ntohll(inc
->i_hdr
.h_sequence
) + 1;
240 if (rdsv3_sysctl_ping_enable
&& inc
->i_hdr
.h_dport
== 0) {
241 rdsv3_stats_inc(s_recv_ping
);
242 (void) rdsv3_send_pong(conn
, inc
->i_hdr
.h_sport
);
246 rs
= rdsv3_find_bound(conn
, inc
->i_hdr
.h_dport
);
248 rdsv3_stats_inc(s_recv_drop_no_sock
);
252 /* Process extension headers */
253 rdsv3_recv_incoming_exthdrs(inc
, rs
);
255 /* We can be racing with rdsv3_release() which marks the socket dead. */
256 sk
= rdsv3_rs_to_sk(rs
);
258 /* serialize with rdsv3_release -> sock_orphan */
259 rw_enter(&rs
->rs_recv_lock
, RW_WRITER
);
260 if (!rdsv3_sk_sock_flag(sk
, SOCK_DEAD
)) {
262 RDSV3_DPRINTF5("rdsv3_recv_incoming",
263 "adding inc %p to rs %p's recv queue", inc
, rs
);
264 rdsv3_stats_inc(s_recv_queued
);
265 rdsv3_recv_rcvbuf_delta(rs
, sk
, inc
->i_conn
->c_lcong
,
266 ntohl(inc
->i_hdr
.h_len
),
268 rdsv3_inc_addref(inc
);
269 list_insert_tail(&rs
->rs_recv_queue
, inc
);
270 bytes
= rs
->rs_rcv_bytes
;
271 rw_exit(&rs
->rs_recv_lock
);
273 __rdsv3_wake_sk_sleep(sk
);
275 /* wake up anyone waiting in poll */
276 sk
->sk_upcalls
->su_recv(sk
->sk_upper_handle
, NULL
,
277 bytes
, 0, &error
, NULL
);
279 RDSV3_DPRINTF2("rdsv3_recv_incoming",
280 "su_recv returned: %d", error
);
283 rdsv3_stats_inc(s_recv_drop_dead_sock
);
284 rw_exit(&rs
->rs_recv_lock
);
293 * be very careful here. This is being called as the condition in
294 * wait_event_*() needs to cope with being called many times.
297 rdsv3_next_incoming(struct rdsv3_sock
*rs
, struct rdsv3_incoming
**inc
)
300 rw_enter(&rs
->rs_recv_lock
, RW_READER
);
301 if (!list_is_empty(&rs
->rs_recv_queue
)) {
302 *inc
= list_head(&rs
->rs_recv_queue
);
303 rdsv3_inc_addref(*inc
);
305 rw_exit(&rs
->rs_recv_lock
);
308 return (*inc
!= NULL
);
312 rdsv3_still_queued(struct rdsv3_sock
*rs
, struct rdsv3_incoming
*inc
,
315 struct rsock
*sk
= rdsv3_rs_to_sk(rs
);
318 RDSV3_DPRINTF4("rdsv3_still_queued", "Enter rs: %p inc: %p drop: %d",
321 rw_enter(&rs
->rs_recv_lock
, RW_WRITER
);
322 if (list_link_active(&inc
->i_item
)) {
325 /* XXX make sure this i_conn is reliable */
326 rdsv3_recv_rcvbuf_delta(rs
, sk
, inc
->i_conn
->c_lcong
,
327 -ntohl(inc
->i_hdr
.h_len
),
329 list_remove_node(&inc
->i_item
);
333 rw_exit(&rs
->rs_recv_lock
);
335 RDSV3_DPRINTF5("rdsv3_still_queued",
336 "inc %p rs %p still %d dropped %d", inc
, rs
, ret
, drop
);
341 * Pull errors off the error queue.
342 * If msghdr is NULL, we will just purge the error queue.
345 rdsv3_notify_queue_get(struct rdsv3_sock
*rs
, struct msghdr
*msghdr
)
347 struct rdsv3_notifier
*notifier
;
348 struct rds_rdma_notify cmsg
;
349 unsigned int count
= 0, max_messages
= ~0U;
353 RDSV3_DPRINTF4("rdsv3_notify_queue_get", "Enter(rs: %p)", rs
);
355 list_create(©
, sizeof (struct rdsv3_notifier
),
356 offsetof(struct rdsv3_notifier
, n_list
));
360 * put_cmsg copies to user space and thus may sleep. We can't do this
361 * with rs_lock held, so first grab as many notifications as we can
363 * in the user provided cmsg buffer. We don't try to copy more, to avoid
364 * losing notifications - except when the buffer is so small that
366 * even hold a single notification. Then we give as much of this
368 * msg as we can squeeze in, and set MSG_CTRUNC.
372 msghdr
->msg_controllen
/ CMSG_SPACE(sizeof (cmsg
));
377 mutex_enter(&rs
->rs_lock
);
378 while (!list_is_empty(&rs
->rs_notify_queue
) && count
< max_messages
) {
379 notifier
= list_remove_head(&rs
->rs_notify_queue
);
380 list_insert_tail(©
, notifier
);
383 mutex_exit(&rs
->rs_lock
);
388 while (!list_is_empty(©
)) {
389 notifier
= list_remove_head(©
);
392 cmsg
.user_token
= notifier
->n_user_token
;
393 cmsg
.status
= notifier
->n_status
;
395 err
= rdsv3_put_cmsg(msghdr
, SOL_RDS
,
396 RDS_CMSG_RDMA_STATUS
, sizeof (cmsg
), &cmsg
);
401 kmem_free(notifier
, sizeof (struct rdsv3_notifier
));
405 * If we bailed out because of an error in put_cmsg,
406 * we may be left with one or more notifications that we
407 * didn't process. Return them to the head of the list.
409 if (!list_is_empty(©
)) {
410 mutex_enter(&rs
->rs_lock
);
411 list_splice(©
, &rs
->rs_notify_queue
);
412 mutex_exit(&rs
->rs_lock
);
415 RDSV3_DPRINTF4("rdsv3_notify_queue_get", "Return(rs: %p)", rs
);
421 * Queue a congestion notification
424 rdsv3_notify_cong(struct rdsv3_sock
*rs
, struct msghdr
*msghdr
)
426 uint64_t notify
= rs
->rs_cong_notify
;
429 err
= rdsv3_put_cmsg(msghdr
, SOL_RDS
, RDS_CMSG_CONG_UPDATE
,
430 sizeof (notify
), ¬ify
);
434 mutex_enter(&rs
->rs_lock
);
435 rs
->rs_cong_notify
&= ~notify
;
436 mutex_exit(&rs
->rs_lock
);
442 * Receive any control messages.
445 rdsv3_cmsg_recv(struct rdsv3_incoming
*inc
, struct msghdr
*msg
)
448 if (inc
->i_rdma_cookie
) {
449 ret
= rdsv3_put_cmsg(msg
, SOL_RDS
, RDS_CMSG_RDMA_DEST
,
450 sizeof (inc
->i_rdma_cookie
), &inc
->i_rdma_cookie
);
456 rdsv3_recvmsg(struct rdsv3_sock
*rs
, uio_t
*uio
,
457 struct msghdr
*msg
, size_t size
, int msg_flags
)
459 struct rsock
*sk
= rdsv3_rs_to_sk(rs
);
461 struct sockaddr_in
*sin
= NULL
;
462 struct rdsv3_incoming
*inc
= NULL
;
463 boolean_t nonblock
= B_FALSE
;
465 RDSV3_DPRINTF4("rdsv3_recvmsg",
466 "Enter(rs: %p size: %d msg_flags: 0x%x)", rs
, size
, msg_flags
);
468 if ((uio
->uio_fmode
& (FNDELAY
| FNONBLOCK
)) ||
469 (msg_flags
& MSG_DONTWAIT
))
472 if (msg_flags
& MSG_OOB
)
475 /* mark the first cmsg position */
477 msg
->msg_control
= NULL
;
482 * If there are pending notifications, do those -
485 if (!list_is_empty(&rs
->rs_notify_queue
)) {
486 ret
= rdsv3_notify_queue_get(rs
, msg
);
488 if (msg
&& msg
->msg_namelen
) {
489 sin
= kmem_zalloc(sizeof (struct sockaddr_in
),
491 sin
->sin_family
= AF_INET_OFFLOAD
;
493 sin
->sin_port
= inc
->i_hdr
.h_sport
;
494 sin
->sin_addr
.s_addr
= inc
->i_saddr
;
496 msg
->msg_namelen
= sizeof (struct sockaddr_in
);
502 if (rs
->rs_cong_notify
) {
503 ret
= rdsv3_notify_cong(rs
, msg
);
507 if (!rdsv3_next_incoming(rs
, &inc
)) {
513 RDSV3_DPRINTF3("rdsv3_recvmsg",
514 "Before wait (rs: %p)", rs
);
517 ret
= rdsv3_wait_sig(sk
->sk_sleep
,
518 !(list_is_empty(&rs
->rs_notify_queue
) &&
519 !rs
->rs_cong_notify
&&
520 !rdsv3_next_incoming(rs
, &inc
)));
522 /* signal/timeout pending */
523 RDSV3_DPRINTF2("rdsv3_recvmsg",
524 "woke due to signal");
528 mutex_enter(&sk
->sk_sleep
->waitq_mutex
);
529 sk
->sk_sleep
->waitq_waiters
++;
530 while ((list_is_empty(&rs
->rs_notify_queue
) &&
531 !rs
->rs_cong_notify
&&
532 !rdsv3_next_incoming(rs
, &inc
))) {
533 ret
= cv_wait_sig(&sk
->sk_sleep
->waitq_cv
,
534 &sk
->sk_sleep
->waitq_mutex
);
536 /* signal/timeout pending */
537 RDSV3_DPRINTF2("rdsv3_recvmsg",
538 "woke due to signal");
543 sk
->sk_sleep
->waitq_waiters
--;
544 mutex_exit(&sk
->sk_sleep
->waitq_mutex
);
547 RDSV3_DPRINTF5("rdsv3_recvmsg",
548 "recvmsg woke rs: %p inc %p ret %d",
555 * if the wakeup was due to rs_notify_queue or
556 * rs_cong_notify then we need to handle those first.
561 RDSV3_DPRINTF5("rdsv3_recvmsg",
562 "copying inc %p from %u.%u.%u.%u:%u to user", inc
,
563 NIPQUAD(inc
->i_conn
->c_faddr
),
564 ntohs(inc
->i_hdr
.h_sport
));
566 ret
= inc
->i_conn
->c_trans
->inc_copy_to_user(inc
, uio
, size
);
571 * if the message we just copied isn't at the head of the
572 * recv queue then someone else raced us to return it, try
573 * to get the next message.
575 if (!rdsv3_still_queued(rs
, inc
, !(msg_flags
& MSG_PEEK
))) {
578 rdsv3_stats_inc(s_recv_deliver_raced
);
582 if (ret
< ntohl(inc
->i_hdr
.h_len
)) {
583 if (msg_flags
& MSG_TRUNC
)
584 ret
= ntohl(inc
->i_hdr
.h_len
);
585 msg
->msg_flags
|= MSG_TRUNC
;
588 if (rdsv3_cmsg_recv(inc
, msg
)) {
593 rdsv3_stats_inc(s_recv_delivered
);
595 if (msg
->msg_namelen
) {
596 sin
= kmem_alloc(sizeof (struct sockaddr_in
), KM_SLEEP
);
597 sin
->sin_family
= AF_INET_OFFLOAD
;
598 sin
->sin_port
= inc
->i_hdr
.h_sport
;
599 sin
->sin_addr
.s_addr
= inc
->i_saddr
;
600 (void) memset(sin
->sin_zero
, 0,
601 sizeof (sin
->sin_zero
));
602 msg
->msg_namelen
= sizeof (struct sockaddr_in
);
612 if (msg
&& msg
->msg_control
== NULL
)
613 msg
->msg_controllen
= 0;
615 RDSV3_DPRINTF4("rdsv3_recvmsg", "Return(rs: %p, ret: %d)", rs
, ret
);
621 * The socket is being shut down and we're asked to drop messages that were
622 * queued for recvmsg. The caller has unbound the socket so the receive path
623 * won't queue any more incoming fragments or messages on the socket.
626 rdsv3_clear_recv_queue(struct rdsv3_sock
*rs
)
628 struct rsock
*sk
= rdsv3_rs_to_sk(rs
);
629 struct rdsv3_incoming
*inc
, *tmp
;
631 RDSV3_DPRINTF4("rdsv3_clear_recv_queue", "Enter(rs: %p)", rs
);
633 rw_enter(&rs
->rs_recv_lock
, RW_WRITER
);
634 RDSV3_FOR_EACH_LIST_NODE_SAFE(inc
, tmp
, &rs
->rs_recv_queue
, i_item
) {
635 rdsv3_recv_rcvbuf_delta(rs
, sk
, inc
->i_conn
->c_lcong
,
636 -ntohl(inc
->i_hdr
.h_len
),
638 list_remove_node(&inc
->i_item
);
641 rw_exit(&rs
->rs_recv_lock
);
643 RDSV3_DPRINTF4("rdsv3_clear_recv_queue", "Return(rs: %p)", rs
);
647 * inc->i_saddr isn't used here because it is only set in the receive
651 rdsv3_inc_info_copy(struct rdsv3_incoming
*inc
,
652 struct rdsv3_info_iterator
*iter
,
653 uint32_be_t saddr
, uint32_be_t daddr
, int flip
)
655 struct rds_info_message minfo
;
657 minfo
.seq
= ntohll(inc
->i_hdr
.h_sequence
);
658 minfo
.len
= ntohl(inc
->i_hdr
.h_len
);
663 minfo
.lport
= inc
->i_hdr
.h_dport
;
664 minfo
.fport
= inc
->i_hdr
.h_sport
;
668 minfo
.lport
= inc
->i_hdr
.h_sport
;
669 minfo
.fport
= inc
->i_hdr
.h_dport
;
672 rdsv3_info_copy(iter
, &minfo
, sizeof (minfo
));