root/net/xdp/xsk_queue.h

/* [<][>][^][v][top][bottom][index][help] */

INCLUDED FROM


DEFINITIONS

This source file includes following definitions.
  1. xskq_nb_invalid_descs
  2. xskq_nb_avail
  3. xskq_nb_free
  4. xskq_has_addrs
  5. xskq_crosses_non_contig_pg
  6. xskq_is_valid_addr
  7. xskq_is_valid_addr_unaligned
  8. xskq_validate_addr
  9. xskq_peek_addr
  10. xskq_discard_addr
  11. xskq_produce_addr
  12. xskq_produce_addr_lazy
  13. xskq_produce_flush_addr_n
  14. xskq_reserve_addr
  15. xskq_is_valid_desc
  16. xskq_validate_desc
  17. xskq_peek_desc
  18. xskq_discard_desc
  19. xskq_produce_batch_desc
  20. xskq_produce_flush_desc
  21. xskq_full_desc
  22. xskq_empty_desc

   1 /* SPDX-License-Identifier: GPL-2.0 */
   2 /* XDP user-space ring structure
   3  * Copyright(c) 2018 Intel Corporation.
   4  */
   5 
   6 #ifndef _LINUX_XSK_QUEUE_H
   7 #define _LINUX_XSK_QUEUE_H
   8 
   9 #include <linux/types.h>
  10 #include <linux/if_xdp.h>
  11 #include <net/xdp_sock.h>
  12 
  13 #define RX_BATCH_SIZE 16
  14 #define LAZY_UPDATE_THRESHOLD 128
  15 
  16 struct xdp_ring {
  17         u32 producer ____cacheline_aligned_in_smp;
  18         u32 consumer ____cacheline_aligned_in_smp;
  19         u32 flags;
  20 };
  21 
  22 /* Used for the RX and TX queues for packets */
  23 struct xdp_rxtx_ring {
  24         struct xdp_ring ptrs;
  25         struct xdp_desc desc[0] ____cacheline_aligned_in_smp;
  26 };
  27 
  28 /* Used for the fill and completion queues for buffers */
  29 struct xdp_umem_ring {
  30         struct xdp_ring ptrs;
  31         u64 desc[0] ____cacheline_aligned_in_smp;
  32 };
  33 
  34 struct xsk_queue {
  35         u64 chunk_mask;
  36         u64 size;
  37         u32 ring_mask;
  38         u32 nentries;
  39         u32 prod_head;
  40         u32 prod_tail;
  41         u32 cons_head;
  42         u32 cons_tail;
  43         struct xdp_ring *ring;
  44         u64 invalid_descs;
  45 };
  46 
  47 /* The structure of the shared state of the rings are the same as the
  48  * ring buffer in kernel/events/ring_buffer.c. For the Rx and completion
  49  * ring, the kernel is the producer and user space is the consumer. For
  50  * the Tx and fill rings, the kernel is the consumer and user space is
  51  * the producer.
  52  *
  53  * producer                         consumer
  54  *
  55  * if (LOAD ->consumer) {           LOAD ->producer
  56  *                    (A)           smp_rmb()       (C)
  57  *    STORE $data                   LOAD $data
  58  *    smp_wmb()       (B)           smp_mb()        (D)
  59  *    STORE ->producer              STORE ->consumer
  60  * }
  61  *
  62  * (A) pairs with (D), and (B) pairs with (C).
  63  *
  64  * Starting with (B), it protects the data from being written after
  65  * the producer pointer. If this barrier was missing, the consumer
  66  * could observe the producer pointer being set and thus load the data
  67  * before the producer has written the new data. The consumer would in
  68  * this case load the old data.
  69  *
  70  * (C) protects the consumer from speculatively loading the data before
  71  * the producer pointer actually has been read. If we do not have this
  72  * barrier, some architectures could load old data as speculative loads
  73  * are not discarded as the CPU does not know there is a dependency
  74  * between ->producer and data.
  75  *
  76  * (A) is a control dependency that separates the load of ->consumer
  77  * from the stores of $data. In case ->consumer indicates there is no
  78  * room in the buffer to store $data we do not. So no barrier is needed.
  79  *
  80  * (D) protects the load of the data to be observed to happen after the
  81  * store of the consumer pointer. If we did not have this memory
  82  * barrier, the producer could observe the consumer pointer being set
  83  * and overwrite the data with a new value before the consumer got the
  84  * chance to read the old value. The consumer would thus miss reading
  85  * the old entry and very likely read the new entry twice, once right
  86  * now and again after circling through the ring.
  87  */
  88 
  89 /* Common functions operating for both RXTX and umem queues */
  90 
  91 static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q)
  92 {
  93         return q ? q->invalid_descs : 0;
  94 }
  95 
  96 static inline u32 xskq_nb_avail(struct xsk_queue *q, u32 dcnt)
  97 {
  98         u32 entries = q->prod_tail - q->cons_tail;
  99 
 100         if (entries == 0) {
 101                 /* Refresh the local pointer */
 102                 q->prod_tail = READ_ONCE(q->ring->producer);
 103                 entries = q->prod_tail - q->cons_tail;
 104         }
 105 
 106         return (entries > dcnt) ? dcnt : entries;
 107 }
 108 
 109 static inline u32 xskq_nb_free(struct xsk_queue *q, u32 producer, u32 dcnt)
 110 {
 111         u32 free_entries = q->nentries - (producer - q->cons_tail);
 112 
 113         if (free_entries >= dcnt)
 114                 return free_entries;
 115 
 116         /* Refresh the local tail pointer */
 117         q->cons_tail = READ_ONCE(q->ring->consumer);
 118         return q->nentries - (producer - q->cons_tail);
 119 }
 120 
 121 static inline bool xskq_has_addrs(struct xsk_queue *q, u32 cnt)
 122 {
 123         u32 entries = q->prod_tail - q->cons_tail;
 124 
 125         if (entries >= cnt)
 126                 return true;
 127 
 128         /* Refresh the local pointer. */
 129         q->prod_tail = READ_ONCE(q->ring->producer);
 130         entries = q->prod_tail - q->cons_tail;
 131 
 132         return entries >= cnt;
 133 }
 134 
 135 /* UMEM queue */
 136 
 137 static inline bool xskq_crosses_non_contig_pg(struct xdp_umem *umem, u64 addr,
 138                                               u64 length)
 139 {
 140         bool cross_pg = (addr & (PAGE_SIZE - 1)) + length > PAGE_SIZE;
 141         bool next_pg_contig =
 142                 (unsigned long)umem->pages[(addr >> PAGE_SHIFT)].addr &
 143                         XSK_NEXT_PG_CONTIG_MASK;
 144 
 145         return cross_pg && !next_pg_contig;
 146 }
 147 
 148 static inline bool xskq_is_valid_addr(struct xsk_queue *q, u64 addr)
 149 {
 150         if (addr >= q->size) {
 151                 q->invalid_descs++;
 152                 return false;
 153         }
 154 
 155         return true;
 156 }
 157 
 158 static inline bool xskq_is_valid_addr_unaligned(struct xsk_queue *q, u64 addr,
 159                                                 u64 length,
 160                                                 struct xdp_umem *umem)
 161 {
 162         u64 base_addr = xsk_umem_extract_addr(addr);
 163 
 164         addr = xsk_umem_add_offset_to_addr(addr);
 165         if (base_addr >= q->size || addr >= q->size ||
 166             xskq_crosses_non_contig_pg(umem, addr, length)) {
 167                 q->invalid_descs++;
 168                 return false;
 169         }
 170 
 171         return true;
 172 }
 173 
 174 static inline u64 *xskq_validate_addr(struct xsk_queue *q, u64 *addr,
 175                                       struct xdp_umem *umem)
 176 {
 177         while (q->cons_tail != q->cons_head) {
 178                 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
 179                 unsigned int idx = q->cons_tail & q->ring_mask;
 180 
 181                 *addr = READ_ONCE(ring->desc[idx]) & q->chunk_mask;
 182 
 183                 if (umem->flags & XDP_UMEM_UNALIGNED_CHUNK_FLAG) {
 184                         if (xskq_is_valid_addr_unaligned(q, *addr,
 185                                                          umem->chunk_size_nohr,
 186                                                          umem))
 187                                 return addr;
 188                         goto out;
 189                 }
 190 
 191                 if (xskq_is_valid_addr(q, *addr))
 192                         return addr;
 193 
 194 out:
 195                 q->cons_tail++;
 196         }
 197 
 198         return NULL;
 199 }
 200 
 201 static inline u64 *xskq_peek_addr(struct xsk_queue *q, u64 *addr,
 202                                   struct xdp_umem *umem)
 203 {
 204         if (q->cons_tail == q->cons_head) {
 205                 smp_mb(); /* D, matches A */
 206                 WRITE_ONCE(q->ring->consumer, q->cons_tail);
 207                 q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE);
 208 
 209                 /* Order consumer and data */
 210                 smp_rmb();
 211         }
 212 
 213         return xskq_validate_addr(q, addr, umem);
 214 }
 215 
 216 static inline void xskq_discard_addr(struct xsk_queue *q)
 217 {
 218         q->cons_tail++;
 219 }
 220 
 221 static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr)
 222 {
 223         struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
 224 
 225         if (xskq_nb_free(q, q->prod_tail, 1) == 0)
 226                 return -ENOSPC;
 227 
 228         /* A, matches D */
 229         ring->desc[q->prod_tail++ & q->ring_mask] = addr;
 230 
 231         /* Order producer and data */
 232         smp_wmb(); /* B, matches C */
 233 
 234         WRITE_ONCE(q->ring->producer, q->prod_tail);
 235         return 0;
 236 }
 237 
 238 static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr)
 239 {
 240         struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
 241 
 242         if (xskq_nb_free(q, q->prod_head, LAZY_UPDATE_THRESHOLD) == 0)
 243                 return -ENOSPC;
 244 
 245         /* A, matches D */
 246         ring->desc[q->prod_head++ & q->ring_mask] = addr;
 247         return 0;
 248 }
 249 
 250 static inline void xskq_produce_flush_addr_n(struct xsk_queue *q,
 251                                              u32 nb_entries)
 252 {
 253         /* Order producer and data */
 254         smp_wmb(); /* B, matches C */
 255 
 256         q->prod_tail += nb_entries;
 257         WRITE_ONCE(q->ring->producer, q->prod_tail);
 258 }
 259 
 260 static inline int xskq_reserve_addr(struct xsk_queue *q)
 261 {
 262         if (xskq_nb_free(q, q->prod_head, 1) == 0)
 263                 return -ENOSPC;
 264 
 265         /* A, matches D */
 266         q->prod_head++;
 267         return 0;
 268 }
 269 
 270 /* Rx/Tx queue */
 271 
 272 static inline bool xskq_is_valid_desc(struct xsk_queue *q, struct xdp_desc *d,
 273                                       struct xdp_umem *umem)
 274 {
 275         if (umem->flags & XDP_UMEM_UNALIGNED_CHUNK_FLAG) {
 276                 if (!xskq_is_valid_addr_unaligned(q, d->addr, d->len, umem))
 277                         return false;
 278 
 279                 if (d->len > umem->chunk_size_nohr || d->options) {
 280                         q->invalid_descs++;
 281                         return false;
 282                 }
 283 
 284                 return true;
 285         }
 286 
 287         if (!xskq_is_valid_addr(q, d->addr))
 288                 return false;
 289 
 290         if (((d->addr + d->len) & q->chunk_mask) != (d->addr & q->chunk_mask) ||
 291             d->options) {
 292                 q->invalid_descs++;
 293                 return false;
 294         }
 295 
 296         return true;
 297 }
 298 
 299 static inline struct xdp_desc *xskq_validate_desc(struct xsk_queue *q,
 300                                                   struct xdp_desc *desc,
 301                                                   struct xdp_umem *umem)
 302 {
 303         while (q->cons_tail != q->cons_head) {
 304                 struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
 305                 unsigned int idx = q->cons_tail & q->ring_mask;
 306 
 307                 *desc = READ_ONCE(ring->desc[idx]);
 308                 if (xskq_is_valid_desc(q, desc, umem))
 309                         return desc;
 310 
 311                 q->cons_tail++;
 312         }
 313 
 314         return NULL;
 315 }
 316 
 317 static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q,
 318                                               struct xdp_desc *desc,
 319                                               struct xdp_umem *umem)
 320 {
 321         if (q->cons_tail == q->cons_head) {
 322                 smp_mb(); /* D, matches A */
 323                 WRITE_ONCE(q->ring->consumer, q->cons_tail);
 324                 q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE);
 325 
 326                 /* Order consumer and data */
 327                 smp_rmb(); /* C, matches B */
 328         }
 329 
 330         return xskq_validate_desc(q, desc, umem);
 331 }
 332 
 333 static inline void xskq_discard_desc(struct xsk_queue *q)
 334 {
 335         q->cons_tail++;
 336 }
 337 
 338 static inline int xskq_produce_batch_desc(struct xsk_queue *q,
 339                                           u64 addr, u32 len)
 340 {
 341         struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
 342         unsigned int idx;
 343 
 344         if (xskq_nb_free(q, q->prod_head, 1) == 0)
 345                 return -ENOSPC;
 346 
 347         /* A, matches D */
 348         idx = (q->prod_head++) & q->ring_mask;
 349         ring->desc[idx].addr = addr;
 350         ring->desc[idx].len = len;
 351 
 352         return 0;
 353 }
 354 
 355 static inline void xskq_produce_flush_desc(struct xsk_queue *q)
 356 {
 357         /* Order producer and data */
 358         smp_wmb(); /* B, matches C */
 359 
 360         q->prod_tail = q->prod_head;
 361         WRITE_ONCE(q->ring->producer, q->prod_tail);
 362 }
 363 
 364 static inline bool xskq_full_desc(struct xsk_queue *q)
 365 {
 366         return xskq_nb_avail(q, q->nentries) == q->nentries;
 367 }
 368 
 369 static inline bool xskq_empty_desc(struct xsk_queue *q)
 370 {
 371         return xskq_nb_free(q, q->prod_tail, q->nentries) == q->nentries;
 372 }
 373 
 374 void xskq_set_umem(struct xsk_queue *q, u64 size, u64 chunk_mask);
 375 struct xsk_queue *xskq_create(u32 nentries, bool umem_queue);
 376 void xskq_destroy(struct xsk_queue *q_ops);
 377 
 378 /* Executed by the core when the entire UMEM gets freed */
 379 void xsk_reuseq_destroy(struct xdp_umem *umem);
 380 
 381 #endif /* _LINUX_XSK_QUEUE_H */

/* [<][>][^][v][top][bottom][index][help] */