1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
4 (c) 2007 Network Appliance, Inc. All Rights Reserved.
5 (c) 2009 NetApp. All Rights Reserved.
8 ******************************************************************************/
10 #include <linux/tcp.h>
11 #include <linux/slab.h>
12 #include <linux/sunrpc/xprt.h>
13 #include <linux/export.h>
14 #include <linux/sunrpc/bc_xprt.h>
16 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
17 #define RPCDBG_FACILITY RPCDBG_TRANS
20 #define BC_MAX_SLOTS 64U
22 unsigned int xprt_bc_max_slots(struct rpc_xprt
*xprt
)
28 * Helper routines that track the number of preallocation elements
31 static inline int xprt_need_to_requeue(struct rpc_xprt
*xprt
)
33 return xprt
->bc_alloc_count
< xprt
->bc_alloc_max
;
37 * Free the preallocated rpc_rqst structure and the memory
38 * buffers hanging off of it.
40 static void xprt_free_allocation(struct rpc_rqst
*req
)
42 struct xdr_buf
*xbufp
;
44 dprintk("RPC: free allocations for req= %p\n", req
);
45 WARN_ON_ONCE(test_bit(RPC_BC_PA_IN_USE
, &req
->rq_bc_pa_state
));
46 xbufp
= &req
->rq_rcv_buf
;
47 free_page((unsigned long)xbufp
->head
[0].iov_base
);
48 xbufp
= &req
->rq_snd_buf
;
49 free_page((unsigned long)xbufp
->head
[0].iov_base
);
53 static void xprt_bc_reinit_xdr_buf(struct xdr_buf
*buf
)
55 buf
->head
[0].iov_len
= PAGE_SIZE
;
56 buf
->tail
[0].iov_len
= 0;
61 buf
->buflen
= PAGE_SIZE
;
64 static int xprt_alloc_xdr_buf(struct xdr_buf
*buf
, gfp_t gfp_flags
)
67 /* Preallocate one XDR receive buffer */
68 page
= alloc_page(gfp_flags
);
71 xdr_buf_init(buf
, page_address(page
), PAGE_SIZE
);
75 static struct rpc_rqst
*xprt_alloc_bc_req(struct rpc_xprt
*xprt
)
77 gfp_t gfp_flags
= GFP_KERNEL
| __GFP_NORETRY
| __GFP_NOWARN
;
80 /* Pre-allocate one backchannel rpc_rqst */
81 req
= kzalloc(sizeof(*req
), gfp_flags
);
87 /* Preallocate one XDR receive buffer */
88 if (xprt_alloc_xdr_buf(&req
->rq_rcv_buf
, gfp_flags
) < 0) {
89 printk(KERN_ERR
"Failed to create bc receive xbuf\n");
92 req
->rq_rcv_buf
.len
= PAGE_SIZE
;
94 /* Preallocate one XDR send buffer */
95 if (xprt_alloc_xdr_buf(&req
->rq_snd_buf
, gfp_flags
) < 0) {
96 printk(KERN_ERR
"Failed to create bc snd xbuf\n");
101 xprt_free_allocation(req
);
106 * Preallocate up to min_reqs structures and related buffers for use
107 * by the backchannel. This function can be called multiple times
108 * when creating new sessions that use the same rpc_xprt. The
109 * preallocated buffers are added to the pool of resources used by
110 * the rpc_xprt. Any one of these resources may be used by an
111 * incoming callback request. It's up to the higher levels in the
112 * stack to enforce that the maximum number of session slots is not
115 * Some callback arguments can be large. For example, a pNFS server
116 * using multiple deviceids. The list can be unbound, but the client
117 * has the ability to tell the server the maximum size of the callback
118 * requests. Each deviceID is 16 bytes, so allocate one page
119 * for the arguments to have enough room to receive a number of these
120 * deviceIDs. The NFS client indicates to the pNFS server that its
121 * callback requests can be up to 4096 bytes in size.
123 int xprt_setup_backchannel(struct rpc_xprt
*xprt
, unsigned int min_reqs
)
125 if (!xprt
->ops
->bc_setup
)
127 return xprt
->ops
->bc_setup(xprt
, min_reqs
);
129 EXPORT_SYMBOL_GPL(xprt_setup_backchannel
);
131 int xprt_setup_bc(struct rpc_xprt
*xprt
, unsigned int min_reqs
)
133 struct rpc_rqst
*req
;
134 struct list_head tmp_list
;
137 dprintk("RPC: setup backchannel transport\n");
139 if (min_reqs
> BC_MAX_SLOTS
)
140 min_reqs
= BC_MAX_SLOTS
;
143 * We use a temporary list to keep track of the preallocated
144 * buffers. Once we're done building the list we splice it
145 * into the backchannel preallocation list off of the rpc_xprt
146 * struct. This helps minimize the amount of time the list
147 * lock is held on the rpc_xprt struct. It also makes cleanup
148 * easier in case of memory allocation errors.
150 INIT_LIST_HEAD(&tmp_list
);
151 for (i
= 0; i
< min_reqs
; i
++) {
152 /* Pre-allocate one backchannel rpc_rqst */
153 req
= xprt_alloc_bc_req(xprt
);
155 printk(KERN_ERR
"Failed to create bc rpc_rqst\n");
159 /* Add the allocated buffer to the tmp list */
160 dprintk("RPC: adding req= %p\n", req
);
161 list_add(&req
->rq_bc_pa_list
, &tmp_list
);
165 * Add the temporary list to the backchannel preallocation list
167 spin_lock(&xprt
->bc_pa_lock
);
168 list_splice(&tmp_list
, &xprt
->bc_pa_list
);
169 xprt
->bc_alloc_count
+= min_reqs
;
170 xprt
->bc_alloc_max
+= min_reqs
;
171 atomic_add(min_reqs
, &xprt
->bc_slot_count
);
172 spin_unlock(&xprt
->bc_pa_lock
);
174 dprintk("RPC: setup backchannel transport done\n");
179 * Memory allocation failed, free the temporary list
181 while (!list_empty(&tmp_list
)) {
182 req
= list_first_entry(&tmp_list
,
185 list_del(&req
->rq_bc_pa_list
);
186 xprt_free_allocation(req
);
189 dprintk("RPC: setup backchannel transport failed\n");
194 * xprt_destroy_backchannel - Destroys the backchannel preallocated structures.
195 * @xprt: the transport holding the preallocated strucures
196 * @max_reqs: the maximum number of preallocated structures to destroy
198 * Since these structures may have been allocated by multiple calls
199 * to xprt_setup_backchannel, we only destroy up to the maximum number
200 * of reqs specified by the caller.
202 void xprt_destroy_backchannel(struct rpc_xprt
*xprt
, unsigned int max_reqs
)
204 if (xprt
->ops
->bc_destroy
)
205 xprt
->ops
->bc_destroy(xprt
, max_reqs
);
207 EXPORT_SYMBOL_GPL(xprt_destroy_backchannel
);
209 void xprt_destroy_bc(struct rpc_xprt
*xprt
, unsigned int max_reqs
)
211 struct rpc_rqst
*req
= NULL
, *tmp
= NULL
;
213 dprintk("RPC: destroy backchannel transport\n");
218 spin_lock_bh(&xprt
->bc_pa_lock
);
219 xprt
->bc_alloc_max
-= min(max_reqs
, xprt
->bc_alloc_max
);
220 list_for_each_entry_safe(req
, tmp
, &xprt
->bc_pa_list
, rq_bc_pa_list
) {
221 dprintk("RPC: req=%p\n", req
);
222 list_del(&req
->rq_bc_pa_list
);
223 xprt_free_allocation(req
);
224 xprt
->bc_alloc_count
--;
225 atomic_dec(&xprt
->bc_slot_count
);
229 spin_unlock_bh(&xprt
->bc_pa_lock
);
232 dprintk("RPC: backchannel list empty= %s\n",
233 list_empty(&xprt
->bc_pa_list
) ? "true" : "false");
236 static struct rpc_rqst
*xprt_get_bc_request(struct rpc_xprt
*xprt
, __be32 xid
,
237 struct rpc_rqst
*new)
239 struct rpc_rqst
*req
= NULL
;
241 dprintk("RPC: allocate a backchannel request\n");
242 if (list_empty(&xprt
->bc_pa_list
)) {
245 if (atomic_read(&xprt
->bc_slot_count
) >= BC_MAX_SLOTS
)
247 list_add_tail(&new->rq_bc_pa_list
, &xprt
->bc_pa_list
);
248 xprt
->bc_alloc_count
++;
249 atomic_inc(&xprt
->bc_slot_count
);
251 req
= list_first_entry(&xprt
->bc_pa_list
, struct rpc_rqst
,
253 req
->rq_reply_bytes_recvd
= 0;
254 memcpy(&req
->rq_private_buf
, &req
->rq_rcv_buf
,
255 sizeof(req
->rq_private_buf
));
257 req
->rq_connect_cookie
= xprt
->connect_cookie
;
258 dprintk("RPC: backchannel req=%p\n", req
);
264 * Return the preallocated rpc_rqst structure and XDR buffers
265 * associated with this rpc_task.
267 void xprt_free_bc_request(struct rpc_rqst
*req
)
269 struct rpc_xprt
*xprt
= req
->rq_xprt
;
271 xprt
->ops
->bc_free_rqst(req
);
274 void xprt_free_bc_rqst(struct rpc_rqst
*req
)
276 struct rpc_xprt
*xprt
= req
->rq_xprt
;
278 dprintk("RPC: free backchannel req=%p\n", req
);
280 req
->rq_connect_cookie
= xprt
->connect_cookie
- 1;
281 smp_mb__before_atomic();
282 clear_bit(RPC_BC_PA_IN_USE
, &req
->rq_bc_pa_state
);
283 smp_mb__after_atomic();
286 * Return it to the list of preallocations so that it
287 * may be reused by a new callback request.
289 spin_lock_bh(&xprt
->bc_pa_lock
);
290 if (xprt_need_to_requeue(xprt
)) {
291 xprt_bc_reinit_xdr_buf(&req
->rq_snd_buf
);
292 xprt_bc_reinit_xdr_buf(&req
->rq_rcv_buf
);
293 req
->rq_rcv_buf
.len
= PAGE_SIZE
;
294 list_add_tail(&req
->rq_bc_pa_list
, &xprt
->bc_pa_list
);
295 xprt
->bc_alloc_count
++;
296 atomic_inc(&xprt
->bc_slot_count
);
299 spin_unlock_bh(&xprt
->bc_pa_lock
);
302 * The last remaining session was destroyed while this
303 * entry was in use. Free the entry and don't attempt
304 * to add back to the list because there is no need to
305 * have anymore preallocated entries.
307 dprintk("RPC: Last session removed req=%p\n", req
);
308 xprt_free_allocation(req
);
314 * One or more rpc_rqst structure have been preallocated during the
315 * backchannel setup. Buffer space for the send and private XDR buffers
316 * has been preallocated as well. Use xprt_alloc_bc_request to allocate
317 * to this request. Use xprt_free_bc_request to return it.
319 * We know that we're called in soft interrupt context, grab the spin_lock
320 * since there is no need to grab the bottom half spin_lock.
322 * Return an available rpc_rqst, otherwise NULL if non are available.
324 struct rpc_rqst
*xprt_lookup_bc_request(struct rpc_xprt
*xprt
, __be32 xid
)
326 struct rpc_rqst
*req
, *new = NULL
;
329 spin_lock(&xprt
->bc_pa_lock
);
330 list_for_each_entry(req
, &xprt
->bc_pa_list
, rq_bc_pa_list
) {
331 if (req
->rq_connect_cookie
!= xprt
->connect_cookie
)
333 if (req
->rq_xid
== xid
)
336 req
= xprt_get_bc_request(xprt
, xid
, new);
338 spin_unlock(&xprt
->bc_pa_lock
);
341 xprt_free_allocation(new);
345 new = xprt_alloc_bc_req(xprt
);
351 * Add callback request to callback list. Wake a thread
352 * on the first pool (usually the only pool) to handle it.
354 void xprt_complete_bc_request(struct rpc_rqst
*req
, uint32_t copied
)
356 struct rpc_xprt
*xprt
= req
->rq_xprt
;
357 struct svc_serv
*bc_serv
= xprt
->bc_serv
;
359 spin_lock(&xprt
->bc_pa_lock
);
360 list_del(&req
->rq_bc_pa_list
);
361 xprt
->bc_alloc_count
--;
362 spin_unlock(&xprt
->bc_pa_lock
);
364 req
->rq_private_buf
.len
= copied
;
365 set_bit(RPC_BC_PA_IN_USE
, &req
->rq_bc_pa_state
);
367 dprintk("RPC: add callback request to list\n");
369 lwq_enqueue(&req
->rq_bc_list
, &bc_serv
->sv_cb_list
);
370 svc_pool_wake_idle_thread(&bc_serv
->sv_pools
[0]);