root/include/linux/ptr_ring.h

/* [<][>][^][v][top][bottom][index][help] */

INCLUDED FROM


DEFINITIONS

This source file includes following definitions.
  1. __ptr_ring_full
  2. ptr_ring_full
  3. ptr_ring_full_irq
  4. ptr_ring_full_any
  5. ptr_ring_full_bh
  6. __ptr_ring_produce
  7. ptr_ring_produce
  8. ptr_ring_produce_irq
  9. ptr_ring_produce_any
  10. ptr_ring_produce_bh
  11. __ptr_ring_peek
  12. __ptr_ring_empty
  13. ptr_ring_empty
  14. ptr_ring_empty_irq
  15. ptr_ring_empty_any
  16. ptr_ring_empty_bh
  17. __ptr_ring_discard_one
  18. __ptr_ring_consume
  19. __ptr_ring_consume_batched
  20. ptr_ring_consume
  21. ptr_ring_consume_irq
  22. ptr_ring_consume_any
  23. ptr_ring_consume_bh
  24. ptr_ring_consume_batched
  25. ptr_ring_consume_batched_irq
  26. ptr_ring_consume_batched_any
  27. ptr_ring_consume_batched_bh
  28. __ptr_ring_init_queue_alloc
  29. __ptr_ring_set_size
  30. ptr_ring_init
  31. ptr_ring_unconsume
  32. __ptr_ring_swap_queue
  33. ptr_ring_resize
  34. ptr_ring_resize_multiple
  35. ptr_ring_cleanup

   1 /* SPDX-License-Identifier: GPL-2.0-or-later */
   2 /*
   3  *      Definitions for the 'struct ptr_ring' datastructure.
   4  *
   5  *      Author:
   6  *              Michael S. Tsirkin <mst@redhat.com>
   7  *
   8  *      Copyright (C) 2016 Red Hat, Inc.
   9  *
  10  *      This is a limited-size FIFO maintaining pointers in FIFO order, with
  11  *      one CPU producing entries and another consuming entries from a FIFO.
  12  *
  13  *      This implementation tries to minimize cache-contention when there is a
  14  *      single producer and a single consumer CPU.
  15  */
  16 
  17 #ifndef _LINUX_PTR_RING_H
  18 #define _LINUX_PTR_RING_H 1
  19 
  20 #ifdef __KERNEL__
  21 #include <linux/spinlock.h>
  22 #include <linux/cache.h>
  23 #include <linux/types.h>
  24 #include <linux/compiler.h>
  25 #include <linux/slab.h>
  26 #include <asm/errno.h>
  27 #endif
  28 
  29 struct ptr_ring {
  30         int producer ____cacheline_aligned_in_smp;
  31         spinlock_t producer_lock;
  32         int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */
  33         int consumer_tail; /* next entry to invalidate */
  34         spinlock_t consumer_lock;
  35         /* Shared consumer/producer data */
  36         /* Read-only by both the producer and the consumer */
  37         int size ____cacheline_aligned_in_smp; /* max entries in queue */
  38         int batch; /* number of entries to consume in a batch */
  39         void **queue;
  40 };
  41 
  42 /* Note: callers invoking this in a loop must use a compiler barrier,
  43  * for example cpu_relax().
  44  *
  45  * NB: this is unlike __ptr_ring_empty in that callers must hold producer_lock:
  46  * see e.g. ptr_ring_full.
  47  */
  48 static inline bool __ptr_ring_full(struct ptr_ring *r)
  49 {
  50         return r->queue[r->producer];
  51 }
  52 
  53 static inline bool ptr_ring_full(struct ptr_ring *r)
  54 {
  55         bool ret;
  56 
  57         spin_lock(&r->producer_lock);
  58         ret = __ptr_ring_full(r);
  59         spin_unlock(&r->producer_lock);
  60 
  61         return ret;
  62 }
  63 
  64 static inline bool ptr_ring_full_irq(struct ptr_ring *r)
  65 {
  66         bool ret;
  67 
  68         spin_lock_irq(&r->producer_lock);
  69         ret = __ptr_ring_full(r);
  70         spin_unlock_irq(&r->producer_lock);
  71 
  72         return ret;
  73 }
  74 
  75 static inline bool ptr_ring_full_any(struct ptr_ring *r)
  76 {
  77         unsigned long flags;
  78         bool ret;
  79 
  80         spin_lock_irqsave(&r->producer_lock, flags);
  81         ret = __ptr_ring_full(r);
  82         spin_unlock_irqrestore(&r->producer_lock, flags);
  83 
  84         return ret;
  85 }
  86 
  87 static inline bool ptr_ring_full_bh(struct ptr_ring *r)
  88 {
  89         bool ret;
  90 
  91         spin_lock_bh(&r->producer_lock);
  92         ret = __ptr_ring_full(r);
  93         spin_unlock_bh(&r->producer_lock);
  94 
  95         return ret;
  96 }
  97 
  98 /* Note: callers invoking this in a loop must use a compiler barrier,
  99  * for example cpu_relax(). Callers must hold producer_lock.
 100  * Callers are responsible for making sure pointer that is being queued
 101  * points to a valid data.
 102  */
 103 static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
 104 {
 105         if (unlikely(!r->size) || r->queue[r->producer])
 106                 return -ENOSPC;
 107 
 108         /* Make sure the pointer we are storing points to a valid data. */
 109         /* Pairs with smp_read_barrier_depends in __ptr_ring_consume. */
 110         smp_wmb();
 111 
 112         WRITE_ONCE(r->queue[r->producer++], ptr);
 113         if (unlikely(r->producer >= r->size))
 114                 r->producer = 0;
 115         return 0;
 116 }
 117 
 118 /*
 119  * Note: resize (below) nests producer lock within consumer lock, so if you
 120  * consume in interrupt or BH context, you must disable interrupts/BH when
 121  * calling this.
 122  */
 123 static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr)
 124 {
 125         int ret;
 126 
 127         spin_lock(&r->producer_lock);
 128         ret = __ptr_ring_produce(r, ptr);
 129         spin_unlock(&r->producer_lock);
 130 
 131         return ret;
 132 }
 133 
 134 static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr)
 135 {
 136         int ret;
 137 
 138         spin_lock_irq(&r->producer_lock);
 139         ret = __ptr_ring_produce(r, ptr);
 140         spin_unlock_irq(&r->producer_lock);
 141 
 142         return ret;
 143 }
 144 
 145 static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr)
 146 {
 147         unsigned long flags;
 148         int ret;
 149 
 150         spin_lock_irqsave(&r->producer_lock, flags);
 151         ret = __ptr_ring_produce(r, ptr);
 152         spin_unlock_irqrestore(&r->producer_lock, flags);
 153 
 154         return ret;
 155 }
 156 
 157 static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
 158 {
 159         int ret;
 160 
 161         spin_lock_bh(&r->producer_lock);
 162         ret = __ptr_ring_produce(r, ptr);
 163         spin_unlock_bh(&r->producer_lock);
 164 
 165         return ret;
 166 }
 167 
 168 static inline void *__ptr_ring_peek(struct ptr_ring *r)
 169 {
 170         if (likely(r->size))
 171                 return READ_ONCE(r->queue[r->consumer_head]);
 172         return NULL;
 173 }
 174 
 175 /*
 176  * Test ring empty status without taking any locks.
 177  *
 178  * NB: This is only safe to call if ring is never resized.
 179  *
 180  * However, if some other CPU consumes ring entries at the same time, the value
 181  * returned is not guaranteed to be correct.
 182  *
 183  * In this case - to avoid incorrectly detecting the ring
 184  * as empty - the CPU consuming the ring entries is responsible
 185  * for either consuming all ring entries until the ring is empty,
 186  * or synchronizing with some other CPU and causing it to
 187  * re-test __ptr_ring_empty and/or consume the ring enteries
 188  * after the synchronization point.
 189  *
 190  * Note: callers invoking this in a loop must use a compiler barrier,
 191  * for example cpu_relax().
 192  */
 193 static inline bool __ptr_ring_empty(struct ptr_ring *r)
 194 {
 195         if (likely(r->size))
 196                 return !r->queue[READ_ONCE(r->consumer_head)];
 197         return true;
 198 }
 199 
 200 static inline bool ptr_ring_empty(struct ptr_ring *r)
 201 {
 202         bool ret;
 203 
 204         spin_lock(&r->consumer_lock);
 205         ret = __ptr_ring_empty(r);
 206         spin_unlock(&r->consumer_lock);
 207 
 208         return ret;
 209 }
 210 
 211 static inline bool ptr_ring_empty_irq(struct ptr_ring *r)
 212 {
 213         bool ret;
 214 
 215         spin_lock_irq(&r->consumer_lock);
 216         ret = __ptr_ring_empty(r);
 217         spin_unlock_irq(&r->consumer_lock);
 218 
 219         return ret;
 220 }
 221 
 222 static inline bool ptr_ring_empty_any(struct ptr_ring *r)
 223 {
 224         unsigned long flags;
 225         bool ret;
 226 
 227         spin_lock_irqsave(&r->consumer_lock, flags);
 228         ret = __ptr_ring_empty(r);
 229         spin_unlock_irqrestore(&r->consumer_lock, flags);
 230 
 231         return ret;
 232 }
 233 
 234 static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
 235 {
 236         bool ret;
 237 
 238         spin_lock_bh(&r->consumer_lock);
 239         ret = __ptr_ring_empty(r);
 240         spin_unlock_bh(&r->consumer_lock);
 241 
 242         return ret;
 243 }
 244 
 245 /* Must only be called after __ptr_ring_peek returned !NULL */
 246 static inline void __ptr_ring_discard_one(struct ptr_ring *r)
 247 {
 248         /* Fundamentally, what we want to do is update consumer
 249          * index and zero out the entry so producer can reuse it.
 250          * Doing it naively at each consume would be as simple as:
 251          *       consumer = r->consumer;
 252          *       r->queue[consumer++] = NULL;
 253          *       if (unlikely(consumer >= r->size))
 254          *               consumer = 0;
 255          *       r->consumer = consumer;
 256          * but that is suboptimal when the ring is full as producer is writing
 257          * out new entries in the same cache line.  Defer these updates until a
 258          * batch of entries has been consumed.
 259          */
 260         /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty
 261          * to work correctly.
 262          */
 263         int consumer_head = r->consumer_head;
 264         int head = consumer_head++;
 265 
 266         /* Once we have processed enough entries invalidate them in
 267          * the ring all at once so producer can reuse their space in the ring.
 268          * We also do this when we reach end of the ring - not mandatory
 269          * but helps keep the implementation simple.
 270          */
 271         if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
 272                      consumer_head >= r->size)) {
 273                 /* Zero out entries in the reverse order: this way we touch the
 274                  * cache line that producer might currently be reading the last;
 275                  * producer won't make progress and touch other cache lines
 276                  * besides the first one until we write out all entries.
 277                  */
 278                 while (likely(head >= r->consumer_tail))
 279                         r->queue[head--] = NULL;
 280                 r->consumer_tail = consumer_head;
 281         }
 282         if (unlikely(consumer_head >= r->size)) {
 283                 consumer_head = 0;
 284                 r->consumer_tail = 0;
 285         }
 286         /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
 287         WRITE_ONCE(r->consumer_head, consumer_head);
 288 }
 289 
 290 static inline void *__ptr_ring_consume(struct ptr_ring *r)
 291 {
 292         void *ptr;
 293 
 294         /* The READ_ONCE in __ptr_ring_peek guarantees that anyone
 295          * accessing data through the pointer is up to date. Pairs
 296          * with smp_wmb in __ptr_ring_produce.
 297          */
 298         ptr = __ptr_ring_peek(r);
 299         if (ptr)
 300                 __ptr_ring_discard_one(r);
 301 
 302         return ptr;
 303 }
 304 
 305 static inline int __ptr_ring_consume_batched(struct ptr_ring *r,
 306                                              void **array, int n)
 307 {
 308         void *ptr;
 309         int i;
 310 
 311         for (i = 0; i < n; i++) {
 312                 ptr = __ptr_ring_consume(r);
 313                 if (!ptr)
 314                         break;
 315                 array[i] = ptr;
 316         }
 317 
 318         return i;
 319 }
 320 
 321 /*
 322  * Note: resize (below) nests producer lock within consumer lock, so if you
 323  * call this in interrupt or BH context, you must disable interrupts/BH when
 324  * producing.
 325  */
 326 static inline void *ptr_ring_consume(struct ptr_ring *r)
 327 {
 328         void *ptr;
 329 
 330         spin_lock(&r->consumer_lock);
 331         ptr = __ptr_ring_consume(r);
 332         spin_unlock(&r->consumer_lock);
 333 
 334         return ptr;
 335 }
 336 
 337 static inline void *ptr_ring_consume_irq(struct ptr_ring *r)
 338 {
 339         void *ptr;
 340 
 341         spin_lock_irq(&r->consumer_lock);
 342         ptr = __ptr_ring_consume(r);
 343         spin_unlock_irq(&r->consumer_lock);
 344 
 345         return ptr;
 346 }
 347 
 348 static inline void *ptr_ring_consume_any(struct ptr_ring *r)
 349 {
 350         unsigned long flags;
 351         void *ptr;
 352 
 353         spin_lock_irqsave(&r->consumer_lock, flags);
 354         ptr = __ptr_ring_consume(r);
 355         spin_unlock_irqrestore(&r->consumer_lock, flags);
 356 
 357         return ptr;
 358 }
 359 
 360 static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
 361 {
 362         void *ptr;
 363 
 364         spin_lock_bh(&r->consumer_lock);
 365         ptr = __ptr_ring_consume(r);
 366         spin_unlock_bh(&r->consumer_lock);
 367 
 368         return ptr;
 369 }
 370 
 371 static inline int ptr_ring_consume_batched(struct ptr_ring *r,
 372                                            void **array, int n)
 373 {
 374         int ret;
 375 
 376         spin_lock(&r->consumer_lock);
 377         ret = __ptr_ring_consume_batched(r, array, n);
 378         spin_unlock(&r->consumer_lock);
 379 
 380         return ret;
 381 }
 382 
 383 static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r,
 384                                                void **array, int n)
 385 {
 386         int ret;
 387 
 388         spin_lock_irq(&r->consumer_lock);
 389         ret = __ptr_ring_consume_batched(r, array, n);
 390         spin_unlock_irq(&r->consumer_lock);
 391 
 392         return ret;
 393 }
 394 
 395 static inline int ptr_ring_consume_batched_any(struct ptr_ring *r,
 396                                                void **array, int n)
 397 {
 398         unsigned long flags;
 399         int ret;
 400 
 401         spin_lock_irqsave(&r->consumer_lock, flags);
 402         ret = __ptr_ring_consume_batched(r, array, n);
 403         spin_unlock_irqrestore(&r->consumer_lock, flags);
 404 
 405         return ret;
 406 }
 407 
 408 static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r,
 409                                               void **array, int n)
 410 {
 411         int ret;
 412 
 413         spin_lock_bh(&r->consumer_lock);
 414         ret = __ptr_ring_consume_batched(r, array, n);
 415         spin_unlock_bh(&r->consumer_lock);
 416 
 417         return ret;
 418 }
 419 
 420 /* Cast to structure type and call a function without discarding from FIFO.
 421  * Function must return a value.
 422  * Callers must take consumer_lock.
 423  */
 424 #define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r)))
 425 
 426 #define PTR_RING_PEEK_CALL(r, f) ({ \
 427         typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
 428         \
 429         spin_lock(&(r)->consumer_lock); \
 430         __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
 431         spin_unlock(&(r)->consumer_lock); \
 432         __PTR_RING_PEEK_CALL_v; \
 433 })
 434 
 435 #define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \
 436         typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
 437         \
 438         spin_lock_irq(&(r)->consumer_lock); \
 439         __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
 440         spin_unlock_irq(&(r)->consumer_lock); \
 441         __PTR_RING_PEEK_CALL_v; \
 442 })
 443 
 444 #define PTR_RING_PEEK_CALL_BH(r, f) ({ \
 445         typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
 446         \
 447         spin_lock_bh(&(r)->consumer_lock); \
 448         __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
 449         spin_unlock_bh(&(r)->consumer_lock); \
 450         __PTR_RING_PEEK_CALL_v; \
 451 })
 452 
 453 #define PTR_RING_PEEK_CALL_ANY(r, f) ({ \
 454         typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
 455         unsigned long __PTR_RING_PEEK_CALL_f;\
 456         \
 457         spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
 458         __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
 459         spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
 460         __PTR_RING_PEEK_CALL_v; \
 461 })
 462 
 463 /* Not all gfp_t flags (besides GFP_KERNEL) are allowed. See
 464  * documentation for vmalloc for which of them are legal.
 465  */
 466 static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp)
 467 {
 468         if (size > KMALLOC_MAX_SIZE / sizeof(void *))
 469                 return NULL;
 470         return kvmalloc_array(size, sizeof(void *), gfp | __GFP_ZERO);
 471 }
 472 
 473 static inline void __ptr_ring_set_size(struct ptr_ring *r, int size)
 474 {
 475         r->size = size;
 476         r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue));
 477         /* We need to set batch at least to 1 to make logic
 478          * in __ptr_ring_discard_one work correctly.
 479          * Batching too much (because ring is small) would cause a lot of
 480          * burstiness. Needs tuning, for now disable batching.
 481          */
 482         if (r->batch > r->size / 2 || !r->batch)
 483                 r->batch = 1;
 484 }
 485 
 486 static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
 487 {
 488         r->queue = __ptr_ring_init_queue_alloc(size, gfp);
 489         if (!r->queue)
 490                 return -ENOMEM;
 491 
 492         __ptr_ring_set_size(r, size);
 493         r->producer = r->consumer_head = r->consumer_tail = 0;
 494         spin_lock_init(&r->producer_lock);
 495         spin_lock_init(&r->consumer_lock);
 496 
 497         return 0;
 498 }
 499 
 500 /*
 501  * Return entries into ring. Destroy entries that don't fit.
 502  *
 503  * Note: this is expected to be a rare slow path operation.
 504  *
 505  * Note: producer lock is nested within consumer lock, so if you
 506  * resize you must make sure all uses nest correctly.
 507  * In particular if you consume ring in interrupt or BH context, you must
 508  * disable interrupts/BH when doing so.
 509  */
 510 static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n,
 511                                       void (*destroy)(void *))
 512 {
 513         unsigned long flags;
 514         int head;
 515 
 516         spin_lock_irqsave(&r->consumer_lock, flags);
 517         spin_lock(&r->producer_lock);
 518 
 519         if (!r->size)
 520                 goto done;
 521 
 522         /*
 523          * Clean out buffered entries (for simplicity). This way following code
 524          * can test entries for NULL and if not assume they are valid.
 525          */
 526         head = r->consumer_head - 1;
 527         while (likely(head >= r->consumer_tail))
 528                 r->queue[head--] = NULL;
 529         r->consumer_tail = r->consumer_head;
 530 
 531         /*
 532          * Go over entries in batch, start moving head back and copy entries.
 533          * Stop when we run into previously unconsumed entries.
 534          */
 535         while (n) {
 536                 head = r->consumer_head - 1;
 537                 if (head < 0)
 538                         head = r->size - 1;
 539                 if (r->queue[head]) {
 540                         /* This batch entry will have to be destroyed. */
 541                         goto done;
 542                 }
 543                 r->queue[head] = batch[--n];
 544                 r->consumer_tail = head;
 545                 /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
 546                 WRITE_ONCE(r->consumer_head, head);
 547         }
 548 
 549 done:
 550         /* Destroy all entries left in the batch. */
 551         while (n)
 552                 destroy(batch[--n]);
 553         spin_unlock(&r->producer_lock);
 554         spin_unlock_irqrestore(&r->consumer_lock, flags);
 555 }
 556 
 557 static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue,
 558                                            int size, gfp_t gfp,
 559                                            void (*destroy)(void *))
 560 {
 561         int producer = 0;
 562         void **old;
 563         void *ptr;
 564 
 565         while ((ptr = __ptr_ring_consume(r)))
 566                 if (producer < size)
 567                         queue[producer++] = ptr;
 568                 else if (destroy)
 569                         destroy(ptr);
 570 
 571         if (producer >= size)
 572                 producer = 0;
 573         __ptr_ring_set_size(r, size);
 574         r->producer = producer;
 575         r->consumer_head = 0;
 576         r->consumer_tail = 0;
 577         old = r->queue;
 578         r->queue = queue;
 579 
 580         return old;
 581 }
 582 
 583 /*
 584  * Note: producer lock is nested within consumer lock, so if you
 585  * resize you must make sure all uses nest correctly.
 586  * In particular if you consume ring in interrupt or BH context, you must
 587  * disable interrupts/BH when doing so.
 588  */
 589 static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp,
 590                                   void (*destroy)(void *))
 591 {
 592         unsigned long flags;
 593         void **queue = __ptr_ring_init_queue_alloc(size, gfp);
 594         void **old;
 595 
 596         if (!queue)
 597                 return -ENOMEM;
 598 
 599         spin_lock_irqsave(&(r)->consumer_lock, flags);
 600         spin_lock(&(r)->producer_lock);
 601 
 602         old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy);
 603 
 604         spin_unlock(&(r)->producer_lock);
 605         spin_unlock_irqrestore(&(r)->consumer_lock, flags);
 606 
 607         kvfree(old);
 608 
 609         return 0;
 610 }
 611 
 612 /*
 613  * Note: producer lock is nested within consumer lock, so if you
 614  * resize you must make sure all uses nest correctly.
 615  * In particular if you consume ring in interrupt or BH context, you must
 616  * disable interrupts/BH when doing so.
 617  */
 618 static inline int ptr_ring_resize_multiple(struct ptr_ring **rings,
 619                                            unsigned int nrings,
 620                                            int size,
 621                                            gfp_t gfp, void (*destroy)(void *))
 622 {
 623         unsigned long flags;
 624         void ***queues;
 625         int i;
 626 
 627         queues = kmalloc_array(nrings, sizeof(*queues), gfp);
 628         if (!queues)
 629                 goto noqueues;
 630 
 631         for (i = 0; i < nrings; ++i) {
 632                 queues[i] = __ptr_ring_init_queue_alloc(size, gfp);
 633                 if (!queues[i])
 634                         goto nomem;
 635         }
 636 
 637         for (i = 0; i < nrings; ++i) {
 638                 spin_lock_irqsave(&(rings[i])->consumer_lock, flags);
 639                 spin_lock(&(rings[i])->producer_lock);
 640                 queues[i] = __ptr_ring_swap_queue(rings[i], queues[i],
 641                                                   size, gfp, destroy);
 642                 spin_unlock(&(rings[i])->producer_lock);
 643                 spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags);
 644         }
 645 
 646         for (i = 0; i < nrings; ++i)
 647                 kvfree(queues[i]);
 648 
 649         kfree(queues);
 650 
 651         return 0;
 652 
 653 nomem:
 654         while (--i >= 0)
 655                 kvfree(queues[i]);
 656 
 657         kfree(queues);
 658 
 659 noqueues:
 660         return -ENOMEM;
 661 }
 662 
 663 static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
 664 {
 665         void *ptr;
 666 
 667         if (destroy)
 668                 while ((ptr = ptr_ring_consume(r)))
 669                         destroy(ptr);
 670         kvfree(r->queue);
 671 }
 672 
 673 #endif /* _LINUX_PTR_RING_H  */

/* [<][>][^][v][top][bottom][index][help] */