1 /* SPDX-License-Identifier: GPL-2.0 */
2 /* XDP user-space ring structure
3 * Copyright(c) 2018 Intel Corporation.
6 #ifndef _LINUX_XSK_QUEUE_H
7 #define _LINUX_XSK_QUEUE_H
9 #include <linux/types.h>
10 #include <linux/if_xdp.h>
11 #include <net/xdp_sock.h>
13 #define RX_BATCH_SIZE 16
14 #define LAZY_UPDATE_THRESHOLD 128
17 u32 producer ____cacheline_aligned_in_smp
;
18 u32 consumer ____cacheline_aligned_in_smp
;
21 /* Used for the RX and TX queues for packets */
22 struct xdp_rxtx_ring
{
24 struct xdp_desc desc
[0] ____cacheline_aligned_in_smp
;
27 /* Used for the fill and completion queues for buffers */
28 struct xdp_umem_ring
{
30 u64 desc
[0] ____cacheline_aligned_in_smp
;
42 struct xdp_ring
*ring
;
46 /* Common functions operating for both RXTX and umem queues */
48 static inline u64
xskq_nb_invalid_descs(struct xsk_queue
*q
)
50 return q
? q
->invalid_descs
: 0;
53 static inline u32
xskq_nb_avail(struct xsk_queue
*q
, u32 dcnt
)
55 u32 entries
= q
->prod_tail
- q
->cons_tail
;
58 /* Refresh the local pointer */
59 q
->prod_tail
= READ_ONCE(q
->ring
->producer
);
60 entries
= q
->prod_tail
- q
->cons_tail
;
63 return (entries
> dcnt
) ? dcnt
: entries
;
66 static inline u32
xskq_nb_free(struct xsk_queue
*q
, u32 producer
, u32 dcnt
)
68 u32 free_entries
= q
->nentries
- (producer
- q
->cons_tail
);
70 if (free_entries
>= dcnt
)
73 /* Refresh the local tail pointer */
74 q
->cons_tail
= READ_ONCE(q
->ring
->consumer
);
75 return q
->nentries
- (producer
- q
->cons_tail
);
80 static inline bool xskq_is_valid_addr(struct xsk_queue
*q
, u64 addr
)
82 if (addr
>= q
->size
) {
90 static inline u64
*xskq_validate_addr(struct xsk_queue
*q
, u64
*addr
)
92 while (q
->cons_tail
!= q
->cons_head
) {
93 struct xdp_umem_ring
*ring
= (struct xdp_umem_ring
*)q
->ring
;
94 unsigned int idx
= q
->cons_tail
& q
->ring_mask
;
96 *addr
= READ_ONCE(ring
->desc
[idx
]) & q
->chunk_mask
;
97 if (xskq_is_valid_addr(q
, *addr
))
106 static inline u64
*xskq_peek_addr(struct xsk_queue
*q
, u64
*addr
)
108 if (q
->cons_tail
== q
->cons_head
) {
109 WRITE_ONCE(q
->ring
->consumer
, q
->cons_tail
);
110 q
->cons_head
= q
->cons_tail
+ xskq_nb_avail(q
, RX_BATCH_SIZE
);
112 /* Order consumer and data */
116 return xskq_validate_addr(q
, addr
);
119 static inline void xskq_discard_addr(struct xsk_queue
*q
)
124 static inline int xskq_produce_addr(struct xsk_queue
*q
, u64 addr
)
126 struct xdp_umem_ring
*ring
= (struct xdp_umem_ring
*)q
->ring
;
128 if (xskq_nb_free(q
, q
->prod_tail
, 1) == 0)
131 ring
->desc
[q
->prod_tail
++ & q
->ring_mask
] = addr
;
133 /* Order producer and data */
136 WRITE_ONCE(q
->ring
->producer
, q
->prod_tail
);
140 static inline int xskq_produce_addr_lazy(struct xsk_queue
*q
, u64 addr
)
142 struct xdp_umem_ring
*ring
= (struct xdp_umem_ring
*)q
->ring
;
144 if (xskq_nb_free(q
, q
->prod_head
, LAZY_UPDATE_THRESHOLD
) == 0)
147 ring
->desc
[q
->prod_head
++ & q
->ring_mask
] = addr
;
151 static inline void xskq_produce_flush_addr_n(struct xsk_queue
*q
,
154 /* Order producer and data */
157 q
->prod_tail
+= nb_entries
;
158 WRITE_ONCE(q
->ring
->producer
, q
->prod_tail
);
161 static inline int xskq_reserve_addr(struct xsk_queue
*q
)
163 if (xskq_nb_free(q
, q
->prod_head
, 1) == 0)
172 static inline bool xskq_is_valid_desc(struct xsk_queue
*q
, struct xdp_desc
*d
)
174 if (!xskq_is_valid_addr(q
, d
->addr
))
177 if (((d
->addr
+ d
->len
) & q
->chunk_mask
) != (d
->addr
& q
->chunk_mask
) ||
186 static inline struct xdp_desc
*xskq_validate_desc(struct xsk_queue
*q
,
187 struct xdp_desc
*desc
)
189 while (q
->cons_tail
!= q
->cons_head
) {
190 struct xdp_rxtx_ring
*ring
= (struct xdp_rxtx_ring
*)q
->ring
;
191 unsigned int idx
= q
->cons_tail
& q
->ring_mask
;
193 *desc
= READ_ONCE(ring
->desc
[idx
]);
194 if (xskq_is_valid_desc(q
, desc
))
203 static inline struct xdp_desc
*xskq_peek_desc(struct xsk_queue
*q
,
204 struct xdp_desc
*desc
)
206 if (q
->cons_tail
== q
->cons_head
) {
207 WRITE_ONCE(q
->ring
->consumer
, q
->cons_tail
);
208 q
->cons_head
= q
->cons_tail
+ xskq_nb_avail(q
, RX_BATCH_SIZE
);
210 /* Order consumer and data */
214 return xskq_validate_desc(q
, desc
);
217 static inline void xskq_discard_desc(struct xsk_queue
*q
)
222 static inline int xskq_produce_batch_desc(struct xsk_queue
*q
,
225 struct xdp_rxtx_ring
*ring
= (struct xdp_rxtx_ring
*)q
->ring
;
228 if (xskq_nb_free(q
, q
->prod_head
, 1) == 0)
231 idx
= (q
->prod_head
++) & q
->ring_mask
;
232 ring
->desc
[idx
].addr
= addr
;
233 ring
->desc
[idx
].len
= len
;
238 static inline void xskq_produce_flush_desc(struct xsk_queue
*q
)
240 /* Order producer and data */
243 q
->prod_tail
= q
->prod_head
,
244 WRITE_ONCE(q
->ring
->producer
, q
->prod_tail
);
247 static inline bool xskq_full_desc(struct xsk_queue
*q
)
249 return xskq_nb_avail(q
, q
->nentries
) == q
->nentries
;
252 static inline bool xskq_empty_desc(struct xsk_queue
*q
)
254 return xskq_nb_free(q
, q
->prod_tail
, q
->nentries
) == q
->nentries
;
257 void xskq_set_umem(struct xsk_queue
*q
, u64 size
, u64 chunk_mask
);
258 struct xsk_queue
*xskq_create(u32 nentries
, bool umem_queue
);
259 void xskq_destroy(struct xsk_queue
*q_ops
);
261 /* Executed by the core when the entire UMEM gets freed */
262 void xsk_reuseq_destroy(struct xdp_umem
*umem
);
264 #endif /* _LINUX_XSK_QUEUE_H */