1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* RxRPC Tx data buffering.
4 * Copyright (C) 2022 Red Hat, Inc. All Rights Reserved.
5 * Written by David Howells (dhowells@redhat.com)
8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
10 #include <linux/slab.h>
11 #include "ar-internal.h"
13 static atomic_t rxrpc_txbuf_debug_ids
;
14 atomic_t rxrpc_nr_txbuf
;
17 * Allocate and partially initialise a data transmission buffer.
19 struct rxrpc_txbuf
*rxrpc_alloc_data_txbuf(struct rxrpc_call
*call
, size_t data_size
,
20 size_t data_align
, gfp_t gfp
)
22 struct rxrpc_wire_header
*whdr
;
23 struct rxrpc_txbuf
*txb
;
27 txb
= kmalloc(sizeof(*txb
), gfp
);
31 hoff
= round_up(sizeof(*whdr
), data_align
) - sizeof(*whdr
);
32 total
= hoff
+ sizeof(*whdr
) + data_size
;
34 data_align
= umax(data_align
, L1_CACHE_BYTES
);
35 mutex_lock(&call
->conn
->tx_data_alloc_lock
);
36 buf
= page_frag_alloc_align(&call
->conn
->tx_data_alloc
, total
, gfp
,
38 mutex_unlock(&call
->conn
->tx_data_alloc_lock
);
46 INIT_LIST_HEAD(&txb
->call_link
);
47 INIT_LIST_HEAD(&txb
->tx_link
);
48 refcount_set(&txb
->ref
, 1);
49 txb
->last_sent
= KTIME_MIN
;
50 txb
->call_debug_id
= call
->debug_id
;
51 txb
->debug_id
= atomic_inc_return(&rxrpc_txbuf_debug_ids
);
52 txb
->space
= data_size
;
54 txb
->offset
= sizeof(*whdr
);
55 txb
->flags
= call
->conn
->out_clientflag
;
57 txb
->seq
= call
->tx_prepared
+ 1;
61 txb
->kvec
[0].iov_base
= whdr
;
62 txb
->kvec
[0].iov_len
= sizeof(*whdr
);
64 whdr
->epoch
= htonl(call
->conn
->proto
.epoch
);
65 whdr
->cid
= htonl(call
->cid
);
66 whdr
->callNumber
= htonl(call
->call_id
);
67 whdr
->seq
= htonl(txb
->seq
);
68 whdr
->type
= RXRPC_PACKET_TYPE_DATA
;
71 whdr
->securityIndex
= call
->security_ix
;
73 whdr
->serviceId
= htons(call
->dest_srx
.srx_service
);
75 trace_rxrpc_txbuf(txb
->debug_id
, txb
->call_debug_id
, txb
->seq
, 1,
76 rxrpc_txbuf_alloc_data
);
78 atomic_inc(&rxrpc_nr_txbuf
);
83 * Allocate and partially initialise an ACK packet.
85 struct rxrpc_txbuf
*rxrpc_alloc_ack_txbuf(struct rxrpc_call
*call
, size_t sack_size
)
87 struct rxrpc_wire_header
*whdr
;
88 struct rxrpc_acktrailer
*trailer
;
89 struct rxrpc_ackpacket
*ack
;
90 struct rxrpc_txbuf
*txb
;
91 gfp_t gfp
= rcu_read_lock_held() ? GFP_ATOMIC
| __GFP_NOWARN
: GFP_NOFS
;
92 void *buf
, *buf2
= NULL
;
95 txb
= kmalloc(sizeof(*txb
), gfp
);
99 buf
= page_frag_alloc(&call
->local
->tx_alloc
,
100 sizeof(*whdr
) + sizeof(*ack
) + 1 + 3 + sizeof(*trailer
), gfp
);
107 buf2
= page_frag_alloc(&call
->local
->tx_alloc
, sack_size
, gfp
);
116 ack
= buf
+ sizeof(*whdr
);
117 filler
= buf
+ sizeof(*whdr
) + sizeof(*ack
) + 1;
118 trailer
= buf
+ sizeof(*whdr
) + sizeof(*ack
) + 1 + 3;
120 INIT_LIST_HEAD(&txb
->call_link
);
121 INIT_LIST_HEAD(&txb
->tx_link
);
122 refcount_set(&txb
->ref
, 1);
123 txb
->call_debug_id
= call
->debug_id
;
124 txb
->debug_id
= atomic_inc_return(&rxrpc_txbuf_debug_ids
);
126 txb
->len
= sizeof(*whdr
) + sizeof(*ack
) + 3 + sizeof(*trailer
);
128 txb
->flags
= call
->conn
->out_clientflag
;
134 txb
->kvec
[0].iov_base
= whdr
;
135 txb
->kvec
[0].iov_len
= sizeof(*whdr
) + sizeof(*ack
);
136 txb
->kvec
[1].iov_base
= buf2
;
137 txb
->kvec
[1].iov_len
= sack_size
;
138 txb
->kvec
[2].iov_base
= filler
;
139 txb
->kvec
[2].iov_len
= 3 + sizeof(*trailer
);
141 whdr
->epoch
= htonl(call
->conn
->proto
.epoch
);
142 whdr
->cid
= htonl(call
->cid
);
143 whdr
->callNumber
= htonl(call
->call_id
);
145 whdr
->type
= RXRPC_PACKET_TYPE_ACK
;
147 whdr
->userStatus
= 0;
148 whdr
->securityIndex
= call
->security_ix
;
150 whdr
->serviceId
= htons(call
->dest_srx
.srx_service
);
152 get_page(virt_to_head_page(trailer
));
154 trace_rxrpc_txbuf(txb
->debug_id
, txb
->call_debug_id
, txb
->seq
, 1,
155 rxrpc_txbuf_alloc_ack
);
156 atomic_inc(&rxrpc_nr_txbuf
);
160 void rxrpc_get_txbuf(struct rxrpc_txbuf
*txb
, enum rxrpc_txbuf_trace what
)
164 __refcount_inc(&txb
->ref
, &r
);
165 trace_rxrpc_txbuf(txb
->debug_id
, txb
->call_debug_id
, txb
->seq
, r
+ 1, what
);
168 void rxrpc_see_txbuf(struct rxrpc_txbuf
*txb
, enum rxrpc_txbuf_trace what
)
170 int r
= refcount_read(&txb
->ref
);
172 trace_rxrpc_txbuf(txb
->debug_id
, txb
->call_debug_id
, txb
->seq
, r
, what
);
175 static void rxrpc_free_txbuf(struct rxrpc_txbuf
*txb
)
179 trace_rxrpc_txbuf(txb
->debug_id
, txb
->call_debug_id
, txb
->seq
, 0,
181 for (i
= 0; i
< txb
->nr_kvec
; i
++)
182 if (txb
->kvec
[i
].iov_base
)
183 page_frag_free(txb
->kvec
[i
].iov_base
);
185 atomic_dec(&rxrpc_nr_txbuf
);
188 void rxrpc_put_txbuf(struct rxrpc_txbuf
*txb
, enum rxrpc_txbuf_trace what
)
190 unsigned int debug_id
, call_debug_id
;
196 debug_id
= txb
->debug_id
;
197 call_debug_id
= txb
->call_debug_id
;
199 dead
= __refcount_dec_and_test(&txb
->ref
, &r
);
200 trace_rxrpc_txbuf(debug_id
, call_debug_id
, seq
, r
- 1, what
);
202 rxrpc_free_txbuf(txb
);
207 * Shrink the transmit buffer.
209 void rxrpc_shrink_call_tx_buffer(struct rxrpc_call
*call
)
211 struct rxrpc_txbuf
*txb
;
212 rxrpc_seq_t hard_ack
= smp_load_acquire(&call
->acks_hard_ack
);
215 _enter("%x/%x/%x", call
->tx_bottom
, call
->acks_hard_ack
, call
->tx_top
);
217 while ((txb
= list_first_entry_or_null(&call
->tx_buffer
,
218 struct rxrpc_txbuf
, call_link
))) {
219 hard_ack
= smp_load_acquire(&call
->acks_hard_ack
);
220 if (before(hard_ack
, txb
->seq
))
223 if (txb
->seq
!= call
->tx_bottom
+ 1)
224 rxrpc_see_txbuf(txb
, rxrpc_txbuf_see_out_of_step
);
225 ASSERTCMP(txb
->seq
, ==, call
->tx_bottom
+ 1);
226 smp_store_release(&call
->tx_bottom
, call
->tx_bottom
+ 1);
227 list_del_rcu(&txb
->call_link
);
229 trace_rxrpc_txqueue(call
, rxrpc_txqueue_dequeue
);
231 rxrpc_put_txbuf(txb
, rxrpc_txbuf_put_rotated
);
232 if (after(call
->acks_hard_ack
, call
->tx_bottom
+ 128))
237 wake_up(&call
->waitq
);