root/fs/xfs/xfs_pwork.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. xfs_pwork_work
  2. xfs_pwork_init
  3. xfs_pwork_queue
  4. xfs_pwork_destroy
  5. xfs_pwork_poll
  6. xfs_pwork_guess_datadev_parallelism

   1 // SPDX-License-Identifier: GPL-2.0-or-later
   2 /*
   3  * Copyright (C) 2019 Oracle.  All Rights Reserved.
   4  * Author: Darrick J. Wong <darrick.wong@oracle.com>
   5  */
   6 #include "xfs.h"
   7 #include "xfs_fs.h"
   8 #include "xfs_shared.h"
   9 #include "xfs_format.h"
  10 #include "xfs_log_format.h"
  11 #include "xfs_trans_resv.h"
  12 #include "xfs_mount.h"
  13 #include "xfs_trace.h"
  14 #include "xfs_sysctl.h"
  15 #include "xfs_pwork.h"
  16 #include <linux/nmi.h>
  17 
  18 /*
  19  * Parallel Work Queue
  20  * ===================
  21  *
  22  * Abstract away the details of running a large and "obviously" parallelizable
  23  * task across multiple CPUs.  Callers initialize the pwork control object with
  24  * a desired level of parallelization and a work function.  Next, they embed
  25  * struct xfs_pwork in whatever structure they use to pass work context to a
  26  * worker thread and queue that pwork.  The work function will be passed the
  27  * pwork item when it is run (from process context) and any returned error will
  28  * be recorded in xfs_pwork_ctl.error.  Work functions should check for errors
  29  * and abort if necessary; the non-zeroness of xfs_pwork_ctl.error does not
  30  * stop workqueue item processing.
  31  *
  32  * This is the rough equivalent of the xfsprogs workqueue code, though we can't
  33  * reuse that name here.
  34  */
  35 
  36 /* Invoke our caller's function. */
  37 static void
  38 xfs_pwork_work(
  39         struct work_struct      *work)
  40 {
  41         struct xfs_pwork        *pwork;
  42         struct xfs_pwork_ctl    *pctl;
  43         int                     error;
  44 
  45         pwork = container_of(work, struct xfs_pwork, work);
  46         pctl = pwork->pctl;
  47         error = pctl->work_fn(pctl->mp, pwork);
  48         if (error && !pctl->error)
  49                 pctl->error = error;
  50         if (atomic_dec_and_test(&pctl->nr_work))
  51                 wake_up(&pctl->poll_wait);
  52 }
  53 
  54 /*
  55  * Set up control data for parallel work.  @work_fn is the function that will
  56  * be called.  @tag will be written into the kernel threads.  @nr_threads is
  57  * the level of parallelism desired, or 0 for no limit.
  58  */
  59 int
  60 xfs_pwork_init(
  61         struct xfs_mount        *mp,
  62         struct xfs_pwork_ctl    *pctl,
  63         xfs_pwork_work_fn       work_fn,
  64         const char              *tag,
  65         unsigned int            nr_threads)
  66 {
  67 #ifdef DEBUG
  68         if (xfs_globals.pwork_threads >= 0)
  69                 nr_threads = xfs_globals.pwork_threads;
  70 #endif
  71         trace_xfs_pwork_init(mp, nr_threads, current->pid);
  72 
  73         pctl->wq = alloc_workqueue("%s-%d", WQ_FREEZABLE, nr_threads, tag,
  74                         current->pid);
  75         if (!pctl->wq)
  76                 return -ENOMEM;
  77         pctl->work_fn = work_fn;
  78         pctl->error = 0;
  79         pctl->mp = mp;
  80         atomic_set(&pctl->nr_work, 0);
  81         init_waitqueue_head(&pctl->poll_wait);
  82 
  83         return 0;
  84 }
  85 
  86 /* Queue some parallel work. */
  87 void
  88 xfs_pwork_queue(
  89         struct xfs_pwork_ctl    *pctl,
  90         struct xfs_pwork        *pwork)
  91 {
  92         INIT_WORK(&pwork->work, xfs_pwork_work);
  93         pwork->pctl = pctl;
  94         atomic_inc(&pctl->nr_work);
  95         queue_work(pctl->wq, &pwork->work);
  96 }
  97 
  98 /* Wait for the work to finish and tear down the control structure. */
  99 int
 100 xfs_pwork_destroy(
 101         struct xfs_pwork_ctl    *pctl)
 102 {
 103         destroy_workqueue(pctl->wq);
 104         pctl->wq = NULL;
 105         return pctl->error;
 106 }
 107 
 108 /*
 109  * Wait for the work to finish by polling completion status and touch the soft
 110  * lockup watchdog.  This is for callers such as mount which hold locks.
 111  */
 112 void
 113 xfs_pwork_poll(
 114         struct xfs_pwork_ctl    *pctl)
 115 {
 116         while (wait_event_timeout(pctl->poll_wait,
 117                                 atomic_read(&pctl->nr_work) == 0, HZ) == 0)
 118                 touch_softlockup_watchdog();
 119 }
 120 
 121 /*
 122  * Return the amount of parallelism that the data device can handle, or 0 for
 123  * no limit.
 124  */
 125 unsigned int
 126 xfs_pwork_guess_datadev_parallelism(
 127         struct xfs_mount        *mp)
 128 {
 129         struct xfs_buftarg      *btp = mp->m_ddev_targp;
 130 
 131         /*
 132          * For now we'll go with the most conservative setting possible,
 133          * which is two threads for an SSD and 1 thread everywhere else.
 134          */
 135         return blk_queue_nonrot(btp->bt_bdev->bd_queue) ? 2 : 1;
 136 }

/* [<][>][^][v][top][bottom][index][help] */