1 // SPDX-License-Identifier: GPL-2.0
3 * Copyright (c) 2020 Oracle. All rights reserved.
6 #include <linux/sunrpc/svc_rdma.h>
7 #include <linux/sunrpc/rpc_rdma.h>
10 #include <trace/events/rpcrdma.h>
13 * pcl_free - Release all memory associated with a parsed chunk list
14 * @pcl: parsed chunk list
17 void pcl_free(struct svc_rdma_pcl
*pcl
)
19 while (!list_empty(&pcl
->cl_chunks
)) {
20 struct svc_rdma_chunk
*chunk
;
22 chunk
= pcl_first_chunk(pcl
);
23 list_del(&chunk
->ch_list
);
28 static struct svc_rdma_chunk
*pcl_alloc_chunk(u32 segcount
, u32 position
)
30 struct svc_rdma_chunk
*chunk
;
32 chunk
= kmalloc(struct_size(chunk
, ch_segments
, segcount
), GFP_KERNEL
);
36 chunk
->ch_position
= position
;
38 chunk
->ch_payload_length
= 0;
39 chunk
->ch_segcount
= 0;
43 static struct svc_rdma_chunk
*
44 pcl_lookup_position(struct svc_rdma_pcl
*pcl
, u32 position
)
46 struct svc_rdma_chunk
*pos
;
48 pcl_for_each_chunk(pos
, pcl
) {
49 if (pos
->ch_position
== position
)
55 static void pcl_insert_position(struct svc_rdma_pcl
*pcl
,
56 struct svc_rdma_chunk
*chunk
)
58 struct svc_rdma_chunk
*pos
;
60 pcl_for_each_chunk(pos
, pcl
) {
61 if (pos
->ch_position
> chunk
->ch_position
)
64 __list_add(&chunk
->ch_list
, pos
->ch_list
.prev
, &pos
->ch_list
);
68 static void pcl_set_read_segment(const struct svc_rdma_recv_ctxt
*rctxt
,
69 struct svc_rdma_chunk
*chunk
,
70 u32 handle
, u32 length
, u64 offset
)
72 struct svc_rdma_segment
*segment
;
74 segment
= &chunk
->ch_segments
[chunk
->ch_segcount
];
75 segment
->rs_handle
= handle
;
76 segment
->rs_length
= length
;
77 segment
->rs_offset
= offset
;
79 trace_svcrdma_decode_rseg(&rctxt
->rc_cid
, chunk
, segment
);
81 chunk
->ch_length
+= length
;
86 * pcl_alloc_call - Construct a parsed chunk list for the Call body
87 * @rctxt: Ingress receive context
88 * @p: Start of an un-decoded Read list
91 * - The incoming Read list has already been sanity checked.
92 * - cl_count is already set to the number of segments in
93 * the un-decoded list.
94 * - The list might not be in order by position.
97 * %true: Parsed chunk list was successfully constructed, and
98 * cl_count is updated to be the number of chunks (ie.
99 * unique positions) in the Read list.
100 * %false: Memory allocation failed.
102 bool pcl_alloc_call(struct svc_rdma_recv_ctxt
*rctxt
, __be32
*p
)
104 struct svc_rdma_pcl
*pcl
= &rctxt
->rc_call_pcl
;
105 unsigned int i
, segcount
= pcl
->cl_count
;
108 for (i
= 0; i
< segcount
; i
++) {
109 struct svc_rdma_chunk
*chunk
;
110 u32 position
, handle
, length
;
113 p
++; /* skip the list discriminator */
114 p
= xdr_decode_read_segment(p
, &position
, &handle
,
119 if (pcl_is_empty(pcl
)) {
120 chunk
= pcl_alloc_chunk(segcount
, position
);
123 pcl_insert_position(pcl
, chunk
);
125 chunk
= list_first_entry(&pcl
->cl_chunks
,
126 struct svc_rdma_chunk
,
130 pcl_set_read_segment(rctxt
, chunk
, handle
, length
, offset
);
137 * pcl_alloc_read - Construct a parsed chunk list for normal Read chunks
138 * @rctxt: Ingress receive context
139 * @p: Start of an un-decoded Read list
142 * - The incoming Read list has already been sanity checked.
143 * - cl_count is already set to the number of segments in
144 * the un-decoded list.
145 * - The list might not be in order by position.
148 * %true: Parsed chunk list was successfully constructed, and
149 * cl_count is updated to be the number of chunks (ie.
150 * unique position values) in the Read list.
151 * %false: Memory allocation failed.
154 * - Check for chunk range overlaps
156 bool pcl_alloc_read(struct svc_rdma_recv_ctxt
*rctxt
, __be32
*p
)
158 struct svc_rdma_pcl
*pcl
= &rctxt
->rc_read_pcl
;
159 unsigned int i
, segcount
= pcl
->cl_count
;
162 for (i
= 0; i
< segcount
; i
++) {
163 struct svc_rdma_chunk
*chunk
;
164 u32 position
, handle
, length
;
167 p
++; /* skip the list discriminator */
168 p
= xdr_decode_read_segment(p
, &position
, &handle
,
173 chunk
= pcl_lookup_position(pcl
, position
);
175 chunk
= pcl_alloc_chunk(segcount
, position
);
178 pcl_insert_position(pcl
, chunk
);
181 pcl_set_read_segment(rctxt
, chunk
, handle
, length
, offset
);
188 * pcl_alloc_write - Construct a parsed chunk list from a Write list
189 * @rctxt: Ingress receive context
190 * @pcl: Parsed chunk list to populate
191 * @p: Start of an un-decoded Write list
194 * - The incoming Write list has already been sanity checked, and
195 * - cl_count is set to the number of chunks in the un-decoded list.
198 * %true: Parsed chunk list was successfully constructed.
199 * %false: Memory allocation failed.
201 bool pcl_alloc_write(struct svc_rdma_recv_ctxt
*rctxt
,
202 struct svc_rdma_pcl
*pcl
, __be32
*p
)
204 struct svc_rdma_segment
*segment
;
205 struct svc_rdma_chunk
*chunk
;
209 for (i
= 0; i
< pcl
->cl_count
; i
++) {
210 p
++; /* skip the list discriminator */
211 segcount
= be32_to_cpup(p
++);
213 chunk
= pcl_alloc_chunk(segcount
, 0);
216 list_add_tail(&chunk
->ch_list
, &pcl
->cl_chunks
);
218 for (j
= 0; j
< segcount
; j
++) {
219 segment
= &chunk
->ch_segments
[j
];
220 p
= xdr_decode_rdma_segment(p
, &segment
->rs_handle
,
222 &segment
->rs_offset
);
223 trace_svcrdma_decode_wseg(&rctxt
->rc_cid
, chunk
, j
);
225 chunk
->ch_length
+= segment
->rs_length
;
226 chunk
->ch_segcount
++;
232 static int pcl_process_region(const struct xdr_buf
*xdr
,
233 unsigned int offset
, unsigned int length
,
234 int (*actor
)(const struct xdr_buf
*, void *),
237 struct xdr_buf subbuf
;
241 if (xdr_buf_subsegment(xdr
, &subbuf
, offset
, length
))
243 return actor(&subbuf
, data
);
247 * pcl_process_nonpayloads - Process non-payload regions inside @xdr
248 * @pcl: Chunk list to process
249 * @xdr: xdr_buf to process
250 * @actor: Function to invoke on each non-payload region
251 * @data: Arguments for @actor
253 * This mechanism must ignore not only result payloads that were already
254 * sent via RDMA Write, but also XDR padding for those payloads that
255 * the upper layer has added.
258 * The xdr->len and ch_position fields are aligned to 4-byte multiples.
262 * %-EMSGSIZE on XDR buffer overflow, or
263 * The return value of @actor
265 int pcl_process_nonpayloads(const struct svc_rdma_pcl
*pcl
,
266 const struct xdr_buf
*xdr
,
267 int (*actor
)(const struct xdr_buf
*, void *),
270 struct svc_rdma_chunk
*chunk
, *next
;
274 chunk
= pcl_first_chunk(pcl
);
276 /* No result payloads were generated */
277 if (!chunk
|| !chunk
->ch_payload_length
)
278 return actor(xdr
, data
);
280 /* Process the region before the first result payload */
281 ret
= pcl_process_region(xdr
, 0, chunk
->ch_position
, actor
, data
);
285 /* Process the regions between each middle result payload */
286 while ((next
= pcl_next_chunk(pcl
, chunk
))) {
287 if (!next
->ch_payload_length
)
290 start
= pcl_chunk_end_offset(chunk
);
291 ret
= pcl_process_region(xdr
, start
, next
->ch_position
- start
,
299 /* Process the region after the last result payload */
300 start
= pcl_chunk_end_offset(chunk
);
301 ret
= pcl_process_region(xdr
, start
, xdr
->len
- start
, actor
, data
);