1 // SPDX-License-Identifier: GPL-2.0-or-later
3 * Copyright (C) 2017, Microsoft Corporation.
5 * Author(s): Long Li <longli@microsoft.com>
7 #include <linux/module.h>
8 #include <linux/highmem.h>
10 #include "cifs_debug.h"
11 #include "cifsproto.h"
12 #include "smb2proto.h"
14 static struct smbd_response
*get_empty_queue_buffer(
15 struct smbd_connection
*info
);
16 static struct smbd_response
*get_receive_buffer(
17 struct smbd_connection
*info
);
18 static void put_receive_buffer(
19 struct smbd_connection
*info
,
20 struct smbd_response
*response
);
21 static int allocate_receive_buffers(struct smbd_connection
*info
, int num_buf
);
22 static void destroy_receive_buffers(struct smbd_connection
*info
);
24 static void put_empty_packet(
25 struct smbd_connection
*info
, struct smbd_response
*response
);
26 static void enqueue_reassembly(
27 struct smbd_connection
*info
,
28 struct smbd_response
*response
, int data_length
);
29 static struct smbd_response
*_get_first_reassembly(
30 struct smbd_connection
*info
);
32 static int smbd_post_recv(
33 struct smbd_connection
*info
,
34 struct smbd_response
*response
);
36 static int smbd_post_send_empty(struct smbd_connection
*info
);
37 static int smbd_post_send_data(
38 struct smbd_connection
*info
,
39 struct kvec
*iov
, int n_vec
, int remaining_data_length
);
40 static int smbd_post_send_page(struct smbd_connection
*info
,
41 struct page
*page
, unsigned long offset
,
42 size_t size
, int remaining_data_length
);
44 static void destroy_mr_list(struct smbd_connection
*info
);
45 static int allocate_mr_list(struct smbd_connection
*info
);
47 /* SMBD version number */
48 #define SMBD_V1 0x0100
50 /* Port numbers for SMBD transport */
52 #define SMBD_PORT 5445
54 /* Address lookup and resolve timeout in ms */
55 #define RDMA_RESOLVE_TIMEOUT 5000
57 /* SMBD negotiation timeout in seconds */
58 #define SMBD_NEGOTIATE_TIMEOUT 120
60 /* SMBD minimum receive size and fragmented sized defined in [MS-SMBD] */
61 #define SMBD_MIN_RECEIVE_SIZE 128
62 #define SMBD_MIN_FRAGMENTED_SIZE 131072
65 * Default maximum number of RDMA read/write outstanding on this connection
66 * This value is possibly decreased during QP creation on hardware limit
68 #define SMBD_CM_RESPONDER_RESOURCES 32
70 /* Maximum number of retries on data transfer operations */
71 #define SMBD_CM_RETRY 6
72 /* No need to retry on Receiver Not Ready since SMBD manages credits */
73 #define SMBD_CM_RNR_RETRY 0
76 * User configurable initial values per SMBD transport connection
77 * as defined in [MS-SMBD] 3.1.1.1
78 * Those may change after a SMBD negotiation
80 /* The local peer's maximum number of credits to grant to the peer */
81 int smbd_receive_credit_max
= 255;
83 /* The remote peer's credit request of local peer */
84 int smbd_send_credit_target
= 255;
86 /* The maximum single message size can be sent to remote peer */
87 int smbd_max_send_size
= 1364;
89 /* The maximum fragmented upper-layer payload receive size supported */
90 int smbd_max_fragmented_recv_size
= 1024 * 1024;
92 /* The maximum single-message size which can be received */
93 int smbd_max_receive_size
= 8192;
95 /* The timeout to initiate send of a keepalive message on idle */
96 int smbd_keep_alive_interval
= 120;
99 * User configurable initial values for RDMA transport
100 * The actual values used may be lower and are limited to hardware capabilities
102 /* Default maximum number of SGEs in a RDMA write/read */
103 int smbd_max_frmr_depth
= 2048;
105 /* If payload is less than this byte, use RDMA send/recv not read/write */
106 int rdma_readwrite_threshold
= 4096;
108 /* Transport logging functions
109 * Logging are defined as classes. They can be OR'ed to define the actual
110 * logging level via module parameter smbd_logging_class
111 * e.g. cifs.smbd_logging_class=0xa0 will log all log_rdma_recv() and
114 #define LOG_OUTGOING 0x1
115 #define LOG_INCOMING 0x2
117 #define LOG_WRITE 0x8
118 #define LOG_RDMA_SEND 0x10
119 #define LOG_RDMA_RECV 0x20
120 #define LOG_KEEP_ALIVE 0x40
121 #define LOG_RDMA_EVENT 0x80
122 #define LOG_RDMA_MR 0x100
123 static unsigned int smbd_logging_class
;
124 module_param(smbd_logging_class
, uint
, 0644);
125 MODULE_PARM_DESC(smbd_logging_class
,
126 "Logging class for SMBD transport 0x0 to 0x100");
130 static unsigned int smbd_logging_level
= ERR
;
131 module_param(smbd_logging_level
, uint
, 0644);
132 MODULE_PARM_DESC(smbd_logging_level
,
133 "Logging level for SMBD transport, 0 (default): error, 1: info");
135 #define log_rdma(level, class, fmt, args...) \
137 if (level <= smbd_logging_level || class & smbd_logging_class) \
138 cifs_dbg(VFS, "%s:%d " fmt, __func__, __LINE__, ##args);\
141 #define log_outgoing(level, fmt, args...) \
142 log_rdma(level, LOG_OUTGOING, fmt, ##args)
143 #define log_incoming(level, fmt, args...) \
144 log_rdma(level, LOG_INCOMING, fmt, ##args)
145 #define log_read(level, fmt, args...) log_rdma(level, LOG_READ, fmt, ##args)
146 #define log_write(level, fmt, args...) log_rdma(level, LOG_WRITE, fmt, ##args)
147 #define log_rdma_send(level, fmt, args...) \
148 log_rdma(level, LOG_RDMA_SEND, fmt, ##args)
149 #define log_rdma_recv(level, fmt, args...) \
150 log_rdma(level, LOG_RDMA_RECV, fmt, ##args)
151 #define log_keep_alive(level, fmt, args...) \
152 log_rdma(level, LOG_KEEP_ALIVE, fmt, ##args)
153 #define log_rdma_event(level, fmt, args...) \
154 log_rdma(level, LOG_RDMA_EVENT, fmt, ##args)
155 #define log_rdma_mr(level, fmt, args...) \
156 log_rdma(level, LOG_RDMA_MR, fmt, ##args)
158 static void smbd_disconnect_rdma_work(struct work_struct
*work
)
160 struct smbd_connection
*info
=
161 container_of(work
, struct smbd_connection
, disconnect_work
);
163 if (info
->transport_status
== SMBD_CONNECTED
) {
164 info
->transport_status
= SMBD_DISCONNECTING
;
165 rdma_disconnect(info
->id
);
169 static void smbd_disconnect_rdma_connection(struct smbd_connection
*info
)
171 queue_work(info
->workqueue
, &info
->disconnect_work
);
174 /* Upcall from RDMA CM */
175 static int smbd_conn_upcall(
176 struct rdma_cm_id
*id
, struct rdma_cm_event
*event
)
178 struct smbd_connection
*info
= id
->context
;
180 log_rdma_event(INFO
, "event=%d status=%d\n",
181 event
->event
, event
->status
);
183 switch (event
->event
) {
184 case RDMA_CM_EVENT_ADDR_RESOLVED
:
185 case RDMA_CM_EVENT_ROUTE_RESOLVED
:
187 complete(&info
->ri_done
);
190 case RDMA_CM_EVENT_ADDR_ERROR
:
191 info
->ri_rc
= -EHOSTUNREACH
;
192 complete(&info
->ri_done
);
195 case RDMA_CM_EVENT_ROUTE_ERROR
:
196 info
->ri_rc
= -ENETUNREACH
;
197 complete(&info
->ri_done
);
200 case RDMA_CM_EVENT_ESTABLISHED
:
201 log_rdma_event(INFO
, "connected event=%d\n", event
->event
);
202 info
->transport_status
= SMBD_CONNECTED
;
203 wake_up_interruptible(&info
->conn_wait
);
206 case RDMA_CM_EVENT_CONNECT_ERROR
:
207 case RDMA_CM_EVENT_UNREACHABLE
:
208 case RDMA_CM_EVENT_REJECTED
:
209 log_rdma_event(INFO
, "connecting failed event=%d\n", event
->event
);
210 info
->transport_status
= SMBD_DISCONNECTED
;
211 wake_up_interruptible(&info
->conn_wait
);
214 case RDMA_CM_EVENT_DEVICE_REMOVAL
:
215 case RDMA_CM_EVENT_DISCONNECTED
:
216 /* This happenes when we fail the negotiation */
217 if (info
->transport_status
== SMBD_NEGOTIATE_FAILED
) {
218 info
->transport_status
= SMBD_DISCONNECTED
;
219 wake_up(&info
->conn_wait
);
223 info
->transport_status
= SMBD_DISCONNECTED
;
224 wake_up_interruptible(&info
->disconn_wait
);
225 wake_up_interruptible(&info
->wait_reassembly_queue
);
226 wake_up_interruptible_all(&info
->wait_send_queue
);
236 /* Upcall from RDMA QP */
238 smbd_qp_async_error_upcall(struct ib_event
*event
, void *context
)
240 struct smbd_connection
*info
= context
;
242 log_rdma_event(ERR
, "%s on device %s info %p\n",
243 ib_event_msg(event
->event
), event
->device
->name
, info
);
245 switch (event
->event
) {
246 case IB_EVENT_CQ_ERR
:
247 case IB_EVENT_QP_FATAL
:
248 smbd_disconnect_rdma_connection(info
);
256 static inline void *smbd_request_payload(struct smbd_request
*request
)
258 return (void *)request
->packet
;
261 static inline void *smbd_response_payload(struct smbd_response
*response
)
263 return (void *)response
->packet
;
266 /* Called when a RDMA send is done */
267 static void send_done(struct ib_cq
*cq
, struct ib_wc
*wc
)
270 struct smbd_request
*request
=
271 container_of(wc
->wr_cqe
, struct smbd_request
, cqe
);
273 log_rdma_send(INFO
, "smbd_request %p completed wc->status=%d\n",
274 request
, wc
->status
);
276 if (wc
->status
!= IB_WC_SUCCESS
|| wc
->opcode
!= IB_WC_SEND
) {
277 log_rdma_send(ERR
, "wc->status=%d wc->opcode=%d\n",
278 wc
->status
, wc
->opcode
);
279 smbd_disconnect_rdma_connection(request
->info
);
282 for (i
= 0; i
< request
->num_sge
; i
++)
283 ib_dma_unmap_single(request
->info
->id
->device
,
284 request
->sge
[i
].addr
,
285 request
->sge
[i
].length
,
288 if (atomic_dec_and_test(&request
->info
->send_pending
))
289 wake_up(&request
->info
->wait_send_pending
);
291 wake_up(&request
->info
->wait_post_send
);
293 mempool_free(request
, request
->info
->request_mempool
);
296 static void dump_smbd_negotiate_resp(struct smbd_negotiate_resp
*resp
)
298 log_rdma_event(INFO
, "resp message min_version %u max_version %u negotiated_version %u credits_requested %u credits_granted %u status %u max_readwrite_size %u preferred_send_size %u max_receive_size %u max_fragmented_size %u\n",
299 resp
->min_version
, resp
->max_version
,
300 resp
->negotiated_version
, resp
->credits_requested
,
301 resp
->credits_granted
, resp
->status
,
302 resp
->max_readwrite_size
, resp
->preferred_send_size
,
303 resp
->max_receive_size
, resp
->max_fragmented_size
);
307 * Process a negotiation response message, according to [MS-SMBD]3.1.5.7
308 * response, packet_length: the negotiation response message
309 * return value: true if negotiation is a success, false if failed
311 static bool process_negotiation_response(
312 struct smbd_response
*response
, int packet_length
)
314 struct smbd_connection
*info
= response
->info
;
315 struct smbd_negotiate_resp
*packet
= smbd_response_payload(response
);
317 if (packet_length
< sizeof(struct smbd_negotiate_resp
)) {
319 "error: packet_length=%d\n", packet_length
);
323 if (le16_to_cpu(packet
->negotiated_version
) != SMBD_V1
) {
324 log_rdma_event(ERR
, "error: negotiated_version=%x\n",
325 le16_to_cpu(packet
->negotiated_version
));
328 info
->protocol
= le16_to_cpu(packet
->negotiated_version
);
330 if (packet
->credits_requested
== 0) {
331 log_rdma_event(ERR
, "error: credits_requested==0\n");
334 info
->receive_credit_target
= le16_to_cpu(packet
->credits_requested
);
336 if (packet
->credits_granted
== 0) {
337 log_rdma_event(ERR
, "error: credits_granted==0\n");
340 atomic_set(&info
->send_credits
, le16_to_cpu(packet
->credits_granted
));
342 atomic_set(&info
->receive_credits
, 0);
344 if (le32_to_cpu(packet
->preferred_send_size
) > info
->max_receive_size
) {
345 log_rdma_event(ERR
, "error: preferred_send_size=%d\n",
346 le32_to_cpu(packet
->preferred_send_size
));
349 info
->max_receive_size
= le32_to_cpu(packet
->preferred_send_size
);
351 if (le32_to_cpu(packet
->max_receive_size
) < SMBD_MIN_RECEIVE_SIZE
) {
352 log_rdma_event(ERR
, "error: max_receive_size=%d\n",
353 le32_to_cpu(packet
->max_receive_size
));
356 info
->max_send_size
= min_t(int, info
->max_send_size
,
357 le32_to_cpu(packet
->max_receive_size
));
359 if (le32_to_cpu(packet
->max_fragmented_size
) <
360 SMBD_MIN_FRAGMENTED_SIZE
) {
361 log_rdma_event(ERR
, "error: max_fragmented_size=%d\n",
362 le32_to_cpu(packet
->max_fragmented_size
));
365 info
->max_fragmented_send_size
=
366 le32_to_cpu(packet
->max_fragmented_size
);
367 info
->rdma_readwrite_threshold
=
368 rdma_readwrite_threshold
> info
->max_fragmented_send_size
?
369 info
->max_fragmented_send_size
:
370 rdma_readwrite_threshold
;
373 info
->max_readwrite_size
= min_t(u32
,
374 le32_to_cpu(packet
->max_readwrite_size
),
375 info
->max_frmr_depth
* PAGE_SIZE
);
376 info
->max_frmr_depth
= info
->max_readwrite_size
/ PAGE_SIZE
;
381 static void smbd_post_send_credits(struct work_struct
*work
)
384 int use_receive_queue
= 1;
386 struct smbd_response
*response
;
387 struct smbd_connection
*info
=
388 container_of(work
, struct smbd_connection
,
389 post_send_credits_work
);
391 if (info
->transport_status
!= SMBD_CONNECTED
) {
392 wake_up(&info
->wait_receive_queues
);
396 if (info
->receive_credit_target
>
397 atomic_read(&info
->receive_credits
)) {
399 if (use_receive_queue
)
400 response
= get_receive_buffer(info
);
402 response
= get_empty_queue_buffer(info
);
404 /* now switch to emtpy packet queue */
405 if (use_receive_queue
) {
406 use_receive_queue
= 0;
412 response
->type
= SMBD_TRANSFER_DATA
;
413 response
->first_segment
= false;
414 rc
= smbd_post_recv(info
, response
);
417 "post_recv failed rc=%d\n", rc
);
418 put_receive_buffer(info
, response
);
426 spin_lock(&info
->lock_new_credits_offered
);
427 info
->new_credits_offered
+= ret
;
428 spin_unlock(&info
->lock_new_credits_offered
);
430 /* Promptly send an immediate packet as defined in [MS-SMBD] 3.1.1.1 */
431 info
->send_immediate
= true;
432 if (atomic_read(&info
->receive_credits
) <
433 info
->receive_credit_target
- 1) {
434 if (info
->keep_alive_requested
== KEEP_ALIVE_PENDING
||
435 info
->send_immediate
) {
436 log_keep_alive(INFO
, "send an empty message\n");
437 smbd_post_send_empty(info
);
442 /* Called from softirq, when recv is done */
443 static void recv_done(struct ib_cq
*cq
, struct ib_wc
*wc
)
445 struct smbd_data_transfer
*data_transfer
;
446 struct smbd_response
*response
=
447 container_of(wc
->wr_cqe
, struct smbd_response
, cqe
);
448 struct smbd_connection
*info
= response
->info
;
451 log_rdma_recv(INFO
, "response=%p type=%d wc status=%d wc opcode %d byte_len=%d pkey_index=%x\n",
452 response
, response
->type
, wc
->status
, wc
->opcode
,
453 wc
->byte_len
, wc
->pkey_index
);
455 if (wc
->status
!= IB_WC_SUCCESS
|| wc
->opcode
!= IB_WC_RECV
) {
456 log_rdma_recv(INFO
, "wc->status=%d opcode=%d\n",
457 wc
->status
, wc
->opcode
);
458 smbd_disconnect_rdma_connection(info
);
462 ib_dma_sync_single_for_cpu(
465 response
->sge
.length
,
468 switch (response
->type
) {
469 /* SMBD negotiation response */
470 case SMBD_NEGOTIATE_RESP
:
471 dump_smbd_negotiate_resp(smbd_response_payload(response
));
472 info
->full_packet_received
= true;
473 info
->negotiate_done
=
474 process_negotiation_response(response
, wc
->byte_len
);
475 complete(&info
->negotiate_completion
);
478 /* SMBD data transfer packet */
479 case SMBD_TRANSFER_DATA
:
480 data_transfer
= smbd_response_payload(response
);
481 data_length
= le32_to_cpu(data_transfer
->data_length
);
484 * If this is a packet with data playload place the data in
485 * reassembly queue and wake up the reading thread
488 if (info
->full_packet_received
)
489 response
->first_segment
= true;
491 if (le32_to_cpu(data_transfer
->remaining_data_length
))
492 info
->full_packet_received
= false;
494 info
->full_packet_received
= true;
501 put_empty_packet(info
, response
);
504 wake_up_interruptible(&info
->wait_reassembly_queue
);
506 atomic_dec(&info
->receive_credits
);
507 info
->receive_credit_target
=
508 le16_to_cpu(data_transfer
->credits_requested
);
509 if (le16_to_cpu(data_transfer
->credits_granted
)) {
510 atomic_add(le16_to_cpu(data_transfer
->credits_granted
),
511 &info
->send_credits
);
513 * We have new send credits granted from remote peer
514 * If any sender is waiting for credits, unblock it
516 wake_up_interruptible(&info
->wait_send_queue
);
519 log_incoming(INFO
, "data flags %d data_offset %d data_length %d remaining_data_length %d\n",
520 le16_to_cpu(data_transfer
->flags
),
521 le32_to_cpu(data_transfer
->data_offset
),
522 le32_to_cpu(data_transfer
->data_length
),
523 le32_to_cpu(data_transfer
->remaining_data_length
));
525 /* Send a KEEP_ALIVE response right away if requested */
526 info
->keep_alive_requested
= KEEP_ALIVE_NONE
;
527 if (le16_to_cpu(data_transfer
->flags
) &
528 SMB_DIRECT_RESPONSE_REQUESTED
) {
529 info
->keep_alive_requested
= KEEP_ALIVE_PENDING
;
536 "unexpected response type=%d\n", response
->type
);
540 put_receive_buffer(info
, response
);
543 static struct rdma_cm_id
*smbd_create_id(
544 struct smbd_connection
*info
,
545 struct sockaddr
*dstaddr
, int port
)
547 struct rdma_cm_id
*id
;
551 id
= rdma_create_id(&init_net
, smbd_conn_upcall
, info
,
552 RDMA_PS_TCP
, IB_QPT_RC
);
555 log_rdma_event(ERR
, "rdma_create_id() failed %i\n", rc
);
559 if (dstaddr
->sa_family
== AF_INET6
)
560 sport
= &((struct sockaddr_in6
*)dstaddr
)->sin6_port
;
562 sport
= &((struct sockaddr_in
*)dstaddr
)->sin_port
;
564 *sport
= htons(port
);
566 init_completion(&info
->ri_done
);
567 info
->ri_rc
= -ETIMEDOUT
;
569 rc
= rdma_resolve_addr(id
, NULL
, (struct sockaddr
*)dstaddr
,
570 RDMA_RESOLVE_TIMEOUT
);
572 log_rdma_event(ERR
, "rdma_resolve_addr() failed %i\n", rc
);
575 wait_for_completion_interruptible_timeout(
576 &info
->ri_done
, msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT
));
579 log_rdma_event(ERR
, "rdma_resolve_addr() completed %i\n", rc
);
583 info
->ri_rc
= -ETIMEDOUT
;
584 rc
= rdma_resolve_route(id
, RDMA_RESOLVE_TIMEOUT
);
586 log_rdma_event(ERR
, "rdma_resolve_route() failed %i\n", rc
);
589 wait_for_completion_interruptible_timeout(
590 &info
->ri_done
, msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT
));
593 log_rdma_event(ERR
, "rdma_resolve_route() completed %i\n", rc
);
605 * Test if FRWR (Fast Registration Work Requests) is supported on the device
606 * This implementation requries FRWR on RDMA read/write
607 * return value: true if it is supported
609 static bool frwr_is_supported(struct ib_device_attr
*attrs
)
611 if (!(attrs
->device_cap_flags
& IB_DEVICE_MEM_MGT_EXTENSIONS
))
613 if (attrs
->max_fast_reg_page_list_len
== 0)
618 static int smbd_ia_open(
619 struct smbd_connection
*info
,
620 struct sockaddr
*dstaddr
, int port
)
624 info
->id
= smbd_create_id(info
, dstaddr
, port
);
625 if (IS_ERR(info
->id
)) {
626 rc
= PTR_ERR(info
->id
);
630 if (!frwr_is_supported(&info
->id
->device
->attrs
)) {
631 log_rdma_event(ERR
, "Fast Registration Work Requests (FRWR) is not supported\n");
632 log_rdma_event(ERR
, "Device capability flags = %llx max_fast_reg_page_list_len = %u\n",
633 info
->id
->device
->attrs
.device_cap_flags
,
634 info
->id
->device
->attrs
.max_fast_reg_page_list_len
);
635 rc
= -EPROTONOSUPPORT
;
638 info
->max_frmr_depth
= min_t(int,
640 info
->id
->device
->attrs
.max_fast_reg_page_list_len
);
641 info
->mr_type
= IB_MR_TYPE_MEM_REG
;
642 if (info
->id
->device
->attrs
.device_cap_flags
& IB_DEVICE_SG_GAPS_REG
)
643 info
->mr_type
= IB_MR_TYPE_SG_GAPS
;
645 info
->pd
= ib_alloc_pd(info
->id
->device
, 0);
646 if (IS_ERR(info
->pd
)) {
647 rc
= PTR_ERR(info
->pd
);
648 log_rdma_event(ERR
, "ib_alloc_pd() returned %d\n", rc
);
655 rdma_destroy_id(info
->id
);
663 * Send a negotiation request message to the peer
664 * The negotiation procedure is in [MS-SMBD] 3.1.5.2 and 3.1.5.3
665 * After negotiation, the transport is connected and ready for
666 * carrying upper layer SMB payload
668 static int smbd_post_send_negotiate_req(struct smbd_connection
*info
)
670 struct ib_send_wr send_wr
;
672 struct smbd_request
*request
;
673 struct smbd_negotiate_req
*packet
;
675 request
= mempool_alloc(info
->request_mempool
, GFP_KERNEL
);
679 request
->info
= info
;
681 packet
= smbd_request_payload(request
);
682 packet
->min_version
= cpu_to_le16(SMBD_V1
);
683 packet
->max_version
= cpu_to_le16(SMBD_V1
);
684 packet
->reserved
= 0;
685 packet
->credits_requested
= cpu_to_le16(info
->send_credit_target
);
686 packet
->preferred_send_size
= cpu_to_le32(info
->max_send_size
);
687 packet
->max_receive_size
= cpu_to_le32(info
->max_receive_size
);
688 packet
->max_fragmented_size
=
689 cpu_to_le32(info
->max_fragmented_recv_size
);
691 request
->num_sge
= 1;
692 request
->sge
[0].addr
= ib_dma_map_single(
693 info
->id
->device
, (void *)packet
,
694 sizeof(*packet
), DMA_TO_DEVICE
);
695 if (ib_dma_mapping_error(info
->id
->device
, request
->sge
[0].addr
)) {
697 goto dma_mapping_failed
;
700 request
->sge
[0].length
= sizeof(*packet
);
701 request
->sge
[0].lkey
= info
->pd
->local_dma_lkey
;
703 ib_dma_sync_single_for_device(
704 info
->id
->device
, request
->sge
[0].addr
,
705 request
->sge
[0].length
, DMA_TO_DEVICE
);
707 request
->cqe
.done
= send_done
;
710 send_wr
.wr_cqe
= &request
->cqe
;
711 send_wr
.sg_list
= request
->sge
;
712 send_wr
.num_sge
= request
->num_sge
;
713 send_wr
.opcode
= IB_WR_SEND
;
714 send_wr
.send_flags
= IB_SEND_SIGNALED
;
716 log_rdma_send(INFO
, "sge addr=%llx length=%x lkey=%x\n",
717 request
->sge
[0].addr
,
718 request
->sge
[0].length
, request
->sge
[0].lkey
);
720 atomic_inc(&info
->send_pending
);
721 rc
= ib_post_send(info
->id
->qp
, &send_wr
, NULL
);
725 /* if we reach here, post send failed */
726 log_rdma_send(ERR
, "ib_post_send failed rc=%d\n", rc
);
727 atomic_dec(&info
->send_pending
);
728 ib_dma_unmap_single(info
->id
->device
, request
->sge
[0].addr
,
729 request
->sge
[0].length
, DMA_TO_DEVICE
);
731 smbd_disconnect_rdma_connection(info
);
734 mempool_free(request
, info
->request_mempool
);
739 * Extend the credits to remote peer
740 * This implements [MS-SMBD] 3.1.5.9
741 * The idea is that we should extend credits to remote peer as quickly as
742 * it's allowed, to maintain data flow. We allocate as much receive
743 * buffer as possible, and extend the receive credits to remote peer
744 * return value: the new credtis being granted.
746 static int manage_credits_prior_sending(struct smbd_connection
*info
)
750 spin_lock(&info
->lock_new_credits_offered
);
751 new_credits
= info
->new_credits_offered
;
752 info
->new_credits_offered
= 0;
753 spin_unlock(&info
->lock_new_credits_offered
);
759 * Check if we need to send a KEEP_ALIVE message
760 * The idle connection timer triggers a KEEP_ALIVE message when expires
761 * SMB_DIRECT_RESPONSE_REQUESTED is set in the message flag to have peer send
764 * 1 if SMB_DIRECT_RESPONSE_REQUESTED needs to be set
767 static int manage_keep_alive_before_sending(struct smbd_connection
*info
)
769 if (info
->keep_alive_requested
== KEEP_ALIVE_PENDING
) {
770 info
->keep_alive_requested
= KEEP_ALIVE_SENT
;
776 /* Post the send request */
777 static int smbd_post_send(struct smbd_connection
*info
,
778 struct smbd_request
*request
)
780 struct ib_send_wr send_wr
;
783 for (i
= 0; i
< request
->num_sge
; i
++) {
785 "rdma_request sge[%d] addr=%llu length=%u\n",
786 i
, request
->sge
[i
].addr
, request
->sge
[i
].length
);
787 ib_dma_sync_single_for_device(
789 request
->sge
[i
].addr
,
790 request
->sge
[i
].length
,
794 request
->cqe
.done
= send_done
;
797 send_wr
.wr_cqe
= &request
->cqe
;
798 send_wr
.sg_list
= request
->sge
;
799 send_wr
.num_sge
= request
->num_sge
;
800 send_wr
.opcode
= IB_WR_SEND
;
801 send_wr
.send_flags
= IB_SEND_SIGNALED
;
803 rc
= ib_post_send(info
->id
->qp
, &send_wr
, NULL
);
805 log_rdma_send(ERR
, "ib_post_send failed rc=%d\n", rc
);
806 smbd_disconnect_rdma_connection(info
);
809 /* Reset timer for idle connection after packet is sent */
810 mod_delayed_work(info
->workqueue
, &info
->idle_timer_work
,
811 info
->keep_alive_interval
*HZ
);
816 static int smbd_post_send_sgl(struct smbd_connection
*info
,
817 struct scatterlist
*sgl
, int data_length
, int remaining_data_length
)
822 struct smbd_request
*request
;
823 struct smbd_data_transfer
*packet
;
825 struct scatterlist
*sg
;
828 /* Wait for send credits. A SMBD packet needs one credit */
829 rc
= wait_event_interruptible(info
->wait_send_queue
,
830 atomic_read(&info
->send_credits
) > 0 ||
831 info
->transport_status
!= SMBD_CONNECTED
);
833 goto err_wait_credit
;
835 if (info
->transport_status
!= SMBD_CONNECTED
) {
836 log_outgoing(ERR
, "disconnected not sending on wait_credit\n");
838 goto err_wait_credit
;
840 if (unlikely(atomic_dec_return(&info
->send_credits
) < 0)) {
841 atomic_inc(&info
->send_credits
);
846 wait_event(info
->wait_post_send
,
847 atomic_read(&info
->send_pending
) < info
->send_credit_target
||
848 info
->transport_status
!= SMBD_CONNECTED
);
850 if (info
->transport_status
!= SMBD_CONNECTED
) {
851 log_outgoing(ERR
, "disconnected not sending on wait_send_queue\n");
853 goto err_wait_send_queue
;
856 if (unlikely(atomic_inc_return(&info
->send_pending
) >
857 info
->send_credit_target
)) {
858 atomic_dec(&info
->send_pending
);
859 goto wait_send_queue
;
862 request
= mempool_alloc(info
->request_mempool
, GFP_KERNEL
);
868 request
->info
= info
;
870 /* Fill in the packet header */
871 packet
= smbd_request_payload(request
);
872 packet
->credits_requested
= cpu_to_le16(info
->send_credit_target
);
874 new_credits
= manage_credits_prior_sending(info
);
875 atomic_add(new_credits
, &info
->receive_credits
);
876 packet
->credits_granted
= cpu_to_le16(new_credits
);
878 info
->send_immediate
= false;
881 if (manage_keep_alive_before_sending(info
))
882 packet
->flags
|= cpu_to_le16(SMB_DIRECT_RESPONSE_REQUESTED
);
884 packet
->reserved
= 0;
886 packet
->data_offset
= 0;
888 packet
->data_offset
= cpu_to_le32(24);
889 packet
->data_length
= cpu_to_le32(data_length
);
890 packet
->remaining_data_length
= cpu_to_le32(remaining_data_length
);
893 log_outgoing(INFO
, "credits_requested=%d credits_granted=%d data_offset=%d data_length=%d remaining_data_length=%d\n",
894 le16_to_cpu(packet
->credits_requested
),
895 le16_to_cpu(packet
->credits_granted
),
896 le32_to_cpu(packet
->data_offset
),
897 le32_to_cpu(packet
->data_length
),
898 le32_to_cpu(packet
->remaining_data_length
));
900 /* Map the packet to DMA */
901 header_length
= sizeof(struct smbd_data_transfer
);
902 /* If this is a packet without payload, don't send padding */
904 header_length
= offsetof(struct smbd_data_transfer
, padding
);
906 request
->num_sge
= 1;
907 request
->sge
[0].addr
= ib_dma_map_single(info
->id
->device
,
911 if (ib_dma_mapping_error(info
->id
->device
, request
->sge
[0].addr
)) {
913 request
->sge
[0].addr
= 0;
917 request
->sge
[0].length
= header_length
;
918 request
->sge
[0].lkey
= info
->pd
->local_dma_lkey
;
920 /* Fill in the packet data payload */
921 num_sgs
= sgl
? sg_nents(sgl
) : 0;
922 for_each_sg(sgl
, sg
, num_sgs
, i
) {
923 request
->sge
[i
+1].addr
=
924 ib_dma_map_page(info
->id
->device
, sg_page(sg
),
925 sg
->offset
, sg
->length
, DMA_TO_DEVICE
);
926 if (ib_dma_mapping_error(
927 info
->id
->device
, request
->sge
[i
+1].addr
)) {
929 request
->sge
[i
+1].addr
= 0;
932 request
->sge
[i
+1].length
= sg
->length
;
933 request
->sge
[i
+1].lkey
= info
->pd
->local_dma_lkey
;
937 rc
= smbd_post_send(info
, request
);
942 for (i
= 0; i
< request
->num_sge
; i
++)
943 if (request
->sge
[i
].addr
)
944 ib_dma_unmap_single(info
->id
->device
,
945 request
->sge
[i
].addr
,
946 request
->sge
[i
].length
,
948 mempool_free(request
, info
->request_mempool
);
950 /* roll back receive credits and credits to be offered */
951 spin_lock(&info
->lock_new_credits_offered
);
952 info
->new_credits_offered
+= new_credits
;
953 spin_unlock(&info
->lock_new_credits_offered
);
954 atomic_sub(new_credits
, &info
->receive_credits
);
957 if (atomic_dec_and_test(&info
->send_pending
))
958 wake_up(&info
->wait_send_pending
);
961 /* roll back send credits and pending */
962 atomic_inc(&info
->send_credits
);
970 * page: the page to send
971 * offset: offset in the page to send
972 * size: length in the page to send
973 * remaining_data_length: remaining data to send in this payload
975 static int smbd_post_send_page(struct smbd_connection
*info
, struct page
*page
,
976 unsigned long offset
, size_t size
, int remaining_data_length
)
978 struct scatterlist sgl
;
980 sg_init_table(&sgl
, 1);
981 sg_set_page(&sgl
, page
, size
, offset
);
983 return smbd_post_send_sgl(info
, &sgl
, size
, remaining_data_length
);
987 * Send an empty message
988 * Empty message is used to extend credits to peer to for keep live
989 * while there is no upper layer payload to send at the time
991 static int smbd_post_send_empty(struct smbd_connection
*info
)
993 info
->count_send_empty
++;
994 return smbd_post_send_sgl(info
, NULL
, 0, 0);
999 * iov: the iov array describing the data buffers
1000 * n_vec: number of iov array
1001 * remaining_data_length: remaining data to send following this packet
1002 * in segmented SMBD packet
1004 static int smbd_post_send_data(
1005 struct smbd_connection
*info
, struct kvec
*iov
, int n_vec
,
1006 int remaining_data_length
)
1009 u32 data_length
= 0;
1010 struct scatterlist sgl
[SMBDIRECT_MAX_SGE
];
1012 if (n_vec
> SMBDIRECT_MAX_SGE
) {
1013 cifs_dbg(VFS
, "Can't fit data to SGL, n_vec=%d\n", n_vec
);
1017 sg_init_table(sgl
, n_vec
);
1018 for (i
= 0; i
< n_vec
; i
++) {
1019 data_length
+= iov
[i
].iov_len
;
1020 sg_set_buf(&sgl
[i
], iov
[i
].iov_base
, iov
[i
].iov_len
);
1023 return smbd_post_send_sgl(info
, sgl
, data_length
, remaining_data_length
);
1027 * Post a receive request to the transport
1028 * The remote peer can only send data when a receive request is posted
1029 * The interaction is controlled by send/receive credit system
1031 static int smbd_post_recv(
1032 struct smbd_connection
*info
, struct smbd_response
*response
)
1034 struct ib_recv_wr recv_wr
;
1037 response
->sge
.addr
= ib_dma_map_single(
1038 info
->id
->device
, response
->packet
,
1039 info
->max_receive_size
, DMA_FROM_DEVICE
);
1040 if (ib_dma_mapping_error(info
->id
->device
, response
->sge
.addr
))
1043 response
->sge
.length
= info
->max_receive_size
;
1044 response
->sge
.lkey
= info
->pd
->local_dma_lkey
;
1046 response
->cqe
.done
= recv_done
;
1048 recv_wr
.wr_cqe
= &response
->cqe
;
1049 recv_wr
.next
= NULL
;
1050 recv_wr
.sg_list
= &response
->sge
;
1051 recv_wr
.num_sge
= 1;
1053 rc
= ib_post_recv(info
->id
->qp
, &recv_wr
, NULL
);
1055 ib_dma_unmap_single(info
->id
->device
, response
->sge
.addr
,
1056 response
->sge
.length
, DMA_FROM_DEVICE
);
1057 smbd_disconnect_rdma_connection(info
);
1058 log_rdma_recv(ERR
, "ib_post_recv failed rc=%d\n", rc
);
1064 /* Perform SMBD negotiate according to [MS-SMBD] 3.1.5.2 */
1065 static int smbd_negotiate(struct smbd_connection
*info
)
1068 struct smbd_response
*response
= get_receive_buffer(info
);
1070 response
->type
= SMBD_NEGOTIATE_RESP
;
1071 rc
= smbd_post_recv(info
, response
);
1072 log_rdma_event(INFO
, "smbd_post_recv rc=%d iov.addr=%llx iov.length=%x iov.lkey=%x\n",
1073 rc
, response
->sge
.addr
,
1074 response
->sge
.length
, response
->sge
.lkey
);
1078 init_completion(&info
->negotiate_completion
);
1079 info
->negotiate_done
= false;
1080 rc
= smbd_post_send_negotiate_req(info
);
1084 rc
= wait_for_completion_interruptible_timeout(
1085 &info
->negotiate_completion
, SMBD_NEGOTIATE_TIMEOUT
* HZ
);
1086 log_rdma_event(INFO
, "wait_for_completion_timeout rc=%d\n", rc
);
1088 if (info
->negotiate_done
)
1093 else if (rc
== -ERESTARTSYS
)
1101 static void put_empty_packet(
1102 struct smbd_connection
*info
, struct smbd_response
*response
)
1104 spin_lock(&info
->empty_packet_queue_lock
);
1105 list_add_tail(&response
->list
, &info
->empty_packet_queue
);
1106 info
->count_empty_packet_queue
++;
1107 spin_unlock(&info
->empty_packet_queue_lock
);
1109 queue_work(info
->workqueue
, &info
->post_send_credits_work
);
1113 * Implement Connection.FragmentReassemblyBuffer defined in [MS-SMBD] 3.1.1.1
1114 * This is a queue for reassembling upper layer payload and present to upper
1115 * layer. All the inncoming payload go to the reassembly queue, regardless of
1116 * if reassembly is required. The uuper layer code reads from the queue for all
1117 * incoming payloads.
1118 * Put a received packet to the reassembly queue
1119 * response: the packet received
1120 * data_length: the size of payload in this packet
1122 static void enqueue_reassembly(
1123 struct smbd_connection
*info
,
1124 struct smbd_response
*response
,
1127 spin_lock(&info
->reassembly_queue_lock
);
1128 list_add_tail(&response
->list
, &info
->reassembly_queue
);
1129 info
->reassembly_queue_length
++;
1131 * Make sure reassembly_data_length is updated after list and
1132 * reassembly_queue_length are updated. On the dequeue side
1133 * reassembly_data_length is checked without a lock to determine
1134 * if reassembly_queue_length and list is up to date
1137 info
->reassembly_data_length
+= data_length
;
1138 spin_unlock(&info
->reassembly_queue_lock
);
1139 info
->count_reassembly_queue
++;
1140 info
->count_enqueue_reassembly_queue
++;
1144 * Get the first entry at the front of reassembly queue
1145 * Caller is responsible for locking
1146 * return value: the first entry if any, NULL if queue is empty
1148 static struct smbd_response
*_get_first_reassembly(struct smbd_connection
*info
)
1150 struct smbd_response
*ret
= NULL
;
1152 if (!list_empty(&info
->reassembly_queue
)) {
1153 ret
= list_first_entry(
1154 &info
->reassembly_queue
,
1155 struct smbd_response
, list
);
1160 static struct smbd_response
*get_empty_queue_buffer(
1161 struct smbd_connection
*info
)
1163 struct smbd_response
*ret
= NULL
;
1164 unsigned long flags
;
1166 spin_lock_irqsave(&info
->empty_packet_queue_lock
, flags
);
1167 if (!list_empty(&info
->empty_packet_queue
)) {
1168 ret
= list_first_entry(
1169 &info
->empty_packet_queue
,
1170 struct smbd_response
, list
);
1171 list_del(&ret
->list
);
1172 info
->count_empty_packet_queue
--;
1174 spin_unlock_irqrestore(&info
->empty_packet_queue_lock
, flags
);
1180 * Get a receive buffer
1181 * For each remote send, we need to post a receive. The receive buffers are
1182 * pre-allocated in advance.
1183 * return value: the receive buffer, NULL if none is available
1185 static struct smbd_response
*get_receive_buffer(struct smbd_connection
*info
)
1187 struct smbd_response
*ret
= NULL
;
1188 unsigned long flags
;
1190 spin_lock_irqsave(&info
->receive_queue_lock
, flags
);
1191 if (!list_empty(&info
->receive_queue
)) {
1192 ret
= list_first_entry(
1193 &info
->receive_queue
,
1194 struct smbd_response
, list
);
1195 list_del(&ret
->list
);
1196 info
->count_receive_queue
--;
1197 info
->count_get_receive_buffer
++;
1199 spin_unlock_irqrestore(&info
->receive_queue_lock
, flags
);
1205 * Return a receive buffer
1206 * Upon returning of a receive buffer, we can post new receive and extend
1207 * more receive credits to remote peer. This is done immediately after a
1208 * receive buffer is returned.
1210 static void put_receive_buffer(
1211 struct smbd_connection
*info
, struct smbd_response
*response
)
1213 unsigned long flags
;
1215 ib_dma_unmap_single(info
->id
->device
, response
->sge
.addr
,
1216 response
->sge
.length
, DMA_FROM_DEVICE
);
1218 spin_lock_irqsave(&info
->receive_queue_lock
, flags
);
1219 list_add_tail(&response
->list
, &info
->receive_queue
);
1220 info
->count_receive_queue
++;
1221 info
->count_put_receive_buffer
++;
1222 spin_unlock_irqrestore(&info
->receive_queue_lock
, flags
);
1224 queue_work(info
->workqueue
, &info
->post_send_credits_work
);
1227 /* Preallocate all receive buffer on transport establishment */
1228 static int allocate_receive_buffers(struct smbd_connection
*info
, int num_buf
)
1231 struct smbd_response
*response
;
1233 INIT_LIST_HEAD(&info
->reassembly_queue
);
1234 spin_lock_init(&info
->reassembly_queue_lock
);
1235 info
->reassembly_data_length
= 0;
1236 info
->reassembly_queue_length
= 0;
1238 INIT_LIST_HEAD(&info
->receive_queue
);
1239 spin_lock_init(&info
->receive_queue_lock
);
1240 info
->count_receive_queue
= 0;
1242 INIT_LIST_HEAD(&info
->empty_packet_queue
);
1243 spin_lock_init(&info
->empty_packet_queue_lock
);
1244 info
->count_empty_packet_queue
= 0;
1246 init_waitqueue_head(&info
->wait_receive_queues
);
1248 for (i
= 0; i
< num_buf
; i
++) {
1249 response
= mempool_alloc(info
->response_mempool
, GFP_KERNEL
);
1251 goto allocate_failed
;
1253 response
->info
= info
;
1254 list_add_tail(&response
->list
, &info
->receive_queue
);
1255 info
->count_receive_queue
++;
1261 while (!list_empty(&info
->receive_queue
)) {
1262 response
= list_first_entry(
1263 &info
->receive_queue
,
1264 struct smbd_response
, list
);
1265 list_del(&response
->list
);
1266 info
->count_receive_queue
--;
1268 mempool_free(response
, info
->response_mempool
);
1273 static void destroy_receive_buffers(struct smbd_connection
*info
)
1275 struct smbd_response
*response
;
1277 while ((response
= get_receive_buffer(info
)))
1278 mempool_free(response
, info
->response_mempool
);
1280 while ((response
= get_empty_queue_buffer(info
)))
1281 mempool_free(response
, info
->response_mempool
);
1284 /* Implement idle connection timer [MS-SMBD] 3.1.6.2 */
1285 static void idle_connection_timer(struct work_struct
*work
)
1287 struct smbd_connection
*info
= container_of(
1288 work
, struct smbd_connection
,
1289 idle_timer_work
.work
);
1291 if (info
->keep_alive_requested
!= KEEP_ALIVE_NONE
) {
1293 "error status info->keep_alive_requested=%d\n",
1294 info
->keep_alive_requested
);
1295 smbd_disconnect_rdma_connection(info
);
1299 log_keep_alive(INFO
, "about to send an empty idle message\n");
1300 smbd_post_send_empty(info
);
1302 /* Setup the next idle timeout work */
1303 queue_delayed_work(info
->workqueue
, &info
->idle_timer_work
,
1304 info
->keep_alive_interval
*HZ
);
1308 * Destroy the transport and related RDMA and memory resources
1309 * Need to go through all the pending counters and make sure on one is using
1310 * the transport while it is destroyed
1312 void smbd_destroy(struct TCP_Server_Info
*server
)
1314 struct smbd_connection
*info
= server
->smbd_conn
;
1315 struct smbd_response
*response
;
1316 unsigned long flags
;
1319 log_rdma_event(INFO
, "rdma session already destroyed\n");
1323 log_rdma_event(INFO
, "destroying rdma session\n");
1324 if (info
->transport_status
!= SMBD_DISCONNECTED
) {
1325 rdma_disconnect(server
->smbd_conn
->id
);
1326 log_rdma_event(INFO
, "wait for transport being disconnected\n");
1327 wait_event_interruptible(
1329 info
->transport_status
== SMBD_DISCONNECTED
);
1332 log_rdma_event(INFO
, "destroying qp\n");
1333 ib_drain_qp(info
->id
->qp
);
1334 rdma_destroy_qp(info
->id
);
1336 log_rdma_event(INFO
, "cancelling idle timer\n");
1337 cancel_delayed_work_sync(&info
->idle_timer_work
);
1339 log_rdma_event(INFO
, "wait for all send posted to IB to finish\n");
1340 wait_event(info
->wait_send_pending
,
1341 atomic_read(&info
->send_pending
) == 0);
1343 /* It's not posssible for upper layer to get to reassembly */
1344 log_rdma_event(INFO
, "drain the reassembly queue\n");
1346 spin_lock_irqsave(&info
->reassembly_queue_lock
, flags
);
1347 response
= _get_first_reassembly(info
);
1349 list_del(&response
->list
);
1350 spin_unlock_irqrestore(
1351 &info
->reassembly_queue_lock
, flags
);
1352 put_receive_buffer(info
, response
);
1354 spin_unlock_irqrestore(
1355 &info
->reassembly_queue_lock
, flags
);
1357 info
->reassembly_data_length
= 0;
1359 log_rdma_event(INFO
, "free receive buffers\n");
1360 wait_event(info
->wait_receive_queues
,
1361 info
->count_receive_queue
+ info
->count_empty_packet_queue
1362 == info
->receive_credit_max
);
1363 destroy_receive_buffers(info
);
1366 * For performance reasons, memory registration and deregistration
1367 * are not locked by srv_mutex. It is possible some processes are
1368 * blocked on transport srv_mutex while holding memory registration.
1369 * Release the transport srv_mutex to allow them to hit the failure
1370 * path when sending data, and then release memory registartions.
1372 log_rdma_event(INFO
, "freeing mr list\n");
1373 wake_up_interruptible_all(&info
->wait_mr
);
1374 while (atomic_read(&info
->mr_used_count
)) {
1375 mutex_unlock(&server
->srv_mutex
);
1377 mutex_lock(&server
->srv_mutex
);
1379 destroy_mr_list(info
);
1381 ib_free_cq(info
->send_cq
);
1382 ib_free_cq(info
->recv_cq
);
1383 ib_dealloc_pd(info
->pd
);
1384 rdma_destroy_id(info
->id
);
1387 mempool_destroy(info
->request_mempool
);
1388 kmem_cache_destroy(info
->request_cache
);
1390 mempool_destroy(info
->response_mempool
);
1391 kmem_cache_destroy(info
->response_cache
);
1393 info
->transport_status
= SMBD_DESTROYED
;
1395 destroy_workqueue(info
->workqueue
);
1396 log_rdma_event(INFO
, "rdma session destroyed\n");
1401 * Reconnect this SMBD connection, called from upper layer
1402 * return value: 0 on success, or actual error code
1404 int smbd_reconnect(struct TCP_Server_Info
*server
)
1406 log_rdma_event(INFO
, "reconnecting rdma session\n");
1408 if (!server
->smbd_conn
) {
1409 log_rdma_event(INFO
, "rdma session already destroyed\n");
1414 * This is possible if transport is disconnected and we haven't received
1415 * notification from RDMA, but upper layer has detected timeout
1417 if (server
->smbd_conn
->transport_status
== SMBD_CONNECTED
) {
1418 log_rdma_event(INFO
, "disconnecting transport\n");
1419 smbd_destroy(server
);
1423 log_rdma_event(INFO
, "creating rdma session\n");
1424 server
->smbd_conn
= smbd_get_connection(
1425 server
, (struct sockaddr
*) &server
->dstaddr
);
1427 if (server
->smbd_conn
)
1428 cifs_dbg(VFS
, "RDMA transport re-established\n");
1430 return server
->smbd_conn
? 0 : -ENOENT
;
1433 static void destroy_caches_and_workqueue(struct smbd_connection
*info
)
1435 destroy_receive_buffers(info
);
1436 destroy_workqueue(info
->workqueue
);
1437 mempool_destroy(info
->response_mempool
);
1438 kmem_cache_destroy(info
->response_cache
);
1439 mempool_destroy(info
->request_mempool
);
1440 kmem_cache_destroy(info
->request_cache
);
1443 #define MAX_NAME_LEN 80
1444 static int allocate_caches_and_workqueue(struct smbd_connection
*info
)
1446 char name
[MAX_NAME_LEN
];
1449 scnprintf(name
, MAX_NAME_LEN
, "smbd_request_%p", info
);
1450 info
->request_cache
=
1453 sizeof(struct smbd_request
) +
1454 sizeof(struct smbd_data_transfer
),
1455 0, SLAB_HWCACHE_ALIGN
, NULL
);
1456 if (!info
->request_cache
)
1459 info
->request_mempool
=
1460 mempool_create(info
->send_credit_target
, mempool_alloc_slab
,
1461 mempool_free_slab
, info
->request_cache
);
1462 if (!info
->request_mempool
)
1465 scnprintf(name
, MAX_NAME_LEN
, "smbd_response_%p", info
);
1466 info
->response_cache
=
1469 sizeof(struct smbd_response
) +
1470 info
->max_receive_size
,
1471 0, SLAB_HWCACHE_ALIGN
, NULL
);
1472 if (!info
->response_cache
)
1475 info
->response_mempool
=
1476 mempool_create(info
->receive_credit_max
, mempool_alloc_slab
,
1477 mempool_free_slab
, info
->response_cache
);
1478 if (!info
->response_mempool
)
1481 scnprintf(name
, MAX_NAME_LEN
, "smbd_%p", info
);
1482 info
->workqueue
= create_workqueue(name
);
1483 if (!info
->workqueue
)
1486 rc
= allocate_receive_buffers(info
, info
->receive_credit_max
);
1488 log_rdma_event(ERR
, "failed to allocate receive buffers\n");
1495 destroy_workqueue(info
->workqueue
);
1497 mempool_destroy(info
->response_mempool
);
1499 kmem_cache_destroy(info
->response_cache
);
1501 mempool_destroy(info
->request_mempool
);
1503 kmem_cache_destroy(info
->request_cache
);
1507 /* Create a SMBD connection, called by upper layer */
1508 static struct smbd_connection
*_smbd_get_connection(
1509 struct TCP_Server_Info
*server
, struct sockaddr
*dstaddr
, int port
)
1512 struct smbd_connection
*info
;
1513 struct rdma_conn_param conn_param
;
1514 struct ib_qp_init_attr qp_attr
;
1515 struct sockaddr_in
*addr_in
= (struct sockaddr_in
*) dstaddr
;
1516 struct ib_port_immutable port_immutable
;
1519 info
= kzalloc(sizeof(struct smbd_connection
), GFP_KERNEL
);
1523 info
->transport_status
= SMBD_CONNECTING
;
1524 rc
= smbd_ia_open(info
, dstaddr
, port
);
1526 log_rdma_event(INFO
, "smbd_ia_open rc=%d\n", rc
);
1527 goto create_id_failed
;
1530 if (smbd_send_credit_target
> info
->id
->device
->attrs
.max_cqe
||
1531 smbd_send_credit_target
> info
->id
->device
->attrs
.max_qp_wr
) {
1532 log_rdma_event(ERR
, "consider lowering send_credit_target = %d. Possible CQE overrun, device reporting max_cpe %d max_qp_wr %d\n",
1533 smbd_send_credit_target
,
1534 info
->id
->device
->attrs
.max_cqe
,
1535 info
->id
->device
->attrs
.max_qp_wr
);
1539 if (smbd_receive_credit_max
> info
->id
->device
->attrs
.max_cqe
||
1540 smbd_receive_credit_max
> info
->id
->device
->attrs
.max_qp_wr
) {
1541 log_rdma_event(ERR
, "consider lowering receive_credit_max = %d. Possible CQE overrun, device reporting max_cpe %d max_qp_wr %d\n",
1542 smbd_receive_credit_max
,
1543 info
->id
->device
->attrs
.max_cqe
,
1544 info
->id
->device
->attrs
.max_qp_wr
);
1548 info
->receive_credit_max
= smbd_receive_credit_max
;
1549 info
->send_credit_target
= smbd_send_credit_target
;
1550 info
->max_send_size
= smbd_max_send_size
;
1551 info
->max_fragmented_recv_size
= smbd_max_fragmented_recv_size
;
1552 info
->max_receive_size
= smbd_max_receive_size
;
1553 info
->keep_alive_interval
= smbd_keep_alive_interval
;
1555 if (info
->id
->device
->attrs
.max_send_sge
< SMBDIRECT_MAX_SGE
) {
1557 "warning: device max_send_sge = %d too small\n",
1558 info
->id
->device
->attrs
.max_send_sge
);
1559 log_rdma_event(ERR
, "Queue Pair creation may fail\n");
1561 if (info
->id
->device
->attrs
.max_recv_sge
< SMBDIRECT_MAX_SGE
) {
1563 "warning: device max_recv_sge = %d too small\n",
1564 info
->id
->device
->attrs
.max_recv_sge
);
1565 log_rdma_event(ERR
, "Queue Pair creation may fail\n");
1568 info
->send_cq
= NULL
;
1569 info
->recv_cq
= NULL
;
1571 ib_alloc_cq_any(info
->id
->device
, info
,
1572 info
->send_credit_target
, IB_POLL_SOFTIRQ
);
1573 if (IS_ERR(info
->send_cq
)) {
1574 info
->send_cq
= NULL
;
1575 goto alloc_cq_failed
;
1579 ib_alloc_cq_any(info
->id
->device
, info
,
1580 info
->receive_credit_max
, IB_POLL_SOFTIRQ
);
1581 if (IS_ERR(info
->recv_cq
)) {
1582 info
->recv_cq
= NULL
;
1583 goto alloc_cq_failed
;
1586 memset(&qp_attr
, 0, sizeof(qp_attr
));
1587 qp_attr
.event_handler
= smbd_qp_async_error_upcall
;
1588 qp_attr
.qp_context
= info
;
1589 qp_attr
.cap
.max_send_wr
= info
->send_credit_target
;
1590 qp_attr
.cap
.max_recv_wr
= info
->receive_credit_max
;
1591 qp_attr
.cap
.max_send_sge
= SMBDIRECT_MAX_SGE
;
1592 qp_attr
.cap
.max_recv_sge
= SMBDIRECT_MAX_SGE
;
1593 qp_attr
.cap
.max_inline_data
= 0;
1594 qp_attr
.sq_sig_type
= IB_SIGNAL_REQ_WR
;
1595 qp_attr
.qp_type
= IB_QPT_RC
;
1596 qp_attr
.send_cq
= info
->send_cq
;
1597 qp_attr
.recv_cq
= info
->recv_cq
;
1598 qp_attr
.port_num
= ~0;
1600 rc
= rdma_create_qp(info
->id
, info
->pd
, &qp_attr
);
1602 log_rdma_event(ERR
, "rdma_create_qp failed %i\n", rc
);
1603 goto create_qp_failed
;
1606 memset(&conn_param
, 0, sizeof(conn_param
));
1607 conn_param
.initiator_depth
= 0;
1609 conn_param
.responder_resources
=
1610 info
->id
->device
->attrs
.max_qp_rd_atom
1611 < SMBD_CM_RESPONDER_RESOURCES
?
1612 info
->id
->device
->attrs
.max_qp_rd_atom
:
1613 SMBD_CM_RESPONDER_RESOURCES
;
1614 info
->responder_resources
= conn_param
.responder_resources
;
1615 log_rdma_mr(INFO
, "responder_resources=%d\n",
1616 info
->responder_resources
);
1618 /* Need to send IRD/ORD in private data for iWARP */
1619 info
->id
->device
->ops
.get_port_immutable(
1620 info
->id
->device
, info
->id
->port_num
, &port_immutable
);
1621 if (port_immutable
.core_cap_flags
& RDMA_CORE_PORT_IWARP
) {
1622 ird_ord_hdr
[0] = info
->responder_resources
;
1624 conn_param
.private_data
= ird_ord_hdr
;
1625 conn_param
.private_data_len
= sizeof(ird_ord_hdr
);
1627 conn_param
.private_data
= NULL
;
1628 conn_param
.private_data_len
= 0;
1631 conn_param
.retry_count
= SMBD_CM_RETRY
;
1632 conn_param
.rnr_retry_count
= SMBD_CM_RNR_RETRY
;
1633 conn_param
.flow_control
= 0;
1635 log_rdma_event(INFO
, "connecting to IP %pI4 port %d\n",
1636 &addr_in
->sin_addr
, port
);
1638 init_waitqueue_head(&info
->conn_wait
);
1639 init_waitqueue_head(&info
->disconn_wait
);
1640 init_waitqueue_head(&info
->wait_reassembly_queue
);
1641 rc
= rdma_connect(info
->id
, &conn_param
);
1643 log_rdma_event(ERR
, "rdma_connect() failed with %i\n", rc
);
1644 goto rdma_connect_failed
;
1647 wait_event_interruptible(
1648 info
->conn_wait
, info
->transport_status
!= SMBD_CONNECTING
);
1650 if (info
->transport_status
!= SMBD_CONNECTED
) {
1651 log_rdma_event(ERR
, "rdma_connect failed port=%d\n", port
);
1652 goto rdma_connect_failed
;
1655 log_rdma_event(INFO
, "rdma_connect connected\n");
1657 rc
= allocate_caches_and_workqueue(info
);
1659 log_rdma_event(ERR
, "cache allocation failed\n");
1660 goto allocate_cache_failed
;
1663 init_waitqueue_head(&info
->wait_send_queue
);
1664 INIT_DELAYED_WORK(&info
->idle_timer_work
, idle_connection_timer
);
1665 queue_delayed_work(info
->workqueue
, &info
->idle_timer_work
,
1666 info
->keep_alive_interval
*HZ
);
1668 init_waitqueue_head(&info
->wait_send_pending
);
1669 atomic_set(&info
->send_pending
, 0);
1671 init_waitqueue_head(&info
->wait_post_send
);
1673 INIT_WORK(&info
->disconnect_work
, smbd_disconnect_rdma_work
);
1674 INIT_WORK(&info
->post_send_credits_work
, smbd_post_send_credits
);
1675 info
->new_credits_offered
= 0;
1676 spin_lock_init(&info
->lock_new_credits_offered
);
1678 rc
= smbd_negotiate(info
);
1680 log_rdma_event(ERR
, "smbd_negotiate rc=%d\n", rc
);
1681 goto negotiation_failed
;
1684 rc
= allocate_mr_list(info
);
1686 log_rdma_mr(ERR
, "memory registration allocation failed\n");
1687 goto allocate_mr_failed
;
1693 /* At this point, need to a full transport shutdown */
1694 smbd_destroy(server
);
1698 cancel_delayed_work_sync(&info
->idle_timer_work
);
1699 destroy_caches_and_workqueue(info
);
1700 info
->transport_status
= SMBD_NEGOTIATE_FAILED
;
1701 init_waitqueue_head(&info
->conn_wait
);
1702 rdma_disconnect(info
->id
);
1703 wait_event(info
->conn_wait
,
1704 info
->transport_status
== SMBD_DISCONNECTED
);
1706 allocate_cache_failed
:
1707 rdma_connect_failed
:
1708 rdma_destroy_qp(info
->id
);
1713 ib_free_cq(info
->send_cq
);
1715 ib_free_cq(info
->recv_cq
);
1718 ib_dealloc_pd(info
->pd
);
1719 rdma_destroy_id(info
->id
);
1726 struct smbd_connection
*smbd_get_connection(
1727 struct TCP_Server_Info
*server
, struct sockaddr
*dstaddr
)
1729 struct smbd_connection
*ret
;
1730 int port
= SMBD_PORT
;
1733 ret
= _smbd_get_connection(server
, dstaddr
, port
);
1735 /* Try SMB_PORT if SMBD_PORT doesn't work */
1736 if (!ret
&& port
== SMBD_PORT
) {
1744 * Receive data from receive reassembly queue
1745 * All the incoming data packets are placed in reassembly queue
1746 * buf: the buffer to read data into
1747 * size: the length of data to read
1748 * return value: actual data read
1749 * Note: this implementation copies the data from reassebmly queue to receive
1750 * buffers used by upper layer. This is not the optimal code path. A better way
1751 * to do it is to not have upper layer allocate its receive buffers but rather
1752 * borrow the buffer from reassembly queue, and return it after data is
1753 * consumed. But this will require more changes to upper layer code, and also
1754 * need to consider packet boundaries while they still being reassembled.
1756 static int smbd_recv_buf(struct smbd_connection
*info
, char *buf
,
1759 struct smbd_response
*response
;
1760 struct smbd_data_transfer
*data_transfer
;
1761 int to_copy
, to_read
, data_read
, offset
;
1762 u32 data_length
, remaining_data_length
, data_offset
;
1767 * No need to hold the reassembly queue lock all the time as we are
1768 * the only one reading from the front of the queue. The transport
1769 * may add more entries to the back of the queue at the same time
1771 log_read(INFO
, "size=%d info->reassembly_data_length=%d\n", size
,
1772 info
->reassembly_data_length
);
1773 if (info
->reassembly_data_length
>= size
) {
1775 int queue_removed
= 0;
1778 * Need to make sure reassembly_data_length is read before
1779 * reading reassembly_queue_length and calling
1780 * _get_first_reassembly. This call is lock free
1781 * as we never read at the end of the queue which are being
1782 * updated in SOFTIRQ as more data is received
1785 queue_length
= info
->reassembly_queue_length
;
1788 offset
= info
->first_entry_offset
;
1789 while (data_read
< size
) {
1790 response
= _get_first_reassembly(info
);
1791 data_transfer
= smbd_response_payload(response
);
1792 data_length
= le32_to_cpu(data_transfer
->data_length
);
1793 remaining_data_length
=
1795 data_transfer
->remaining_data_length
);
1796 data_offset
= le32_to_cpu(data_transfer
->data_offset
);
1799 * The upper layer expects RFC1002 length at the
1800 * beginning of the payload. Return it to indicate
1801 * the total length of the packet. This minimize the
1802 * change to upper layer packet processing logic. This
1803 * will be eventually remove when an intermediate
1804 * transport layer is added
1806 if (response
->first_segment
&& size
== 4) {
1807 unsigned int rfc1002_len
=
1808 data_length
+ remaining_data_length
;
1809 *((__be32
*)buf
) = cpu_to_be32(rfc1002_len
);
1811 response
->first_segment
= false;
1812 log_read(INFO
, "returning rfc1002 length %d\n",
1814 goto read_rfc1002_done
;
1817 to_copy
= min_t(int, data_length
- offset
, to_read
);
1820 (char *)data_transfer
+ data_offset
+ offset
,
1823 /* move on to the next buffer? */
1824 if (to_copy
== data_length
- offset
) {
1827 * No need to lock if we are not at the
1831 list_del(&response
->list
);
1834 &info
->reassembly_queue_lock
);
1835 list_del(&response
->list
);
1837 &info
->reassembly_queue_lock
);
1840 info
->count_reassembly_queue
--;
1841 info
->count_dequeue_reassembly_queue
++;
1842 put_receive_buffer(info
, response
);
1844 log_read(INFO
, "put_receive_buffer offset=0\n");
1849 data_read
+= to_copy
;
1851 log_read(INFO
, "_get_first_reassembly memcpy %d bytes data_transfer_length-offset=%d after that to_read=%d data_read=%d offset=%d\n",
1852 to_copy
, data_length
- offset
,
1853 to_read
, data_read
, offset
);
1856 spin_lock_irq(&info
->reassembly_queue_lock
);
1857 info
->reassembly_data_length
-= data_read
;
1858 info
->reassembly_queue_length
-= queue_removed
;
1859 spin_unlock_irq(&info
->reassembly_queue_lock
);
1861 info
->first_entry_offset
= offset
;
1862 log_read(INFO
, "returning to thread data_read=%d reassembly_data_length=%d first_entry_offset=%d\n",
1863 data_read
, info
->reassembly_data_length
,
1864 info
->first_entry_offset
);
1869 log_read(INFO
, "wait_event on more data\n");
1870 rc
= wait_event_interruptible(
1871 info
->wait_reassembly_queue
,
1872 info
->reassembly_data_length
>= size
||
1873 info
->transport_status
!= SMBD_CONNECTED
);
1874 /* Don't return any data if interrupted */
1878 if (info
->transport_status
!= SMBD_CONNECTED
) {
1879 log_read(ERR
, "disconnected\n");
1880 return -ECONNABORTED
;
1887 * Receive a page from receive reassembly queue
1888 * page: the page to read data into
1889 * to_read: the length of data to read
1890 * return value: actual data read
1892 static int smbd_recv_page(struct smbd_connection
*info
,
1893 struct page
*page
, unsigned int page_offset
,
1894 unsigned int to_read
)
1900 /* make sure we have the page ready for read */
1901 ret
= wait_event_interruptible(
1902 info
->wait_reassembly_queue
,
1903 info
->reassembly_data_length
>= to_read
||
1904 info
->transport_status
!= SMBD_CONNECTED
);
1908 /* now we can read from reassembly queue and not sleep */
1909 page_address
= kmap_atomic(page
);
1910 to_address
= (char *) page_address
+ page_offset
;
1912 log_read(INFO
, "reading from page=%p address=%p to_read=%d\n",
1913 page
, to_address
, to_read
);
1915 ret
= smbd_recv_buf(info
, to_address
, to_read
);
1916 kunmap_atomic(page_address
);
1922 * Receive data from transport
1923 * msg: a msghdr point to the buffer, can be ITER_KVEC or ITER_BVEC
1924 * return: total bytes read, or 0. SMB Direct will not do partial read.
1926 int smbd_recv(struct smbd_connection
*info
, struct msghdr
*msg
)
1930 unsigned int to_read
, page_offset
;
1933 if (iov_iter_rw(&msg
->msg_iter
) == WRITE
) {
1934 /* It's a bug in upper layer to get there */
1935 cifs_dbg(VFS
, "Invalid msg iter dir %u\n",
1936 iov_iter_rw(&msg
->msg_iter
));
1941 switch (iov_iter_type(&msg
->msg_iter
)) {
1943 buf
= msg
->msg_iter
.kvec
->iov_base
;
1944 to_read
= msg
->msg_iter
.kvec
->iov_len
;
1945 rc
= smbd_recv_buf(info
, buf
, to_read
);
1949 page
= msg
->msg_iter
.bvec
->bv_page
;
1950 page_offset
= msg
->msg_iter
.bvec
->bv_offset
;
1951 to_read
= msg
->msg_iter
.bvec
->bv_len
;
1952 rc
= smbd_recv_page(info
, page
, page_offset
, to_read
);
1956 /* It's a bug in upper layer to get there */
1957 cifs_dbg(VFS
, "Invalid msg type %d\n",
1958 iov_iter_type(&msg
->msg_iter
));
1963 /* SMBDirect will read it all or nothing */
1965 msg
->msg_iter
.count
= 0;
1970 * Send data to transport
1971 * Each rqst is transported as a SMBDirect payload
1972 * rqst: the data to write
1973 * return value: 0 if successfully write, otherwise error code
1975 int smbd_send(struct TCP_Server_Info
*server
,
1976 int num_rqst
, struct smb_rqst
*rqst_array
)
1978 struct smbd_connection
*info
= server
->smbd_conn
;
1982 unsigned int buflen
, remaining_data_length
;
1985 info
->max_send_size
- sizeof(struct smbd_data_transfer
);
1988 struct smb_rqst
*rqst
;
1991 if (info
->transport_status
!= SMBD_CONNECTED
) {
1997 * Add in the page array if there is one. The caller needs to set
1998 * rq_tailsz to PAGE_SIZE when the buffer has multiple pages and
1999 * ends at page boundary
2001 remaining_data_length
= 0;
2002 for (i
= 0; i
< num_rqst
; i
++)
2003 remaining_data_length
+= smb_rqst_len(server
, &rqst_array
[i
]);
2005 if (remaining_data_length
> info
->max_fragmented_send_size
) {
2006 log_write(ERR
, "payload size %d > max size %d\n",
2007 remaining_data_length
, info
->max_fragmented_send_size
);
2012 log_write(INFO
, "num_rqst=%d total length=%u\n",
2013 num_rqst
, remaining_data_length
);
2017 rqst
= &rqst_array
[rqst_idx
];
2020 cifs_dbg(FYI
, "Sending smb (RDMA): idx=%d smb_len=%lu\n",
2021 rqst_idx
, smb_rqst_len(server
, rqst
));
2022 for (i
= 0; i
< rqst
->rq_nvec
; i
++)
2023 dump_smb(iov
[i
].iov_base
, iov
[i
].iov_len
);
2026 log_write(INFO
, "rqst_idx=%d nvec=%d rqst->rq_npages=%d rq_pagesz=%d rq_tailsz=%d buflen=%lu\n",
2027 rqst_idx
, rqst
->rq_nvec
, rqst
->rq_npages
, rqst
->rq_pagesz
,
2028 rqst
->rq_tailsz
, smb_rqst_len(server
, rqst
));
2033 buflen
+= iov
[i
].iov_len
;
2034 if (buflen
> max_iov_size
) {
2036 remaining_data_length
-=
2037 (buflen
-iov
[i
].iov_len
);
2038 log_write(INFO
, "sending iov[] from start=%d i=%d nvecs=%d remaining_data_length=%d\n",
2039 start
, i
, i
- start
,
2040 remaining_data_length
);
2041 rc
= smbd_post_send_data(
2042 info
, &iov
[start
], i
-start
,
2043 remaining_data_length
);
2047 /* iov[start] is too big, break it */
2048 nvecs
= (buflen
+max_iov_size
-1)/max_iov_size
;
2049 log_write(INFO
, "iov[%d] iov_base=%p buflen=%d break to %d vectors\n",
2050 start
, iov
[start
].iov_base
,
2052 for (j
= 0; j
< nvecs
; j
++) {
2054 (char *)iov
[start
].iov_base
+
2056 vec
.iov_len
= max_iov_size
;
2060 max_iov_size
*(nvecs
-1);
2061 remaining_data_length
-= vec
.iov_len
;
2063 "sending vec j=%d iov_base=%p iov_len=%zu remaining_data_length=%d\n",
2064 j
, vec
.iov_base
, vec
.iov_len
,
2065 remaining_data_length
);
2066 rc
= smbd_post_send_data(
2068 remaining_data_length
);
2073 if (i
== rqst
->rq_nvec
)
2080 if (i
== rqst
->rq_nvec
) {
2081 /* send out all remaining vecs */
2082 remaining_data_length
-= buflen
;
2083 log_write(INFO
, "sending iov[] from start=%d i=%d nvecs=%d remaining_data_length=%d\n",
2084 start
, i
, i
- start
,
2085 remaining_data_length
);
2086 rc
= smbd_post_send_data(info
, &iov
[start
],
2087 i
-start
, remaining_data_length
);
2093 log_write(INFO
, "looping i=%d buflen=%d\n", i
, buflen
);
2096 /* now sending pages if there are any */
2097 for (i
= 0; i
< rqst
->rq_npages
; i
++) {
2098 unsigned int offset
;
2100 rqst_page_get_length(rqst
, i
, &buflen
, &offset
);
2101 nvecs
= (buflen
+ max_iov_size
- 1) / max_iov_size
;
2102 log_write(INFO
, "sending pages buflen=%d nvecs=%d\n",
2104 for (j
= 0; j
< nvecs
; j
++) {
2105 size
= max_iov_size
;
2107 size
= buflen
- j
*max_iov_size
;
2108 remaining_data_length
-= size
;
2109 log_write(INFO
, "sending pages i=%d offset=%d size=%d remaining_data_length=%d\n",
2110 i
, j
* max_iov_size
+ offset
, size
,
2111 remaining_data_length
);
2112 rc
= smbd_post_send_page(
2113 info
, rqst
->rq_pages
[i
],
2114 j
*max_iov_size
+ offset
,
2115 size
, remaining_data_length
);
2122 if (rqst_idx
< num_rqst
)
2127 * As an optimization, we don't wait for individual I/O to finish
2128 * before sending the next one.
2129 * Send them all and wait for pending send count to get to 0
2130 * that means all the I/Os have been out and we are good to return
2133 wait_event(info
->wait_send_pending
,
2134 atomic_read(&info
->send_pending
) == 0);
2139 static void register_mr_done(struct ib_cq
*cq
, struct ib_wc
*wc
)
2145 log_rdma_mr(ERR
, "status=%d\n", wc
->status
);
2147 mr
= container_of(cqe
, struct smbd_mr
, cqe
);
2148 smbd_disconnect_rdma_connection(mr
->conn
);
2153 * The work queue function that recovers MRs
2154 * We need to call ib_dereg_mr() and ib_alloc_mr() before this MR can be used
2155 * again. Both calls are slow, so finish them in a workqueue. This will not
2157 * There is one workqueue that recovers MRs, there is no need to lock as the
2158 * I/O requests calling smbd_register_mr will never update the links in the
2161 static void smbd_mr_recovery_work(struct work_struct
*work
)
2163 struct smbd_connection
*info
=
2164 container_of(work
, struct smbd_connection
, mr_recovery_work
);
2165 struct smbd_mr
*smbdirect_mr
;
2168 list_for_each_entry(smbdirect_mr
, &info
->mr_list
, list
) {
2169 if (smbdirect_mr
->state
== MR_ERROR
) {
2171 /* recover this MR entry */
2172 rc
= ib_dereg_mr(smbdirect_mr
->mr
);
2175 "ib_dereg_mr failed rc=%x\n",
2177 smbd_disconnect_rdma_connection(info
);
2181 smbdirect_mr
->mr
= ib_alloc_mr(
2182 info
->pd
, info
->mr_type
,
2183 info
->max_frmr_depth
);
2184 if (IS_ERR(smbdirect_mr
->mr
)) {
2185 log_rdma_mr(ERR
, "ib_alloc_mr failed mr_type=%x max_frmr_depth=%x\n",
2187 info
->max_frmr_depth
);
2188 smbd_disconnect_rdma_connection(info
);
2192 /* This MR is being used, don't recover it */
2195 smbdirect_mr
->state
= MR_READY
;
2197 /* smbdirect_mr->state is updated by this function
2198 * and is read and updated by I/O issuing CPUs trying
2199 * to get a MR, the call to atomic_inc_return
2200 * implicates a memory barrier and guarantees this
2201 * value is updated before waking up any calls to
2202 * get_mr() from the I/O issuing CPUs
2204 if (atomic_inc_return(&info
->mr_ready_count
) == 1)
2205 wake_up_interruptible(&info
->wait_mr
);
2209 static void destroy_mr_list(struct smbd_connection
*info
)
2211 struct smbd_mr
*mr
, *tmp
;
2213 cancel_work_sync(&info
->mr_recovery_work
);
2214 list_for_each_entry_safe(mr
, tmp
, &info
->mr_list
, list
) {
2215 if (mr
->state
== MR_INVALIDATED
)
2216 ib_dma_unmap_sg(info
->id
->device
, mr
->sgl
,
2217 mr
->sgl_count
, mr
->dir
);
2218 ib_dereg_mr(mr
->mr
);
2225 * Allocate MRs used for RDMA read/write
2226 * The number of MRs will not exceed hardware capability in responder_resources
2227 * All MRs are kept in mr_list. The MR can be recovered after it's used
2228 * Recovery is done in smbd_mr_recovery_work. The content of list entry changes
2229 * as MRs are used and recovered for I/O, but the list links will not change
2231 static int allocate_mr_list(struct smbd_connection
*info
)
2234 struct smbd_mr
*smbdirect_mr
, *tmp
;
2236 INIT_LIST_HEAD(&info
->mr_list
);
2237 init_waitqueue_head(&info
->wait_mr
);
2238 spin_lock_init(&info
->mr_list_lock
);
2239 atomic_set(&info
->mr_ready_count
, 0);
2240 atomic_set(&info
->mr_used_count
, 0);
2241 init_waitqueue_head(&info
->wait_for_mr_cleanup
);
2242 /* Allocate more MRs (2x) than hardware responder_resources */
2243 for (i
= 0; i
< info
->responder_resources
* 2; i
++) {
2244 smbdirect_mr
= kzalloc(sizeof(*smbdirect_mr
), GFP_KERNEL
);
2247 smbdirect_mr
->mr
= ib_alloc_mr(info
->pd
, info
->mr_type
,
2248 info
->max_frmr_depth
);
2249 if (IS_ERR(smbdirect_mr
->mr
)) {
2250 log_rdma_mr(ERR
, "ib_alloc_mr failed mr_type=%x max_frmr_depth=%x\n",
2251 info
->mr_type
, info
->max_frmr_depth
);
2254 smbdirect_mr
->sgl
= kcalloc(
2255 info
->max_frmr_depth
,
2256 sizeof(struct scatterlist
),
2258 if (!smbdirect_mr
->sgl
) {
2259 log_rdma_mr(ERR
, "failed to allocate sgl\n");
2260 ib_dereg_mr(smbdirect_mr
->mr
);
2263 smbdirect_mr
->state
= MR_READY
;
2264 smbdirect_mr
->conn
= info
;
2266 list_add_tail(&smbdirect_mr
->list
, &info
->mr_list
);
2267 atomic_inc(&info
->mr_ready_count
);
2269 INIT_WORK(&info
->mr_recovery_work
, smbd_mr_recovery_work
);
2273 kfree(smbdirect_mr
);
2275 list_for_each_entry_safe(smbdirect_mr
, tmp
, &info
->mr_list
, list
) {
2276 ib_dereg_mr(smbdirect_mr
->mr
);
2277 kfree(smbdirect_mr
->sgl
);
2278 kfree(smbdirect_mr
);
2284 * Get a MR from mr_list. This function waits until there is at least one
2285 * MR available in the list. It may access the list while the
2286 * smbd_mr_recovery_work is recovering the MR list. This doesn't need a lock
2287 * as they never modify the same places. However, there may be several CPUs
2288 * issueing I/O trying to get MR at the same time, mr_list_lock is used to
2289 * protect this situation.
2291 static struct smbd_mr
*get_mr(struct smbd_connection
*info
)
2293 struct smbd_mr
*ret
;
2296 rc
= wait_event_interruptible(info
->wait_mr
,
2297 atomic_read(&info
->mr_ready_count
) ||
2298 info
->transport_status
!= SMBD_CONNECTED
);
2300 log_rdma_mr(ERR
, "wait_event_interruptible rc=%x\n", rc
);
2304 if (info
->transport_status
!= SMBD_CONNECTED
) {
2305 log_rdma_mr(ERR
, "info->transport_status=%x\n",
2306 info
->transport_status
);
2310 spin_lock(&info
->mr_list_lock
);
2311 list_for_each_entry(ret
, &info
->mr_list
, list
) {
2312 if (ret
->state
== MR_READY
) {
2313 ret
->state
= MR_REGISTERED
;
2314 spin_unlock(&info
->mr_list_lock
);
2315 atomic_dec(&info
->mr_ready_count
);
2316 atomic_inc(&info
->mr_used_count
);
2321 spin_unlock(&info
->mr_list_lock
);
2323 * It is possible that we could fail to get MR because other processes may
2324 * try to acquire a MR at the same time. If this is the case, retry it.
2330 * Register memory for RDMA read/write
2331 * pages[]: the list of pages to register memory with
2332 * num_pages: the number of pages to register
2333 * tailsz: if non-zero, the bytes to register in the last page
2334 * writing: true if this is a RDMA write (SMB read), false for RDMA read
2335 * need_invalidate: true if this MR needs to be locally invalidated after I/O
2336 * return value: the MR registered, NULL if failed.
2338 struct smbd_mr
*smbd_register_mr(
2339 struct smbd_connection
*info
, struct page
*pages
[], int num_pages
,
2340 int offset
, int tailsz
, bool writing
, bool need_invalidate
)
2342 struct smbd_mr
*smbdirect_mr
;
2344 enum dma_data_direction dir
;
2345 struct ib_reg_wr
*reg_wr
;
2347 if (num_pages
> info
->max_frmr_depth
) {
2348 log_rdma_mr(ERR
, "num_pages=%d max_frmr_depth=%d\n",
2349 num_pages
, info
->max_frmr_depth
);
2353 smbdirect_mr
= get_mr(info
);
2354 if (!smbdirect_mr
) {
2355 log_rdma_mr(ERR
, "get_mr returning NULL\n");
2358 smbdirect_mr
->need_invalidate
= need_invalidate
;
2359 smbdirect_mr
->sgl_count
= num_pages
;
2360 sg_init_table(smbdirect_mr
->sgl
, num_pages
);
2362 log_rdma_mr(INFO
, "num_pages=0x%x offset=0x%x tailsz=0x%x\n",
2363 num_pages
, offset
, tailsz
);
2365 if (num_pages
== 1) {
2366 sg_set_page(&smbdirect_mr
->sgl
[0], pages
[0], tailsz
, offset
);
2367 goto skip_multiple_pages
;
2370 /* We have at least two pages to register */
2372 &smbdirect_mr
->sgl
[0], pages
[0], PAGE_SIZE
- offset
, offset
);
2374 while (i
< num_pages
- 1) {
2375 sg_set_page(&smbdirect_mr
->sgl
[i
], pages
[i
], PAGE_SIZE
, 0);
2378 sg_set_page(&smbdirect_mr
->sgl
[i
], pages
[i
],
2379 tailsz
? tailsz
: PAGE_SIZE
, 0);
2381 skip_multiple_pages
:
2382 dir
= writing
? DMA_FROM_DEVICE
: DMA_TO_DEVICE
;
2383 smbdirect_mr
->dir
= dir
;
2384 rc
= ib_dma_map_sg(info
->id
->device
, smbdirect_mr
->sgl
, num_pages
, dir
);
2386 log_rdma_mr(ERR
, "ib_dma_map_sg num_pages=%x dir=%x rc=%x\n",
2387 num_pages
, dir
, rc
);
2391 rc
= ib_map_mr_sg(smbdirect_mr
->mr
, smbdirect_mr
->sgl
, num_pages
,
2393 if (rc
!= num_pages
) {
2395 "ib_map_mr_sg failed rc = %d num_pages = %x\n",
2400 ib_update_fast_reg_key(smbdirect_mr
->mr
,
2401 ib_inc_rkey(smbdirect_mr
->mr
->rkey
));
2402 reg_wr
= &smbdirect_mr
->wr
;
2403 reg_wr
->wr
.opcode
= IB_WR_REG_MR
;
2404 smbdirect_mr
->cqe
.done
= register_mr_done
;
2405 reg_wr
->wr
.wr_cqe
= &smbdirect_mr
->cqe
;
2406 reg_wr
->wr
.num_sge
= 0;
2407 reg_wr
->wr
.send_flags
= IB_SEND_SIGNALED
;
2408 reg_wr
->mr
= smbdirect_mr
->mr
;
2409 reg_wr
->key
= smbdirect_mr
->mr
->rkey
;
2410 reg_wr
->access
= writing
?
2411 IB_ACCESS_REMOTE_WRITE
| IB_ACCESS_LOCAL_WRITE
:
2412 IB_ACCESS_REMOTE_READ
;
2415 * There is no need for waiting for complemtion on ib_post_send
2416 * on IB_WR_REG_MR. Hardware enforces a barrier and order of execution
2417 * on the next ib_post_send when we actaully send I/O to remote peer
2419 rc
= ib_post_send(info
->id
->qp
, ®_wr
->wr
, NULL
);
2421 return smbdirect_mr
;
2423 log_rdma_mr(ERR
, "ib_post_send failed rc=%x reg_wr->key=%x\n",
2426 /* If all failed, attempt to recover this MR by setting it MR_ERROR*/
2428 ib_dma_unmap_sg(info
->id
->device
, smbdirect_mr
->sgl
,
2429 smbdirect_mr
->sgl_count
, smbdirect_mr
->dir
);
2432 smbdirect_mr
->state
= MR_ERROR
;
2433 if (atomic_dec_and_test(&info
->mr_used_count
))
2434 wake_up(&info
->wait_for_mr_cleanup
);
2436 smbd_disconnect_rdma_connection(info
);
2441 static void local_inv_done(struct ib_cq
*cq
, struct ib_wc
*wc
)
2443 struct smbd_mr
*smbdirect_mr
;
2447 smbdirect_mr
= container_of(cqe
, struct smbd_mr
, cqe
);
2448 smbdirect_mr
->state
= MR_INVALIDATED
;
2449 if (wc
->status
!= IB_WC_SUCCESS
) {
2450 log_rdma_mr(ERR
, "invalidate failed status=%x\n", wc
->status
);
2451 smbdirect_mr
->state
= MR_ERROR
;
2453 complete(&smbdirect_mr
->invalidate_done
);
2457 * Deregister a MR after I/O is done
2458 * This function may wait if remote invalidation is not used
2459 * and we have to locally invalidate the buffer to prevent data is being
2460 * modified by remote peer after upper layer consumes it
2462 int smbd_deregister_mr(struct smbd_mr
*smbdirect_mr
)
2464 struct ib_send_wr
*wr
;
2465 struct smbd_connection
*info
= smbdirect_mr
->conn
;
2468 if (smbdirect_mr
->need_invalidate
) {
2469 /* Need to finish local invalidation before returning */
2470 wr
= &smbdirect_mr
->inv_wr
;
2471 wr
->opcode
= IB_WR_LOCAL_INV
;
2472 smbdirect_mr
->cqe
.done
= local_inv_done
;
2473 wr
->wr_cqe
= &smbdirect_mr
->cqe
;
2475 wr
->ex
.invalidate_rkey
= smbdirect_mr
->mr
->rkey
;
2476 wr
->send_flags
= IB_SEND_SIGNALED
;
2478 init_completion(&smbdirect_mr
->invalidate_done
);
2479 rc
= ib_post_send(info
->id
->qp
, wr
, NULL
);
2481 log_rdma_mr(ERR
, "ib_post_send failed rc=%x\n", rc
);
2482 smbd_disconnect_rdma_connection(info
);
2485 wait_for_completion(&smbdirect_mr
->invalidate_done
);
2486 smbdirect_mr
->need_invalidate
= false;
2489 * For remote invalidation, just set it to MR_INVALIDATED
2490 * and defer to mr_recovery_work to recover the MR for next use
2492 smbdirect_mr
->state
= MR_INVALIDATED
;
2494 if (smbdirect_mr
->state
== MR_INVALIDATED
) {
2496 info
->id
->device
, smbdirect_mr
->sgl
,
2497 smbdirect_mr
->sgl_count
,
2499 smbdirect_mr
->state
= MR_READY
;
2500 if (atomic_inc_return(&info
->mr_ready_count
) == 1)
2501 wake_up_interruptible(&info
->wait_mr
);
2504 * Schedule the work to do MR recovery for future I/Os MR
2505 * recovery is slow and don't want it to block current I/O
2507 queue_work(info
->workqueue
, &info
->mr_recovery_work
);
2510 if (atomic_dec_and_test(&info
->mr_used_count
))
2511 wake_up(&info
->wait_for_mr_cleanup
);