1/* 2 drbd_worker.c 3 4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg. 5 6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH. 7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>. 8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>. 9 10 drbd is free software; you can redistribute it and/or modify 11 it under the terms of the GNU General Public License as published by 12 the Free Software Foundation; either version 2, or (at your option) 13 any later version. 14 15 drbd is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with drbd; see the file COPYING. If not, write to 22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 23 24*/ 25 26#include <linux/module.h> 27#include <linux/drbd.h> 28#include <linux/sched.h> 29#include <linux/wait.h> 30#include <linux/mm.h> 31#include <linux/memcontrol.h> 32#include <linux/mm_inline.h> 33#include <linux/slab.h> 34#include <linux/random.h> 35#include <linux/string.h> 36#include <linux/scatterlist.h> 37 38#include "drbd_int.h" 39#include "drbd_protocol.h" 40#include "drbd_req.h" 41 42static int make_ov_request(struct drbd_device *, int); 43static int make_resync_request(struct drbd_device *, int); 44 45/* endio handlers: 46 * drbd_md_endio (defined here) 47 * drbd_request_endio (defined here) 48 * drbd_peer_request_endio (defined here) 49 * drbd_bm_endio (defined in drbd_bitmap.c) 50 * 51 * For all these callbacks, note the following: 52 * The callbacks will be called in irq context by the IDE drivers, 53 * and in Softirqs/Tasklets/BH context by the SCSI drivers. 54 * Try to get the locking right :) 55 * 56 */ 57 58 59/* About the global_state_lock 60 Each state transition on an device holds a read lock. In case we have 61 to evaluate the resync after dependencies, we grab a write lock, because 62 we need stable states on all devices for that. */ 63rwlock_t global_state_lock; 64 65/* used for synchronous meta data and bitmap IO 66 * submitted by drbd_md_sync_page_io() 67 */ 68void drbd_md_endio(struct bio *bio) 69{ 70 struct drbd_device *device; 71 72 device = bio->bi_private; 73 device->md_io.error = bio->bi_error; 74 75 /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able 76 * to timeout on the lower level device, and eventually detach from it. 77 * If this io completion runs after that timeout expired, this 78 * drbd_md_put_buffer() may allow us to finally try and re-attach. 79 * During normal operation, this only puts that extra reference 80 * down to 1 again. 81 * Make sure we first drop the reference, and only then signal 82 * completion, or we may (in drbd_al_read_log()) cycle so fast into the 83 * next drbd_md_sync_page_io(), that we trigger the 84 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there. 85 */ 86 drbd_md_put_buffer(device); 87 device->md_io.done = 1; 88 wake_up(&device->misc_wait); 89 bio_put(bio); 90 if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */ 91 put_ldev(device); 92} 93 94/* reads on behalf of the partner, 95 * "submitted" by the receiver 96 */ 97static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local) 98{ 99 unsigned long flags = 0; 100 struct drbd_peer_device *peer_device = peer_req->peer_device; 101 struct drbd_device *device = peer_device->device; 102 103 spin_lock_irqsave(&device->resource->req_lock, flags); 104 device->read_cnt += peer_req->i.size >> 9; 105 list_del(&peer_req->w.list); 106 if (list_empty(&device->read_ee)) 107 wake_up(&device->ee_wait); 108 if (test_bit(__EE_WAS_ERROR, &peer_req->flags)) 109 __drbd_chk_io_error(device, DRBD_READ_ERROR); 110 spin_unlock_irqrestore(&device->resource->req_lock, flags); 111 112 drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w); 113 put_ldev(device); 114} 115 116/* writes on behalf of the partner, or resync writes, 117 * "submitted" by the receiver, final stage. */ 118void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local) 119{ 120 unsigned long flags = 0; 121 struct drbd_peer_device *peer_device = peer_req->peer_device; 122 struct drbd_device *device = peer_device->device; 123 struct drbd_interval i; 124 int do_wake; 125 u64 block_id; 126 int do_al_complete_io; 127 128 /* after we moved peer_req to done_ee, 129 * we may no longer access it, 130 * it may be freed/reused already! 131 * (as soon as we release the req_lock) */ 132 i = peer_req->i; 133 do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO; 134 block_id = peer_req->block_id; 135 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO; 136 137 spin_lock_irqsave(&device->resource->req_lock, flags); 138 device->writ_cnt += peer_req->i.size >> 9; 139 list_move_tail(&peer_req->w.list, &device->done_ee); 140 141 /* 142 * Do not remove from the write_requests tree here: we did not send the 143 * Ack yet and did not wake possibly waiting conflicting requests. 144 * Removed from the tree from "drbd_process_done_ee" within the 145 * appropriate dw.cb (e_end_block/e_end_resync_block) or from 146 * _drbd_clear_done_ee. 147 */ 148 149 do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee); 150 151 /* FIXME do we want to detach for failed REQ_DISCARD? 152 * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */ 153 if (peer_req->flags & EE_WAS_ERROR) 154 __drbd_chk_io_error(device, DRBD_WRITE_ERROR); 155 spin_unlock_irqrestore(&device->resource->req_lock, flags); 156 157 if (block_id == ID_SYNCER) 158 drbd_rs_complete_io(device, i.sector); 159 160 if (do_wake) 161 wake_up(&device->ee_wait); 162 163 if (do_al_complete_io) 164 drbd_al_complete_io(device, &i); 165 166 wake_asender(peer_device->connection); 167 put_ldev(device); 168} 169 170/* writes on behalf of the partner, or resync writes, 171 * "submitted" by the receiver. 172 */ 173void drbd_peer_request_endio(struct bio *bio) 174{ 175 struct drbd_peer_request *peer_req = bio->bi_private; 176 struct drbd_device *device = peer_req->peer_device->device; 177 int is_write = bio_data_dir(bio) == WRITE; 178 int is_discard = !!(bio->bi_rw & REQ_DISCARD); 179 180 if (bio->bi_error && __ratelimit(&drbd_ratelimit_state)) 181 drbd_warn(device, "%s: error=%d s=%llus\n", 182 is_write ? (is_discard ? "discard" : "write") 183 : "read", bio->bi_error, 184 (unsigned long long)peer_req->i.sector); 185 186 if (bio->bi_error) 187 set_bit(__EE_WAS_ERROR, &peer_req->flags); 188 189 bio_put(bio); /* no need for the bio anymore */ 190 if (atomic_dec_and_test(&peer_req->pending_bios)) { 191 if (is_write) 192 drbd_endio_write_sec_final(peer_req); 193 else 194 drbd_endio_read_sec_final(peer_req); 195 } 196} 197 198/* read, readA or write requests on R_PRIMARY coming from drbd_make_request 199 */ 200void drbd_request_endio(struct bio *bio) 201{ 202 unsigned long flags; 203 struct drbd_request *req = bio->bi_private; 204 struct drbd_device *device = req->device; 205 struct bio_and_error m; 206 enum drbd_req_event what; 207 208 /* If this request was aborted locally before, 209 * but now was completed "successfully", 210 * chances are that this caused arbitrary data corruption. 211 * 212 * "aborting" requests, or force-detaching the disk, is intended for 213 * completely blocked/hung local backing devices which do no longer 214 * complete requests at all, not even do error completions. In this 215 * situation, usually a hard-reset and failover is the only way out. 216 * 217 * By "aborting", basically faking a local error-completion, 218 * we allow for a more graceful swichover by cleanly migrating services. 219 * Still the affected node has to be rebooted "soon". 220 * 221 * By completing these requests, we allow the upper layers to re-use 222 * the associated data pages. 223 * 224 * If later the local backing device "recovers", and now DMAs some data 225 * from disk into the original request pages, in the best case it will 226 * just put random data into unused pages; but typically it will corrupt 227 * meanwhile completely unrelated data, causing all sorts of damage. 228 * 229 * Which means delayed successful completion, 230 * especially for READ requests, 231 * is a reason to panic(). 232 * 233 * We assume that a delayed *error* completion is OK, 234 * though we still will complain noisily about it. 235 */ 236 if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) { 237 if (__ratelimit(&drbd_ratelimit_state)) 238 drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n"); 239 240 if (!bio->bi_error) 241 panic("possible random memory corruption caused by delayed completion of aborted local request\n"); 242 } 243 244 /* to avoid recursion in __req_mod */ 245 if (unlikely(bio->bi_error)) { 246 if (bio->bi_rw & REQ_DISCARD) 247 what = (bio->bi_error == -EOPNOTSUPP) 248 ? DISCARD_COMPLETED_NOTSUPP 249 : DISCARD_COMPLETED_WITH_ERROR; 250 else 251 what = (bio_data_dir(bio) == WRITE) 252 ? WRITE_COMPLETED_WITH_ERROR 253 : (bio_rw(bio) == READ) 254 ? READ_COMPLETED_WITH_ERROR 255 : READ_AHEAD_COMPLETED_WITH_ERROR; 256 } else 257 what = COMPLETED_OK; 258 259 bio_put(req->private_bio); 260 req->private_bio = ERR_PTR(bio->bi_error); 261 262 /* not req_mod(), we need irqsave here! */ 263 spin_lock_irqsave(&device->resource->req_lock, flags); 264 __req_mod(req, what, &m); 265 spin_unlock_irqrestore(&device->resource->req_lock, flags); 266 put_ldev(device); 267 268 if (m.bio) 269 complete_master_bio(device, &m); 270} 271 272void drbd_csum_ee(struct crypto_hash *tfm, struct drbd_peer_request *peer_req, void *digest) 273{ 274 struct hash_desc desc; 275 struct scatterlist sg; 276 struct page *page = peer_req->pages; 277 struct page *tmp; 278 unsigned len; 279 280 desc.tfm = tfm; 281 desc.flags = 0; 282 283 sg_init_table(&sg, 1); 284 crypto_hash_init(&desc); 285 286 while ((tmp = page_chain_next(page))) { 287 /* all but the last page will be fully used */ 288 sg_set_page(&sg, page, PAGE_SIZE, 0); 289 crypto_hash_update(&desc, &sg, sg.length); 290 page = tmp; 291 } 292 /* and now the last, possibly only partially used page */ 293 len = peer_req->i.size & (PAGE_SIZE - 1); 294 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0); 295 crypto_hash_update(&desc, &sg, sg.length); 296 crypto_hash_final(&desc, digest); 297} 298 299void drbd_csum_bio(struct crypto_hash *tfm, struct bio *bio, void *digest) 300{ 301 struct hash_desc desc; 302 struct scatterlist sg; 303 struct bio_vec bvec; 304 struct bvec_iter iter; 305 306 desc.tfm = tfm; 307 desc.flags = 0; 308 309 sg_init_table(&sg, 1); 310 crypto_hash_init(&desc); 311 312 bio_for_each_segment(bvec, bio, iter) { 313 sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset); 314 crypto_hash_update(&desc, &sg, sg.length); 315 } 316 crypto_hash_final(&desc, digest); 317} 318 319/* MAYBE merge common code with w_e_end_ov_req */ 320static int w_e_send_csum(struct drbd_work *w, int cancel) 321{ 322 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 323 struct drbd_peer_device *peer_device = peer_req->peer_device; 324 struct drbd_device *device = peer_device->device; 325 int digest_size; 326 void *digest; 327 int err = 0; 328 329 if (unlikely(cancel)) 330 goto out; 331 332 if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0)) 333 goto out; 334 335 digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm); 336 digest = kmalloc(digest_size, GFP_NOIO); 337 if (digest) { 338 sector_t sector = peer_req->i.sector; 339 unsigned int size = peer_req->i.size; 340 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest); 341 /* Free peer_req and pages before send. 342 * In case we block on congestion, we could otherwise run into 343 * some distributed deadlock, if the other side blocks on 344 * congestion as well, because our receiver blocks in 345 * drbd_alloc_pages due to pp_in_use > max_buffers. */ 346 drbd_free_peer_req(device, peer_req); 347 peer_req = NULL; 348 inc_rs_pending(device); 349 err = drbd_send_drequest_csum(peer_device, sector, size, 350 digest, digest_size, 351 P_CSUM_RS_REQUEST); 352 kfree(digest); 353 } else { 354 drbd_err(device, "kmalloc() of digest failed.\n"); 355 err = -ENOMEM; 356 } 357 358out: 359 if (peer_req) 360 drbd_free_peer_req(device, peer_req); 361 362 if (unlikely(err)) 363 drbd_err(device, "drbd_send_drequest(..., csum) failed\n"); 364 return err; 365} 366 367#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) 368 369static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size) 370{ 371 struct drbd_device *device = peer_device->device; 372 struct drbd_peer_request *peer_req; 373 374 if (!get_ldev(device)) 375 return -EIO; 376 377 /* GFP_TRY, because if there is no memory available right now, this may 378 * be rescheduled for later. It is "only" background resync, after all. */ 379 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector, 380 size, true /* has real payload */, GFP_TRY); 381 if (!peer_req) 382 goto defer; 383 384 peer_req->w.cb = w_e_send_csum; 385 spin_lock_irq(&device->resource->req_lock); 386 list_add_tail(&peer_req->w.list, &device->read_ee); 387 spin_unlock_irq(&device->resource->req_lock); 388 389 atomic_add(size >> 9, &device->rs_sect_ev); 390 if (drbd_submit_peer_request(device, peer_req, READ, DRBD_FAULT_RS_RD) == 0) 391 return 0; 392 393 /* If it failed because of ENOMEM, retry should help. If it failed 394 * because bio_add_page failed (probably broken lower level driver), 395 * retry may or may not help. 396 * If it does not, you may need to force disconnect. */ 397 spin_lock_irq(&device->resource->req_lock); 398 list_del(&peer_req->w.list); 399 spin_unlock_irq(&device->resource->req_lock); 400 401 drbd_free_peer_req(device, peer_req); 402defer: 403 put_ldev(device); 404 return -EAGAIN; 405} 406 407int w_resync_timer(struct drbd_work *w, int cancel) 408{ 409 struct drbd_device *device = 410 container_of(w, struct drbd_device, resync_work); 411 412 switch (device->state.conn) { 413 case C_VERIFY_S: 414 make_ov_request(device, cancel); 415 break; 416 case C_SYNC_TARGET: 417 make_resync_request(device, cancel); 418 break; 419 } 420 421 return 0; 422} 423 424void resync_timer_fn(unsigned long data) 425{ 426 struct drbd_device *device = (struct drbd_device *) data; 427 428 drbd_queue_work_if_unqueued( 429 &first_peer_device(device)->connection->sender_work, 430 &device->resync_work); 431} 432 433static void fifo_set(struct fifo_buffer *fb, int value) 434{ 435 int i; 436 437 for (i = 0; i < fb->size; i++) 438 fb->values[i] = value; 439} 440 441static int fifo_push(struct fifo_buffer *fb, int value) 442{ 443 int ov; 444 445 ov = fb->values[fb->head_index]; 446 fb->values[fb->head_index++] = value; 447 448 if (fb->head_index >= fb->size) 449 fb->head_index = 0; 450 451 return ov; 452} 453 454static void fifo_add_val(struct fifo_buffer *fb, int value) 455{ 456 int i; 457 458 for (i = 0; i < fb->size; i++) 459 fb->values[i] += value; 460} 461 462struct fifo_buffer *fifo_alloc(int fifo_size) 463{ 464 struct fifo_buffer *fb; 465 466 fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO); 467 if (!fb) 468 return NULL; 469 470 fb->head_index = 0; 471 fb->size = fifo_size; 472 fb->total = 0; 473 474 return fb; 475} 476 477static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in) 478{ 479 struct disk_conf *dc; 480 unsigned int want; /* The number of sectors we want in-flight */ 481 int req_sect; /* Number of sectors to request in this turn */ 482 int correction; /* Number of sectors more we need in-flight */ 483 int cps; /* correction per invocation of drbd_rs_controller() */ 484 int steps; /* Number of time steps to plan ahead */ 485 int curr_corr; 486 int max_sect; 487 struct fifo_buffer *plan; 488 489 dc = rcu_dereference(device->ldev->disk_conf); 490 plan = rcu_dereference(device->rs_plan_s); 491 492 steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */ 493 494 if (device->rs_in_flight + sect_in == 0) { /* At start of resync */ 495 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps; 496 } else { /* normal path */ 497 want = dc->c_fill_target ? dc->c_fill_target : 498 sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10); 499 } 500 501 correction = want - device->rs_in_flight - plan->total; 502 503 /* Plan ahead */ 504 cps = correction / steps; 505 fifo_add_val(plan, cps); 506 plan->total += cps * steps; 507 508 /* What we do in this step */ 509 curr_corr = fifo_push(plan, 0); 510 plan->total -= curr_corr; 511 512 req_sect = sect_in + curr_corr; 513 if (req_sect < 0) 514 req_sect = 0; 515 516 max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ; 517 if (req_sect > max_sect) 518 req_sect = max_sect; 519 520 /* 521 drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n", 522 sect_in, device->rs_in_flight, want, correction, 523 steps, cps, device->rs_planed, curr_corr, req_sect); 524 */ 525 526 return req_sect; 527} 528 529static int drbd_rs_number_requests(struct drbd_device *device) 530{ 531 unsigned int sect_in; /* Number of sectors that came in since the last turn */ 532 int number, mxb; 533 534 sect_in = atomic_xchg(&device->rs_sect_in, 0); 535 device->rs_in_flight -= sect_in; 536 537 rcu_read_lock(); 538 mxb = drbd_get_max_buffers(device) / 2; 539 if (rcu_dereference(device->rs_plan_s)->size) { 540 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9); 541 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME; 542 } else { 543 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate; 544 number = SLEEP_TIME * device->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ); 545 } 546 rcu_read_unlock(); 547 548 /* Don't have more than "max-buffers"/2 in-flight. 549 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(), 550 * potentially causing a distributed deadlock on congestion during 551 * online-verify or (checksum-based) resync, if max-buffers, 552 * socket buffer sizes and resync rate settings are mis-configured. */ 553 554 /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k), 555 * mxb (as used here, and in drbd_alloc_pages on the peer) is 556 * "number of pages" (typically also 4k), 557 * but "rs_in_flight" is in "sectors" (512 Byte). */ 558 if (mxb - device->rs_in_flight/8 < number) 559 number = mxb - device->rs_in_flight/8; 560 561 return number; 562} 563 564static int make_resync_request(struct drbd_device *const device, int cancel) 565{ 566 struct drbd_peer_device *const peer_device = first_peer_device(device); 567 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL; 568 unsigned long bit; 569 sector_t sector; 570 const sector_t capacity = drbd_get_capacity(device->this_bdev); 571 int max_bio_size; 572 int number, rollback_i, size; 573 int align, requeue = 0; 574 int i = 0; 575 576 if (unlikely(cancel)) 577 return 0; 578 579 if (device->rs_total == 0) { 580 /* empty resync? */ 581 drbd_resync_finished(device); 582 return 0; 583 } 584 585 if (!get_ldev(device)) { 586 /* Since we only need to access device->rsync a 587 get_ldev_if_state(device,D_FAILED) would be sufficient, but 588 to continue resync with a broken disk makes no sense at 589 all */ 590 drbd_err(device, "Disk broke down during resync!\n"); 591 return 0; 592 } 593 594 max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9; 595 number = drbd_rs_number_requests(device); 596 if (number <= 0) 597 goto requeue; 598 599 for (i = 0; i < number; i++) { 600 /* Stop generating RS requests when half of the send buffer is filled, 601 * but notify TCP that we'd like to have more space. */ 602 mutex_lock(&connection->data.mutex); 603 if (connection->data.socket) { 604 struct sock *sk = connection->data.socket->sk; 605 int queued = sk->sk_wmem_queued; 606 int sndbuf = sk->sk_sndbuf; 607 if (queued > sndbuf / 2) { 608 requeue = 1; 609 if (sk->sk_socket) 610 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags); 611 } 612 } else 613 requeue = 1; 614 mutex_unlock(&connection->data.mutex); 615 if (requeue) 616 goto requeue; 617 618next_sector: 619 size = BM_BLOCK_SIZE; 620 bit = drbd_bm_find_next(device, device->bm_resync_fo); 621 622 if (bit == DRBD_END_OF_BITMAP) { 623 device->bm_resync_fo = drbd_bm_bits(device); 624 put_ldev(device); 625 return 0; 626 } 627 628 sector = BM_BIT_TO_SECT(bit); 629 630 if (drbd_try_rs_begin_io(device, sector)) { 631 device->bm_resync_fo = bit; 632 goto requeue; 633 } 634 device->bm_resync_fo = bit + 1; 635 636 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) { 637 drbd_rs_complete_io(device, sector); 638 goto next_sector; 639 } 640 641#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE 642 /* try to find some adjacent bits. 643 * we stop if we have already the maximum req size. 644 * 645 * Additionally always align bigger requests, in order to 646 * be prepared for all stripe sizes of software RAIDs. 647 */ 648 align = 1; 649 rollback_i = i; 650 while (i < number) { 651 if (size + BM_BLOCK_SIZE > max_bio_size) 652 break; 653 654 /* Be always aligned */ 655 if (sector & ((1<<(align+3))-1)) 656 break; 657 658 /* do not cross extent boundaries */ 659 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0) 660 break; 661 /* now, is it actually dirty, after all? 662 * caution, drbd_bm_test_bit is tri-state for some 663 * obscure reason; ( b == 0 ) would get the out-of-band 664 * only accidentally right because of the "oddly sized" 665 * adjustment below */ 666 if (drbd_bm_test_bit(device, bit+1) != 1) 667 break; 668 bit++; 669 size += BM_BLOCK_SIZE; 670 if ((BM_BLOCK_SIZE << align) <= size) 671 align++; 672 i++; 673 } 674 /* if we merged some, 675 * reset the offset to start the next drbd_bm_find_next from */ 676 if (size > BM_BLOCK_SIZE) 677 device->bm_resync_fo = bit + 1; 678#endif 679 680 /* adjust very last sectors, in case we are oddly sized */ 681 if (sector + (size>>9) > capacity) 682 size = (capacity-sector)<<9; 683 684 if (device->use_csums) { 685 switch (read_for_csum(peer_device, sector, size)) { 686 case -EIO: /* Disk failure */ 687 put_ldev(device); 688 return -EIO; 689 case -EAGAIN: /* allocation failed, or ldev busy */ 690 drbd_rs_complete_io(device, sector); 691 device->bm_resync_fo = BM_SECT_TO_BIT(sector); 692 i = rollback_i; 693 goto requeue; 694 case 0: 695 /* everything ok */ 696 break; 697 default: 698 BUG(); 699 } 700 } else { 701 int err; 702 703 inc_rs_pending(device); 704 err = drbd_send_drequest(peer_device, P_RS_DATA_REQUEST, 705 sector, size, ID_SYNCER); 706 if (err) { 707 drbd_err(device, "drbd_send_drequest() failed, aborting...\n"); 708 dec_rs_pending(device); 709 put_ldev(device); 710 return err; 711 } 712 } 713 } 714 715 if (device->bm_resync_fo >= drbd_bm_bits(device)) { 716 /* last syncer _request_ was sent, 717 * but the P_RS_DATA_REPLY not yet received. sync will end (and 718 * next sync group will resume), as soon as we receive the last 719 * resync data block, and the last bit is cleared. 720 * until then resync "work" is "inactive" ... 721 */ 722 put_ldev(device); 723 return 0; 724 } 725 726 requeue: 727 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9)); 728 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME); 729 put_ldev(device); 730 return 0; 731} 732 733static int make_ov_request(struct drbd_device *device, int cancel) 734{ 735 int number, i, size; 736 sector_t sector; 737 const sector_t capacity = drbd_get_capacity(device->this_bdev); 738 bool stop_sector_reached = false; 739 740 if (unlikely(cancel)) 741 return 1; 742 743 number = drbd_rs_number_requests(device); 744 745 sector = device->ov_position; 746 for (i = 0; i < number; i++) { 747 if (sector >= capacity) 748 return 1; 749 750 /* We check for "finished" only in the reply path: 751 * w_e_end_ov_reply(). 752 * We need to send at least one request out. */ 753 stop_sector_reached = i > 0 754 && verify_can_do_stop_sector(device) 755 && sector >= device->ov_stop_sector; 756 if (stop_sector_reached) 757 break; 758 759 size = BM_BLOCK_SIZE; 760 761 if (drbd_try_rs_begin_io(device, sector)) { 762 device->ov_position = sector; 763 goto requeue; 764 } 765 766 if (sector + (size>>9) > capacity) 767 size = (capacity-sector)<<9; 768 769 inc_rs_pending(device); 770 if (drbd_send_ov_request(first_peer_device(device), sector, size)) { 771 dec_rs_pending(device); 772 return 0; 773 } 774 sector += BM_SECT_PER_BIT; 775 } 776 device->ov_position = sector; 777 778 requeue: 779 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9)); 780 if (i == 0 || !stop_sector_reached) 781 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME); 782 return 1; 783} 784 785int w_ov_finished(struct drbd_work *w, int cancel) 786{ 787 struct drbd_device_work *dw = 788 container_of(w, struct drbd_device_work, w); 789 struct drbd_device *device = dw->device; 790 kfree(dw); 791 ov_out_of_sync_print(device); 792 drbd_resync_finished(device); 793 794 return 0; 795} 796 797static int w_resync_finished(struct drbd_work *w, int cancel) 798{ 799 struct drbd_device_work *dw = 800 container_of(w, struct drbd_device_work, w); 801 struct drbd_device *device = dw->device; 802 kfree(dw); 803 804 drbd_resync_finished(device); 805 806 return 0; 807} 808 809static void ping_peer(struct drbd_device *device) 810{ 811 struct drbd_connection *connection = first_peer_device(device)->connection; 812 813 clear_bit(GOT_PING_ACK, &connection->flags); 814 request_ping(connection); 815 wait_event(connection->ping_wait, 816 test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED); 817} 818 819int drbd_resync_finished(struct drbd_device *device) 820{ 821 unsigned long db, dt, dbdt; 822 unsigned long n_oos; 823 union drbd_state os, ns; 824 struct drbd_device_work *dw; 825 char *khelper_cmd = NULL; 826 int verify_done = 0; 827 828 /* Remove all elements from the resync LRU. Since future actions 829 * might set bits in the (main) bitmap, then the entries in the 830 * resync LRU would be wrong. */ 831 if (drbd_rs_del_all(device)) { 832 /* In case this is not possible now, most probably because 833 * there are P_RS_DATA_REPLY Packets lingering on the worker's 834 * queue (or even the read operations for those packets 835 * is not finished by now). Retry in 100ms. */ 836 837 schedule_timeout_interruptible(HZ / 10); 838 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC); 839 if (dw) { 840 dw->w.cb = w_resync_finished; 841 dw->device = device; 842 drbd_queue_work(&first_peer_device(device)->connection->sender_work, 843 &dw->w); 844 return 1; 845 } 846 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n"); 847 } 848 849 dt = (jiffies - device->rs_start - device->rs_paused) / HZ; 850 if (dt <= 0) 851 dt = 1; 852 853 db = device->rs_total; 854 /* adjust for verify start and stop sectors, respective reached position */ 855 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T) 856 db -= device->ov_left; 857 858 dbdt = Bit2KB(db/dt); 859 device->rs_paused /= HZ; 860 861 if (!get_ldev(device)) 862 goto out; 863 864 ping_peer(device); 865 866 spin_lock_irq(&device->resource->req_lock); 867 os = drbd_read_state(device); 868 869 verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T); 870 871 /* This protects us against multiple calls (that can happen in the presence 872 of application IO), and against connectivity loss just before we arrive here. */ 873 if (os.conn <= C_CONNECTED) 874 goto out_unlock; 875 876 ns = os; 877 ns.conn = C_CONNECTED; 878 879 drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n", 880 verify_done ? "Online verify" : "Resync", 881 dt + device->rs_paused, device->rs_paused, dbdt); 882 883 n_oos = drbd_bm_total_weight(device); 884 885 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) { 886 if (n_oos) { 887 drbd_alert(device, "Online verify found %lu %dk block out of sync!\n", 888 n_oos, Bit2KB(1)); 889 khelper_cmd = "out-of-sync"; 890 } 891 } else { 892 D_ASSERT(device, (n_oos - device->rs_failed) == 0); 893 894 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) 895 khelper_cmd = "after-resync-target"; 896 897 if (device->use_csums && device->rs_total) { 898 const unsigned long s = device->rs_same_csum; 899 const unsigned long t = device->rs_total; 900 const int ratio = 901 (t == 0) ? 0 : 902 (t < 100000) ? ((s*100)/t) : (s/(t/100)); 903 drbd_info(device, "%u %% had equal checksums, eliminated: %luK; " 904 "transferred %luK total %luK\n", 905 ratio, 906 Bit2KB(device->rs_same_csum), 907 Bit2KB(device->rs_total - device->rs_same_csum), 908 Bit2KB(device->rs_total)); 909 } 910 } 911 912 if (device->rs_failed) { 913 drbd_info(device, " %lu failed blocks\n", device->rs_failed); 914 915 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 916 ns.disk = D_INCONSISTENT; 917 ns.pdsk = D_UP_TO_DATE; 918 } else { 919 ns.disk = D_UP_TO_DATE; 920 ns.pdsk = D_INCONSISTENT; 921 } 922 } else { 923 ns.disk = D_UP_TO_DATE; 924 ns.pdsk = D_UP_TO_DATE; 925 926 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 927 if (device->p_uuid) { 928 int i; 929 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++) 930 _drbd_uuid_set(device, i, device->p_uuid[i]); 931 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]); 932 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]); 933 } else { 934 drbd_err(device, "device->p_uuid is NULL! BUG\n"); 935 } 936 } 937 938 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) { 939 /* for verify runs, we don't update uuids here, 940 * so there would be nothing to report. */ 941 drbd_uuid_set_bm(device, 0UL); 942 drbd_print_uuids(device, "updated UUIDs"); 943 if (device->p_uuid) { 944 /* Now the two UUID sets are equal, update what we 945 * know of the peer. */ 946 int i; 947 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++) 948 device->p_uuid[i] = device->ldev->md.uuid[i]; 949 } 950 } 951 } 952 953 _drbd_set_state(device, ns, CS_VERBOSE, NULL); 954out_unlock: 955 spin_unlock_irq(&device->resource->req_lock); 956 put_ldev(device); 957out: 958 device->rs_total = 0; 959 device->rs_failed = 0; 960 device->rs_paused = 0; 961 962 /* reset start sector, if we reached end of device */ 963 if (verify_done && device->ov_left == 0) 964 device->ov_start_sector = 0; 965 966 drbd_md_sync(device); 967 968 if (khelper_cmd) 969 drbd_khelper(device, khelper_cmd); 970 971 return 1; 972} 973 974/* helper */ 975static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req) 976{ 977 if (drbd_peer_req_has_active_page(peer_req)) { 978 /* This might happen if sendpage() has not finished */ 979 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT; 980 atomic_add(i, &device->pp_in_use_by_net); 981 atomic_sub(i, &device->pp_in_use); 982 spin_lock_irq(&device->resource->req_lock); 983 list_add_tail(&peer_req->w.list, &device->net_ee); 984 spin_unlock_irq(&device->resource->req_lock); 985 wake_up(&drbd_pp_wait); 986 } else 987 drbd_free_peer_req(device, peer_req); 988} 989 990/** 991 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST 992 * @device: DRBD device. 993 * @w: work object. 994 * @cancel: The connection will be closed anyways 995 */ 996int w_e_end_data_req(struct drbd_work *w, int cancel) 997{ 998 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 999 struct drbd_peer_device *peer_device = peer_req->peer_device; 1000 struct drbd_device *device = peer_device->device; 1001 int err; 1002 1003 if (unlikely(cancel)) { 1004 drbd_free_peer_req(device, peer_req); 1005 dec_unacked(device); 1006 return 0; 1007 } 1008 1009 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1010 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req); 1011 } else { 1012 if (__ratelimit(&drbd_ratelimit_state)) 1013 drbd_err(device, "Sending NegDReply. sector=%llus.\n", 1014 (unsigned long long)peer_req->i.sector); 1015 1016 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req); 1017 } 1018 1019 dec_unacked(device); 1020 1021 move_to_net_ee_or_free(device, peer_req); 1022 1023 if (unlikely(err)) 1024 drbd_err(device, "drbd_send_block() failed\n"); 1025 return err; 1026} 1027 1028/** 1029 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST 1030 * @w: work object. 1031 * @cancel: The connection will be closed anyways 1032 */ 1033int w_e_end_rsdata_req(struct drbd_work *w, int cancel) 1034{ 1035 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1036 struct drbd_peer_device *peer_device = peer_req->peer_device; 1037 struct drbd_device *device = peer_device->device; 1038 int err; 1039 1040 if (unlikely(cancel)) { 1041 drbd_free_peer_req(device, peer_req); 1042 dec_unacked(device); 1043 return 0; 1044 } 1045 1046 if (get_ldev_if_state(device, D_FAILED)) { 1047 drbd_rs_complete_io(device, peer_req->i.sector); 1048 put_ldev(device); 1049 } 1050 1051 if (device->state.conn == C_AHEAD) { 1052 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req); 1053 } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1054 if (likely(device->state.pdsk >= D_INCONSISTENT)) { 1055 inc_rs_pending(device); 1056 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req); 1057 } else { 1058 if (__ratelimit(&drbd_ratelimit_state)) 1059 drbd_err(device, "Not sending RSDataReply, " 1060 "partner DISKLESS!\n"); 1061 err = 0; 1062 } 1063 } else { 1064 if (__ratelimit(&drbd_ratelimit_state)) 1065 drbd_err(device, "Sending NegRSDReply. sector %llus.\n", 1066 (unsigned long long)peer_req->i.sector); 1067 1068 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req); 1069 1070 /* update resync data with failure */ 1071 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size); 1072 } 1073 1074 dec_unacked(device); 1075 1076 move_to_net_ee_or_free(device, peer_req); 1077 1078 if (unlikely(err)) 1079 drbd_err(device, "drbd_send_block() failed\n"); 1080 return err; 1081} 1082 1083int w_e_end_csum_rs_req(struct drbd_work *w, int cancel) 1084{ 1085 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1086 struct drbd_peer_device *peer_device = peer_req->peer_device; 1087 struct drbd_device *device = peer_device->device; 1088 struct digest_info *di; 1089 int digest_size; 1090 void *digest = NULL; 1091 int err, eq = 0; 1092 1093 if (unlikely(cancel)) { 1094 drbd_free_peer_req(device, peer_req); 1095 dec_unacked(device); 1096 return 0; 1097 } 1098 1099 if (get_ldev(device)) { 1100 drbd_rs_complete_io(device, peer_req->i.sector); 1101 put_ldev(device); 1102 } 1103 1104 di = peer_req->digest; 1105 1106 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1107 /* quick hack to try to avoid a race against reconfiguration. 1108 * a real fix would be much more involved, 1109 * introducing more locking mechanisms */ 1110 if (peer_device->connection->csums_tfm) { 1111 digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm); 1112 D_ASSERT(device, digest_size == di->digest_size); 1113 digest = kmalloc(digest_size, GFP_NOIO); 1114 } 1115 if (digest) { 1116 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest); 1117 eq = !memcmp(digest, di->digest, digest_size); 1118 kfree(digest); 1119 } 1120 1121 if (eq) { 1122 drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size); 1123 /* rs_same_csums unit is BM_BLOCK_SIZE */ 1124 device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT; 1125 err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req); 1126 } else { 1127 inc_rs_pending(device); 1128 peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */ 1129 peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */ 1130 kfree(di); 1131 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req); 1132 } 1133 } else { 1134 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req); 1135 if (__ratelimit(&drbd_ratelimit_state)) 1136 drbd_err(device, "Sending NegDReply. I guess it gets messy.\n"); 1137 } 1138 1139 dec_unacked(device); 1140 move_to_net_ee_or_free(device, peer_req); 1141 1142 if (unlikely(err)) 1143 drbd_err(device, "drbd_send_block/ack() failed\n"); 1144 return err; 1145} 1146 1147int w_e_end_ov_req(struct drbd_work *w, int cancel) 1148{ 1149 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1150 struct drbd_peer_device *peer_device = peer_req->peer_device; 1151 struct drbd_device *device = peer_device->device; 1152 sector_t sector = peer_req->i.sector; 1153 unsigned int size = peer_req->i.size; 1154 int digest_size; 1155 void *digest; 1156 int err = 0; 1157 1158 if (unlikely(cancel)) 1159 goto out; 1160 1161 digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm); 1162 digest = kmalloc(digest_size, GFP_NOIO); 1163 if (!digest) { 1164 err = 1; /* terminate the connection in case the allocation failed */ 1165 goto out; 1166 } 1167 1168 if (likely(!(peer_req->flags & EE_WAS_ERROR))) 1169 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest); 1170 else 1171 memset(digest, 0, digest_size); 1172 1173 /* Free e and pages before send. 1174 * In case we block on congestion, we could otherwise run into 1175 * some distributed deadlock, if the other side blocks on 1176 * congestion as well, because our receiver blocks in 1177 * drbd_alloc_pages due to pp_in_use > max_buffers. */ 1178 drbd_free_peer_req(device, peer_req); 1179 peer_req = NULL; 1180 inc_rs_pending(device); 1181 err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY); 1182 if (err) 1183 dec_rs_pending(device); 1184 kfree(digest); 1185 1186out: 1187 if (peer_req) 1188 drbd_free_peer_req(device, peer_req); 1189 dec_unacked(device); 1190 return err; 1191} 1192 1193void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size) 1194{ 1195 if (device->ov_last_oos_start + device->ov_last_oos_size == sector) { 1196 device->ov_last_oos_size += size>>9; 1197 } else { 1198 device->ov_last_oos_start = sector; 1199 device->ov_last_oos_size = size>>9; 1200 } 1201 drbd_set_out_of_sync(device, sector, size); 1202} 1203 1204int w_e_end_ov_reply(struct drbd_work *w, int cancel) 1205{ 1206 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1207 struct drbd_peer_device *peer_device = peer_req->peer_device; 1208 struct drbd_device *device = peer_device->device; 1209 struct digest_info *di; 1210 void *digest; 1211 sector_t sector = peer_req->i.sector; 1212 unsigned int size = peer_req->i.size; 1213 int digest_size; 1214 int err, eq = 0; 1215 bool stop_sector_reached = false; 1216 1217 if (unlikely(cancel)) { 1218 drbd_free_peer_req(device, peer_req); 1219 dec_unacked(device); 1220 return 0; 1221 } 1222 1223 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all 1224 * the resync lru has been cleaned up already */ 1225 if (get_ldev(device)) { 1226 drbd_rs_complete_io(device, peer_req->i.sector); 1227 put_ldev(device); 1228 } 1229 1230 di = peer_req->digest; 1231 1232 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1233 digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm); 1234 digest = kmalloc(digest_size, GFP_NOIO); 1235 if (digest) { 1236 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest); 1237 1238 D_ASSERT(device, digest_size == di->digest_size); 1239 eq = !memcmp(digest, di->digest, digest_size); 1240 kfree(digest); 1241 } 1242 } 1243 1244 /* Free peer_req and pages before send. 1245 * In case we block on congestion, we could otherwise run into 1246 * some distributed deadlock, if the other side blocks on 1247 * congestion as well, because our receiver blocks in 1248 * drbd_alloc_pages due to pp_in_use > max_buffers. */ 1249 drbd_free_peer_req(device, peer_req); 1250 if (!eq) 1251 drbd_ov_out_of_sync_found(device, sector, size); 1252 else 1253 ov_out_of_sync_print(device); 1254 1255 err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, 1256 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC); 1257 1258 dec_unacked(device); 1259 1260 --device->ov_left; 1261 1262 /* let's advance progress step marks only for every other megabyte */ 1263 if ((device->ov_left & 0x200) == 0x200) 1264 drbd_advance_rs_marks(device, device->ov_left); 1265 1266 stop_sector_reached = verify_can_do_stop_sector(device) && 1267 (sector + (size>>9)) >= device->ov_stop_sector; 1268 1269 if (device->ov_left == 0 || stop_sector_reached) { 1270 ov_out_of_sync_print(device); 1271 drbd_resync_finished(device); 1272 } 1273 1274 return err; 1275} 1276 1277/* FIXME 1278 * We need to track the number of pending barrier acks, 1279 * and to be able to wait for them. 1280 * See also comment in drbd_adm_attach before drbd_suspend_io. 1281 */ 1282static int drbd_send_barrier(struct drbd_connection *connection) 1283{ 1284 struct p_barrier *p; 1285 struct drbd_socket *sock; 1286 1287 sock = &connection->data; 1288 p = conn_prepare_command(connection, sock); 1289 if (!p) 1290 return -EIO; 1291 p->barrier = connection->send.current_epoch_nr; 1292 p->pad = 0; 1293 connection->send.current_epoch_writes = 0; 1294 1295 return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0); 1296} 1297 1298int w_send_write_hint(struct drbd_work *w, int cancel) 1299{ 1300 struct drbd_device *device = 1301 container_of(w, struct drbd_device, unplug_work); 1302 struct drbd_socket *sock; 1303 1304 if (cancel) 1305 return 0; 1306 sock = &first_peer_device(device)->connection->data; 1307 if (!drbd_prepare_command(first_peer_device(device), sock)) 1308 return -EIO; 1309 return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0); 1310} 1311 1312static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch) 1313{ 1314 if (!connection->send.seen_any_write_yet) { 1315 connection->send.seen_any_write_yet = true; 1316 connection->send.current_epoch_nr = epoch; 1317 connection->send.current_epoch_writes = 0; 1318 } 1319} 1320 1321static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch) 1322{ 1323 /* re-init if first write on this connection */ 1324 if (!connection->send.seen_any_write_yet) 1325 return; 1326 if (connection->send.current_epoch_nr != epoch) { 1327 if (connection->send.current_epoch_writes) 1328 drbd_send_barrier(connection); 1329 connection->send.current_epoch_nr = epoch; 1330 } 1331} 1332 1333int w_send_out_of_sync(struct drbd_work *w, int cancel) 1334{ 1335 struct drbd_request *req = container_of(w, struct drbd_request, w); 1336 struct drbd_device *device = req->device; 1337 struct drbd_peer_device *const peer_device = first_peer_device(device); 1338 struct drbd_connection *const connection = peer_device->connection; 1339 int err; 1340 1341 if (unlikely(cancel)) { 1342 req_mod(req, SEND_CANCELED); 1343 return 0; 1344 } 1345 req->pre_send_jif = jiffies; 1346 1347 /* this time, no connection->send.current_epoch_writes++; 1348 * If it was sent, it was the closing barrier for the last 1349 * replicated epoch, before we went into AHEAD mode. 1350 * No more barriers will be sent, until we leave AHEAD mode again. */ 1351 maybe_send_barrier(connection, req->epoch); 1352 1353 err = drbd_send_out_of_sync(peer_device, req); 1354 req_mod(req, OOS_HANDED_TO_NETWORK); 1355 1356 return err; 1357} 1358 1359/** 1360 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request 1361 * @w: work object. 1362 * @cancel: The connection will be closed anyways 1363 */ 1364int w_send_dblock(struct drbd_work *w, int cancel) 1365{ 1366 struct drbd_request *req = container_of(w, struct drbd_request, w); 1367 struct drbd_device *device = req->device; 1368 struct drbd_peer_device *const peer_device = first_peer_device(device); 1369 struct drbd_connection *connection = peer_device->connection; 1370 int err; 1371 1372 if (unlikely(cancel)) { 1373 req_mod(req, SEND_CANCELED); 1374 return 0; 1375 } 1376 req->pre_send_jif = jiffies; 1377 1378 re_init_if_first_write(connection, req->epoch); 1379 maybe_send_barrier(connection, req->epoch); 1380 connection->send.current_epoch_writes++; 1381 1382 err = drbd_send_dblock(peer_device, req); 1383 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK); 1384 1385 return err; 1386} 1387 1388/** 1389 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet 1390 * @w: work object. 1391 * @cancel: The connection will be closed anyways 1392 */ 1393int w_send_read_req(struct drbd_work *w, int cancel) 1394{ 1395 struct drbd_request *req = container_of(w, struct drbd_request, w); 1396 struct drbd_device *device = req->device; 1397 struct drbd_peer_device *const peer_device = first_peer_device(device); 1398 struct drbd_connection *connection = peer_device->connection; 1399 int err; 1400 1401 if (unlikely(cancel)) { 1402 req_mod(req, SEND_CANCELED); 1403 return 0; 1404 } 1405 req->pre_send_jif = jiffies; 1406 1407 /* Even read requests may close a write epoch, 1408 * if there was any yet. */ 1409 maybe_send_barrier(connection, req->epoch); 1410 1411 err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size, 1412 (unsigned long)req); 1413 1414 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK); 1415 1416 return err; 1417} 1418 1419int w_restart_disk_io(struct drbd_work *w, int cancel) 1420{ 1421 struct drbd_request *req = container_of(w, struct drbd_request, w); 1422 struct drbd_device *device = req->device; 1423 1424 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG) 1425 drbd_al_begin_io(device, &req->i); 1426 1427 drbd_req_make_private_bio(req, req->master_bio); 1428 req->private_bio->bi_bdev = device->ldev->backing_bdev; 1429 generic_make_request(req->private_bio); 1430 1431 return 0; 1432} 1433 1434static int _drbd_may_sync_now(struct drbd_device *device) 1435{ 1436 struct drbd_device *odev = device; 1437 int resync_after; 1438 1439 while (1) { 1440 if (!odev->ldev || odev->state.disk == D_DISKLESS) 1441 return 1; 1442 rcu_read_lock(); 1443 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after; 1444 rcu_read_unlock(); 1445 if (resync_after == -1) 1446 return 1; 1447 odev = minor_to_device(resync_after); 1448 if (!odev) 1449 return 1; 1450 if ((odev->state.conn >= C_SYNC_SOURCE && 1451 odev->state.conn <= C_PAUSED_SYNC_T) || 1452 odev->state.aftr_isp || odev->state.peer_isp || 1453 odev->state.user_isp) 1454 return 0; 1455 } 1456} 1457 1458/** 1459 * _drbd_pause_after() - Pause resync on all devices that may not resync now 1460 * @device: DRBD device. 1461 * 1462 * Called from process context only (admin command and after_state_ch). 1463 */ 1464static int _drbd_pause_after(struct drbd_device *device) 1465{ 1466 struct drbd_device *odev; 1467 int i, rv = 0; 1468 1469 rcu_read_lock(); 1470 idr_for_each_entry(&drbd_devices, odev, i) { 1471 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1472 continue; 1473 if (!_drbd_may_sync_now(odev)) 1474 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL) 1475 != SS_NOTHING_TO_DO); 1476 } 1477 rcu_read_unlock(); 1478 1479 return rv; 1480} 1481 1482/** 1483 * _drbd_resume_next() - Resume resync on all devices that may resync now 1484 * @device: DRBD device. 1485 * 1486 * Called from process context only (admin command and worker). 1487 */ 1488static int _drbd_resume_next(struct drbd_device *device) 1489{ 1490 struct drbd_device *odev; 1491 int i, rv = 0; 1492 1493 rcu_read_lock(); 1494 idr_for_each_entry(&drbd_devices, odev, i) { 1495 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1496 continue; 1497 if (odev->state.aftr_isp) { 1498 if (_drbd_may_sync_now(odev)) 1499 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0), 1500 CS_HARD, NULL) 1501 != SS_NOTHING_TO_DO) ; 1502 } 1503 } 1504 rcu_read_unlock(); 1505 return rv; 1506} 1507 1508void resume_next_sg(struct drbd_device *device) 1509{ 1510 write_lock_irq(&global_state_lock); 1511 _drbd_resume_next(device); 1512 write_unlock_irq(&global_state_lock); 1513} 1514 1515void suspend_other_sg(struct drbd_device *device) 1516{ 1517 write_lock_irq(&global_state_lock); 1518 _drbd_pause_after(device); 1519 write_unlock_irq(&global_state_lock); 1520} 1521 1522/* caller must hold global_state_lock */ 1523enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor) 1524{ 1525 struct drbd_device *odev; 1526 int resync_after; 1527 1528 if (o_minor == -1) 1529 return NO_ERROR; 1530 if (o_minor < -1 || o_minor > MINORMASK) 1531 return ERR_RESYNC_AFTER; 1532 1533 /* check for loops */ 1534 odev = minor_to_device(o_minor); 1535 while (1) { 1536 if (odev == device) 1537 return ERR_RESYNC_AFTER_CYCLE; 1538 1539 /* You are free to depend on diskless, non-existing, 1540 * or not yet/no longer existing minors. 1541 * We only reject dependency loops. 1542 * We cannot follow the dependency chain beyond a detached or 1543 * missing minor. 1544 */ 1545 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS) 1546 return NO_ERROR; 1547 1548 rcu_read_lock(); 1549 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after; 1550 rcu_read_unlock(); 1551 /* dependency chain ends here, no cycles. */ 1552 if (resync_after == -1) 1553 return NO_ERROR; 1554 1555 /* follow the dependency chain */ 1556 odev = minor_to_device(resync_after); 1557 } 1558} 1559 1560/* caller must hold global_state_lock */ 1561void drbd_resync_after_changed(struct drbd_device *device) 1562{ 1563 int changes; 1564 1565 do { 1566 changes = _drbd_pause_after(device); 1567 changes |= _drbd_resume_next(device); 1568 } while (changes); 1569} 1570 1571void drbd_rs_controller_reset(struct drbd_device *device) 1572{ 1573 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk; 1574 struct fifo_buffer *plan; 1575 1576 atomic_set(&device->rs_sect_in, 0); 1577 atomic_set(&device->rs_sect_ev, 0); 1578 device->rs_in_flight = 0; 1579 device->rs_last_events = 1580 (int)part_stat_read(&disk->part0, sectors[0]) + 1581 (int)part_stat_read(&disk->part0, sectors[1]); 1582 1583 /* Updating the RCU protected object in place is necessary since 1584 this function gets called from atomic context. 1585 It is valid since all other updates also lead to an completely 1586 empty fifo */ 1587 rcu_read_lock(); 1588 plan = rcu_dereference(device->rs_plan_s); 1589 plan->total = 0; 1590 fifo_set(plan, 0); 1591 rcu_read_unlock(); 1592} 1593 1594void start_resync_timer_fn(unsigned long data) 1595{ 1596 struct drbd_device *device = (struct drbd_device *) data; 1597 drbd_device_post_work(device, RS_START); 1598} 1599 1600static void do_start_resync(struct drbd_device *device) 1601{ 1602 if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) { 1603 drbd_warn(device, "postponing start_resync ...\n"); 1604 device->start_resync_timer.expires = jiffies + HZ/10; 1605 add_timer(&device->start_resync_timer); 1606 return; 1607 } 1608 1609 drbd_start_resync(device, C_SYNC_SOURCE); 1610 clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags); 1611} 1612 1613static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device) 1614{ 1615 bool csums_after_crash_only; 1616 rcu_read_lock(); 1617 csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only; 1618 rcu_read_unlock(); 1619 return connection->agreed_pro_version >= 89 && /* supported? */ 1620 connection->csums_tfm && /* configured? */ 1621 (csums_after_crash_only == 0 /* use for each resync? */ 1622 || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */ 1623} 1624 1625/** 1626 * drbd_start_resync() - Start the resync process 1627 * @device: DRBD device. 1628 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET 1629 * 1630 * This function might bring you directly into one of the 1631 * C_PAUSED_SYNC_* states. 1632 */ 1633void drbd_start_resync(struct drbd_device *device, enum drbd_conns side) 1634{ 1635 struct drbd_peer_device *peer_device = first_peer_device(device); 1636 struct drbd_connection *connection = peer_device ? peer_device->connection : NULL; 1637 union drbd_state ns; 1638 int r; 1639 1640 if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) { 1641 drbd_err(device, "Resync already running!\n"); 1642 return; 1643 } 1644 1645 if (!test_bit(B_RS_H_DONE, &device->flags)) { 1646 if (side == C_SYNC_TARGET) { 1647 /* Since application IO was locked out during C_WF_BITMAP_T and 1648 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET 1649 we check that we might make the data inconsistent. */ 1650 r = drbd_khelper(device, "before-resync-target"); 1651 r = (r >> 8) & 0xff; 1652 if (r > 0) { 1653 drbd_info(device, "before-resync-target handler returned %d, " 1654 "dropping connection.\n", r); 1655 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 1656 return; 1657 } 1658 } else /* C_SYNC_SOURCE */ { 1659 r = drbd_khelper(device, "before-resync-source"); 1660 r = (r >> 8) & 0xff; 1661 if (r > 0) { 1662 if (r == 3) { 1663 drbd_info(device, "before-resync-source handler returned %d, " 1664 "ignoring. Old userland tools?", r); 1665 } else { 1666 drbd_info(device, "before-resync-source handler returned %d, " 1667 "dropping connection.\n", r); 1668 conn_request_state(connection, 1669 NS(conn, C_DISCONNECTING), CS_HARD); 1670 return; 1671 } 1672 } 1673 } 1674 } 1675 1676 if (current == connection->worker.task) { 1677 /* The worker should not sleep waiting for state_mutex, 1678 that can take long */ 1679 if (!mutex_trylock(device->state_mutex)) { 1680 set_bit(B_RS_H_DONE, &device->flags); 1681 device->start_resync_timer.expires = jiffies + HZ/5; 1682 add_timer(&device->start_resync_timer); 1683 return; 1684 } 1685 } else { 1686 mutex_lock(device->state_mutex); 1687 } 1688 clear_bit(B_RS_H_DONE, &device->flags); 1689 1690 /* req_lock: serialize with drbd_send_and_submit() and others 1691 * global_state_lock: for stable sync-after dependencies */ 1692 spin_lock_irq(&device->resource->req_lock); 1693 write_lock(&global_state_lock); 1694 /* Did some connection breakage or IO error race with us? */ 1695 if (device->state.conn < C_CONNECTED 1696 || !get_ldev_if_state(device, D_NEGOTIATING)) { 1697 write_unlock(&global_state_lock); 1698 spin_unlock_irq(&device->resource->req_lock); 1699 mutex_unlock(device->state_mutex); 1700 return; 1701 } 1702 1703 ns = drbd_read_state(device); 1704 1705 ns.aftr_isp = !_drbd_may_sync_now(device); 1706 1707 ns.conn = side; 1708 1709 if (side == C_SYNC_TARGET) 1710 ns.disk = D_INCONSISTENT; 1711 else /* side == C_SYNC_SOURCE */ 1712 ns.pdsk = D_INCONSISTENT; 1713 1714 r = __drbd_set_state(device, ns, CS_VERBOSE, NULL); 1715 ns = drbd_read_state(device); 1716 1717 if (ns.conn < C_CONNECTED) 1718 r = SS_UNKNOWN_ERROR; 1719 1720 if (r == SS_SUCCESS) { 1721 unsigned long tw = drbd_bm_total_weight(device); 1722 unsigned long now = jiffies; 1723 int i; 1724 1725 device->rs_failed = 0; 1726 device->rs_paused = 0; 1727 device->rs_same_csum = 0; 1728 device->rs_last_sect_ev = 0; 1729 device->rs_total = tw; 1730 device->rs_start = now; 1731 for (i = 0; i < DRBD_SYNC_MARKS; i++) { 1732 device->rs_mark_left[i] = tw; 1733 device->rs_mark_time[i] = now; 1734 } 1735 _drbd_pause_after(device); 1736 /* Forget potentially stale cached per resync extent bit-counts. 1737 * Open coded drbd_rs_cancel_all(device), we already have IRQs 1738 * disabled, and know the disk state is ok. */ 1739 spin_lock(&device->al_lock); 1740 lc_reset(device->resync); 1741 device->resync_locked = 0; 1742 device->resync_wenr = LC_FREE; 1743 spin_unlock(&device->al_lock); 1744 } 1745 write_unlock(&global_state_lock); 1746 spin_unlock_irq(&device->resource->req_lock); 1747 1748 if (r == SS_SUCCESS) { 1749 wake_up(&device->al_wait); /* for lc_reset() above */ 1750 /* reset rs_last_bcast when a resync or verify is started, 1751 * to deal with potential jiffies wrap. */ 1752 device->rs_last_bcast = jiffies - HZ; 1753 1754 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n", 1755 drbd_conn_str(ns.conn), 1756 (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10), 1757 (unsigned long) device->rs_total); 1758 if (side == C_SYNC_TARGET) { 1759 device->bm_resync_fo = 0; 1760 device->use_csums = use_checksum_based_resync(connection, device); 1761 } else { 1762 device->use_csums = 0; 1763 } 1764 1765 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid 1766 * with w_send_oos, or the sync target will get confused as to 1767 * how much bits to resync. We cannot do that always, because for an 1768 * empty resync and protocol < 95, we need to do it here, as we call 1769 * drbd_resync_finished from here in that case. 1770 * We drbd_gen_and_send_sync_uuid here for protocol < 96, 1771 * and from after_state_ch otherwise. */ 1772 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96) 1773 drbd_gen_and_send_sync_uuid(peer_device); 1774 1775 if (connection->agreed_pro_version < 95 && device->rs_total == 0) { 1776 /* This still has a race (about when exactly the peers 1777 * detect connection loss) that can lead to a full sync 1778 * on next handshake. In 8.3.9 we fixed this with explicit 1779 * resync-finished notifications, but the fix 1780 * introduces a protocol change. Sleeping for some 1781 * time longer than the ping interval + timeout on the 1782 * SyncSource, to give the SyncTarget the chance to 1783 * detect connection loss, then waiting for a ping 1784 * response (implicit in drbd_resync_finished) reduces 1785 * the race considerably, but does not solve it. */ 1786 if (side == C_SYNC_SOURCE) { 1787 struct net_conf *nc; 1788 int timeo; 1789 1790 rcu_read_lock(); 1791 nc = rcu_dereference(connection->net_conf); 1792 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9; 1793 rcu_read_unlock(); 1794 schedule_timeout_interruptible(timeo); 1795 } 1796 drbd_resync_finished(device); 1797 } 1798 1799 drbd_rs_controller_reset(device); 1800 /* ns.conn may already be != device->state.conn, 1801 * we may have been paused in between, or become paused until 1802 * the timer triggers. 1803 * No matter, that is handled in resync_timer_fn() */ 1804 if (ns.conn == C_SYNC_TARGET) 1805 mod_timer(&device->resync_timer, jiffies); 1806 1807 drbd_md_sync(device); 1808 } 1809 put_ldev(device); 1810 mutex_unlock(device->state_mutex); 1811} 1812 1813static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done) 1814{ 1815 struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, }; 1816 device->rs_last_bcast = jiffies; 1817 1818 if (!get_ldev(device)) 1819 return; 1820 1821 drbd_bm_write_lazy(device, 0); 1822 if (resync_done && is_sync_state(device->state.conn)) 1823 drbd_resync_finished(device); 1824 1825 drbd_bcast_event(device, &sib); 1826 /* update timestamp, in case it took a while to write out stuff */ 1827 device->rs_last_bcast = jiffies; 1828 put_ldev(device); 1829} 1830 1831static void drbd_ldev_destroy(struct drbd_device *device) 1832{ 1833 lc_destroy(device->resync); 1834 device->resync = NULL; 1835 lc_destroy(device->act_log); 1836 device->act_log = NULL; 1837 1838 __acquire(local); 1839 drbd_free_ldev(device->ldev); 1840 device->ldev = NULL; 1841 __release(local); 1842 1843 clear_bit(GOING_DISKLESS, &device->flags); 1844 wake_up(&device->misc_wait); 1845} 1846 1847static void go_diskless(struct drbd_device *device) 1848{ 1849 D_ASSERT(device, device->state.disk == D_FAILED); 1850 /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will 1851 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch 1852 * the protected members anymore, though, so once put_ldev reaches zero 1853 * again, it will be safe to free them. */ 1854 1855 /* Try to write changed bitmap pages, read errors may have just 1856 * set some bits outside the area covered by the activity log. 1857 * 1858 * If we have an IO error during the bitmap writeout, 1859 * we will want a full sync next time, just in case. 1860 * (Do we want a specific meta data flag for this?) 1861 * 1862 * If that does not make it to stable storage either, 1863 * we cannot do anything about that anymore. 1864 * 1865 * We still need to check if both bitmap and ldev are present, we may 1866 * end up here after a failed attach, before ldev was even assigned. 1867 */ 1868 if (device->bitmap && device->ldev) { 1869 /* An interrupted resync or similar is allowed to recounts bits 1870 * while we detach. 1871 * Any modifications would not be expected anymore, though. 1872 */ 1873 if (drbd_bitmap_io_from_worker(device, drbd_bm_write, 1874 "detach", BM_LOCKED_TEST_ALLOWED)) { 1875 if (test_bit(WAS_READ_ERROR, &device->flags)) { 1876 drbd_md_set_flag(device, MDF_FULL_SYNC); 1877 drbd_md_sync(device); 1878 } 1879 } 1880 } 1881 1882 drbd_force_state(device, NS(disk, D_DISKLESS)); 1883} 1884 1885static int do_md_sync(struct drbd_device *device) 1886{ 1887 drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n"); 1888 drbd_md_sync(device); 1889 return 0; 1890} 1891 1892/* only called from drbd_worker thread, no locking */ 1893void __update_timing_details( 1894 struct drbd_thread_timing_details *tdp, 1895 unsigned int *cb_nr, 1896 void *cb, 1897 const char *fn, const unsigned int line) 1898{ 1899 unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST; 1900 struct drbd_thread_timing_details *td = tdp + i; 1901 1902 td->start_jif = jiffies; 1903 td->cb_addr = cb; 1904 td->caller_fn = fn; 1905 td->line = line; 1906 td->cb_nr = *cb_nr; 1907 1908 i = (i+1) % DRBD_THREAD_DETAILS_HIST; 1909 td = tdp + i; 1910 memset(td, 0, sizeof(*td)); 1911 1912 ++(*cb_nr); 1913} 1914 1915static void do_device_work(struct drbd_device *device, const unsigned long todo) 1916{ 1917 if (test_bit(MD_SYNC, &todo)) 1918 do_md_sync(device); 1919 if (test_bit(RS_DONE, &todo) || 1920 test_bit(RS_PROGRESS, &todo)) 1921 update_on_disk_bitmap(device, test_bit(RS_DONE, &todo)); 1922 if (test_bit(GO_DISKLESS, &todo)) 1923 go_diskless(device); 1924 if (test_bit(DESTROY_DISK, &todo)) 1925 drbd_ldev_destroy(device); 1926 if (test_bit(RS_START, &todo)) 1927 do_start_resync(device); 1928} 1929 1930#define DRBD_DEVICE_WORK_MASK \ 1931 ((1UL << GO_DISKLESS) \ 1932 |(1UL << DESTROY_DISK) \ 1933 |(1UL << MD_SYNC) \ 1934 |(1UL << RS_START) \ 1935 |(1UL << RS_PROGRESS) \ 1936 |(1UL << RS_DONE) \ 1937 ) 1938 1939static unsigned long get_work_bits(unsigned long *flags) 1940{ 1941 unsigned long old, new; 1942 do { 1943 old = *flags; 1944 new = old & ~DRBD_DEVICE_WORK_MASK; 1945 } while (cmpxchg(flags, old, new) != old); 1946 return old & DRBD_DEVICE_WORK_MASK; 1947} 1948 1949static void do_unqueued_work(struct drbd_connection *connection) 1950{ 1951 struct drbd_peer_device *peer_device; 1952 int vnr; 1953 1954 rcu_read_lock(); 1955 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 1956 struct drbd_device *device = peer_device->device; 1957 unsigned long todo = get_work_bits(&device->flags); 1958 if (!todo) 1959 continue; 1960 1961 kref_get(&device->kref); 1962 rcu_read_unlock(); 1963 do_device_work(device, todo); 1964 kref_put(&device->kref, drbd_destroy_device); 1965 rcu_read_lock(); 1966 } 1967 rcu_read_unlock(); 1968} 1969 1970static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list) 1971{ 1972 spin_lock_irq(&queue->q_lock); 1973 list_splice_tail_init(&queue->q, work_list); 1974 spin_unlock_irq(&queue->q_lock); 1975 return !list_empty(work_list); 1976} 1977 1978static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list) 1979{ 1980 DEFINE_WAIT(wait); 1981 struct net_conf *nc; 1982 int uncork, cork; 1983 1984 dequeue_work_batch(&connection->sender_work, work_list); 1985 if (!list_empty(work_list)) 1986 return; 1987 1988 /* Still nothing to do? 1989 * Maybe we still need to close the current epoch, 1990 * even if no new requests are queued yet. 1991 * 1992 * Also, poke TCP, just in case. 1993 * Then wait for new work (or signal). */ 1994 rcu_read_lock(); 1995 nc = rcu_dereference(connection->net_conf); 1996 uncork = nc ? nc->tcp_cork : 0; 1997 rcu_read_unlock(); 1998 if (uncork) { 1999 mutex_lock(&connection->data.mutex); 2000 if (connection->data.socket) 2001 drbd_tcp_uncork(connection->data.socket); 2002 mutex_unlock(&connection->data.mutex); 2003 } 2004 2005 for (;;) { 2006 int send_barrier; 2007 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE); 2008 spin_lock_irq(&connection->resource->req_lock); 2009 spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */ 2010 if (!list_empty(&connection->sender_work.q)) 2011 list_splice_tail_init(&connection->sender_work.q, work_list); 2012 spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */ 2013 if (!list_empty(work_list) || signal_pending(current)) { 2014 spin_unlock_irq(&connection->resource->req_lock); 2015 break; 2016 } 2017 2018 /* We found nothing new to do, no to-be-communicated request, 2019 * no other work item. We may still need to close the last 2020 * epoch. Next incoming request epoch will be connection -> 2021 * current transfer log epoch number. If that is different 2022 * from the epoch of the last request we communicated, it is 2023 * safe to send the epoch separating barrier now. 2024 */ 2025 send_barrier = 2026 atomic_read(&connection->current_tle_nr) != 2027 connection->send.current_epoch_nr; 2028 spin_unlock_irq(&connection->resource->req_lock); 2029 2030 if (send_barrier) 2031 maybe_send_barrier(connection, 2032 connection->send.current_epoch_nr + 1); 2033 2034 if (test_bit(DEVICE_WORK_PENDING, &connection->flags)) 2035 break; 2036 2037 /* drbd_send() may have called flush_signals() */ 2038 if (get_t_state(&connection->worker) != RUNNING) 2039 break; 2040 2041 schedule(); 2042 /* may be woken up for other things but new work, too, 2043 * e.g. if the current epoch got closed. 2044 * In which case we send the barrier above. */ 2045 } 2046 finish_wait(&connection->sender_work.q_wait, &wait); 2047 2048 /* someone may have changed the config while we have been waiting above. */ 2049 rcu_read_lock(); 2050 nc = rcu_dereference(connection->net_conf); 2051 cork = nc ? nc->tcp_cork : 0; 2052 rcu_read_unlock(); 2053 mutex_lock(&connection->data.mutex); 2054 if (connection->data.socket) { 2055 if (cork) 2056 drbd_tcp_cork(connection->data.socket); 2057 else if (!uncork) 2058 drbd_tcp_uncork(connection->data.socket); 2059 } 2060 mutex_unlock(&connection->data.mutex); 2061} 2062 2063int drbd_worker(struct drbd_thread *thi) 2064{ 2065 struct drbd_connection *connection = thi->connection; 2066 struct drbd_work *w = NULL; 2067 struct drbd_peer_device *peer_device; 2068 LIST_HEAD(work_list); 2069 int vnr; 2070 2071 while (get_t_state(thi) == RUNNING) { 2072 drbd_thread_current_set_cpu(thi); 2073 2074 if (list_empty(&work_list)) { 2075 update_worker_timing_details(connection, wait_for_work); 2076 wait_for_work(connection, &work_list); 2077 } 2078 2079 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) { 2080 update_worker_timing_details(connection, do_unqueued_work); 2081 do_unqueued_work(connection); 2082 } 2083 2084 if (signal_pending(current)) { 2085 flush_signals(current); 2086 if (get_t_state(thi) == RUNNING) { 2087 drbd_warn(connection, "Worker got an unexpected signal\n"); 2088 continue; 2089 } 2090 break; 2091 } 2092 2093 if (get_t_state(thi) != RUNNING) 2094 break; 2095 2096 if (!list_empty(&work_list)) { 2097 w = list_first_entry(&work_list, struct drbd_work, list); 2098 list_del_init(&w->list); 2099 update_worker_timing_details(connection, w->cb); 2100 if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0) 2101 continue; 2102 if (connection->cstate >= C_WF_REPORT_PARAMS) 2103 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD); 2104 } 2105 } 2106 2107 do { 2108 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) { 2109 update_worker_timing_details(connection, do_unqueued_work); 2110 do_unqueued_work(connection); 2111 } 2112 if (!list_empty(&work_list)) { 2113 w = list_first_entry(&work_list, struct drbd_work, list); 2114 list_del_init(&w->list); 2115 update_worker_timing_details(connection, w->cb); 2116 w->cb(w, 1); 2117 } else 2118 dequeue_work_batch(&connection->sender_work, &work_list); 2119 } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags)); 2120 2121 rcu_read_lock(); 2122 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 2123 struct drbd_device *device = peer_device->device; 2124 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE); 2125 kref_get(&device->kref); 2126 rcu_read_unlock(); 2127 drbd_device_cleanup(device); 2128 kref_put(&device->kref, drbd_destroy_device); 2129 rcu_read_lock(); 2130 } 2131 rcu_read_unlock(); 2132 2133 return 0; 2134} 2135