root/drivers/block/drbd/drbd_worker.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. drbd_md_endio
  2. drbd_endio_read_sec_final
  3. drbd_endio_write_sec_final
  4. drbd_peer_request_endio
  5. drbd_panic_after_delayed_completion_of_aborted_request
  6. drbd_request_endio
  7. drbd_csum_ee
  8. drbd_csum_bio
  9. w_e_send_csum
  10. read_for_csum
  11. w_resync_timer
  12. resync_timer_fn
  13. fifo_set
  14. fifo_push
  15. fifo_add_val
  16. fifo_alloc
  17. drbd_rs_controller
  18. drbd_rs_number_requests
  19. make_resync_request
  20. make_ov_request
  21. w_ov_finished
  22. w_resync_finished
  23. ping_peer
  24. drbd_resync_finished
  25. move_to_net_ee_or_free
  26. w_e_end_data_req
  27. all_zero
  28. w_e_end_rsdata_req
  29. w_e_end_csum_rs_req
  30. w_e_end_ov_req
  31. drbd_ov_out_of_sync_found
  32. w_e_end_ov_reply
  33. drbd_send_barrier
  34. pd_send_unplug_remote
  35. w_send_write_hint
  36. re_init_if_first_write
  37. maybe_send_barrier
  38. w_send_out_of_sync
  39. w_send_dblock
  40. w_send_read_req
  41. w_restart_disk_io
  42. _drbd_may_sync_now
  43. drbd_pause_after
  44. drbd_resume_next
  45. resume_next_sg
  46. suspend_other_sg
  47. drbd_resync_after_valid
  48. drbd_resync_after_changed
  49. drbd_rs_controller_reset
  50. start_resync_timer_fn
  51. do_start_resync
  52. use_checksum_based_resync
  53. drbd_start_resync
  54. update_on_disk_bitmap
  55. drbd_ldev_destroy
  56. go_diskless
  57. do_md_sync
  58. __update_timing_details
  59. do_device_work
  60. get_work_bits
  61. do_unqueued_work
  62. dequeue_work_batch
  63. wait_for_work
  64. drbd_worker

   1 // SPDX-License-Identifier: GPL-2.0-or-later
   2 /*
   3    drbd_worker.c
   4 
   5    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
   6 
   7    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
   8    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
   9    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
  10 
  11 
  12 */
  13 
  14 #include <linux/module.h>
  15 #include <linux/drbd.h>
  16 #include <linux/sched/signal.h>
  17 #include <linux/wait.h>
  18 #include <linux/mm.h>
  19 #include <linux/memcontrol.h>
  20 #include <linux/mm_inline.h>
  21 #include <linux/slab.h>
  22 #include <linux/random.h>
  23 #include <linux/string.h>
  24 #include <linux/scatterlist.h>
  25 
  26 #include "drbd_int.h"
  27 #include "drbd_protocol.h"
  28 #include "drbd_req.h"
  29 
  30 static int make_ov_request(struct drbd_device *, int);
  31 static int make_resync_request(struct drbd_device *, int);
  32 
  33 /* endio handlers:
  34  *   drbd_md_endio (defined here)
  35  *   drbd_request_endio (defined here)
  36  *   drbd_peer_request_endio (defined here)
  37  *   drbd_bm_endio (defined in drbd_bitmap.c)
  38  *
  39  * For all these callbacks, note the following:
  40  * The callbacks will be called in irq context by the IDE drivers,
  41  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
  42  * Try to get the locking right :)
  43  *
  44  */
  45 
  46 /* used for synchronous meta data and bitmap IO
  47  * submitted by drbd_md_sync_page_io()
  48  */
  49 void drbd_md_endio(struct bio *bio)
  50 {
  51         struct drbd_device *device;
  52 
  53         device = bio->bi_private;
  54         device->md_io.error = blk_status_to_errno(bio->bi_status);
  55 
  56         /* special case: drbd_md_read() during drbd_adm_attach() */
  57         if (device->ldev)
  58                 put_ldev(device);
  59         bio_put(bio);
  60 
  61         /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
  62          * to timeout on the lower level device, and eventually detach from it.
  63          * If this io completion runs after that timeout expired, this
  64          * drbd_md_put_buffer() may allow us to finally try and re-attach.
  65          * During normal operation, this only puts that extra reference
  66          * down to 1 again.
  67          * Make sure we first drop the reference, and only then signal
  68          * completion, or we may (in drbd_al_read_log()) cycle so fast into the
  69          * next drbd_md_sync_page_io(), that we trigger the
  70          * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
  71          */
  72         drbd_md_put_buffer(device);
  73         device->md_io.done = 1;
  74         wake_up(&device->misc_wait);
  75 }
  76 
  77 /* reads on behalf of the partner,
  78  * "submitted" by the receiver
  79  */
  80 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
  81 {
  82         unsigned long flags = 0;
  83         struct drbd_peer_device *peer_device = peer_req->peer_device;
  84         struct drbd_device *device = peer_device->device;
  85 
  86         spin_lock_irqsave(&device->resource->req_lock, flags);
  87         device->read_cnt += peer_req->i.size >> 9;
  88         list_del(&peer_req->w.list);
  89         if (list_empty(&device->read_ee))
  90                 wake_up(&device->ee_wait);
  91         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
  92                 __drbd_chk_io_error(device, DRBD_READ_ERROR);
  93         spin_unlock_irqrestore(&device->resource->req_lock, flags);
  94 
  95         drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
  96         put_ldev(device);
  97 }
  98 
  99 /* writes on behalf of the partner, or resync writes,
 100  * "submitted" by the receiver, final stage.  */
 101 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
 102 {
 103         unsigned long flags = 0;
 104         struct drbd_peer_device *peer_device = peer_req->peer_device;
 105         struct drbd_device *device = peer_device->device;
 106         struct drbd_connection *connection = peer_device->connection;
 107         struct drbd_interval i;
 108         int do_wake;
 109         u64 block_id;
 110         int do_al_complete_io;
 111 
 112         /* after we moved peer_req to done_ee,
 113          * we may no longer access it,
 114          * it may be freed/reused already!
 115          * (as soon as we release the req_lock) */
 116         i = peer_req->i;
 117         do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
 118         block_id = peer_req->block_id;
 119         peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
 120 
 121         if (peer_req->flags & EE_WAS_ERROR) {
 122                 /* In protocol != C, we usually do not send write acks.
 123                  * In case of a write error, send the neg ack anyways. */
 124                 if (!__test_and_set_bit(__EE_SEND_WRITE_ACK, &peer_req->flags))
 125                         inc_unacked(device);
 126                 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
 127         }
 128 
 129         spin_lock_irqsave(&device->resource->req_lock, flags);
 130         device->writ_cnt += peer_req->i.size >> 9;
 131         list_move_tail(&peer_req->w.list, &device->done_ee);
 132 
 133         /*
 134          * Do not remove from the write_requests tree here: we did not send the
 135          * Ack yet and did not wake possibly waiting conflicting requests.
 136          * Removed from the tree from "drbd_process_done_ee" within the
 137          * appropriate dw.cb (e_end_block/e_end_resync_block) or from
 138          * _drbd_clear_done_ee.
 139          */
 140 
 141         do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
 142 
 143         /* FIXME do we want to detach for failed REQ_OP_DISCARD?
 144          * ((peer_req->flags & (EE_WAS_ERROR|EE_TRIM)) == EE_WAS_ERROR) */
 145         if (peer_req->flags & EE_WAS_ERROR)
 146                 __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
 147 
 148         if (connection->cstate >= C_WF_REPORT_PARAMS) {
 149                 kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
 150                 if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
 151                         kref_put(&device->kref, drbd_destroy_device);
 152         }
 153         spin_unlock_irqrestore(&device->resource->req_lock, flags);
 154 
 155         if (block_id == ID_SYNCER)
 156                 drbd_rs_complete_io(device, i.sector);
 157 
 158         if (do_wake)
 159                 wake_up(&device->ee_wait);
 160 
 161         if (do_al_complete_io)
 162                 drbd_al_complete_io(device, &i);
 163 
 164         put_ldev(device);
 165 }
 166 
 167 /* writes on behalf of the partner, or resync writes,
 168  * "submitted" by the receiver.
 169  */
 170 void drbd_peer_request_endio(struct bio *bio)
 171 {
 172         struct drbd_peer_request *peer_req = bio->bi_private;
 173         struct drbd_device *device = peer_req->peer_device->device;
 174         bool is_write = bio_data_dir(bio) == WRITE;
 175         bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES ||
 176                           bio_op(bio) == REQ_OP_DISCARD;
 177 
 178         if (bio->bi_status && __ratelimit(&drbd_ratelimit_state))
 179                 drbd_warn(device, "%s: error=%d s=%llus\n",
 180                                 is_write ? (is_discard ? "discard" : "write")
 181                                         : "read", bio->bi_status,
 182                                 (unsigned long long)peer_req->i.sector);
 183 
 184         if (bio->bi_status)
 185                 set_bit(__EE_WAS_ERROR, &peer_req->flags);
 186 
 187         bio_put(bio); /* no need for the bio anymore */
 188         if (atomic_dec_and_test(&peer_req->pending_bios)) {
 189                 if (is_write)
 190                         drbd_endio_write_sec_final(peer_req);
 191                 else
 192                         drbd_endio_read_sec_final(peer_req);
 193         }
 194 }
 195 
 196 static void
 197 drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
 198 {
 199         panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
 200                 device->minor, device->resource->name, device->vnr);
 201 }
 202 
 203 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
 204  */
 205 void drbd_request_endio(struct bio *bio)
 206 {
 207         unsigned long flags;
 208         struct drbd_request *req = bio->bi_private;
 209         struct drbd_device *device = req->device;
 210         struct bio_and_error m;
 211         enum drbd_req_event what;
 212 
 213         /* If this request was aborted locally before,
 214          * but now was completed "successfully",
 215          * chances are that this caused arbitrary data corruption.
 216          *
 217          * "aborting" requests, or force-detaching the disk, is intended for
 218          * completely blocked/hung local backing devices which do no longer
 219          * complete requests at all, not even do error completions.  In this
 220          * situation, usually a hard-reset and failover is the only way out.
 221          *
 222          * By "aborting", basically faking a local error-completion,
 223          * we allow for a more graceful swichover by cleanly migrating services.
 224          * Still the affected node has to be rebooted "soon".
 225          *
 226          * By completing these requests, we allow the upper layers to re-use
 227          * the associated data pages.
 228          *
 229          * If later the local backing device "recovers", and now DMAs some data
 230          * from disk into the original request pages, in the best case it will
 231          * just put random data into unused pages; but typically it will corrupt
 232          * meanwhile completely unrelated data, causing all sorts of damage.
 233          *
 234          * Which means delayed successful completion,
 235          * especially for READ requests,
 236          * is a reason to panic().
 237          *
 238          * We assume that a delayed *error* completion is OK,
 239          * though we still will complain noisily about it.
 240          */
 241         if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
 242                 if (__ratelimit(&drbd_ratelimit_state))
 243                         drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
 244 
 245                 if (!bio->bi_status)
 246                         drbd_panic_after_delayed_completion_of_aborted_request(device);
 247         }
 248 
 249         /* to avoid recursion in __req_mod */
 250         if (unlikely(bio->bi_status)) {
 251                 switch (bio_op(bio)) {
 252                 case REQ_OP_WRITE_ZEROES:
 253                 case REQ_OP_DISCARD:
 254                         if (bio->bi_status == BLK_STS_NOTSUPP)
 255                                 what = DISCARD_COMPLETED_NOTSUPP;
 256                         else
 257                                 what = DISCARD_COMPLETED_WITH_ERROR;
 258                         break;
 259                 case REQ_OP_READ:
 260                         if (bio->bi_opf & REQ_RAHEAD)
 261                                 what = READ_AHEAD_COMPLETED_WITH_ERROR;
 262                         else
 263                                 what = READ_COMPLETED_WITH_ERROR;
 264                         break;
 265                 default:
 266                         what = WRITE_COMPLETED_WITH_ERROR;
 267                         break;
 268                 }
 269         } else {
 270                 what = COMPLETED_OK;
 271         }
 272 
 273         req->private_bio = ERR_PTR(blk_status_to_errno(bio->bi_status));
 274         bio_put(bio);
 275 
 276         /* not req_mod(), we need irqsave here! */
 277         spin_lock_irqsave(&device->resource->req_lock, flags);
 278         __req_mod(req, what, &m);
 279         spin_unlock_irqrestore(&device->resource->req_lock, flags);
 280         put_ldev(device);
 281 
 282         if (m.bio)
 283                 complete_master_bio(device, &m);
 284 }
 285 
 286 void drbd_csum_ee(struct crypto_shash *tfm, struct drbd_peer_request *peer_req, void *digest)
 287 {
 288         SHASH_DESC_ON_STACK(desc, tfm);
 289         struct page *page = peer_req->pages;
 290         struct page *tmp;
 291         unsigned len;
 292         void *src;
 293 
 294         desc->tfm = tfm;
 295 
 296         crypto_shash_init(desc);
 297 
 298         src = kmap_atomic(page);
 299         while ((tmp = page_chain_next(page))) {
 300                 /* all but the last page will be fully used */
 301                 crypto_shash_update(desc, src, PAGE_SIZE);
 302                 kunmap_atomic(src);
 303                 page = tmp;
 304                 src = kmap_atomic(page);
 305         }
 306         /* and now the last, possibly only partially used page */
 307         len = peer_req->i.size & (PAGE_SIZE - 1);
 308         crypto_shash_update(desc, src, len ?: PAGE_SIZE);
 309         kunmap_atomic(src);
 310 
 311         crypto_shash_final(desc, digest);
 312         shash_desc_zero(desc);
 313 }
 314 
 315 void drbd_csum_bio(struct crypto_shash *tfm, struct bio *bio, void *digest)
 316 {
 317         SHASH_DESC_ON_STACK(desc, tfm);
 318         struct bio_vec bvec;
 319         struct bvec_iter iter;
 320 
 321         desc->tfm = tfm;
 322 
 323         crypto_shash_init(desc);
 324 
 325         bio_for_each_segment(bvec, bio, iter) {
 326                 u8 *src;
 327 
 328                 src = kmap_atomic(bvec.bv_page);
 329                 crypto_shash_update(desc, src + bvec.bv_offset, bvec.bv_len);
 330                 kunmap_atomic(src);
 331 
 332                 /* REQ_OP_WRITE_SAME has only one segment,
 333                  * checksum the payload only once. */
 334                 if (bio_op(bio) == REQ_OP_WRITE_SAME)
 335                         break;
 336         }
 337         crypto_shash_final(desc, digest);
 338         shash_desc_zero(desc);
 339 }
 340 
 341 /* MAYBE merge common code with w_e_end_ov_req */
 342 static int w_e_send_csum(struct drbd_work *w, int cancel)
 343 {
 344         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
 345         struct drbd_peer_device *peer_device = peer_req->peer_device;
 346         struct drbd_device *device = peer_device->device;
 347         int digest_size;
 348         void *digest;
 349         int err = 0;
 350 
 351         if (unlikely(cancel))
 352                 goto out;
 353 
 354         if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
 355                 goto out;
 356 
 357         digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
 358         digest = kmalloc(digest_size, GFP_NOIO);
 359         if (digest) {
 360                 sector_t sector = peer_req->i.sector;
 361                 unsigned int size = peer_req->i.size;
 362                 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
 363                 /* Free peer_req and pages before send.
 364                  * In case we block on congestion, we could otherwise run into
 365                  * some distributed deadlock, if the other side blocks on
 366                  * congestion as well, because our receiver blocks in
 367                  * drbd_alloc_pages due to pp_in_use > max_buffers. */
 368                 drbd_free_peer_req(device, peer_req);
 369                 peer_req = NULL;
 370                 inc_rs_pending(device);
 371                 err = drbd_send_drequest_csum(peer_device, sector, size,
 372                                               digest, digest_size,
 373                                               P_CSUM_RS_REQUEST);
 374                 kfree(digest);
 375         } else {
 376                 drbd_err(device, "kmalloc() of digest failed.\n");
 377                 err = -ENOMEM;
 378         }
 379 
 380 out:
 381         if (peer_req)
 382                 drbd_free_peer_req(device, peer_req);
 383 
 384         if (unlikely(err))
 385                 drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
 386         return err;
 387 }
 388 
 389 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
 390 
 391 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
 392 {
 393         struct drbd_device *device = peer_device->device;
 394         struct drbd_peer_request *peer_req;
 395 
 396         if (!get_ldev(device))
 397                 return -EIO;
 398 
 399         /* GFP_TRY, because if there is no memory available right now, this may
 400          * be rescheduled for later. It is "only" background resync, after all. */
 401         peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
 402                                        size, size, GFP_TRY);
 403         if (!peer_req)
 404                 goto defer;
 405 
 406         peer_req->w.cb = w_e_send_csum;
 407         spin_lock_irq(&device->resource->req_lock);
 408         list_add_tail(&peer_req->w.list, &device->read_ee);
 409         spin_unlock_irq(&device->resource->req_lock);
 410 
 411         atomic_add(size >> 9, &device->rs_sect_ev);
 412         if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
 413                                      DRBD_FAULT_RS_RD) == 0)
 414                 return 0;
 415 
 416         /* If it failed because of ENOMEM, retry should help.  If it failed
 417          * because bio_add_page failed (probably broken lower level driver),
 418          * retry may or may not help.
 419          * If it does not, you may need to force disconnect. */
 420         spin_lock_irq(&device->resource->req_lock);
 421         list_del(&peer_req->w.list);
 422         spin_unlock_irq(&device->resource->req_lock);
 423 
 424         drbd_free_peer_req(device, peer_req);
 425 defer:
 426         put_ldev(device);
 427         return -EAGAIN;
 428 }
 429 
 430 int w_resync_timer(struct drbd_work *w, int cancel)
 431 {
 432         struct drbd_device *device =
 433                 container_of(w, struct drbd_device, resync_work);
 434 
 435         switch (device->state.conn) {
 436         case C_VERIFY_S:
 437                 make_ov_request(device, cancel);
 438                 break;
 439         case C_SYNC_TARGET:
 440                 make_resync_request(device, cancel);
 441                 break;
 442         }
 443 
 444         return 0;
 445 }
 446 
 447 void resync_timer_fn(struct timer_list *t)
 448 {
 449         struct drbd_device *device = from_timer(device, t, resync_timer);
 450 
 451         drbd_queue_work_if_unqueued(
 452                 &first_peer_device(device)->connection->sender_work,
 453                 &device->resync_work);
 454 }
 455 
 456 static void fifo_set(struct fifo_buffer *fb, int value)
 457 {
 458         int i;
 459 
 460         for (i = 0; i < fb->size; i++)
 461                 fb->values[i] = value;
 462 }
 463 
 464 static int fifo_push(struct fifo_buffer *fb, int value)
 465 {
 466         int ov;
 467 
 468         ov = fb->values[fb->head_index];
 469         fb->values[fb->head_index++] = value;
 470 
 471         if (fb->head_index >= fb->size)
 472                 fb->head_index = 0;
 473 
 474         return ov;
 475 }
 476 
 477 static void fifo_add_val(struct fifo_buffer *fb, int value)
 478 {
 479         int i;
 480 
 481         for (i = 0; i < fb->size; i++)
 482                 fb->values[i] += value;
 483 }
 484 
 485 struct fifo_buffer *fifo_alloc(int fifo_size)
 486 {
 487         struct fifo_buffer *fb;
 488 
 489         fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
 490         if (!fb)
 491                 return NULL;
 492 
 493         fb->head_index = 0;
 494         fb->size = fifo_size;
 495         fb->total = 0;
 496 
 497         return fb;
 498 }
 499 
 500 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
 501 {
 502         struct disk_conf *dc;
 503         unsigned int want;     /* The number of sectors we want in-flight */
 504         int req_sect; /* Number of sectors to request in this turn */
 505         int correction; /* Number of sectors more we need in-flight */
 506         int cps; /* correction per invocation of drbd_rs_controller() */
 507         int steps; /* Number of time steps to plan ahead */
 508         int curr_corr;
 509         int max_sect;
 510         struct fifo_buffer *plan;
 511 
 512         dc = rcu_dereference(device->ldev->disk_conf);
 513         plan = rcu_dereference(device->rs_plan_s);
 514 
 515         steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
 516 
 517         if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
 518                 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
 519         } else { /* normal path */
 520                 want = dc->c_fill_target ? dc->c_fill_target :
 521                         sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
 522         }
 523 
 524         correction = want - device->rs_in_flight - plan->total;
 525 
 526         /* Plan ahead */
 527         cps = correction / steps;
 528         fifo_add_val(plan, cps);
 529         plan->total += cps * steps;
 530 
 531         /* What we do in this step */
 532         curr_corr = fifo_push(plan, 0);
 533         plan->total -= curr_corr;
 534 
 535         req_sect = sect_in + curr_corr;
 536         if (req_sect < 0)
 537                 req_sect = 0;
 538 
 539         max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
 540         if (req_sect > max_sect)
 541                 req_sect = max_sect;
 542 
 543         /*
 544         drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
 545                  sect_in, device->rs_in_flight, want, correction,
 546                  steps, cps, device->rs_planed, curr_corr, req_sect);
 547         */
 548 
 549         return req_sect;
 550 }
 551 
 552 static int drbd_rs_number_requests(struct drbd_device *device)
 553 {
 554         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
 555         int number, mxb;
 556 
 557         sect_in = atomic_xchg(&device->rs_sect_in, 0);
 558         device->rs_in_flight -= sect_in;
 559 
 560         rcu_read_lock();
 561         mxb = drbd_get_max_buffers(device) / 2;
 562         if (rcu_dereference(device->rs_plan_s)->size) {
 563                 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
 564                 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
 565         } else {
 566                 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
 567                 number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
 568         }
 569         rcu_read_unlock();
 570 
 571         /* Don't have more than "max-buffers"/2 in-flight.
 572          * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
 573          * potentially causing a distributed deadlock on congestion during
 574          * online-verify or (checksum-based) resync, if max-buffers,
 575          * socket buffer sizes and resync rate settings are mis-configured. */
 576 
 577         /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
 578          * mxb (as used here, and in drbd_alloc_pages on the peer) is
 579          * "number of pages" (typically also 4k),
 580          * but "rs_in_flight" is in "sectors" (512 Byte). */
 581         if (mxb - device->rs_in_flight/8 < number)
 582                 number = mxb - device->rs_in_flight/8;
 583 
 584         return number;
 585 }
 586 
 587 static int make_resync_request(struct drbd_device *const device, int cancel)
 588 {
 589         struct drbd_peer_device *const peer_device = first_peer_device(device);
 590         struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
 591         unsigned long bit;
 592         sector_t sector;
 593         const sector_t capacity = drbd_get_capacity(device->this_bdev);
 594         int max_bio_size;
 595         int number, rollback_i, size;
 596         int align, requeue = 0;
 597         int i = 0;
 598         int discard_granularity = 0;
 599 
 600         if (unlikely(cancel))
 601                 return 0;
 602 
 603         if (device->rs_total == 0) {
 604                 /* empty resync? */
 605                 drbd_resync_finished(device);
 606                 return 0;
 607         }
 608 
 609         if (!get_ldev(device)) {
 610                 /* Since we only need to access device->rsync a
 611                    get_ldev_if_state(device,D_FAILED) would be sufficient, but
 612                    to continue resync with a broken disk makes no sense at
 613                    all */
 614                 drbd_err(device, "Disk broke down during resync!\n");
 615                 return 0;
 616         }
 617 
 618         if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
 619                 rcu_read_lock();
 620                 discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
 621                 rcu_read_unlock();
 622         }
 623 
 624         max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
 625         number = drbd_rs_number_requests(device);
 626         if (number <= 0)
 627                 goto requeue;
 628 
 629         for (i = 0; i < number; i++) {
 630                 /* Stop generating RS requests when half of the send buffer is filled,
 631                  * but notify TCP that we'd like to have more space. */
 632                 mutex_lock(&connection->data.mutex);
 633                 if (connection->data.socket) {
 634                         struct sock *sk = connection->data.socket->sk;
 635                         int queued = sk->sk_wmem_queued;
 636                         int sndbuf = sk->sk_sndbuf;
 637                         if (queued > sndbuf / 2) {
 638                                 requeue = 1;
 639                                 if (sk->sk_socket)
 640                                         set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
 641                         }
 642                 } else
 643                         requeue = 1;
 644                 mutex_unlock(&connection->data.mutex);
 645                 if (requeue)
 646                         goto requeue;
 647 
 648 next_sector:
 649                 size = BM_BLOCK_SIZE;
 650                 bit  = drbd_bm_find_next(device, device->bm_resync_fo);
 651 
 652                 if (bit == DRBD_END_OF_BITMAP) {
 653                         device->bm_resync_fo = drbd_bm_bits(device);
 654                         put_ldev(device);
 655                         return 0;
 656                 }
 657 
 658                 sector = BM_BIT_TO_SECT(bit);
 659 
 660                 if (drbd_try_rs_begin_io(device, sector)) {
 661                         device->bm_resync_fo = bit;
 662                         goto requeue;
 663                 }
 664                 device->bm_resync_fo = bit + 1;
 665 
 666                 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
 667                         drbd_rs_complete_io(device, sector);
 668                         goto next_sector;
 669                 }
 670 
 671 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
 672                 /* try to find some adjacent bits.
 673                  * we stop if we have already the maximum req size.
 674                  *
 675                  * Additionally always align bigger requests, in order to
 676                  * be prepared for all stripe sizes of software RAIDs.
 677                  */
 678                 align = 1;
 679                 rollback_i = i;
 680                 while (i < number) {
 681                         if (size + BM_BLOCK_SIZE > max_bio_size)
 682                                 break;
 683 
 684                         /* Be always aligned */
 685                         if (sector & ((1<<(align+3))-1))
 686                                 break;
 687 
 688                         if (discard_granularity && size == discard_granularity)
 689                                 break;
 690 
 691                         /* do not cross extent boundaries */
 692                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
 693                                 break;
 694                         /* now, is it actually dirty, after all?
 695                          * caution, drbd_bm_test_bit is tri-state for some
 696                          * obscure reason; ( b == 0 ) would get the out-of-band
 697                          * only accidentally right because of the "oddly sized"
 698                          * adjustment below */
 699                         if (drbd_bm_test_bit(device, bit+1) != 1)
 700                                 break;
 701                         bit++;
 702                         size += BM_BLOCK_SIZE;
 703                         if ((BM_BLOCK_SIZE << align) <= size)
 704                                 align++;
 705                         i++;
 706                 }
 707                 /* if we merged some,
 708                  * reset the offset to start the next drbd_bm_find_next from */
 709                 if (size > BM_BLOCK_SIZE)
 710                         device->bm_resync_fo = bit + 1;
 711 #endif
 712 
 713                 /* adjust very last sectors, in case we are oddly sized */
 714                 if (sector + (size>>9) > capacity)
 715                         size = (capacity-sector)<<9;
 716 
 717                 if (device->use_csums) {
 718                         switch (read_for_csum(peer_device, sector, size)) {
 719                         case -EIO: /* Disk failure */
 720                                 put_ldev(device);
 721                                 return -EIO;
 722                         case -EAGAIN: /* allocation failed, or ldev busy */
 723                                 drbd_rs_complete_io(device, sector);
 724                                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
 725                                 i = rollback_i;
 726                                 goto requeue;
 727                         case 0:
 728                                 /* everything ok */
 729                                 break;
 730                         default:
 731                                 BUG();
 732                         }
 733                 } else {
 734                         int err;
 735 
 736                         inc_rs_pending(device);
 737                         err = drbd_send_drequest(peer_device,
 738                                                  size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
 739                                                  sector, size, ID_SYNCER);
 740                         if (err) {
 741                                 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
 742                                 dec_rs_pending(device);
 743                                 put_ldev(device);
 744                                 return err;
 745                         }
 746                 }
 747         }
 748 
 749         if (device->bm_resync_fo >= drbd_bm_bits(device)) {
 750                 /* last syncer _request_ was sent,
 751                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
 752                  * next sync group will resume), as soon as we receive the last
 753                  * resync data block, and the last bit is cleared.
 754                  * until then resync "work" is "inactive" ...
 755                  */
 756                 put_ldev(device);
 757                 return 0;
 758         }
 759 
 760  requeue:
 761         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
 762         mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
 763         put_ldev(device);
 764         return 0;
 765 }
 766 
 767 static int make_ov_request(struct drbd_device *device, int cancel)
 768 {
 769         int number, i, size;
 770         sector_t sector;
 771         const sector_t capacity = drbd_get_capacity(device->this_bdev);
 772         bool stop_sector_reached = false;
 773 
 774         if (unlikely(cancel))
 775                 return 1;
 776 
 777         number = drbd_rs_number_requests(device);
 778 
 779         sector = device->ov_position;
 780         for (i = 0; i < number; i++) {
 781                 if (sector >= capacity)
 782                         return 1;
 783 
 784                 /* We check for "finished" only in the reply path:
 785                  * w_e_end_ov_reply().
 786                  * We need to send at least one request out. */
 787                 stop_sector_reached = i > 0
 788                         && verify_can_do_stop_sector(device)
 789                         && sector >= device->ov_stop_sector;
 790                 if (stop_sector_reached)
 791                         break;
 792 
 793                 size = BM_BLOCK_SIZE;
 794 
 795                 if (drbd_try_rs_begin_io(device, sector)) {
 796                         device->ov_position = sector;
 797                         goto requeue;
 798                 }
 799 
 800                 if (sector + (size>>9) > capacity)
 801                         size = (capacity-sector)<<9;
 802 
 803                 inc_rs_pending(device);
 804                 if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
 805                         dec_rs_pending(device);
 806                         return 0;
 807                 }
 808                 sector += BM_SECT_PER_BIT;
 809         }
 810         device->ov_position = sector;
 811 
 812  requeue:
 813         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
 814         if (i == 0 || !stop_sector_reached)
 815                 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
 816         return 1;
 817 }
 818 
 819 int w_ov_finished(struct drbd_work *w, int cancel)
 820 {
 821         struct drbd_device_work *dw =
 822                 container_of(w, struct drbd_device_work, w);
 823         struct drbd_device *device = dw->device;
 824         kfree(dw);
 825         ov_out_of_sync_print(device);
 826         drbd_resync_finished(device);
 827 
 828         return 0;
 829 }
 830 
 831 static int w_resync_finished(struct drbd_work *w, int cancel)
 832 {
 833         struct drbd_device_work *dw =
 834                 container_of(w, struct drbd_device_work, w);
 835         struct drbd_device *device = dw->device;
 836         kfree(dw);
 837 
 838         drbd_resync_finished(device);
 839 
 840         return 0;
 841 }
 842 
 843 static void ping_peer(struct drbd_device *device)
 844 {
 845         struct drbd_connection *connection = first_peer_device(device)->connection;
 846 
 847         clear_bit(GOT_PING_ACK, &connection->flags);
 848         request_ping(connection);
 849         wait_event(connection->ping_wait,
 850                    test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
 851 }
 852 
 853 int drbd_resync_finished(struct drbd_device *device)
 854 {
 855         struct drbd_connection *connection = first_peer_device(device)->connection;
 856         unsigned long db, dt, dbdt;
 857         unsigned long n_oos;
 858         union drbd_state os, ns;
 859         struct drbd_device_work *dw;
 860         char *khelper_cmd = NULL;
 861         int verify_done = 0;
 862 
 863         /* Remove all elements from the resync LRU. Since future actions
 864          * might set bits in the (main) bitmap, then the entries in the
 865          * resync LRU would be wrong. */
 866         if (drbd_rs_del_all(device)) {
 867                 /* In case this is not possible now, most probably because
 868                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
 869                  * queue (or even the read operations for those packets
 870                  * is not finished by now).   Retry in 100ms. */
 871 
 872                 schedule_timeout_interruptible(HZ / 10);
 873                 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
 874                 if (dw) {
 875                         dw->w.cb = w_resync_finished;
 876                         dw->device = device;
 877                         drbd_queue_work(&connection->sender_work, &dw->w);
 878                         return 1;
 879                 }
 880                 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
 881         }
 882 
 883         dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
 884         if (dt <= 0)
 885                 dt = 1;
 886 
 887         db = device->rs_total;
 888         /* adjust for verify start and stop sectors, respective reached position */
 889         if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
 890                 db -= device->ov_left;
 891 
 892         dbdt = Bit2KB(db/dt);
 893         device->rs_paused /= HZ;
 894 
 895         if (!get_ldev(device))
 896                 goto out;
 897 
 898         ping_peer(device);
 899 
 900         spin_lock_irq(&device->resource->req_lock);
 901         os = drbd_read_state(device);
 902 
 903         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
 904 
 905         /* This protects us against multiple calls (that can happen in the presence
 906            of application IO), and against connectivity loss just before we arrive here. */
 907         if (os.conn <= C_CONNECTED)
 908                 goto out_unlock;
 909 
 910         ns = os;
 911         ns.conn = C_CONNECTED;
 912 
 913         drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
 914              verify_done ? "Online verify" : "Resync",
 915              dt + device->rs_paused, device->rs_paused, dbdt);
 916 
 917         n_oos = drbd_bm_total_weight(device);
 918 
 919         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
 920                 if (n_oos) {
 921                         drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
 922                               n_oos, Bit2KB(1));
 923                         khelper_cmd = "out-of-sync";
 924                 }
 925         } else {
 926                 D_ASSERT(device, (n_oos - device->rs_failed) == 0);
 927 
 928                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
 929                         khelper_cmd = "after-resync-target";
 930 
 931                 if (device->use_csums && device->rs_total) {
 932                         const unsigned long s = device->rs_same_csum;
 933                         const unsigned long t = device->rs_total;
 934                         const int ratio =
 935                                 (t == 0)     ? 0 :
 936                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
 937                         drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
 938                              "transferred %luK total %luK\n",
 939                              ratio,
 940                              Bit2KB(device->rs_same_csum),
 941                              Bit2KB(device->rs_total - device->rs_same_csum),
 942                              Bit2KB(device->rs_total));
 943                 }
 944         }
 945 
 946         if (device->rs_failed) {
 947                 drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
 948 
 949                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
 950                         ns.disk = D_INCONSISTENT;
 951                         ns.pdsk = D_UP_TO_DATE;
 952                 } else {
 953                         ns.disk = D_UP_TO_DATE;
 954                         ns.pdsk = D_INCONSISTENT;
 955                 }
 956         } else {
 957                 ns.disk = D_UP_TO_DATE;
 958                 ns.pdsk = D_UP_TO_DATE;
 959 
 960                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
 961                         if (device->p_uuid) {
 962                                 int i;
 963                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
 964                                         _drbd_uuid_set(device, i, device->p_uuid[i]);
 965                                 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
 966                                 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
 967                         } else {
 968                                 drbd_err(device, "device->p_uuid is NULL! BUG\n");
 969                         }
 970                 }
 971 
 972                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
 973                         /* for verify runs, we don't update uuids here,
 974                          * so there would be nothing to report. */
 975                         drbd_uuid_set_bm(device, 0UL);
 976                         drbd_print_uuids(device, "updated UUIDs");
 977                         if (device->p_uuid) {
 978                                 /* Now the two UUID sets are equal, update what we
 979                                  * know of the peer. */
 980                                 int i;
 981                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
 982                                         device->p_uuid[i] = device->ldev->md.uuid[i];
 983                         }
 984                 }
 985         }
 986 
 987         _drbd_set_state(device, ns, CS_VERBOSE, NULL);
 988 out_unlock:
 989         spin_unlock_irq(&device->resource->req_lock);
 990 
 991         /* If we have been sync source, and have an effective fencing-policy,
 992          * once *all* volumes are back in sync, call "unfence". */
 993         if (os.conn == C_SYNC_SOURCE) {
 994                 enum drbd_disk_state disk_state = D_MASK;
 995                 enum drbd_disk_state pdsk_state = D_MASK;
 996                 enum drbd_fencing_p fp = FP_DONT_CARE;
 997 
 998                 rcu_read_lock();
 999                 fp = rcu_dereference(device->ldev->disk_conf)->fencing;
1000                 if (fp != FP_DONT_CARE) {
1001                         struct drbd_peer_device *peer_device;
1002                         int vnr;
1003                         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1004                                 struct drbd_device *device = peer_device->device;
1005                                 disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
1006                                 pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
1007                         }
1008                 }
1009                 rcu_read_unlock();
1010                 if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1011                         conn_khelper(connection, "unfence-peer");
1012         }
1013 
1014         put_ldev(device);
1015 out:
1016         device->rs_total  = 0;
1017         device->rs_failed = 0;
1018         device->rs_paused = 0;
1019 
1020         /* reset start sector, if we reached end of device */
1021         if (verify_done && device->ov_left == 0)
1022                 device->ov_start_sector = 0;
1023 
1024         drbd_md_sync(device);
1025 
1026         if (khelper_cmd)
1027                 drbd_khelper(device, khelper_cmd);
1028 
1029         return 1;
1030 }
1031 
1032 /* helper */
1033 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
1034 {
1035         if (drbd_peer_req_has_active_page(peer_req)) {
1036                 /* This might happen if sendpage() has not finished */
1037                 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
1038                 atomic_add(i, &device->pp_in_use_by_net);
1039                 atomic_sub(i, &device->pp_in_use);
1040                 spin_lock_irq(&device->resource->req_lock);
1041                 list_add_tail(&peer_req->w.list, &device->net_ee);
1042                 spin_unlock_irq(&device->resource->req_lock);
1043                 wake_up(&drbd_pp_wait);
1044         } else
1045                 drbd_free_peer_req(device, peer_req);
1046 }
1047 
1048 /**
1049  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1050  * @w:          work object.
1051  * @cancel:     The connection will be closed anyways
1052  */
1053 int w_e_end_data_req(struct drbd_work *w, int cancel)
1054 {
1055         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1056         struct drbd_peer_device *peer_device = peer_req->peer_device;
1057         struct drbd_device *device = peer_device->device;
1058         int err;
1059 
1060         if (unlikely(cancel)) {
1061                 drbd_free_peer_req(device, peer_req);
1062                 dec_unacked(device);
1063                 return 0;
1064         }
1065 
1066         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1067                 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1068         } else {
1069                 if (__ratelimit(&drbd_ratelimit_state))
1070                         drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1071                             (unsigned long long)peer_req->i.sector);
1072 
1073                 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1074         }
1075 
1076         dec_unacked(device);
1077 
1078         move_to_net_ee_or_free(device, peer_req);
1079 
1080         if (unlikely(err))
1081                 drbd_err(device, "drbd_send_block() failed\n");
1082         return err;
1083 }
1084 
1085 static bool all_zero(struct drbd_peer_request *peer_req)
1086 {
1087         struct page *page = peer_req->pages;
1088         unsigned int len = peer_req->i.size;
1089 
1090         page_chain_for_each(page) {
1091                 unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1092                 unsigned int i, words = l / sizeof(long);
1093                 unsigned long *d;
1094 
1095                 d = kmap_atomic(page);
1096                 for (i = 0; i < words; i++) {
1097                         if (d[i]) {
1098                                 kunmap_atomic(d);
1099                                 return false;
1100                         }
1101                 }
1102                 kunmap_atomic(d);
1103                 len -= l;
1104         }
1105 
1106         return true;
1107 }
1108 
1109 /**
1110  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1111  * @w:          work object.
1112  * @cancel:     The connection will be closed anyways
1113  */
1114 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1115 {
1116         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1117         struct drbd_peer_device *peer_device = peer_req->peer_device;
1118         struct drbd_device *device = peer_device->device;
1119         int err;
1120 
1121         if (unlikely(cancel)) {
1122                 drbd_free_peer_req(device, peer_req);
1123                 dec_unacked(device);
1124                 return 0;
1125         }
1126 
1127         if (get_ldev_if_state(device, D_FAILED)) {
1128                 drbd_rs_complete_io(device, peer_req->i.sector);
1129                 put_ldev(device);
1130         }
1131 
1132         if (device->state.conn == C_AHEAD) {
1133                 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1134         } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1135                 if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1136                         inc_rs_pending(device);
1137                         if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1138                                 err = drbd_send_rs_deallocated(peer_device, peer_req);
1139                         else
1140                                 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1141                 } else {
1142                         if (__ratelimit(&drbd_ratelimit_state))
1143                                 drbd_err(device, "Not sending RSDataReply, "
1144                                     "partner DISKLESS!\n");
1145                         err = 0;
1146                 }
1147         } else {
1148                 if (__ratelimit(&drbd_ratelimit_state))
1149                         drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1150                             (unsigned long long)peer_req->i.sector);
1151 
1152                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1153 
1154                 /* update resync data with failure */
1155                 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1156         }
1157 
1158         dec_unacked(device);
1159 
1160         move_to_net_ee_or_free(device, peer_req);
1161 
1162         if (unlikely(err))
1163                 drbd_err(device, "drbd_send_block() failed\n");
1164         return err;
1165 }
1166 
1167 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1168 {
1169         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1170         struct drbd_peer_device *peer_device = peer_req->peer_device;
1171         struct drbd_device *device = peer_device->device;
1172         struct digest_info *di;
1173         int digest_size;
1174         void *digest = NULL;
1175         int err, eq = 0;
1176 
1177         if (unlikely(cancel)) {
1178                 drbd_free_peer_req(device, peer_req);
1179                 dec_unacked(device);
1180                 return 0;
1181         }
1182 
1183         if (get_ldev(device)) {
1184                 drbd_rs_complete_io(device, peer_req->i.sector);
1185                 put_ldev(device);
1186         }
1187 
1188         di = peer_req->digest;
1189 
1190         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1191                 /* quick hack to try to avoid a race against reconfiguration.
1192                  * a real fix would be much more involved,
1193                  * introducing more locking mechanisms */
1194                 if (peer_device->connection->csums_tfm) {
1195                         digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
1196                         D_ASSERT(device, digest_size == di->digest_size);
1197                         digest = kmalloc(digest_size, GFP_NOIO);
1198                 }
1199                 if (digest) {
1200                         drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1201                         eq = !memcmp(digest, di->digest, digest_size);
1202                         kfree(digest);
1203                 }
1204 
1205                 if (eq) {
1206                         drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1207                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1208                         device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1209                         err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1210                 } else {
1211                         inc_rs_pending(device);
1212                         peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1213                         peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1214                         kfree(di);
1215                         err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1216                 }
1217         } else {
1218                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1219                 if (__ratelimit(&drbd_ratelimit_state))
1220                         drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1221         }
1222 
1223         dec_unacked(device);
1224         move_to_net_ee_or_free(device, peer_req);
1225 
1226         if (unlikely(err))
1227                 drbd_err(device, "drbd_send_block/ack() failed\n");
1228         return err;
1229 }
1230 
1231 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1232 {
1233         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1234         struct drbd_peer_device *peer_device = peer_req->peer_device;
1235         struct drbd_device *device = peer_device->device;
1236         sector_t sector = peer_req->i.sector;
1237         unsigned int size = peer_req->i.size;
1238         int digest_size;
1239         void *digest;
1240         int err = 0;
1241 
1242         if (unlikely(cancel))
1243                 goto out;
1244 
1245         digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1246         digest = kmalloc(digest_size, GFP_NOIO);
1247         if (!digest) {
1248                 err = 1;        /* terminate the connection in case the allocation failed */
1249                 goto out;
1250         }
1251 
1252         if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1253                 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1254         else
1255                 memset(digest, 0, digest_size);
1256 
1257         /* Free e and pages before send.
1258          * In case we block on congestion, we could otherwise run into
1259          * some distributed deadlock, if the other side blocks on
1260          * congestion as well, because our receiver blocks in
1261          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1262         drbd_free_peer_req(device, peer_req);
1263         peer_req = NULL;
1264         inc_rs_pending(device);
1265         err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1266         if (err)
1267                 dec_rs_pending(device);
1268         kfree(digest);
1269 
1270 out:
1271         if (peer_req)
1272                 drbd_free_peer_req(device, peer_req);
1273         dec_unacked(device);
1274         return err;
1275 }
1276 
1277 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1278 {
1279         if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1280                 device->ov_last_oos_size += size>>9;
1281         } else {
1282                 device->ov_last_oos_start = sector;
1283                 device->ov_last_oos_size = size>>9;
1284         }
1285         drbd_set_out_of_sync(device, sector, size);
1286 }
1287 
1288 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1289 {
1290         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1291         struct drbd_peer_device *peer_device = peer_req->peer_device;
1292         struct drbd_device *device = peer_device->device;
1293         struct digest_info *di;
1294         void *digest;
1295         sector_t sector = peer_req->i.sector;
1296         unsigned int size = peer_req->i.size;
1297         int digest_size;
1298         int err, eq = 0;
1299         bool stop_sector_reached = false;
1300 
1301         if (unlikely(cancel)) {
1302                 drbd_free_peer_req(device, peer_req);
1303                 dec_unacked(device);
1304                 return 0;
1305         }
1306 
1307         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1308          * the resync lru has been cleaned up already */
1309         if (get_ldev(device)) {
1310                 drbd_rs_complete_io(device, peer_req->i.sector);
1311                 put_ldev(device);
1312         }
1313 
1314         di = peer_req->digest;
1315 
1316         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1317                 digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1318                 digest = kmalloc(digest_size, GFP_NOIO);
1319                 if (digest) {
1320                         drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1321 
1322                         D_ASSERT(device, digest_size == di->digest_size);
1323                         eq = !memcmp(digest, di->digest, digest_size);
1324                         kfree(digest);
1325                 }
1326         }
1327 
1328         /* Free peer_req and pages before send.
1329          * In case we block on congestion, we could otherwise run into
1330          * some distributed deadlock, if the other side blocks on
1331          * congestion as well, because our receiver blocks in
1332          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1333         drbd_free_peer_req(device, peer_req);
1334         if (!eq)
1335                 drbd_ov_out_of_sync_found(device, sector, size);
1336         else
1337                 ov_out_of_sync_print(device);
1338 
1339         err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1340                                eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1341 
1342         dec_unacked(device);
1343 
1344         --device->ov_left;
1345 
1346         /* let's advance progress step marks only for every other megabyte */
1347         if ((device->ov_left & 0x200) == 0x200)
1348                 drbd_advance_rs_marks(device, device->ov_left);
1349 
1350         stop_sector_reached = verify_can_do_stop_sector(device) &&
1351                 (sector + (size>>9)) >= device->ov_stop_sector;
1352 
1353         if (device->ov_left == 0 || stop_sector_reached) {
1354                 ov_out_of_sync_print(device);
1355                 drbd_resync_finished(device);
1356         }
1357 
1358         return err;
1359 }
1360 
1361 /* FIXME
1362  * We need to track the number of pending barrier acks,
1363  * and to be able to wait for them.
1364  * See also comment in drbd_adm_attach before drbd_suspend_io.
1365  */
1366 static int drbd_send_barrier(struct drbd_connection *connection)
1367 {
1368         struct p_barrier *p;
1369         struct drbd_socket *sock;
1370 
1371         sock = &connection->data;
1372         p = conn_prepare_command(connection, sock);
1373         if (!p)
1374                 return -EIO;
1375         p->barrier = connection->send.current_epoch_nr;
1376         p->pad = 0;
1377         connection->send.current_epoch_writes = 0;
1378         connection->send.last_sent_barrier_jif = jiffies;
1379 
1380         return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1381 }
1382 
1383 static int pd_send_unplug_remote(struct drbd_peer_device *pd)
1384 {
1385         struct drbd_socket *sock = &pd->connection->data;
1386         if (!drbd_prepare_command(pd, sock))
1387                 return -EIO;
1388         return drbd_send_command(pd, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1389 }
1390 
1391 int w_send_write_hint(struct drbd_work *w, int cancel)
1392 {
1393         struct drbd_device *device =
1394                 container_of(w, struct drbd_device, unplug_work);
1395 
1396         if (cancel)
1397                 return 0;
1398         return pd_send_unplug_remote(first_peer_device(device));
1399 }
1400 
1401 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1402 {
1403         if (!connection->send.seen_any_write_yet) {
1404                 connection->send.seen_any_write_yet = true;
1405                 connection->send.current_epoch_nr = epoch;
1406                 connection->send.current_epoch_writes = 0;
1407                 connection->send.last_sent_barrier_jif = jiffies;
1408         }
1409 }
1410 
1411 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1412 {
1413         /* re-init if first write on this connection */
1414         if (!connection->send.seen_any_write_yet)
1415                 return;
1416         if (connection->send.current_epoch_nr != epoch) {
1417                 if (connection->send.current_epoch_writes)
1418                         drbd_send_barrier(connection);
1419                 connection->send.current_epoch_nr = epoch;
1420         }
1421 }
1422 
1423 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1424 {
1425         struct drbd_request *req = container_of(w, struct drbd_request, w);
1426         struct drbd_device *device = req->device;
1427         struct drbd_peer_device *const peer_device = first_peer_device(device);
1428         struct drbd_connection *const connection = peer_device->connection;
1429         int err;
1430 
1431         if (unlikely(cancel)) {
1432                 req_mod(req, SEND_CANCELED);
1433                 return 0;
1434         }
1435         req->pre_send_jif = jiffies;
1436 
1437         /* this time, no connection->send.current_epoch_writes++;
1438          * If it was sent, it was the closing barrier for the last
1439          * replicated epoch, before we went into AHEAD mode.
1440          * No more barriers will be sent, until we leave AHEAD mode again. */
1441         maybe_send_barrier(connection, req->epoch);
1442 
1443         err = drbd_send_out_of_sync(peer_device, req);
1444         req_mod(req, OOS_HANDED_TO_NETWORK);
1445 
1446         return err;
1447 }
1448 
1449 /**
1450  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1451  * @w:          work object.
1452  * @cancel:     The connection will be closed anyways
1453  */
1454 int w_send_dblock(struct drbd_work *w, int cancel)
1455 {
1456         struct drbd_request *req = container_of(w, struct drbd_request, w);
1457         struct drbd_device *device = req->device;
1458         struct drbd_peer_device *const peer_device = first_peer_device(device);
1459         struct drbd_connection *connection = peer_device->connection;
1460         bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1461         int err;
1462 
1463         if (unlikely(cancel)) {
1464                 req_mod(req, SEND_CANCELED);
1465                 return 0;
1466         }
1467         req->pre_send_jif = jiffies;
1468 
1469         re_init_if_first_write(connection, req->epoch);
1470         maybe_send_barrier(connection, req->epoch);
1471         connection->send.current_epoch_writes++;
1472 
1473         err = drbd_send_dblock(peer_device, req);
1474         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1475 
1476         if (do_send_unplug && !err)
1477                 pd_send_unplug_remote(peer_device);
1478 
1479         return err;
1480 }
1481 
1482 /**
1483  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1484  * @w:          work object.
1485  * @cancel:     The connection will be closed anyways
1486  */
1487 int w_send_read_req(struct drbd_work *w, int cancel)
1488 {
1489         struct drbd_request *req = container_of(w, struct drbd_request, w);
1490         struct drbd_device *device = req->device;
1491         struct drbd_peer_device *const peer_device = first_peer_device(device);
1492         struct drbd_connection *connection = peer_device->connection;
1493         bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1494         int err;
1495 
1496         if (unlikely(cancel)) {
1497                 req_mod(req, SEND_CANCELED);
1498                 return 0;
1499         }
1500         req->pre_send_jif = jiffies;
1501 
1502         /* Even read requests may close a write epoch,
1503          * if there was any yet. */
1504         maybe_send_barrier(connection, req->epoch);
1505 
1506         err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1507                                  (unsigned long)req);
1508 
1509         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1510 
1511         if (do_send_unplug && !err)
1512                 pd_send_unplug_remote(peer_device);
1513 
1514         return err;
1515 }
1516 
1517 int w_restart_disk_io(struct drbd_work *w, int cancel)
1518 {
1519         struct drbd_request *req = container_of(w, struct drbd_request, w);
1520         struct drbd_device *device = req->device;
1521 
1522         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1523                 drbd_al_begin_io(device, &req->i);
1524 
1525         drbd_req_make_private_bio(req, req->master_bio);
1526         bio_set_dev(req->private_bio, device->ldev->backing_bdev);
1527         generic_make_request(req->private_bio);
1528 
1529         return 0;
1530 }
1531 
1532 static int _drbd_may_sync_now(struct drbd_device *device)
1533 {
1534         struct drbd_device *odev = device;
1535         int resync_after;
1536 
1537         while (1) {
1538                 if (!odev->ldev || odev->state.disk == D_DISKLESS)
1539                         return 1;
1540                 rcu_read_lock();
1541                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1542                 rcu_read_unlock();
1543                 if (resync_after == -1)
1544                         return 1;
1545                 odev = minor_to_device(resync_after);
1546                 if (!odev)
1547                         return 1;
1548                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1549                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1550                     odev->state.aftr_isp || odev->state.peer_isp ||
1551                     odev->state.user_isp)
1552                         return 0;
1553         }
1554 }
1555 
1556 /**
1557  * drbd_pause_after() - Pause resync on all devices that may not resync now
1558  * @device:     DRBD device.
1559  *
1560  * Called from process context only (admin command and after_state_ch).
1561  */
1562 static bool drbd_pause_after(struct drbd_device *device)
1563 {
1564         bool changed = false;
1565         struct drbd_device *odev;
1566         int i;
1567 
1568         rcu_read_lock();
1569         idr_for_each_entry(&drbd_devices, odev, i) {
1570                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1571                         continue;
1572                 if (!_drbd_may_sync_now(odev) &&
1573                     _drbd_set_state(_NS(odev, aftr_isp, 1),
1574                                     CS_HARD, NULL) != SS_NOTHING_TO_DO)
1575                         changed = true;
1576         }
1577         rcu_read_unlock();
1578 
1579         return changed;
1580 }
1581 
1582 /**
1583  * drbd_resume_next() - Resume resync on all devices that may resync now
1584  * @device:     DRBD device.
1585  *
1586  * Called from process context only (admin command and worker).
1587  */
1588 static bool drbd_resume_next(struct drbd_device *device)
1589 {
1590         bool changed = false;
1591         struct drbd_device *odev;
1592         int i;
1593 
1594         rcu_read_lock();
1595         idr_for_each_entry(&drbd_devices, odev, i) {
1596                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1597                         continue;
1598                 if (odev->state.aftr_isp) {
1599                         if (_drbd_may_sync_now(odev) &&
1600                             _drbd_set_state(_NS(odev, aftr_isp, 0),
1601                                             CS_HARD, NULL) != SS_NOTHING_TO_DO)
1602                                 changed = true;
1603                 }
1604         }
1605         rcu_read_unlock();
1606         return changed;
1607 }
1608 
1609 void resume_next_sg(struct drbd_device *device)
1610 {
1611         lock_all_resources();
1612         drbd_resume_next(device);
1613         unlock_all_resources();
1614 }
1615 
1616 void suspend_other_sg(struct drbd_device *device)
1617 {
1618         lock_all_resources();
1619         drbd_pause_after(device);
1620         unlock_all_resources();
1621 }
1622 
1623 /* caller must lock_all_resources() */
1624 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1625 {
1626         struct drbd_device *odev;
1627         int resync_after;
1628 
1629         if (o_minor == -1)
1630                 return NO_ERROR;
1631         if (o_minor < -1 || o_minor > MINORMASK)
1632                 return ERR_RESYNC_AFTER;
1633 
1634         /* check for loops */
1635         odev = minor_to_device(o_minor);
1636         while (1) {
1637                 if (odev == device)
1638                         return ERR_RESYNC_AFTER_CYCLE;
1639 
1640                 /* You are free to depend on diskless, non-existing,
1641                  * or not yet/no longer existing minors.
1642                  * We only reject dependency loops.
1643                  * We cannot follow the dependency chain beyond a detached or
1644                  * missing minor.
1645                  */
1646                 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1647                         return NO_ERROR;
1648 
1649                 rcu_read_lock();
1650                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1651                 rcu_read_unlock();
1652                 /* dependency chain ends here, no cycles. */
1653                 if (resync_after == -1)
1654                         return NO_ERROR;
1655 
1656                 /* follow the dependency chain */
1657                 odev = minor_to_device(resync_after);
1658         }
1659 }
1660 
1661 /* caller must lock_all_resources() */
1662 void drbd_resync_after_changed(struct drbd_device *device)
1663 {
1664         int changed;
1665 
1666         do {
1667                 changed  = drbd_pause_after(device);
1668                 changed |= drbd_resume_next(device);
1669         } while (changed);
1670 }
1671 
1672 void drbd_rs_controller_reset(struct drbd_device *device)
1673 {
1674         struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1675         struct fifo_buffer *plan;
1676 
1677         atomic_set(&device->rs_sect_in, 0);
1678         atomic_set(&device->rs_sect_ev, 0);
1679         device->rs_in_flight = 0;
1680         device->rs_last_events = (int)part_stat_read_accum(&disk->part0, sectors);
1681 
1682         /* Updating the RCU protected object in place is necessary since
1683            this function gets called from atomic context.
1684            It is valid since all other updates also lead to an completely
1685            empty fifo */
1686         rcu_read_lock();
1687         plan = rcu_dereference(device->rs_plan_s);
1688         plan->total = 0;
1689         fifo_set(plan, 0);
1690         rcu_read_unlock();
1691 }
1692 
1693 void start_resync_timer_fn(struct timer_list *t)
1694 {
1695         struct drbd_device *device = from_timer(device, t, start_resync_timer);
1696         drbd_device_post_work(device, RS_START);
1697 }
1698 
1699 static void do_start_resync(struct drbd_device *device)
1700 {
1701         if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1702                 drbd_warn(device, "postponing start_resync ...\n");
1703                 device->start_resync_timer.expires = jiffies + HZ/10;
1704                 add_timer(&device->start_resync_timer);
1705                 return;
1706         }
1707 
1708         drbd_start_resync(device, C_SYNC_SOURCE);
1709         clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1710 }
1711 
1712 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1713 {
1714         bool csums_after_crash_only;
1715         rcu_read_lock();
1716         csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1717         rcu_read_unlock();
1718         return connection->agreed_pro_version >= 89 &&          /* supported? */
1719                 connection->csums_tfm &&                        /* configured? */
1720                 (csums_after_crash_only == false                /* use for each resync? */
1721                  || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1722 }
1723 
1724 /**
1725  * drbd_start_resync() - Start the resync process
1726  * @device:     DRBD device.
1727  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1728  *
1729  * This function might bring you directly into one of the
1730  * C_PAUSED_SYNC_* states.
1731  */
1732 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1733 {
1734         struct drbd_peer_device *peer_device = first_peer_device(device);
1735         struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1736         union drbd_state ns;
1737         int r;
1738 
1739         if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1740                 drbd_err(device, "Resync already running!\n");
1741                 return;
1742         }
1743 
1744         if (!connection) {
1745                 drbd_err(device, "No connection to peer, aborting!\n");
1746                 return;
1747         }
1748 
1749         if (!test_bit(B_RS_H_DONE, &device->flags)) {
1750                 if (side == C_SYNC_TARGET) {
1751                         /* Since application IO was locked out during C_WF_BITMAP_T and
1752                            C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1753                            we check that we might make the data inconsistent. */
1754                         r = drbd_khelper(device, "before-resync-target");
1755                         r = (r >> 8) & 0xff;
1756                         if (r > 0) {
1757                                 drbd_info(device, "before-resync-target handler returned %d, "
1758                                          "dropping connection.\n", r);
1759                                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1760                                 return;
1761                         }
1762                 } else /* C_SYNC_SOURCE */ {
1763                         r = drbd_khelper(device, "before-resync-source");
1764                         r = (r >> 8) & 0xff;
1765                         if (r > 0) {
1766                                 if (r == 3) {
1767                                         drbd_info(device, "before-resync-source handler returned %d, "
1768                                                  "ignoring. Old userland tools?", r);
1769                                 } else {
1770                                         drbd_info(device, "before-resync-source handler returned %d, "
1771                                                  "dropping connection.\n", r);
1772                                         conn_request_state(connection,
1773                                                            NS(conn, C_DISCONNECTING), CS_HARD);
1774                                         return;
1775                                 }
1776                         }
1777                 }
1778         }
1779 
1780         if (current == connection->worker.task) {
1781                 /* The worker should not sleep waiting for state_mutex,
1782                    that can take long */
1783                 if (!mutex_trylock(device->state_mutex)) {
1784                         set_bit(B_RS_H_DONE, &device->flags);
1785                         device->start_resync_timer.expires = jiffies + HZ/5;
1786                         add_timer(&device->start_resync_timer);
1787                         return;
1788                 }
1789         } else {
1790                 mutex_lock(device->state_mutex);
1791         }
1792 
1793         lock_all_resources();
1794         clear_bit(B_RS_H_DONE, &device->flags);
1795         /* Did some connection breakage or IO error race with us? */
1796         if (device->state.conn < C_CONNECTED
1797         || !get_ldev_if_state(device, D_NEGOTIATING)) {
1798                 unlock_all_resources();
1799                 goto out;
1800         }
1801 
1802         ns = drbd_read_state(device);
1803 
1804         ns.aftr_isp = !_drbd_may_sync_now(device);
1805 
1806         ns.conn = side;
1807 
1808         if (side == C_SYNC_TARGET)
1809                 ns.disk = D_INCONSISTENT;
1810         else /* side == C_SYNC_SOURCE */
1811                 ns.pdsk = D_INCONSISTENT;
1812 
1813         r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1814         ns = drbd_read_state(device);
1815 
1816         if (ns.conn < C_CONNECTED)
1817                 r = SS_UNKNOWN_ERROR;
1818 
1819         if (r == SS_SUCCESS) {
1820                 unsigned long tw = drbd_bm_total_weight(device);
1821                 unsigned long now = jiffies;
1822                 int i;
1823 
1824                 device->rs_failed    = 0;
1825                 device->rs_paused    = 0;
1826                 device->rs_same_csum = 0;
1827                 device->rs_last_sect_ev = 0;
1828                 device->rs_total     = tw;
1829                 device->rs_start     = now;
1830                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1831                         device->rs_mark_left[i] = tw;
1832                         device->rs_mark_time[i] = now;
1833                 }
1834                 drbd_pause_after(device);
1835                 /* Forget potentially stale cached per resync extent bit-counts.
1836                  * Open coded drbd_rs_cancel_all(device), we already have IRQs
1837                  * disabled, and know the disk state is ok. */
1838                 spin_lock(&device->al_lock);
1839                 lc_reset(device->resync);
1840                 device->resync_locked = 0;
1841                 device->resync_wenr = LC_FREE;
1842                 spin_unlock(&device->al_lock);
1843         }
1844         unlock_all_resources();
1845 
1846         if (r == SS_SUCCESS) {
1847                 wake_up(&device->al_wait); /* for lc_reset() above */
1848                 /* reset rs_last_bcast when a resync or verify is started,
1849                  * to deal with potential jiffies wrap. */
1850                 device->rs_last_bcast = jiffies - HZ;
1851 
1852                 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1853                      drbd_conn_str(ns.conn),
1854                      (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1855                      (unsigned long) device->rs_total);
1856                 if (side == C_SYNC_TARGET) {
1857                         device->bm_resync_fo = 0;
1858                         device->use_csums = use_checksum_based_resync(connection, device);
1859                 } else {
1860                         device->use_csums = false;
1861                 }
1862 
1863                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1864                  * with w_send_oos, or the sync target will get confused as to
1865                  * how much bits to resync.  We cannot do that always, because for an
1866                  * empty resync and protocol < 95, we need to do it here, as we call
1867                  * drbd_resync_finished from here in that case.
1868                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1869                  * and from after_state_ch otherwise. */
1870                 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1871                         drbd_gen_and_send_sync_uuid(peer_device);
1872 
1873                 if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1874                         /* This still has a race (about when exactly the peers
1875                          * detect connection loss) that can lead to a full sync
1876                          * on next handshake. In 8.3.9 we fixed this with explicit
1877                          * resync-finished notifications, but the fix
1878                          * introduces a protocol change.  Sleeping for some
1879                          * time longer than the ping interval + timeout on the
1880                          * SyncSource, to give the SyncTarget the chance to
1881                          * detect connection loss, then waiting for a ping
1882                          * response (implicit in drbd_resync_finished) reduces
1883                          * the race considerably, but does not solve it. */
1884                         if (side == C_SYNC_SOURCE) {
1885                                 struct net_conf *nc;
1886                                 int timeo;
1887 
1888                                 rcu_read_lock();
1889                                 nc = rcu_dereference(connection->net_conf);
1890                                 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1891                                 rcu_read_unlock();
1892                                 schedule_timeout_interruptible(timeo);
1893                         }
1894                         drbd_resync_finished(device);
1895                 }
1896 
1897                 drbd_rs_controller_reset(device);
1898                 /* ns.conn may already be != device->state.conn,
1899                  * we may have been paused in between, or become paused until
1900                  * the timer triggers.
1901                  * No matter, that is handled in resync_timer_fn() */
1902                 if (ns.conn == C_SYNC_TARGET)
1903                         mod_timer(&device->resync_timer, jiffies);
1904 
1905                 drbd_md_sync(device);
1906         }
1907         put_ldev(device);
1908 out:
1909         mutex_unlock(device->state_mutex);
1910 }
1911 
1912 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1913 {
1914         struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1915         device->rs_last_bcast = jiffies;
1916 
1917         if (!get_ldev(device))
1918                 return;
1919 
1920         drbd_bm_write_lazy(device, 0);
1921         if (resync_done && is_sync_state(device->state.conn))
1922                 drbd_resync_finished(device);
1923 
1924         drbd_bcast_event(device, &sib);
1925         /* update timestamp, in case it took a while to write out stuff */
1926         device->rs_last_bcast = jiffies;
1927         put_ldev(device);
1928 }
1929 
1930 static void drbd_ldev_destroy(struct drbd_device *device)
1931 {
1932         lc_destroy(device->resync);
1933         device->resync = NULL;
1934         lc_destroy(device->act_log);
1935         device->act_log = NULL;
1936 
1937         __acquire(local);
1938         drbd_backing_dev_free(device, device->ldev);
1939         device->ldev = NULL;
1940         __release(local);
1941 
1942         clear_bit(GOING_DISKLESS, &device->flags);
1943         wake_up(&device->misc_wait);
1944 }
1945 
1946 static void go_diskless(struct drbd_device *device)
1947 {
1948         D_ASSERT(device, device->state.disk == D_FAILED);
1949         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1950          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1951          * the protected members anymore, though, so once put_ldev reaches zero
1952          * again, it will be safe to free them. */
1953 
1954         /* Try to write changed bitmap pages, read errors may have just
1955          * set some bits outside the area covered by the activity log.
1956          *
1957          * If we have an IO error during the bitmap writeout,
1958          * we will want a full sync next time, just in case.
1959          * (Do we want a specific meta data flag for this?)
1960          *
1961          * If that does not make it to stable storage either,
1962          * we cannot do anything about that anymore.
1963          *
1964          * We still need to check if both bitmap and ldev are present, we may
1965          * end up here after a failed attach, before ldev was even assigned.
1966          */
1967         if (device->bitmap && device->ldev) {
1968                 /* An interrupted resync or similar is allowed to recounts bits
1969                  * while we detach.
1970                  * Any modifications would not be expected anymore, though.
1971                  */
1972                 if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1973                                         "detach", BM_LOCKED_TEST_ALLOWED)) {
1974                         if (test_bit(WAS_READ_ERROR, &device->flags)) {
1975                                 drbd_md_set_flag(device, MDF_FULL_SYNC);
1976                                 drbd_md_sync(device);
1977                         }
1978                 }
1979         }
1980 
1981         drbd_force_state(device, NS(disk, D_DISKLESS));
1982 }
1983 
1984 static int do_md_sync(struct drbd_device *device)
1985 {
1986         drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1987         drbd_md_sync(device);
1988         return 0;
1989 }
1990 
1991 /* only called from drbd_worker thread, no locking */
1992 void __update_timing_details(
1993                 struct drbd_thread_timing_details *tdp,
1994                 unsigned int *cb_nr,
1995                 void *cb,
1996                 const char *fn, const unsigned int line)
1997 {
1998         unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
1999         struct drbd_thread_timing_details *td = tdp + i;
2000 
2001         td->start_jif = jiffies;
2002         td->cb_addr = cb;
2003         td->caller_fn = fn;
2004         td->line = line;
2005         td->cb_nr = *cb_nr;
2006 
2007         i = (i+1) % DRBD_THREAD_DETAILS_HIST;
2008         td = tdp + i;
2009         memset(td, 0, sizeof(*td));
2010 
2011         ++(*cb_nr);
2012 }
2013 
2014 static void do_device_work(struct drbd_device *device, const unsigned long todo)
2015 {
2016         if (test_bit(MD_SYNC, &todo))
2017                 do_md_sync(device);
2018         if (test_bit(RS_DONE, &todo) ||
2019             test_bit(RS_PROGRESS, &todo))
2020                 update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
2021         if (test_bit(GO_DISKLESS, &todo))
2022                 go_diskless(device);
2023         if (test_bit(DESTROY_DISK, &todo))
2024                 drbd_ldev_destroy(device);
2025         if (test_bit(RS_START, &todo))
2026                 do_start_resync(device);
2027 }
2028 
2029 #define DRBD_DEVICE_WORK_MASK   \
2030         ((1UL << GO_DISKLESS)   \
2031         |(1UL << DESTROY_DISK)  \
2032         |(1UL << MD_SYNC)       \
2033         |(1UL << RS_START)      \
2034         |(1UL << RS_PROGRESS)   \
2035         |(1UL << RS_DONE)       \
2036         )
2037 
2038 static unsigned long get_work_bits(unsigned long *flags)
2039 {
2040         unsigned long old, new;
2041         do {
2042                 old = *flags;
2043                 new = old & ~DRBD_DEVICE_WORK_MASK;
2044         } while (cmpxchg(flags, old, new) != old);
2045         return old & DRBD_DEVICE_WORK_MASK;
2046 }
2047 
2048 static void do_unqueued_work(struct drbd_connection *connection)
2049 {
2050         struct drbd_peer_device *peer_device;
2051         int vnr;
2052 
2053         rcu_read_lock();
2054         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2055                 struct drbd_device *device = peer_device->device;
2056                 unsigned long todo = get_work_bits(&device->flags);
2057                 if (!todo)
2058                         continue;
2059 
2060                 kref_get(&device->kref);
2061                 rcu_read_unlock();
2062                 do_device_work(device, todo);
2063                 kref_put(&device->kref, drbd_destroy_device);
2064                 rcu_read_lock();
2065         }
2066         rcu_read_unlock();
2067 }
2068 
2069 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
2070 {
2071         spin_lock_irq(&queue->q_lock);
2072         list_splice_tail_init(&queue->q, work_list);
2073         spin_unlock_irq(&queue->q_lock);
2074         return !list_empty(work_list);
2075 }
2076 
2077 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2078 {
2079         DEFINE_WAIT(wait);
2080         struct net_conf *nc;
2081         int uncork, cork;
2082 
2083         dequeue_work_batch(&connection->sender_work, work_list);
2084         if (!list_empty(work_list))
2085                 return;
2086 
2087         /* Still nothing to do?
2088          * Maybe we still need to close the current epoch,
2089          * even if no new requests are queued yet.
2090          *
2091          * Also, poke TCP, just in case.
2092          * Then wait for new work (or signal). */
2093         rcu_read_lock();
2094         nc = rcu_dereference(connection->net_conf);
2095         uncork = nc ? nc->tcp_cork : 0;
2096         rcu_read_unlock();
2097         if (uncork) {
2098                 mutex_lock(&connection->data.mutex);
2099                 if (connection->data.socket)
2100                         drbd_tcp_uncork(connection->data.socket);
2101                 mutex_unlock(&connection->data.mutex);
2102         }
2103 
2104         for (;;) {
2105                 int send_barrier;
2106                 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2107                 spin_lock_irq(&connection->resource->req_lock);
2108                 spin_lock(&connection->sender_work.q_lock);     /* FIXME get rid of this one? */
2109                 if (!list_empty(&connection->sender_work.q))
2110                         list_splice_tail_init(&connection->sender_work.q, work_list);
2111                 spin_unlock(&connection->sender_work.q_lock);   /* FIXME get rid of this one? */
2112                 if (!list_empty(work_list) || signal_pending(current)) {
2113                         spin_unlock_irq(&connection->resource->req_lock);
2114                         break;
2115                 }
2116 
2117                 /* We found nothing new to do, no to-be-communicated request,
2118                  * no other work item.  We may still need to close the last
2119                  * epoch.  Next incoming request epoch will be connection ->
2120                  * current transfer log epoch number.  If that is different
2121                  * from the epoch of the last request we communicated, it is
2122                  * safe to send the epoch separating barrier now.
2123                  */
2124                 send_barrier =
2125                         atomic_read(&connection->current_tle_nr) !=
2126                         connection->send.current_epoch_nr;
2127                 spin_unlock_irq(&connection->resource->req_lock);
2128 
2129                 if (send_barrier)
2130                         maybe_send_barrier(connection,
2131                                         connection->send.current_epoch_nr + 1);
2132 
2133                 if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2134                         break;
2135 
2136                 /* drbd_send() may have called flush_signals() */
2137                 if (get_t_state(&connection->worker) != RUNNING)
2138                         break;
2139 
2140                 schedule();
2141                 /* may be woken up for other things but new work, too,
2142                  * e.g. if the current epoch got closed.
2143                  * In which case we send the barrier above. */
2144         }
2145         finish_wait(&connection->sender_work.q_wait, &wait);
2146 
2147         /* someone may have changed the config while we have been waiting above. */
2148         rcu_read_lock();
2149         nc = rcu_dereference(connection->net_conf);
2150         cork = nc ? nc->tcp_cork : 0;
2151         rcu_read_unlock();
2152         mutex_lock(&connection->data.mutex);
2153         if (connection->data.socket) {
2154                 if (cork)
2155                         drbd_tcp_cork(connection->data.socket);
2156                 else if (!uncork)
2157                         drbd_tcp_uncork(connection->data.socket);
2158         }
2159         mutex_unlock(&connection->data.mutex);
2160 }
2161 
2162 int drbd_worker(struct drbd_thread *thi)
2163 {
2164         struct drbd_connection *connection = thi->connection;
2165         struct drbd_work *w = NULL;
2166         struct drbd_peer_device *peer_device;
2167         LIST_HEAD(work_list);
2168         int vnr;
2169 
2170         while (get_t_state(thi) == RUNNING) {
2171                 drbd_thread_current_set_cpu(thi);
2172 
2173                 if (list_empty(&work_list)) {
2174                         update_worker_timing_details(connection, wait_for_work);
2175                         wait_for_work(connection, &work_list);
2176                 }
2177 
2178                 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2179                         update_worker_timing_details(connection, do_unqueued_work);
2180                         do_unqueued_work(connection);
2181                 }
2182 
2183                 if (signal_pending(current)) {
2184                         flush_signals(current);
2185                         if (get_t_state(thi) == RUNNING) {
2186                                 drbd_warn(connection, "Worker got an unexpected signal\n");
2187                                 continue;
2188                         }
2189                         break;
2190                 }
2191 
2192                 if (get_t_state(thi) != RUNNING)
2193                         break;
2194 
2195                 if (!list_empty(&work_list)) {
2196                         w = list_first_entry(&work_list, struct drbd_work, list);
2197                         list_del_init(&w->list);
2198                         update_worker_timing_details(connection, w->cb);
2199                         if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2200                                 continue;
2201                         if (connection->cstate >= C_WF_REPORT_PARAMS)
2202                                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2203                 }
2204         }
2205 
2206         do {
2207                 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2208                         update_worker_timing_details(connection, do_unqueued_work);
2209                         do_unqueued_work(connection);
2210                 }
2211                 if (!list_empty(&work_list)) {
2212                         w = list_first_entry(&work_list, struct drbd_work, list);
2213                         list_del_init(&w->list);
2214                         update_worker_timing_details(connection, w->cb);
2215                         w->cb(w, 1);
2216                 } else
2217                         dequeue_work_batch(&connection->sender_work, &work_list);
2218         } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2219 
2220         rcu_read_lock();
2221         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2222                 struct drbd_device *device = peer_device->device;
2223                 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2224                 kref_get(&device->kref);
2225                 rcu_read_unlock();
2226                 drbd_device_cleanup(device);
2227                 kref_put(&device->kref, drbd_destroy_device);
2228                 rcu_read_lock();
2229         }
2230         rcu_read_unlock();
2231 
2232         return 0;
2233 }

/* [<][>][^][v][top][bottom][index][help] */