Merge tag 'block-5.11-2021-01-10' of git://git.kernel.dk/linux-block
[linux/fpc-iii.git] / samples / bpf / xsk_fwd.c
blob1cd97c84c337bac054b146f43399107ffb824185
1 // SPDX-License-Identifier: GPL-2.0
2 /* Copyright(c) 2020 Intel Corporation. */
4 #define _GNU_SOURCE
5 #include <poll.h>
6 #include <pthread.h>
7 #include <signal.h>
8 #include <sched.h>
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <string.h>
12 #include <sys/mman.h>
13 #include <sys/resource.h>
14 #include <sys/socket.h>
15 #include <sys/types.h>
16 #include <time.h>
17 #include <unistd.h>
18 #include <getopt.h>
19 #include <netinet/ether.h>
20 #include <net/if.h>
22 #include <linux/bpf.h>
23 #include <linux/if_link.h>
24 #include <linux/if_xdp.h>
26 #include <bpf/libbpf.h>
27 #include <bpf/xsk.h>
28 #include <bpf/bpf.h>
30 #define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0]))
32 typedef __u64 u64;
33 typedef __u32 u32;
34 typedef __u16 u16;
35 typedef __u8 u8;
37 /* This program illustrates the packet forwarding between multiple AF_XDP
38 * sockets in multi-threaded environment. All threads are sharing a common
39 * buffer pool, with each socket having its own private buffer cache.
41 * Example 1: Single thread handling two sockets. The packets received by socket
42 * A (interface IFA, queue QA) are forwarded to socket B (interface IFB, queue
43 * QB), while the packets received by socket B are forwarded to socket A. The
44 * thread is running on CPU core X:
46 * ./xsk_fwd -i IFA -q QA -i IFB -q QB -c X
48 * Example 2: Two threads, each handling two sockets. The thread running on CPU
49 * core X forwards all the packets received by socket A to socket B, and all the
50 * packets received by socket B to socket A. The thread running on CPU core Y is
51 * performing the same packet forwarding between sockets C and D:
53 * ./xsk_fwd -i IFA -q QA -i IFB -q QB -i IFC -q QC -i IFD -q QD
54 * -c CX -c CY
58 * Buffer pool and buffer cache
60 * For packet forwarding, the packet buffers are typically allocated from the
61 * pool for packet reception and freed back to the pool for further reuse once
62 * the packet transmission is completed.
64 * The buffer pool is shared between multiple threads. In order to minimize the
65 * access latency to the shared buffer pool, each thread creates one (or
66 * several) buffer caches, which, unlike the buffer pool, are private to the
67 * thread that creates them and therefore cannot be shared with other threads.
68 * The access to the shared pool is only needed either (A) when the cache gets
69 * empty due to repeated buffer allocations and it needs to be replenished from
70 * the pool, or (B) when the cache gets full due to repeated buffer free and it
71 * needs to be flushed back to the pull.
73 * In a packet forwarding system, a packet received on any input port can
74 * potentially be transmitted on any output port, depending on the forwarding
75 * configuration. For AF_XDP sockets, for this to work with zero-copy of the
76 * packet buffers when, it is required that the buffer pool memory fits into the
77 * UMEM area shared by all the sockets.
80 struct bpool_params {
81 u32 n_buffers;
82 u32 buffer_size;
83 int mmap_flags;
85 u32 n_users_max;
86 u32 n_buffers_per_slab;
89 /* This buffer pool implementation organizes the buffers into equally sized
90 * slabs of *n_buffers_per_slab*. Initially, there are *n_slabs* slabs in the
91 * pool that are completely filled with buffer pointers (full slabs).
93 * Each buffer cache has a slab for buffer allocation and a slab for buffer
94 * free, with both of these slabs initially empty. When the cache's allocation
95 * slab goes empty, it is swapped with one of the available full slabs from the
96 * pool, if any is available. When the cache's free slab goes full, it is
97 * swapped for one of the empty slabs from the pool, which is guaranteed to
98 * succeed.
100 * Partially filled slabs never get traded between the cache and the pool
101 * (except when the cache itself is destroyed), which enables fast operation
102 * through pointer swapping.
104 struct bpool {
105 struct bpool_params params;
106 pthread_mutex_t lock;
107 void *addr;
109 u64 **slabs;
110 u64 **slabs_reserved;
111 u64 *buffers;
112 u64 *buffers_reserved;
114 u64 n_slabs;
115 u64 n_slabs_reserved;
116 u64 n_buffers;
118 u64 n_slabs_available;
119 u64 n_slabs_reserved_available;
121 struct xsk_umem_config umem_cfg;
122 struct xsk_ring_prod umem_fq;
123 struct xsk_ring_cons umem_cq;
124 struct xsk_umem *umem;
127 static struct bpool *
128 bpool_init(struct bpool_params *params,
129 struct xsk_umem_config *umem_cfg)
131 struct rlimit r = {RLIM_INFINITY, RLIM_INFINITY};
132 u64 n_slabs, n_slabs_reserved, n_buffers, n_buffers_reserved;
133 u64 slabs_size, slabs_reserved_size;
134 u64 buffers_size, buffers_reserved_size;
135 u64 total_size, i;
136 struct bpool *bp;
137 u8 *p;
138 int status;
140 /* mmap prep. */
141 if (setrlimit(RLIMIT_MEMLOCK, &r))
142 return NULL;
144 /* bpool internals dimensioning. */
145 n_slabs = (params->n_buffers + params->n_buffers_per_slab - 1) /
146 params->n_buffers_per_slab;
147 n_slabs_reserved = params->n_users_max * 2;
148 n_buffers = n_slabs * params->n_buffers_per_slab;
149 n_buffers_reserved = n_slabs_reserved * params->n_buffers_per_slab;
151 slabs_size = n_slabs * sizeof(u64 *);
152 slabs_reserved_size = n_slabs_reserved * sizeof(u64 *);
153 buffers_size = n_buffers * sizeof(u64);
154 buffers_reserved_size = n_buffers_reserved * sizeof(u64);
156 total_size = sizeof(struct bpool) +
157 slabs_size + slabs_reserved_size +
158 buffers_size + buffers_reserved_size;
160 /* bpool memory allocation. */
161 p = calloc(total_size, sizeof(u8));
162 if (!p)
163 return NULL;
165 /* bpool memory initialization. */
166 bp = (struct bpool *)p;
167 memcpy(&bp->params, params, sizeof(*params));
168 bp->params.n_buffers = n_buffers;
170 bp->slabs = (u64 **)&p[sizeof(struct bpool)];
171 bp->slabs_reserved = (u64 **)&p[sizeof(struct bpool) +
172 slabs_size];
173 bp->buffers = (u64 *)&p[sizeof(struct bpool) +
174 slabs_size + slabs_reserved_size];
175 bp->buffers_reserved = (u64 *)&p[sizeof(struct bpool) +
176 slabs_size + slabs_reserved_size + buffers_size];
178 bp->n_slabs = n_slabs;
179 bp->n_slabs_reserved = n_slabs_reserved;
180 bp->n_buffers = n_buffers;
182 for (i = 0; i < n_slabs; i++)
183 bp->slabs[i] = &bp->buffers[i * params->n_buffers_per_slab];
184 bp->n_slabs_available = n_slabs;
186 for (i = 0; i < n_slabs_reserved; i++)
187 bp->slabs_reserved[i] = &bp->buffers_reserved[i *
188 params->n_buffers_per_slab];
189 bp->n_slabs_reserved_available = n_slabs_reserved;
191 for (i = 0; i < n_buffers; i++)
192 bp->buffers[i] = i * params->buffer_size;
194 /* lock. */
195 status = pthread_mutex_init(&bp->lock, NULL);
196 if (status) {
197 free(p);
198 return NULL;
201 /* mmap. */
202 bp->addr = mmap(NULL,
203 n_buffers * params->buffer_size,
204 PROT_READ | PROT_WRITE,
205 MAP_PRIVATE | MAP_ANONYMOUS | params->mmap_flags,
208 if (bp->addr == MAP_FAILED) {
209 pthread_mutex_destroy(&bp->lock);
210 free(p);
211 return NULL;
214 /* umem. */
215 status = xsk_umem__create(&bp->umem,
216 bp->addr,
217 bp->params.n_buffers * bp->params.buffer_size,
218 &bp->umem_fq,
219 &bp->umem_cq,
220 umem_cfg);
221 if (status) {
222 munmap(bp->addr, bp->params.n_buffers * bp->params.buffer_size);
223 pthread_mutex_destroy(&bp->lock);
224 free(p);
225 return NULL;
227 memcpy(&bp->umem_cfg, umem_cfg, sizeof(*umem_cfg));
229 return bp;
232 static void
233 bpool_free(struct bpool *bp)
235 if (!bp)
236 return;
238 xsk_umem__delete(bp->umem);
239 munmap(bp->addr, bp->params.n_buffers * bp->params.buffer_size);
240 pthread_mutex_destroy(&bp->lock);
241 free(bp);
244 struct bcache {
245 struct bpool *bp;
247 u64 *slab_cons;
248 u64 *slab_prod;
250 u64 n_buffers_cons;
251 u64 n_buffers_prod;
254 static u32
255 bcache_slab_size(struct bcache *bc)
257 struct bpool *bp = bc->bp;
259 return bp->params.n_buffers_per_slab;
262 static struct bcache *
263 bcache_init(struct bpool *bp)
265 struct bcache *bc;
267 bc = calloc(1, sizeof(struct bcache));
268 if (!bc)
269 return NULL;
271 bc->bp = bp;
272 bc->n_buffers_cons = 0;
273 bc->n_buffers_prod = 0;
275 pthread_mutex_lock(&bp->lock);
276 if (bp->n_slabs_reserved_available == 0) {
277 pthread_mutex_unlock(&bp->lock);
278 free(bc);
279 return NULL;
282 bc->slab_cons = bp->slabs_reserved[bp->n_slabs_reserved_available - 1];
283 bc->slab_prod = bp->slabs_reserved[bp->n_slabs_reserved_available - 2];
284 bp->n_slabs_reserved_available -= 2;
285 pthread_mutex_unlock(&bp->lock);
287 return bc;
290 static void
291 bcache_free(struct bcache *bc)
293 struct bpool *bp;
295 if (!bc)
296 return;
298 /* In order to keep this example simple, the case of freeing any
299 * existing buffers from the cache back to the pool is ignored.
302 bp = bc->bp;
303 pthread_mutex_lock(&bp->lock);
304 bp->slabs_reserved[bp->n_slabs_reserved_available] = bc->slab_prod;
305 bp->slabs_reserved[bp->n_slabs_reserved_available + 1] = bc->slab_cons;
306 bp->n_slabs_reserved_available += 2;
307 pthread_mutex_unlock(&bp->lock);
309 free(bc);
312 /* To work correctly, the implementation requires that the *n_buffers* input
313 * argument is never greater than the buffer pool's *n_buffers_per_slab*. This
314 * is typically the case, with one exception taking place when large number of
315 * buffers are allocated at init time (e.g. for the UMEM fill queue setup).
317 static inline u32
318 bcache_cons_check(struct bcache *bc, u32 n_buffers)
320 struct bpool *bp = bc->bp;
321 u64 n_buffers_per_slab = bp->params.n_buffers_per_slab;
322 u64 n_buffers_cons = bc->n_buffers_cons;
323 u64 n_slabs_available;
324 u64 *slab_full;
327 * Consumer slab is not empty: Use what's available locally. Do not
328 * look for more buffers from the pool when the ask can only be
329 * partially satisfied.
331 if (n_buffers_cons)
332 return (n_buffers_cons < n_buffers) ?
333 n_buffers_cons :
334 n_buffers;
337 * Consumer slab is empty: look to trade the current consumer slab
338 * (full) for a full slab from the pool, if any is available.
340 pthread_mutex_lock(&bp->lock);
341 n_slabs_available = bp->n_slabs_available;
342 if (!n_slabs_available) {
343 pthread_mutex_unlock(&bp->lock);
344 return 0;
347 n_slabs_available--;
348 slab_full = bp->slabs[n_slabs_available];
349 bp->slabs[n_slabs_available] = bc->slab_cons;
350 bp->n_slabs_available = n_slabs_available;
351 pthread_mutex_unlock(&bp->lock);
353 bc->slab_cons = slab_full;
354 bc->n_buffers_cons = n_buffers_per_slab;
355 return n_buffers;
358 static inline u64
359 bcache_cons(struct bcache *bc)
361 u64 n_buffers_cons = bc->n_buffers_cons - 1;
362 u64 buffer;
364 buffer = bc->slab_cons[n_buffers_cons];
365 bc->n_buffers_cons = n_buffers_cons;
366 return buffer;
369 static inline void
370 bcache_prod(struct bcache *bc, u64 buffer)
372 struct bpool *bp = bc->bp;
373 u64 n_buffers_per_slab = bp->params.n_buffers_per_slab;
374 u64 n_buffers_prod = bc->n_buffers_prod;
375 u64 n_slabs_available;
376 u64 *slab_empty;
379 * Producer slab is not yet full: store the current buffer to it.
381 if (n_buffers_prod < n_buffers_per_slab) {
382 bc->slab_prod[n_buffers_prod] = buffer;
383 bc->n_buffers_prod = n_buffers_prod + 1;
384 return;
388 * Producer slab is full: trade the cache's current producer slab
389 * (full) for an empty slab from the pool, then store the current
390 * buffer to the new producer slab. As one full slab exists in the
391 * cache, it is guaranteed that there is at least one empty slab
392 * available in the pool.
394 pthread_mutex_lock(&bp->lock);
395 n_slabs_available = bp->n_slabs_available;
396 slab_empty = bp->slabs[n_slabs_available];
397 bp->slabs[n_slabs_available] = bc->slab_prod;
398 bp->n_slabs_available = n_slabs_available + 1;
399 pthread_mutex_unlock(&bp->lock);
401 slab_empty[0] = buffer;
402 bc->slab_prod = slab_empty;
403 bc->n_buffers_prod = 1;
407 * Port
409 * Each of the forwarding ports sits on top of an AF_XDP socket. In order for
410 * packet forwarding to happen with no packet buffer copy, all the sockets need
411 * to share the same UMEM area, which is used as the buffer pool memory.
413 #ifndef MAX_BURST_RX
414 #define MAX_BURST_RX 64
415 #endif
417 #ifndef MAX_BURST_TX
418 #define MAX_BURST_TX 64
419 #endif
421 struct burst_rx {
422 u64 addr[MAX_BURST_RX];
423 u32 len[MAX_BURST_RX];
426 struct burst_tx {
427 u64 addr[MAX_BURST_TX];
428 u32 len[MAX_BURST_TX];
429 u32 n_pkts;
432 struct port_params {
433 struct xsk_socket_config xsk_cfg;
434 struct bpool *bp;
435 const char *iface;
436 u32 iface_queue;
439 struct port {
440 struct port_params params;
442 struct bcache *bc;
444 struct xsk_ring_cons rxq;
445 struct xsk_ring_prod txq;
446 struct xsk_ring_prod umem_fq;
447 struct xsk_ring_cons umem_cq;
448 struct xsk_socket *xsk;
449 int umem_fq_initialized;
451 u64 n_pkts_rx;
452 u64 n_pkts_tx;
455 static void
456 port_free(struct port *p)
458 if (!p)
459 return;
461 /* To keep this example simple, the code to free the buffers from the
462 * socket's receive and transmit queues, as well as from the UMEM fill
463 * and completion queues, is not included.
466 if (p->xsk)
467 xsk_socket__delete(p->xsk);
469 bcache_free(p->bc);
471 free(p);
474 static struct port *
475 port_init(struct port_params *params)
477 struct port *p;
478 u32 umem_fq_size, pos = 0;
479 int status, i;
481 /* Memory allocation and initialization. */
482 p = calloc(sizeof(struct port), 1);
483 if (!p)
484 return NULL;
486 memcpy(&p->params, params, sizeof(p->params));
487 umem_fq_size = params->bp->umem_cfg.fill_size;
489 /* bcache. */
490 p->bc = bcache_init(params->bp);
491 if (!p->bc ||
492 (bcache_slab_size(p->bc) < umem_fq_size) ||
493 (bcache_cons_check(p->bc, umem_fq_size) < umem_fq_size)) {
494 port_free(p);
495 return NULL;
498 /* xsk socket. */
499 status = xsk_socket__create_shared(&p->xsk,
500 params->iface,
501 params->iface_queue,
502 params->bp->umem,
503 &p->rxq,
504 &p->txq,
505 &p->umem_fq,
506 &p->umem_cq,
507 &params->xsk_cfg);
508 if (status) {
509 port_free(p);
510 return NULL;
513 /* umem fq. */
514 xsk_ring_prod__reserve(&p->umem_fq, umem_fq_size, &pos);
516 for (i = 0; i < umem_fq_size; i++)
517 *xsk_ring_prod__fill_addr(&p->umem_fq, pos + i) =
518 bcache_cons(p->bc);
520 xsk_ring_prod__submit(&p->umem_fq, umem_fq_size);
521 p->umem_fq_initialized = 1;
523 return p;
526 static inline u32
527 port_rx_burst(struct port *p, struct burst_rx *b)
529 u32 n_pkts, pos, i;
531 /* Free buffers for FQ replenish. */
532 n_pkts = ARRAY_SIZE(b->addr);
534 n_pkts = bcache_cons_check(p->bc, n_pkts);
535 if (!n_pkts)
536 return 0;
538 /* RXQ. */
539 n_pkts = xsk_ring_cons__peek(&p->rxq, n_pkts, &pos);
540 if (!n_pkts) {
541 if (xsk_ring_prod__needs_wakeup(&p->umem_fq)) {
542 struct pollfd pollfd = {
543 .fd = xsk_socket__fd(p->xsk),
544 .events = POLLIN,
547 poll(&pollfd, 1, 0);
549 return 0;
552 for (i = 0; i < n_pkts; i++) {
553 b->addr[i] = xsk_ring_cons__rx_desc(&p->rxq, pos + i)->addr;
554 b->len[i] = xsk_ring_cons__rx_desc(&p->rxq, pos + i)->len;
557 xsk_ring_cons__release(&p->rxq, n_pkts);
558 p->n_pkts_rx += n_pkts;
560 /* UMEM FQ. */
561 for ( ; ; ) {
562 int status;
564 status = xsk_ring_prod__reserve(&p->umem_fq, n_pkts, &pos);
565 if (status == n_pkts)
566 break;
568 if (xsk_ring_prod__needs_wakeup(&p->umem_fq)) {
569 struct pollfd pollfd = {
570 .fd = xsk_socket__fd(p->xsk),
571 .events = POLLIN,
574 poll(&pollfd, 1, 0);
578 for (i = 0; i < n_pkts; i++)
579 *xsk_ring_prod__fill_addr(&p->umem_fq, pos + i) =
580 bcache_cons(p->bc);
582 xsk_ring_prod__submit(&p->umem_fq, n_pkts);
584 return n_pkts;
587 static inline void
588 port_tx_burst(struct port *p, struct burst_tx *b)
590 u32 n_pkts, pos, i;
591 int status;
593 /* UMEM CQ. */
594 n_pkts = p->params.bp->umem_cfg.comp_size;
596 n_pkts = xsk_ring_cons__peek(&p->umem_cq, n_pkts, &pos);
598 for (i = 0; i < n_pkts; i++) {
599 u64 addr = *xsk_ring_cons__comp_addr(&p->umem_cq, pos + i);
601 bcache_prod(p->bc, addr);
604 xsk_ring_cons__release(&p->umem_cq, n_pkts);
606 /* TXQ. */
607 n_pkts = b->n_pkts;
609 for ( ; ; ) {
610 status = xsk_ring_prod__reserve(&p->txq, n_pkts, &pos);
611 if (status == n_pkts)
612 break;
614 if (xsk_ring_prod__needs_wakeup(&p->txq))
615 sendto(xsk_socket__fd(p->xsk), NULL, 0, MSG_DONTWAIT,
616 NULL, 0);
619 for (i = 0; i < n_pkts; i++) {
620 xsk_ring_prod__tx_desc(&p->txq, pos + i)->addr = b->addr[i];
621 xsk_ring_prod__tx_desc(&p->txq, pos + i)->len = b->len[i];
624 xsk_ring_prod__submit(&p->txq, n_pkts);
625 if (xsk_ring_prod__needs_wakeup(&p->txq))
626 sendto(xsk_socket__fd(p->xsk), NULL, 0, MSG_DONTWAIT, NULL, 0);
627 p->n_pkts_tx += n_pkts;
631 * Thread
633 * Packet forwarding threads.
635 #ifndef MAX_PORTS_PER_THREAD
636 #define MAX_PORTS_PER_THREAD 16
637 #endif
639 struct thread_data {
640 struct port *ports_rx[MAX_PORTS_PER_THREAD];
641 struct port *ports_tx[MAX_PORTS_PER_THREAD];
642 u32 n_ports_rx;
643 struct burst_rx burst_rx;
644 struct burst_tx burst_tx[MAX_PORTS_PER_THREAD];
645 u32 cpu_core_id;
646 int quit;
649 static void swap_mac_addresses(void *data)
651 struct ether_header *eth = (struct ether_header *)data;
652 struct ether_addr *src_addr = (struct ether_addr *)&eth->ether_shost;
653 struct ether_addr *dst_addr = (struct ether_addr *)&eth->ether_dhost;
654 struct ether_addr tmp;
656 tmp = *src_addr;
657 *src_addr = *dst_addr;
658 *dst_addr = tmp;
661 static void *
662 thread_func(void *arg)
664 struct thread_data *t = arg;
665 cpu_set_t cpu_cores;
666 u32 i;
668 CPU_ZERO(&cpu_cores);
669 CPU_SET(t->cpu_core_id, &cpu_cores);
670 pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpu_cores);
672 for (i = 0; !t->quit; i = (i + 1) & (t->n_ports_rx - 1)) {
673 struct port *port_rx = t->ports_rx[i];
674 struct port *port_tx = t->ports_tx[i];
675 struct burst_rx *brx = &t->burst_rx;
676 struct burst_tx *btx = &t->burst_tx[i];
677 u32 n_pkts, j;
679 /* RX. */
680 n_pkts = port_rx_burst(port_rx, brx);
681 if (!n_pkts)
682 continue;
684 /* Process & TX. */
685 for (j = 0; j < n_pkts; j++) {
686 u64 addr = xsk_umem__add_offset_to_addr(brx->addr[j]);
687 u8 *pkt = xsk_umem__get_data(port_rx->params.bp->addr,
688 addr);
690 swap_mac_addresses(pkt);
692 btx->addr[btx->n_pkts] = brx->addr[j];
693 btx->len[btx->n_pkts] = brx->len[j];
694 btx->n_pkts++;
696 if (btx->n_pkts == MAX_BURST_TX) {
697 port_tx_burst(port_tx, btx);
698 btx->n_pkts = 0;
703 return NULL;
707 * Process
709 static const struct bpool_params bpool_params_default = {
710 .n_buffers = 64 * 1024,
711 .buffer_size = XSK_UMEM__DEFAULT_FRAME_SIZE,
712 .mmap_flags = 0,
714 .n_users_max = 16,
715 .n_buffers_per_slab = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2,
718 static const struct xsk_umem_config umem_cfg_default = {
719 .fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2,
720 .comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
721 .frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE,
722 .frame_headroom = XSK_UMEM__DEFAULT_FRAME_HEADROOM,
723 .flags = 0,
726 static const struct port_params port_params_default = {
727 .xsk_cfg = {
728 .rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
729 .tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
730 .libbpf_flags = 0,
731 .xdp_flags = XDP_FLAGS_DRV_MODE,
732 .bind_flags = XDP_USE_NEED_WAKEUP | XDP_ZEROCOPY,
735 .bp = NULL,
736 .iface = NULL,
737 .iface_queue = 0,
740 #ifndef MAX_PORTS
741 #define MAX_PORTS 64
742 #endif
744 #ifndef MAX_THREADS
745 #define MAX_THREADS 64
746 #endif
748 static struct bpool_params bpool_params;
749 static struct xsk_umem_config umem_cfg;
750 static struct bpool *bp;
752 static struct port_params port_params[MAX_PORTS];
753 static struct port *ports[MAX_PORTS];
754 static u64 n_pkts_rx[MAX_PORTS];
755 static u64 n_pkts_tx[MAX_PORTS];
756 static int n_ports;
758 static pthread_t threads[MAX_THREADS];
759 static struct thread_data thread_data[MAX_THREADS];
760 static int n_threads;
762 static void
763 print_usage(char *prog_name)
765 const char *usage =
766 "Usage:\n"
767 "\t%s [ -b SIZE ] -c CORE -i INTERFACE [ -q QUEUE ]\n"
768 "\n"
769 "-c CORE CPU core to run a packet forwarding thread\n"
770 " on. May be invoked multiple times.\n"
771 "\n"
772 "-b SIZE Number of buffers in the buffer pool shared\n"
773 " by all the forwarding threads. Default: %u.\n"
774 "\n"
775 "-i INTERFACE Network interface. Each (INTERFACE, QUEUE)\n"
776 " pair specifies one forwarding port. May be\n"
777 " invoked multiple times.\n"
778 "\n"
779 "-q QUEUE Network interface queue for RX and TX. Each\n"
780 " (INTERFACE, QUEUE) pair specified one\n"
781 " forwarding port. Default: %u. May be invoked\n"
782 " multiple times.\n"
783 "\n";
784 printf(usage,
785 prog_name,
786 bpool_params_default.n_buffers,
787 port_params_default.iface_queue);
790 static int
791 parse_args(int argc, char **argv)
793 struct option lgopts[] = {
794 { NULL, 0, 0, 0 }
796 int opt, option_index;
798 /* Parse the input arguments. */
799 for ( ; ;) {
800 opt = getopt_long(argc, argv, "c:i:q:", lgopts, &option_index);
801 if (opt == EOF)
802 break;
804 switch (opt) {
805 case 'b':
806 bpool_params.n_buffers = atoi(optarg);
807 break;
809 case 'c':
810 if (n_threads == MAX_THREADS) {
811 printf("Max number of threads (%d) reached.\n",
812 MAX_THREADS);
813 return -1;
816 thread_data[n_threads].cpu_core_id = atoi(optarg);
817 n_threads++;
818 break;
820 case 'i':
821 if (n_ports == MAX_PORTS) {
822 printf("Max number of ports (%d) reached.\n",
823 MAX_PORTS);
824 return -1;
827 port_params[n_ports].iface = optarg;
828 port_params[n_ports].iface_queue = 0;
829 n_ports++;
830 break;
832 case 'q':
833 if (n_ports == 0) {
834 printf("No port specified for queue.\n");
835 return -1;
837 port_params[n_ports - 1].iface_queue = atoi(optarg);
838 break;
840 default:
841 printf("Illegal argument.\n");
842 return -1;
846 optind = 1; /* reset getopt lib */
848 /* Check the input arguments. */
849 if (!n_ports) {
850 printf("No ports specified.\n");
851 return -1;
854 if (!n_threads) {
855 printf("No threads specified.\n");
856 return -1;
859 if (n_ports % n_threads) {
860 printf("Ports cannot be evenly distributed to threads.\n");
861 return -1;
864 return 0;
867 static void
868 print_port(u32 port_id)
870 struct port *port = ports[port_id];
872 printf("Port %u: interface = %s, queue = %u\n",
873 port_id, port->params.iface, port->params.iface_queue);
876 static void
877 print_thread(u32 thread_id)
879 struct thread_data *t = &thread_data[thread_id];
880 u32 i;
882 printf("Thread %u (CPU core %u): ",
883 thread_id, t->cpu_core_id);
885 for (i = 0; i < t->n_ports_rx; i++) {
886 struct port *port_rx = t->ports_rx[i];
887 struct port *port_tx = t->ports_tx[i];
889 printf("(%s, %u) -> (%s, %u), ",
890 port_rx->params.iface,
891 port_rx->params.iface_queue,
892 port_tx->params.iface,
893 port_tx->params.iface_queue);
896 printf("\n");
899 static void
900 print_port_stats_separator(void)
902 printf("+-%4s-+-%12s-+-%13s-+-%12s-+-%13s-+\n",
903 "----",
904 "------------",
905 "-------------",
906 "------------",
907 "-------------");
910 static void
911 print_port_stats_header(void)
913 print_port_stats_separator();
914 printf("| %4s | %12s | %13s | %12s | %13s |\n",
915 "Port",
916 "RX packets",
917 "RX rate (pps)",
918 "TX packets",
919 "TX_rate (pps)");
920 print_port_stats_separator();
923 static void
924 print_port_stats_trailer(void)
926 print_port_stats_separator();
927 printf("\n");
930 static void
931 print_port_stats(int port_id, u64 ns_diff)
933 struct port *p = ports[port_id];
934 double rx_pps, tx_pps;
936 rx_pps = (p->n_pkts_rx - n_pkts_rx[port_id]) * 1000000000. / ns_diff;
937 tx_pps = (p->n_pkts_tx - n_pkts_tx[port_id]) * 1000000000. / ns_diff;
939 printf("| %4d | %12llu | %13.0f | %12llu | %13.0f |\n",
940 port_id,
941 p->n_pkts_rx,
942 rx_pps,
943 p->n_pkts_tx,
944 tx_pps);
946 n_pkts_rx[port_id] = p->n_pkts_rx;
947 n_pkts_tx[port_id] = p->n_pkts_tx;
950 static void
951 print_port_stats_all(u64 ns_diff)
953 int i;
955 print_port_stats_header();
956 for (i = 0; i < n_ports; i++)
957 print_port_stats(i, ns_diff);
958 print_port_stats_trailer();
961 static int quit;
963 static void
964 signal_handler(int sig)
966 quit = 1;
969 static void remove_xdp_program(void)
971 int i;
973 for (i = 0 ; i < n_ports; i++)
974 bpf_set_link_xdp_fd(if_nametoindex(port_params[i].iface), -1,
975 port_params[i].xsk_cfg.xdp_flags);
978 int main(int argc, char **argv)
980 struct timespec time;
981 u64 ns0;
982 int i;
984 /* Parse args. */
985 memcpy(&bpool_params, &bpool_params_default,
986 sizeof(struct bpool_params));
987 memcpy(&umem_cfg, &umem_cfg_default,
988 sizeof(struct xsk_umem_config));
989 for (i = 0; i < MAX_PORTS; i++)
990 memcpy(&port_params[i], &port_params_default,
991 sizeof(struct port_params));
993 if (parse_args(argc, argv)) {
994 print_usage(argv[0]);
995 return -1;
998 /* Buffer pool initialization. */
999 bp = bpool_init(&bpool_params, &umem_cfg);
1000 if (!bp) {
1001 printf("Buffer pool initialization failed.\n");
1002 return -1;
1004 printf("Buffer pool created successfully.\n");
1006 /* Ports initialization. */
1007 for (i = 0; i < MAX_PORTS; i++)
1008 port_params[i].bp = bp;
1010 for (i = 0; i < n_ports; i++) {
1011 ports[i] = port_init(&port_params[i]);
1012 if (!ports[i]) {
1013 printf("Port %d initialization failed.\n", i);
1014 return -1;
1016 print_port(i);
1018 printf("All ports created successfully.\n");
1020 /* Threads. */
1021 for (i = 0; i < n_threads; i++) {
1022 struct thread_data *t = &thread_data[i];
1023 u32 n_ports_per_thread = n_ports / n_threads, j;
1025 for (j = 0; j < n_ports_per_thread; j++) {
1026 t->ports_rx[j] = ports[i * n_ports_per_thread + j];
1027 t->ports_tx[j] = ports[i * n_ports_per_thread +
1028 (j + 1) % n_ports_per_thread];
1031 t->n_ports_rx = n_ports_per_thread;
1033 print_thread(i);
1036 for (i = 0; i < n_threads; i++) {
1037 int status;
1039 status = pthread_create(&threads[i],
1040 NULL,
1041 thread_func,
1042 &thread_data[i]);
1043 if (status) {
1044 printf("Thread %d creation failed.\n", i);
1045 return -1;
1048 printf("All threads created successfully.\n");
1050 /* Print statistics. */
1051 signal(SIGINT, signal_handler);
1052 signal(SIGTERM, signal_handler);
1053 signal(SIGABRT, signal_handler);
1055 clock_gettime(CLOCK_MONOTONIC, &time);
1056 ns0 = time.tv_sec * 1000000000UL + time.tv_nsec;
1057 for ( ; !quit; ) {
1058 u64 ns1, ns_diff;
1060 sleep(1);
1061 clock_gettime(CLOCK_MONOTONIC, &time);
1062 ns1 = time.tv_sec * 1000000000UL + time.tv_nsec;
1063 ns_diff = ns1 - ns0;
1064 ns0 = ns1;
1066 print_port_stats_all(ns_diff);
1069 /* Threads completion. */
1070 printf("Quit.\n");
1071 for (i = 0; i < n_threads; i++)
1072 thread_data[i].quit = 1;
1074 for (i = 0; i < n_threads; i++)
1075 pthread_join(threads[i], NULL);
1077 for (i = 0; i < n_ports; i++)
1078 port_free(ports[i]);
1080 bpool_free(bp);
1082 remove_xdp_program();
1084 return 0;