root/drivers/md/dm-kcopyd.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. dm_get_kcopyd_subjob_size
  2. io_job_start
  3. io_job_finish
  4. wake
  5. alloc_pl
  6. free_pl
  7. kcopyd_put_pages
  8. kcopyd_get_pages
  9. drop_pages
  10. client_reserve_pages
  11. client_free_pages
  12. dm_kcopyd_init
  13. dm_kcopyd_exit
  14. pop_io_job
  15. pop
  16. push
  17. push_head
  18. run_complete_job
  19. complete_io
  20. run_io_job
  21. run_pages_job
  22. process_jobs
  23. do_work
  24. dispatch_job
  25. segment_complete
  26. split_job
  27. dm_kcopyd_copy
  28. dm_kcopyd_zero
  29. dm_kcopyd_prepare_callback
  30. dm_kcopyd_do_callback
  31. kcopyd_cancel
  32. dm_kcopyd_client_create
  33. dm_kcopyd_client_destroy

   1 /*
   2  * Copyright (C) 2002 Sistina Software (UK) Limited.
   3  * Copyright (C) 2006 Red Hat GmbH
   4  *
   5  * This file is released under the GPL.
   6  *
   7  * Kcopyd provides a simple interface for copying an area of one
   8  * block-device to one or more other block-devices, with an asynchronous
   9  * completion notification.
  10  */
  11 
  12 #include <linux/types.h>
  13 #include <linux/atomic.h>
  14 #include <linux/blkdev.h>
  15 #include <linux/fs.h>
  16 #include <linux/init.h>
  17 #include <linux/list.h>
  18 #include <linux/mempool.h>
  19 #include <linux/module.h>
  20 #include <linux/pagemap.h>
  21 #include <linux/slab.h>
  22 #include <linux/vmalloc.h>
  23 #include <linux/workqueue.h>
  24 #include <linux/mutex.h>
  25 #include <linux/delay.h>
  26 #include <linux/device-mapper.h>
  27 #include <linux/dm-kcopyd.h>
  28 
  29 #include "dm-core.h"
  30 
  31 #define SPLIT_COUNT     8
  32 #define MIN_JOBS        8
  33 
  34 #define DEFAULT_SUB_JOB_SIZE_KB 512
  35 #define MAX_SUB_JOB_SIZE_KB     1024
  36 
  37 static unsigned kcopyd_subjob_size_kb = DEFAULT_SUB_JOB_SIZE_KB;
  38 
  39 module_param(kcopyd_subjob_size_kb, uint, S_IRUGO | S_IWUSR);
  40 MODULE_PARM_DESC(kcopyd_subjob_size_kb, "Sub-job size for dm-kcopyd clients");
  41 
  42 static unsigned dm_get_kcopyd_subjob_size(void)
  43 {
  44         unsigned sub_job_size_kb;
  45 
  46         sub_job_size_kb = __dm_get_module_param(&kcopyd_subjob_size_kb,
  47                                                 DEFAULT_SUB_JOB_SIZE_KB,
  48                                                 MAX_SUB_JOB_SIZE_KB);
  49 
  50         return sub_job_size_kb << 1;
  51 }
  52 
  53 /*-----------------------------------------------------------------
  54  * Each kcopyd client has its own little pool of preallocated
  55  * pages for kcopyd io.
  56  *---------------------------------------------------------------*/
  57 struct dm_kcopyd_client {
  58         struct page_list *pages;
  59         unsigned nr_reserved_pages;
  60         unsigned nr_free_pages;
  61         unsigned sub_job_size;
  62 
  63         struct dm_io_client *io_client;
  64 
  65         wait_queue_head_t destroyq;
  66 
  67         mempool_t job_pool;
  68 
  69         struct workqueue_struct *kcopyd_wq;
  70         struct work_struct kcopyd_work;
  71 
  72         struct dm_kcopyd_throttle *throttle;
  73 
  74         atomic_t nr_jobs;
  75 
  76 /*
  77  * We maintain four lists of jobs:
  78  *
  79  * i)   jobs waiting for pages
  80  * ii)  jobs that have pages, and are waiting for the io to be issued.
  81  * iii) jobs that don't need to do any IO and just run a callback
  82  * iv) jobs that have completed.
  83  *
  84  * All four of these are protected by job_lock.
  85  */
  86         spinlock_t job_lock;
  87         struct list_head callback_jobs;
  88         struct list_head complete_jobs;
  89         struct list_head io_jobs;
  90         struct list_head pages_jobs;
  91 };
  92 
  93 static struct page_list zero_page_list;
  94 
  95 static DEFINE_SPINLOCK(throttle_spinlock);
  96 
  97 /*
  98  * IO/IDLE accounting slowly decays after (1 << ACCOUNT_INTERVAL_SHIFT) period.
  99  * When total_period >= (1 << ACCOUNT_INTERVAL_SHIFT) the counters are divided
 100  * by 2.
 101  */
 102 #define ACCOUNT_INTERVAL_SHIFT          SHIFT_HZ
 103 
 104 /*
 105  * Sleep this number of milliseconds.
 106  *
 107  * The value was decided experimentally.
 108  * Smaller values seem to cause an increased copy rate above the limit.
 109  * The reason for this is unknown but possibly due to jiffies rounding errors
 110  * or read/write cache inside the disk.
 111  */
 112 #define SLEEP_MSEC                      100
 113 
 114 /*
 115  * Maximum number of sleep events. There is a theoretical livelock if more
 116  * kcopyd clients do work simultaneously which this limit avoids.
 117  */
 118 #define MAX_SLEEPS                      10
 119 
 120 static void io_job_start(struct dm_kcopyd_throttle *t)
 121 {
 122         unsigned throttle, now, difference;
 123         int slept = 0, skew;
 124 
 125         if (unlikely(!t))
 126                 return;
 127 
 128 try_again:
 129         spin_lock_irq(&throttle_spinlock);
 130 
 131         throttle = READ_ONCE(t->throttle);
 132 
 133         if (likely(throttle >= 100))
 134                 goto skip_limit;
 135 
 136         now = jiffies;
 137         difference = now - t->last_jiffies;
 138         t->last_jiffies = now;
 139         if (t->num_io_jobs)
 140                 t->io_period += difference;
 141         t->total_period += difference;
 142 
 143         /*
 144          * Maintain sane values if we got a temporary overflow.
 145          */
 146         if (unlikely(t->io_period > t->total_period))
 147                 t->io_period = t->total_period;
 148 
 149         if (unlikely(t->total_period >= (1 << ACCOUNT_INTERVAL_SHIFT))) {
 150                 int shift = fls(t->total_period >> ACCOUNT_INTERVAL_SHIFT);
 151                 t->total_period >>= shift;
 152                 t->io_period >>= shift;
 153         }
 154 
 155         skew = t->io_period - throttle * t->total_period / 100;
 156 
 157         if (unlikely(skew > 0) && slept < MAX_SLEEPS) {
 158                 slept++;
 159                 spin_unlock_irq(&throttle_spinlock);
 160                 msleep(SLEEP_MSEC);
 161                 goto try_again;
 162         }
 163 
 164 skip_limit:
 165         t->num_io_jobs++;
 166 
 167         spin_unlock_irq(&throttle_spinlock);
 168 }
 169 
 170 static void io_job_finish(struct dm_kcopyd_throttle *t)
 171 {
 172         unsigned long flags;
 173 
 174         if (unlikely(!t))
 175                 return;
 176 
 177         spin_lock_irqsave(&throttle_spinlock, flags);
 178 
 179         t->num_io_jobs--;
 180 
 181         if (likely(READ_ONCE(t->throttle) >= 100))
 182                 goto skip_limit;
 183 
 184         if (!t->num_io_jobs) {
 185                 unsigned now, difference;
 186 
 187                 now = jiffies;
 188                 difference = now - t->last_jiffies;
 189                 t->last_jiffies = now;
 190 
 191                 t->io_period += difference;
 192                 t->total_period += difference;
 193 
 194                 /*
 195                  * Maintain sane values if we got a temporary overflow.
 196                  */
 197                 if (unlikely(t->io_period > t->total_period))
 198                         t->io_period = t->total_period;
 199         }
 200 
 201 skip_limit:
 202         spin_unlock_irqrestore(&throttle_spinlock, flags);
 203 }
 204 
 205 
 206 static void wake(struct dm_kcopyd_client *kc)
 207 {
 208         queue_work(kc->kcopyd_wq, &kc->kcopyd_work);
 209 }
 210 
 211 /*
 212  * Obtain one page for the use of kcopyd.
 213  */
 214 static struct page_list *alloc_pl(gfp_t gfp)
 215 {
 216         struct page_list *pl;
 217 
 218         pl = kmalloc(sizeof(*pl), gfp);
 219         if (!pl)
 220                 return NULL;
 221 
 222         pl->page = alloc_page(gfp);
 223         if (!pl->page) {
 224                 kfree(pl);
 225                 return NULL;
 226         }
 227 
 228         return pl;
 229 }
 230 
 231 static void free_pl(struct page_list *pl)
 232 {
 233         __free_page(pl->page);
 234         kfree(pl);
 235 }
 236 
 237 /*
 238  * Add the provided pages to a client's free page list, releasing
 239  * back to the system any beyond the reserved_pages limit.
 240  */
 241 static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl)
 242 {
 243         struct page_list *next;
 244 
 245         do {
 246                 next = pl->next;
 247 
 248                 if (kc->nr_free_pages >= kc->nr_reserved_pages)
 249                         free_pl(pl);
 250                 else {
 251                         pl->next = kc->pages;
 252                         kc->pages = pl;
 253                         kc->nr_free_pages++;
 254                 }
 255 
 256                 pl = next;
 257         } while (pl);
 258 }
 259 
 260 static int kcopyd_get_pages(struct dm_kcopyd_client *kc,
 261                             unsigned int nr, struct page_list **pages)
 262 {
 263         struct page_list *pl;
 264 
 265         *pages = NULL;
 266 
 267         do {
 268                 pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY | __GFP_KSWAPD_RECLAIM);
 269                 if (unlikely(!pl)) {
 270                         /* Use reserved pages */
 271                         pl = kc->pages;
 272                         if (unlikely(!pl))
 273                                 goto out_of_memory;
 274                         kc->pages = pl->next;
 275                         kc->nr_free_pages--;
 276                 }
 277                 pl->next = *pages;
 278                 *pages = pl;
 279         } while (--nr);
 280 
 281         return 0;
 282 
 283 out_of_memory:
 284         if (*pages)
 285                 kcopyd_put_pages(kc, *pages);
 286         return -ENOMEM;
 287 }
 288 
 289 /*
 290  * These three functions resize the page pool.
 291  */
 292 static void drop_pages(struct page_list *pl)
 293 {
 294         struct page_list *next;
 295 
 296         while (pl) {
 297                 next = pl->next;
 298                 free_pl(pl);
 299                 pl = next;
 300         }
 301 }
 302 
 303 /*
 304  * Allocate and reserve nr_pages for the use of a specific client.
 305  */
 306 static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages)
 307 {
 308         unsigned i;
 309         struct page_list *pl = NULL, *next;
 310 
 311         for (i = 0; i < nr_pages; i++) {
 312                 next = alloc_pl(GFP_KERNEL);
 313                 if (!next) {
 314                         if (pl)
 315                                 drop_pages(pl);
 316                         return -ENOMEM;
 317                 }
 318                 next->next = pl;
 319                 pl = next;
 320         }
 321 
 322         kc->nr_reserved_pages += nr_pages;
 323         kcopyd_put_pages(kc, pl);
 324 
 325         return 0;
 326 }
 327 
 328 static void client_free_pages(struct dm_kcopyd_client *kc)
 329 {
 330         BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages);
 331         drop_pages(kc->pages);
 332         kc->pages = NULL;
 333         kc->nr_free_pages = kc->nr_reserved_pages = 0;
 334 }
 335 
 336 /*-----------------------------------------------------------------
 337  * kcopyd_jobs need to be allocated by the *clients* of kcopyd,
 338  * for this reason we use a mempool to prevent the client from
 339  * ever having to do io (which could cause a deadlock).
 340  *---------------------------------------------------------------*/
 341 struct kcopyd_job {
 342         struct dm_kcopyd_client *kc;
 343         struct list_head list;
 344         unsigned long flags;
 345 
 346         /*
 347          * Error state of the job.
 348          */
 349         int read_err;
 350         unsigned long write_err;
 351 
 352         /*
 353          * Either READ or WRITE
 354          */
 355         int rw;
 356         struct dm_io_region source;
 357 
 358         /*
 359          * The destinations for the transfer.
 360          */
 361         unsigned int num_dests;
 362         struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS];
 363 
 364         struct page_list *pages;
 365 
 366         /*
 367          * Set this to ensure you are notified when the job has
 368          * completed.  'context' is for callback to use.
 369          */
 370         dm_kcopyd_notify_fn fn;
 371         void *context;
 372 
 373         /*
 374          * These fields are only used if the job has been split
 375          * into more manageable parts.
 376          */
 377         struct mutex lock;
 378         atomic_t sub_jobs;
 379         sector_t progress;
 380         sector_t write_offset;
 381 
 382         struct kcopyd_job *master_job;
 383 };
 384 
 385 static struct kmem_cache *_job_cache;
 386 
 387 int __init dm_kcopyd_init(void)
 388 {
 389         _job_cache = kmem_cache_create("kcopyd_job",
 390                                 sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1),
 391                                 __alignof__(struct kcopyd_job), 0, NULL);
 392         if (!_job_cache)
 393                 return -ENOMEM;
 394 
 395         zero_page_list.next = &zero_page_list;
 396         zero_page_list.page = ZERO_PAGE(0);
 397 
 398         return 0;
 399 }
 400 
 401 void dm_kcopyd_exit(void)
 402 {
 403         kmem_cache_destroy(_job_cache);
 404         _job_cache = NULL;
 405 }
 406 
 407 /*
 408  * Functions to push and pop a job onto the head of a given job
 409  * list.
 410  */
 411 static struct kcopyd_job *pop_io_job(struct list_head *jobs,
 412                                      struct dm_kcopyd_client *kc)
 413 {
 414         struct kcopyd_job *job;
 415 
 416         /*
 417          * For I/O jobs, pop any read, any write without sequential write
 418          * constraint and sequential writes that are at the right position.
 419          */
 420         list_for_each_entry(job, jobs, list) {
 421                 if (job->rw == READ || !test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags)) {
 422                         list_del(&job->list);
 423                         return job;
 424                 }
 425 
 426                 if (job->write_offset == job->master_job->write_offset) {
 427                         job->master_job->write_offset += job->source.count;
 428                         list_del(&job->list);
 429                         return job;
 430                 }
 431         }
 432 
 433         return NULL;
 434 }
 435 
 436 static struct kcopyd_job *pop(struct list_head *jobs,
 437                               struct dm_kcopyd_client *kc)
 438 {
 439         struct kcopyd_job *job = NULL;
 440         unsigned long flags;
 441 
 442         spin_lock_irqsave(&kc->job_lock, flags);
 443 
 444         if (!list_empty(jobs)) {
 445                 if (jobs == &kc->io_jobs)
 446                         job = pop_io_job(jobs, kc);
 447                 else {
 448                         job = list_entry(jobs->next, struct kcopyd_job, list);
 449                         list_del(&job->list);
 450                 }
 451         }
 452         spin_unlock_irqrestore(&kc->job_lock, flags);
 453 
 454         return job;
 455 }
 456 
 457 static void push(struct list_head *jobs, struct kcopyd_job *job)
 458 {
 459         unsigned long flags;
 460         struct dm_kcopyd_client *kc = job->kc;
 461 
 462         spin_lock_irqsave(&kc->job_lock, flags);
 463         list_add_tail(&job->list, jobs);
 464         spin_unlock_irqrestore(&kc->job_lock, flags);
 465 }
 466 
 467 
 468 static void push_head(struct list_head *jobs, struct kcopyd_job *job)
 469 {
 470         unsigned long flags;
 471         struct dm_kcopyd_client *kc = job->kc;
 472 
 473         spin_lock_irqsave(&kc->job_lock, flags);
 474         list_add(&job->list, jobs);
 475         spin_unlock_irqrestore(&kc->job_lock, flags);
 476 }
 477 
 478 /*
 479  * These three functions process 1 item from the corresponding
 480  * job list.
 481  *
 482  * They return:
 483  * < 0: error
 484  *   0: success
 485  * > 0: can't process yet.
 486  */
 487 static int run_complete_job(struct kcopyd_job *job)
 488 {
 489         void *context = job->context;
 490         int read_err = job->read_err;
 491         unsigned long write_err = job->write_err;
 492         dm_kcopyd_notify_fn fn = job->fn;
 493         struct dm_kcopyd_client *kc = job->kc;
 494 
 495         if (job->pages && job->pages != &zero_page_list)
 496                 kcopyd_put_pages(kc, job->pages);
 497         /*
 498          * If this is the master job, the sub jobs have already
 499          * completed so we can free everything.
 500          */
 501         if (job->master_job == job) {
 502                 mutex_destroy(&job->lock);
 503                 mempool_free(job, &kc->job_pool);
 504         }
 505         fn(read_err, write_err, context);
 506 
 507         if (atomic_dec_and_test(&kc->nr_jobs))
 508                 wake_up(&kc->destroyq);
 509 
 510         cond_resched();
 511 
 512         return 0;
 513 }
 514 
 515 static void complete_io(unsigned long error, void *context)
 516 {
 517         struct kcopyd_job *job = (struct kcopyd_job *) context;
 518         struct dm_kcopyd_client *kc = job->kc;
 519 
 520         io_job_finish(kc->throttle);
 521 
 522         if (error) {
 523                 if (op_is_write(job->rw))
 524                         job->write_err |= error;
 525                 else
 526                         job->read_err = 1;
 527 
 528                 if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
 529                         push(&kc->complete_jobs, job);
 530                         wake(kc);
 531                         return;
 532                 }
 533         }
 534 
 535         if (op_is_write(job->rw))
 536                 push(&kc->complete_jobs, job);
 537 
 538         else {
 539                 job->rw = WRITE;
 540                 push(&kc->io_jobs, job);
 541         }
 542 
 543         wake(kc);
 544 }
 545 
 546 /*
 547  * Request io on as many buffer heads as we can currently get for
 548  * a particular job.
 549  */
 550 static int run_io_job(struct kcopyd_job *job)
 551 {
 552         int r;
 553         struct dm_io_request io_req = {
 554                 .bi_op = job->rw,
 555                 .bi_op_flags = 0,
 556                 .mem.type = DM_IO_PAGE_LIST,
 557                 .mem.ptr.pl = job->pages,
 558                 .mem.offset = 0,
 559                 .notify.fn = complete_io,
 560                 .notify.context = job,
 561                 .client = job->kc->io_client,
 562         };
 563 
 564         /*
 565          * If we need to write sequentially and some reads or writes failed,
 566          * no point in continuing.
 567          */
 568         if (test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags) &&
 569             job->master_job->write_err) {
 570                 job->write_err = job->master_job->write_err;
 571                 return -EIO;
 572         }
 573 
 574         io_job_start(job->kc->throttle);
 575 
 576         if (job->rw == READ)
 577                 r = dm_io(&io_req, 1, &job->source, NULL);
 578         else
 579                 r = dm_io(&io_req, job->num_dests, job->dests, NULL);
 580 
 581         return r;
 582 }
 583 
 584 static int run_pages_job(struct kcopyd_job *job)
 585 {
 586         int r;
 587         unsigned nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9);
 588 
 589         r = kcopyd_get_pages(job->kc, nr_pages, &job->pages);
 590         if (!r) {
 591                 /* this job is ready for io */
 592                 push(&job->kc->io_jobs, job);
 593                 return 0;
 594         }
 595 
 596         if (r == -ENOMEM)
 597                 /* can't complete now */
 598                 return 1;
 599 
 600         return r;
 601 }
 602 
 603 /*
 604  * Run through a list for as long as possible.  Returns the count
 605  * of successful jobs.
 606  */
 607 static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc,
 608                         int (*fn) (struct kcopyd_job *))
 609 {
 610         struct kcopyd_job *job;
 611         int r, count = 0;
 612 
 613         while ((job = pop(jobs, kc))) {
 614 
 615                 r = fn(job);
 616 
 617                 if (r < 0) {
 618                         /* error this rogue job */
 619                         if (op_is_write(job->rw))
 620                                 job->write_err = (unsigned long) -1L;
 621                         else
 622                                 job->read_err = 1;
 623                         push(&kc->complete_jobs, job);
 624                         wake(kc);
 625                         break;
 626                 }
 627 
 628                 if (r > 0) {
 629                         /*
 630                          * We couldn't service this job ATM, so
 631                          * push this job back onto the list.
 632                          */
 633                         push_head(jobs, job);
 634                         break;
 635                 }
 636 
 637                 count++;
 638         }
 639 
 640         return count;
 641 }
 642 
 643 /*
 644  * kcopyd does this every time it's woken up.
 645  */
 646 static void do_work(struct work_struct *work)
 647 {
 648         struct dm_kcopyd_client *kc = container_of(work,
 649                                         struct dm_kcopyd_client, kcopyd_work);
 650         struct blk_plug plug;
 651         unsigned long flags;
 652 
 653         /*
 654          * The order that these are called is *very* important.
 655          * complete jobs can free some pages for pages jobs.
 656          * Pages jobs when successful will jump onto the io jobs
 657          * list.  io jobs call wake when they complete and it all
 658          * starts again.
 659          */
 660         spin_lock_irqsave(&kc->job_lock, flags);
 661         list_splice_tail_init(&kc->callback_jobs, &kc->complete_jobs);
 662         spin_unlock_irqrestore(&kc->job_lock, flags);
 663 
 664         blk_start_plug(&plug);
 665         process_jobs(&kc->complete_jobs, kc, run_complete_job);
 666         process_jobs(&kc->pages_jobs, kc, run_pages_job);
 667         process_jobs(&kc->io_jobs, kc, run_io_job);
 668         blk_finish_plug(&plug);
 669 }
 670 
 671 /*
 672  * If we are copying a small region we just dispatch a single job
 673  * to do the copy, otherwise the io has to be split up into many
 674  * jobs.
 675  */
 676 static void dispatch_job(struct kcopyd_job *job)
 677 {
 678         struct dm_kcopyd_client *kc = job->kc;
 679         atomic_inc(&kc->nr_jobs);
 680         if (unlikely(!job->source.count))
 681                 push(&kc->callback_jobs, job);
 682         else if (job->pages == &zero_page_list)
 683                 push(&kc->io_jobs, job);
 684         else
 685                 push(&kc->pages_jobs, job);
 686         wake(kc);
 687 }
 688 
 689 static void segment_complete(int read_err, unsigned long write_err,
 690                              void *context)
 691 {
 692         /* FIXME: tidy this function */
 693         sector_t progress = 0;
 694         sector_t count = 0;
 695         struct kcopyd_job *sub_job = (struct kcopyd_job *) context;
 696         struct kcopyd_job *job = sub_job->master_job;
 697         struct dm_kcopyd_client *kc = job->kc;
 698 
 699         mutex_lock(&job->lock);
 700 
 701         /* update the error */
 702         if (read_err)
 703                 job->read_err = 1;
 704 
 705         if (write_err)
 706                 job->write_err |= write_err;
 707 
 708         /*
 709          * Only dispatch more work if there hasn't been an error.
 710          */
 711         if ((!job->read_err && !job->write_err) ||
 712             test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
 713                 /* get the next chunk of work */
 714                 progress = job->progress;
 715                 count = job->source.count - progress;
 716                 if (count) {
 717                         if (count > kc->sub_job_size)
 718                                 count = kc->sub_job_size;
 719 
 720                         job->progress += count;
 721                 }
 722         }
 723         mutex_unlock(&job->lock);
 724 
 725         if (count) {
 726                 int i;
 727 
 728                 *sub_job = *job;
 729                 sub_job->write_offset = progress;
 730                 sub_job->source.sector += progress;
 731                 sub_job->source.count = count;
 732 
 733                 for (i = 0; i < job->num_dests; i++) {
 734                         sub_job->dests[i].sector += progress;
 735                         sub_job->dests[i].count = count;
 736                 }
 737 
 738                 sub_job->fn = segment_complete;
 739                 sub_job->context = sub_job;
 740                 dispatch_job(sub_job);
 741 
 742         } else if (atomic_dec_and_test(&job->sub_jobs)) {
 743 
 744                 /*
 745                  * Queue the completion callback to the kcopyd thread.
 746                  *
 747                  * Some callers assume that all the completions are called
 748                  * from a single thread and don't race with each other.
 749                  *
 750                  * We must not call the callback directly here because this
 751                  * code may not be executing in the thread.
 752                  */
 753                 push(&kc->complete_jobs, job);
 754                 wake(kc);
 755         }
 756 }
 757 
 758 /*
 759  * Create some sub jobs to share the work between them.
 760  */
 761 static void split_job(struct kcopyd_job *master_job)
 762 {
 763         int i;
 764 
 765         atomic_inc(&master_job->kc->nr_jobs);
 766 
 767         atomic_set(&master_job->sub_jobs, SPLIT_COUNT);
 768         for (i = 0; i < SPLIT_COUNT; i++) {
 769                 master_job[i + 1].master_job = master_job;
 770                 segment_complete(0, 0u, &master_job[i + 1]);
 771         }
 772 }
 773 
 774 void dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from,
 775                     unsigned int num_dests, struct dm_io_region *dests,
 776                     unsigned int flags, dm_kcopyd_notify_fn fn, void *context)
 777 {
 778         struct kcopyd_job *job;
 779         int i;
 780 
 781         /*
 782          * Allocate an array of jobs consisting of one master job
 783          * followed by SPLIT_COUNT sub jobs.
 784          */
 785         job = mempool_alloc(&kc->job_pool, GFP_NOIO);
 786         mutex_init(&job->lock);
 787 
 788         /*
 789          * set up for the read.
 790          */
 791         job->kc = kc;
 792         job->flags = flags;
 793         job->read_err = 0;
 794         job->write_err = 0;
 795 
 796         job->num_dests = num_dests;
 797         memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
 798 
 799         /*
 800          * If one of the destination is a host-managed zoned block device,
 801          * we need to write sequentially. If one of the destination is a
 802          * host-aware device, then leave it to the caller to choose what to do.
 803          */
 804         if (!test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags)) {
 805                 for (i = 0; i < job->num_dests; i++) {
 806                         if (bdev_zoned_model(dests[i].bdev) == BLK_ZONED_HM) {
 807                                 set_bit(DM_KCOPYD_WRITE_SEQ, &job->flags);
 808                                 break;
 809                         }
 810                 }
 811         }
 812 
 813         /*
 814          * If we need to write sequentially, errors cannot be ignored.
 815          */
 816         if (test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags) &&
 817             test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags))
 818                 clear_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags);
 819 
 820         if (from) {
 821                 job->source = *from;
 822                 job->pages = NULL;
 823                 job->rw = READ;
 824         } else {
 825                 memset(&job->source, 0, sizeof job->source);
 826                 job->source.count = job->dests[0].count;
 827                 job->pages = &zero_page_list;
 828 
 829                 /*
 830                  * Use WRITE ZEROES to optimize zeroing if all dests support it.
 831                  */
 832                 job->rw = REQ_OP_WRITE_ZEROES;
 833                 for (i = 0; i < job->num_dests; i++)
 834                         if (!bdev_write_zeroes_sectors(job->dests[i].bdev)) {
 835                                 job->rw = WRITE;
 836                                 break;
 837                         }
 838         }
 839 
 840         job->fn = fn;
 841         job->context = context;
 842         job->master_job = job;
 843         job->write_offset = 0;
 844 
 845         if (job->source.count <= kc->sub_job_size)
 846                 dispatch_job(job);
 847         else {
 848                 job->progress = 0;
 849                 split_job(job);
 850         }
 851 }
 852 EXPORT_SYMBOL(dm_kcopyd_copy);
 853 
 854 void dm_kcopyd_zero(struct dm_kcopyd_client *kc,
 855                     unsigned num_dests, struct dm_io_region *dests,
 856                     unsigned flags, dm_kcopyd_notify_fn fn, void *context)
 857 {
 858         dm_kcopyd_copy(kc, NULL, num_dests, dests, flags, fn, context);
 859 }
 860 EXPORT_SYMBOL(dm_kcopyd_zero);
 861 
 862 void *dm_kcopyd_prepare_callback(struct dm_kcopyd_client *kc,
 863                                  dm_kcopyd_notify_fn fn, void *context)
 864 {
 865         struct kcopyd_job *job;
 866 
 867         job = mempool_alloc(&kc->job_pool, GFP_NOIO);
 868 
 869         memset(job, 0, sizeof(struct kcopyd_job));
 870         job->kc = kc;
 871         job->fn = fn;
 872         job->context = context;
 873         job->master_job = job;
 874 
 875         atomic_inc(&kc->nr_jobs);
 876 
 877         return job;
 878 }
 879 EXPORT_SYMBOL(dm_kcopyd_prepare_callback);
 880 
 881 void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err)
 882 {
 883         struct kcopyd_job *job = j;
 884         struct dm_kcopyd_client *kc = job->kc;
 885 
 886         job->read_err = read_err;
 887         job->write_err = write_err;
 888 
 889         push(&kc->callback_jobs, job);
 890         wake(kc);
 891 }
 892 EXPORT_SYMBOL(dm_kcopyd_do_callback);
 893 
 894 /*
 895  * Cancels a kcopyd job, eg. someone might be deactivating a
 896  * mirror.
 897  */
 898 #if 0
 899 int kcopyd_cancel(struct kcopyd_job *job, int block)
 900 {
 901         /* FIXME: finish */
 902         return -1;
 903 }
 904 #endif  /*  0  */
 905 
 906 /*-----------------------------------------------------------------
 907  * Client setup
 908  *---------------------------------------------------------------*/
 909 struct dm_kcopyd_client *dm_kcopyd_client_create(struct dm_kcopyd_throttle *throttle)
 910 {
 911         int r;
 912         unsigned reserve_pages;
 913         struct dm_kcopyd_client *kc;
 914 
 915         kc = kzalloc(sizeof(*kc), GFP_KERNEL);
 916         if (!kc)
 917                 return ERR_PTR(-ENOMEM);
 918 
 919         spin_lock_init(&kc->job_lock);
 920         INIT_LIST_HEAD(&kc->callback_jobs);
 921         INIT_LIST_HEAD(&kc->complete_jobs);
 922         INIT_LIST_HEAD(&kc->io_jobs);
 923         INIT_LIST_HEAD(&kc->pages_jobs);
 924         kc->throttle = throttle;
 925 
 926         r = mempool_init_slab_pool(&kc->job_pool, MIN_JOBS, _job_cache);
 927         if (r)
 928                 goto bad_slab;
 929 
 930         INIT_WORK(&kc->kcopyd_work, do_work);
 931         kc->kcopyd_wq = alloc_workqueue("kcopyd", WQ_MEM_RECLAIM, 0);
 932         if (!kc->kcopyd_wq) {
 933                 r = -ENOMEM;
 934                 goto bad_workqueue;
 935         }
 936 
 937         kc->sub_job_size = dm_get_kcopyd_subjob_size();
 938         reserve_pages = DIV_ROUND_UP(kc->sub_job_size << SECTOR_SHIFT, PAGE_SIZE);
 939 
 940         kc->pages = NULL;
 941         kc->nr_reserved_pages = kc->nr_free_pages = 0;
 942         r = client_reserve_pages(kc, reserve_pages);
 943         if (r)
 944                 goto bad_client_pages;
 945 
 946         kc->io_client = dm_io_client_create();
 947         if (IS_ERR(kc->io_client)) {
 948                 r = PTR_ERR(kc->io_client);
 949                 goto bad_io_client;
 950         }
 951 
 952         init_waitqueue_head(&kc->destroyq);
 953         atomic_set(&kc->nr_jobs, 0);
 954 
 955         return kc;
 956 
 957 bad_io_client:
 958         client_free_pages(kc);
 959 bad_client_pages:
 960         destroy_workqueue(kc->kcopyd_wq);
 961 bad_workqueue:
 962         mempool_exit(&kc->job_pool);
 963 bad_slab:
 964         kfree(kc);
 965 
 966         return ERR_PTR(r);
 967 }
 968 EXPORT_SYMBOL(dm_kcopyd_client_create);
 969 
 970 void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc)
 971 {
 972         /* Wait for completion of all jobs submitted by this client. */
 973         wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
 974 
 975         BUG_ON(!list_empty(&kc->callback_jobs));
 976         BUG_ON(!list_empty(&kc->complete_jobs));
 977         BUG_ON(!list_empty(&kc->io_jobs));
 978         BUG_ON(!list_empty(&kc->pages_jobs));
 979         destroy_workqueue(kc->kcopyd_wq);
 980         dm_io_client_destroy(kc->io_client);
 981         client_free_pages(kc);
 982         mempool_exit(&kc->job_pool);
 983         kfree(kc);
 984 }
 985 EXPORT_SYMBOL(dm_kcopyd_client_destroy);

/* [<][>][^][v][top][bottom][index][help] */