1/* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * journal.c 5 * 6 * Defines functions of journalling api 7 * 8 * Copyright (C) 2003, 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 */ 25 26#include <linux/fs.h> 27#include <linux/types.h> 28#include <linux/slab.h> 29#include <linux/highmem.h> 30#include <linux/kthread.h> 31#include <linux/time.h> 32#include <linux/random.h> 33#include <linux/delay.h> 34 35#include <cluster/masklog.h> 36 37#include "ocfs2.h" 38 39#include "alloc.h" 40#include "blockcheck.h" 41#include "dir.h" 42#include "dlmglue.h" 43#include "extent_map.h" 44#include "heartbeat.h" 45#include "inode.h" 46#include "journal.h" 47#include "localalloc.h" 48#include "slot_map.h" 49#include "super.h" 50#include "sysfile.h" 51#include "uptodate.h" 52#include "quota.h" 53#include "file.h" 54#include "namei.h" 55 56#include "buffer_head_io.h" 57#include "ocfs2_trace.h" 58 59DEFINE_SPINLOCK(trans_inc_lock); 60 61#define ORPHAN_SCAN_SCHEDULE_TIMEOUT 300000 62 63static int ocfs2_force_read_journal(struct inode *inode); 64static int ocfs2_recover_node(struct ocfs2_super *osb, 65 int node_num, int slot_num); 66static int __ocfs2_recovery_thread(void *arg); 67static int ocfs2_commit_cache(struct ocfs2_super *osb); 68static int __ocfs2_wait_on_mount(struct ocfs2_super *osb, int quota); 69static int ocfs2_journal_toggle_dirty(struct ocfs2_super *osb, 70 int dirty, int replayed); 71static int ocfs2_trylock_journal(struct ocfs2_super *osb, 72 int slot_num); 73static int ocfs2_recover_orphans(struct ocfs2_super *osb, 74 int slot, 75 enum ocfs2_orphan_reco_type orphan_reco_type); 76static int ocfs2_commit_thread(void *arg); 77static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal, 78 int slot_num, 79 struct ocfs2_dinode *la_dinode, 80 struct ocfs2_dinode *tl_dinode, 81 struct ocfs2_quota_recovery *qrec, 82 enum ocfs2_orphan_reco_type orphan_reco_type); 83 84static inline int ocfs2_wait_on_mount(struct ocfs2_super *osb) 85{ 86 return __ocfs2_wait_on_mount(osb, 0); 87} 88 89static inline int ocfs2_wait_on_quotas(struct ocfs2_super *osb) 90{ 91 return __ocfs2_wait_on_mount(osb, 1); 92} 93 94/* 95 * This replay_map is to track online/offline slots, so we could recover 96 * offline slots during recovery and mount 97 */ 98 99enum ocfs2_replay_state { 100 REPLAY_UNNEEDED = 0, /* Replay is not needed, so ignore this map */ 101 REPLAY_NEEDED, /* Replay slots marked in rm_replay_slots */ 102 REPLAY_DONE /* Replay was already queued */ 103}; 104 105struct ocfs2_replay_map { 106 unsigned int rm_slots; 107 enum ocfs2_replay_state rm_state; 108 unsigned char rm_replay_slots[0]; 109}; 110 111void ocfs2_replay_map_set_state(struct ocfs2_super *osb, int state) 112{ 113 if (!osb->replay_map) 114 return; 115 116 /* If we've already queued the replay, we don't have any more to do */ 117 if (osb->replay_map->rm_state == REPLAY_DONE) 118 return; 119 120 osb->replay_map->rm_state = state; 121} 122 123int ocfs2_compute_replay_slots(struct ocfs2_super *osb) 124{ 125 struct ocfs2_replay_map *replay_map; 126 int i, node_num; 127 128 /* If replay map is already set, we don't do it again */ 129 if (osb->replay_map) 130 return 0; 131 132 replay_map = kzalloc(sizeof(struct ocfs2_replay_map) + 133 (osb->max_slots * sizeof(char)), GFP_KERNEL); 134 135 if (!replay_map) { 136 mlog_errno(-ENOMEM); 137 return -ENOMEM; 138 } 139 140 spin_lock(&osb->osb_lock); 141 142 replay_map->rm_slots = osb->max_slots; 143 replay_map->rm_state = REPLAY_UNNEEDED; 144 145 /* set rm_replay_slots for offline slot(s) */ 146 for (i = 0; i < replay_map->rm_slots; i++) { 147 if (ocfs2_slot_to_node_num_locked(osb, i, &node_num) == -ENOENT) 148 replay_map->rm_replay_slots[i] = 1; 149 } 150 151 osb->replay_map = replay_map; 152 spin_unlock(&osb->osb_lock); 153 return 0; 154} 155 156void ocfs2_queue_replay_slots(struct ocfs2_super *osb, 157 enum ocfs2_orphan_reco_type orphan_reco_type) 158{ 159 struct ocfs2_replay_map *replay_map = osb->replay_map; 160 int i; 161 162 if (!replay_map) 163 return; 164 165 if (replay_map->rm_state != REPLAY_NEEDED) 166 return; 167 168 for (i = 0; i < replay_map->rm_slots; i++) 169 if (replay_map->rm_replay_slots[i]) 170 ocfs2_queue_recovery_completion(osb->journal, i, NULL, 171 NULL, NULL, 172 orphan_reco_type); 173 replay_map->rm_state = REPLAY_DONE; 174} 175 176void ocfs2_free_replay_slots(struct ocfs2_super *osb) 177{ 178 struct ocfs2_replay_map *replay_map = osb->replay_map; 179 180 if (!osb->replay_map) 181 return; 182 183 kfree(replay_map); 184 osb->replay_map = NULL; 185} 186 187int ocfs2_recovery_init(struct ocfs2_super *osb) 188{ 189 struct ocfs2_recovery_map *rm; 190 191 mutex_init(&osb->recovery_lock); 192 osb->disable_recovery = 0; 193 osb->recovery_thread_task = NULL; 194 init_waitqueue_head(&osb->recovery_event); 195 196 rm = kzalloc(sizeof(struct ocfs2_recovery_map) + 197 osb->max_slots * sizeof(unsigned int), 198 GFP_KERNEL); 199 if (!rm) { 200 mlog_errno(-ENOMEM); 201 return -ENOMEM; 202 } 203 204 rm->rm_entries = (unsigned int *)((char *)rm + 205 sizeof(struct ocfs2_recovery_map)); 206 osb->recovery_map = rm; 207 208 return 0; 209} 210 211/* we can't grab the goofy sem lock from inside wait_event, so we use 212 * memory barriers to make sure that we'll see the null task before 213 * being woken up */ 214static int ocfs2_recovery_thread_running(struct ocfs2_super *osb) 215{ 216 mb(); 217 return osb->recovery_thread_task != NULL; 218} 219 220void ocfs2_recovery_exit(struct ocfs2_super *osb) 221{ 222 struct ocfs2_recovery_map *rm; 223 224 /* disable any new recovery threads and wait for any currently 225 * running ones to exit. Do this before setting the vol_state. */ 226 mutex_lock(&osb->recovery_lock); 227 osb->disable_recovery = 1; 228 mutex_unlock(&osb->recovery_lock); 229 wait_event(osb->recovery_event, !ocfs2_recovery_thread_running(osb)); 230 231 /* At this point, we know that no more recovery threads can be 232 * launched, so wait for any recovery completion work to 233 * complete. */ 234 flush_workqueue(ocfs2_wq); 235 236 /* 237 * Now that recovery is shut down, and the osb is about to be 238 * freed, the osb_lock is not taken here. 239 */ 240 rm = osb->recovery_map; 241 /* XXX: Should we bug if there are dirty entries? */ 242 243 kfree(rm); 244} 245 246static int __ocfs2_recovery_map_test(struct ocfs2_super *osb, 247 unsigned int node_num) 248{ 249 int i; 250 struct ocfs2_recovery_map *rm = osb->recovery_map; 251 252 assert_spin_locked(&osb->osb_lock); 253 254 for (i = 0; i < rm->rm_used; i++) { 255 if (rm->rm_entries[i] == node_num) 256 return 1; 257 } 258 259 return 0; 260} 261 262/* Behaves like test-and-set. Returns the previous value */ 263static int ocfs2_recovery_map_set(struct ocfs2_super *osb, 264 unsigned int node_num) 265{ 266 struct ocfs2_recovery_map *rm = osb->recovery_map; 267 268 spin_lock(&osb->osb_lock); 269 if (__ocfs2_recovery_map_test(osb, node_num)) { 270 spin_unlock(&osb->osb_lock); 271 return 1; 272 } 273 274 /* XXX: Can this be exploited? Not from o2dlm... */ 275 BUG_ON(rm->rm_used >= osb->max_slots); 276 277 rm->rm_entries[rm->rm_used] = node_num; 278 rm->rm_used++; 279 spin_unlock(&osb->osb_lock); 280 281 return 0; 282} 283 284static void ocfs2_recovery_map_clear(struct ocfs2_super *osb, 285 unsigned int node_num) 286{ 287 int i; 288 struct ocfs2_recovery_map *rm = osb->recovery_map; 289 290 spin_lock(&osb->osb_lock); 291 292 for (i = 0; i < rm->rm_used; i++) { 293 if (rm->rm_entries[i] == node_num) 294 break; 295 } 296 297 if (i < rm->rm_used) { 298 /* XXX: be careful with the pointer math */ 299 memmove(&(rm->rm_entries[i]), &(rm->rm_entries[i + 1]), 300 (rm->rm_used - i - 1) * sizeof(unsigned int)); 301 rm->rm_used--; 302 } 303 304 spin_unlock(&osb->osb_lock); 305} 306 307static int ocfs2_commit_cache(struct ocfs2_super *osb) 308{ 309 int status = 0; 310 unsigned int flushed; 311 struct ocfs2_journal *journal = NULL; 312 313 journal = osb->journal; 314 315 /* Flush all pending commits and checkpoint the journal. */ 316 down_write(&journal->j_trans_barrier); 317 318 flushed = atomic_read(&journal->j_num_trans); 319 trace_ocfs2_commit_cache_begin(flushed); 320 if (flushed == 0) { 321 up_write(&journal->j_trans_barrier); 322 goto finally; 323 } 324 325 jbd2_journal_lock_updates(journal->j_journal); 326 status = jbd2_journal_flush(journal->j_journal); 327 jbd2_journal_unlock_updates(journal->j_journal); 328 if (status < 0) { 329 up_write(&journal->j_trans_barrier); 330 mlog_errno(status); 331 goto finally; 332 } 333 334 ocfs2_inc_trans_id(journal); 335 336 flushed = atomic_read(&journal->j_num_trans); 337 atomic_set(&journal->j_num_trans, 0); 338 up_write(&journal->j_trans_barrier); 339 340 trace_ocfs2_commit_cache_end(journal->j_trans_id, flushed); 341 342 ocfs2_wake_downconvert_thread(osb); 343 wake_up(&journal->j_checkpointed); 344finally: 345 return status; 346} 347 348handle_t *ocfs2_start_trans(struct ocfs2_super *osb, int max_buffs) 349{ 350 journal_t *journal = osb->journal->j_journal; 351 handle_t *handle; 352 353 BUG_ON(!osb || !osb->journal->j_journal); 354 355 if (ocfs2_is_hard_readonly(osb)) 356 return ERR_PTR(-EROFS); 357 358 BUG_ON(osb->journal->j_state == OCFS2_JOURNAL_FREE); 359 BUG_ON(max_buffs <= 0); 360 361 /* Nested transaction? Just return the handle... */ 362 if (journal_current_handle()) 363 return jbd2_journal_start(journal, max_buffs); 364 365 sb_start_intwrite(osb->sb); 366 367 down_read(&osb->journal->j_trans_barrier); 368 369 handle = jbd2_journal_start(journal, max_buffs); 370 if (IS_ERR(handle)) { 371 up_read(&osb->journal->j_trans_barrier); 372 sb_end_intwrite(osb->sb); 373 374 mlog_errno(PTR_ERR(handle)); 375 376 if (is_journal_aborted(journal)) { 377 ocfs2_abort(osb->sb, "Detected aborted journal"); 378 handle = ERR_PTR(-EROFS); 379 } 380 } else { 381 if (!ocfs2_mount_local(osb)) 382 atomic_inc(&(osb->journal->j_num_trans)); 383 } 384 385 return handle; 386} 387 388int ocfs2_commit_trans(struct ocfs2_super *osb, 389 handle_t *handle) 390{ 391 int ret, nested; 392 struct ocfs2_journal *journal = osb->journal; 393 394 BUG_ON(!handle); 395 396 nested = handle->h_ref > 1; 397 ret = jbd2_journal_stop(handle); 398 if (ret < 0) 399 mlog_errno(ret); 400 401 if (!nested) { 402 up_read(&journal->j_trans_barrier); 403 sb_end_intwrite(osb->sb); 404 } 405 406 return ret; 407} 408 409/* 410 * 'nblocks' is what you want to add to the current transaction. 411 * 412 * This might call jbd2_journal_restart() which will commit dirty buffers 413 * and then restart the transaction. Before calling 414 * ocfs2_extend_trans(), any changed blocks should have been 415 * dirtied. After calling it, all blocks which need to be changed must 416 * go through another set of journal_access/journal_dirty calls. 417 * 418 * WARNING: This will not release any semaphores or disk locks taken 419 * during the transaction, so make sure they were taken *before* 420 * start_trans or we'll have ordering deadlocks. 421 * 422 * WARNING2: Note that we do *not* drop j_trans_barrier here. This is 423 * good because transaction ids haven't yet been recorded on the 424 * cluster locks associated with this handle. 425 */ 426int ocfs2_extend_trans(handle_t *handle, int nblocks) 427{ 428 int status, old_nblocks; 429 430 BUG_ON(!handle); 431 BUG_ON(nblocks < 0); 432 433 if (!nblocks) 434 return 0; 435 436 old_nblocks = handle->h_buffer_credits; 437 438 trace_ocfs2_extend_trans(old_nblocks, nblocks); 439 440#ifdef CONFIG_OCFS2_DEBUG_FS 441 status = 1; 442#else 443 status = jbd2_journal_extend(handle, nblocks); 444 if (status < 0) { 445 mlog_errno(status); 446 goto bail; 447 } 448#endif 449 450 if (status > 0) { 451 trace_ocfs2_extend_trans_restart(old_nblocks + nblocks); 452 status = jbd2_journal_restart(handle, 453 old_nblocks + nblocks); 454 if (status < 0) { 455 mlog_errno(status); 456 goto bail; 457 } 458 } 459 460 status = 0; 461bail: 462 return status; 463} 464 465/* 466 * If we have fewer than thresh credits, extend by OCFS2_MAX_TRANS_DATA. 467 * If that fails, restart the transaction & regain write access for the 468 * buffer head which is used for metadata modifications. 469 * Taken from Ext4: extend_or_restart_transaction() 470 */ 471int ocfs2_allocate_extend_trans(handle_t *handle, int thresh) 472{ 473 int status, old_nblks; 474 475 BUG_ON(!handle); 476 477 old_nblks = handle->h_buffer_credits; 478 trace_ocfs2_allocate_extend_trans(old_nblks, thresh); 479 480 if (old_nblks < thresh) 481 return 0; 482 483 status = jbd2_journal_extend(handle, OCFS2_MAX_TRANS_DATA); 484 if (status < 0) { 485 mlog_errno(status); 486 goto bail; 487 } 488 489 if (status > 0) { 490 status = jbd2_journal_restart(handle, OCFS2_MAX_TRANS_DATA); 491 if (status < 0) 492 mlog_errno(status); 493 } 494 495bail: 496 return status; 497} 498 499 500struct ocfs2_triggers { 501 struct jbd2_buffer_trigger_type ot_triggers; 502 int ot_offset; 503}; 504 505static inline struct ocfs2_triggers *to_ocfs2_trigger(struct jbd2_buffer_trigger_type *triggers) 506{ 507 return container_of(triggers, struct ocfs2_triggers, ot_triggers); 508} 509 510static void ocfs2_frozen_trigger(struct jbd2_buffer_trigger_type *triggers, 511 struct buffer_head *bh, 512 void *data, size_t size) 513{ 514 struct ocfs2_triggers *ot = to_ocfs2_trigger(triggers); 515 516 /* 517 * We aren't guaranteed to have the superblock here, so we 518 * must unconditionally compute the ecc data. 519 * __ocfs2_journal_access() will only set the triggers if 520 * metaecc is enabled. 521 */ 522 ocfs2_block_check_compute(data, size, data + ot->ot_offset); 523} 524 525/* 526 * Quota blocks have their own trigger because the struct ocfs2_block_check 527 * offset depends on the blocksize. 528 */ 529static void ocfs2_dq_frozen_trigger(struct jbd2_buffer_trigger_type *triggers, 530 struct buffer_head *bh, 531 void *data, size_t size) 532{ 533 struct ocfs2_disk_dqtrailer *dqt = 534 ocfs2_block_dqtrailer(size, data); 535 536 /* 537 * We aren't guaranteed to have the superblock here, so we 538 * must unconditionally compute the ecc data. 539 * __ocfs2_journal_access() will only set the triggers if 540 * metaecc is enabled. 541 */ 542 ocfs2_block_check_compute(data, size, &dqt->dq_check); 543} 544 545/* 546 * Directory blocks also have their own trigger because the 547 * struct ocfs2_block_check offset depends on the blocksize. 548 */ 549static void ocfs2_db_frozen_trigger(struct jbd2_buffer_trigger_type *triggers, 550 struct buffer_head *bh, 551 void *data, size_t size) 552{ 553 struct ocfs2_dir_block_trailer *trailer = 554 ocfs2_dir_trailer_from_size(size, data); 555 556 /* 557 * We aren't guaranteed to have the superblock here, so we 558 * must unconditionally compute the ecc data. 559 * __ocfs2_journal_access() will only set the triggers if 560 * metaecc is enabled. 561 */ 562 ocfs2_block_check_compute(data, size, &trailer->db_check); 563} 564 565static void ocfs2_abort_trigger(struct jbd2_buffer_trigger_type *triggers, 566 struct buffer_head *bh) 567{ 568 mlog(ML_ERROR, 569 "ocfs2_abort_trigger called by JBD2. bh = 0x%lx, " 570 "bh->b_blocknr = %llu\n", 571 (unsigned long)bh, 572 (unsigned long long)bh->b_blocknr); 573 574 /* We aren't guaranteed to have the superblock here - but if we 575 * don't, it'll just crash. */ 576 ocfs2_error(bh->b_assoc_map->host->i_sb, 577 "JBD2 has aborted our journal, ocfs2 cannot continue\n"); 578} 579 580static struct ocfs2_triggers di_triggers = { 581 .ot_triggers = { 582 .t_frozen = ocfs2_frozen_trigger, 583 .t_abort = ocfs2_abort_trigger, 584 }, 585 .ot_offset = offsetof(struct ocfs2_dinode, i_check), 586}; 587 588static struct ocfs2_triggers eb_triggers = { 589 .ot_triggers = { 590 .t_frozen = ocfs2_frozen_trigger, 591 .t_abort = ocfs2_abort_trigger, 592 }, 593 .ot_offset = offsetof(struct ocfs2_extent_block, h_check), 594}; 595 596static struct ocfs2_triggers rb_triggers = { 597 .ot_triggers = { 598 .t_frozen = ocfs2_frozen_trigger, 599 .t_abort = ocfs2_abort_trigger, 600 }, 601 .ot_offset = offsetof(struct ocfs2_refcount_block, rf_check), 602}; 603 604static struct ocfs2_triggers gd_triggers = { 605 .ot_triggers = { 606 .t_frozen = ocfs2_frozen_trigger, 607 .t_abort = ocfs2_abort_trigger, 608 }, 609 .ot_offset = offsetof(struct ocfs2_group_desc, bg_check), 610}; 611 612static struct ocfs2_triggers db_triggers = { 613 .ot_triggers = { 614 .t_frozen = ocfs2_db_frozen_trigger, 615 .t_abort = ocfs2_abort_trigger, 616 }, 617}; 618 619static struct ocfs2_triggers xb_triggers = { 620 .ot_triggers = { 621 .t_frozen = ocfs2_frozen_trigger, 622 .t_abort = ocfs2_abort_trigger, 623 }, 624 .ot_offset = offsetof(struct ocfs2_xattr_block, xb_check), 625}; 626 627static struct ocfs2_triggers dq_triggers = { 628 .ot_triggers = { 629 .t_frozen = ocfs2_dq_frozen_trigger, 630 .t_abort = ocfs2_abort_trigger, 631 }, 632}; 633 634static struct ocfs2_triggers dr_triggers = { 635 .ot_triggers = { 636 .t_frozen = ocfs2_frozen_trigger, 637 .t_abort = ocfs2_abort_trigger, 638 }, 639 .ot_offset = offsetof(struct ocfs2_dx_root_block, dr_check), 640}; 641 642static struct ocfs2_triggers dl_triggers = { 643 .ot_triggers = { 644 .t_frozen = ocfs2_frozen_trigger, 645 .t_abort = ocfs2_abort_trigger, 646 }, 647 .ot_offset = offsetof(struct ocfs2_dx_leaf, dl_check), 648}; 649 650static int __ocfs2_journal_access(handle_t *handle, 651 struct ocfs2_caching_info *ci, 652 struct buffer_head *bh, 653 struct ocfs2_triggers *triggers, 654 int type) 655{ 656 int status; 657 struct ocfs2_super *osb = 658 OCFS2_SB(ocfs2_metadata_cache_get_super(ci)); 659 660 BUG_ON(!ci || !ci->ci_ops); 661 BUG_ON(!handle); 662 BUG_ON(!bh); 663 664 trace_ocfs2_journal_access( 665 (unsigned long long)ocfs2_metadata_cache_owner(ci), 666 (unsigned long long)bh->b_blocknr, type, bh->b_size); 667 668 /* we can safely remove this assertion after testing. */ 669 if (!buffer_uptodate(bh)) { 670 mlog(ML_ERROR, "giving me a buffer that's not uptodate!\n"); 671 mlog(ML_ERROR, "b_blocknr=%llu\n", 672 (unsigned long long)bh->b_blocknr); 673 BUG(); 674 } 675 676 /* Set the current transaction information on the ci so 677 * that the locking code knows whether it can drop it's locks 678 * on this ci or not. We're protected from the commit 679 * thread updating the current transaction id until 680 * ocfs2_commit_trans() because ocfs2_start_trans() took 681 * j_trans_barrier for us. */ 682 ocfs2_set_ci_lock_trans(osb->journal, ci); 683 684 ocfs2_metadata_cache_io_lock(ci); 685 switch (type) { 686 case OCFS2_JOURNAL_ACCESS_CREATE: 687 case OCFS2_JOURNAL_ACCESS_WRITE: 688 status = jbd2_journal_get_write_access(handle, bh); 689 break; 690 691 case OCFS2_JOURNAL_ACCESS_UNDO: 692 status = jbd2_journal_get_undo_access(handle, bh); 693 break; 694 695 default: 696 status = -EINVAL; 697 mlog(ML_ERROR, "Unknown access type!\n"); 698 } 699 if (!status && ocfs2_meta_ecc(osb) && triggers) 700 jbd2_journal_set_triggers(bh, &triggers->ot_triggers); 701 ocfs2_metadata_cache_io_unlock(ci); 702 703 if (status < 0) 704 mlog(ML_ERROR, "Error %d getting %d access to buffer!\n", 705 status, type); 706 707 return status; 708} 709 710int ocfs2_journal_access_di(handle_t *handle, struct ocfs2_caching_info *ci, 711 struct buffer_head *bh, int type) 712{ 713 return __ocfs2_journal_access(handle, ci, bh, &di_triggers, type); 714} 715 716int ocfs2_journal_access_eb(handle_t *handle, struct ocfs2_caching_info *ci, 717 struct buffer_head *bh, int type) 718{ 719 return __ocfs2_journal_access(handle, ci, bh, &eb_triggers, type); 720} 721 722int ocfs2_journal_access_rb(handle_t *handle, struct ocfs2_caching_info *ci, 723 struct buffer_head *bh, int type) 724{ 725 return __ocfs2_journal_access(handle, ci, bh, &rb_triggers, 726 type); 727} 728 729int ocfs2_journal_access_gd(handle_t *handle, struct ocfs2_caching_info *ci, 730 struct buffer_head *bh, int type) 731{ 732 return __ocfs2_journal_access(handle, ci, bh, &gd_triggers, type); 733} 734 735int ocfs2_journal_access_db(handle_t *handle, struct ocfs2_caching_info *ci, 736 struct buffer_head *bh, int type) 737{ 738 return __ocfs2_journal_access(handle, ci, bh, &db_triggers, type); 739} 740 741int ocfs2_journal_access_xb(handle_t *handle, struct ocfs2_caching_info *ci, 742 struct buffer_head *bh, int type) 743{ 744 return __ocfs2_journal_access(handle, ci, bh, &xb_triggers, type); 745} 746 747int ocfs2_journal_access_dq(handle_t *handle, struct ocfs2_caching_info *ci, 748 struct buffer_head *bh, int type) 749{ 750 return __ocfs2_journal_access(handle, ci, bh, &dq_triggers, type); 751} 752 753int ocfs2_journal_access_dr(handle_t *handle, struct ocfs2_caching_info *ci, 754 struct buffer_head *bh, int type) 755{ 756 return __ocfs2_journal_access(handle, ci, bh, &dr_triggers, type); 757} 758 759int ocfs2_journal_access_dl(handle_t *handle, struct ocfs2_caching_info *ci, 760 struct buffer_head *bh, int type) 761{ 762 return __ocfs2_journal_access(handle, ci, bh, &dl_triggers, type); 763} 764 765int ocfs2_journal_access(handle_t *handle, struct ocfs2_caching_info *ci, 766 struct buffer_head *bh, int type) 767{ 768 return __ocfs2_journal_access(handle, ci, bh, NULL, type); 769} 770 771void ocfs2_journal_dirty(handle_t *handle, struct buffer_head *bh) 772{ 773 int status; 774 775 trace_ocfs2_journal_dirty((unsigned long long)bh->b_blocknr); 776 777 status = jbd2_journal_dirty_metadata(handle, bh); 778 BUG_ON(status); 779} 780 781#define OCFS2_DEFAULT_COMMIT_INTERVAL (HZ * JBD2_DEFAULT_MAX_COMMIT_AGE) 782 783void ocfs2_set_journal_params(struct ocfs2_super *osb) 784{ 785 journal_t *journal = osb->journal->j_journal; 786 unsigned long commit_interval = OCFS2_DEFAULT_COMMIT_INTERVAL; 787 788 if (osb->osb_commit_interval) 789 commit_interval = osb->osb_commit_interval; 790 791 write_lock(&journal->j_state_lock); 792 journal->j_commit_interval = commit_interval; 793 if (osb->s_mount_opt & OCFS2_MOUNT_BARRIER) 794 journal->j_flags |= JBD2_BARRIER; 795 else 796 journal->j_flags &= ~JBD2_BARRIER; 797 write_unlock(&journal->j_state_lock); 798} 799 800int ocfs2_journal_init(struct ocfs2_journal *journal, int *dirty) 801{ 802 int status = -1; 803 struct inode *inode = NULL; /* the journal inode */ 804 journal_t *j_journal = NULL; 805 struct ocfs2_dinode *di = NULL; 806 struct buffer_head *bh = NULL; 807 struct ocfs2_super *osb; 808 int inode_lock = 0; 809 810 BUG_ON(!journal); 811 812 osb = journal->j_osb; 813 814 /* already have the inode for our journal */ 815 inode = ocfs2_get_system_file_inode(osb, JOURNAL_SYSTEM_INODE, 816 osb->slot_num); 817 if (inode == NULL) { 818 status = -EACCES; 819 mlog_errno(status); 820 goto done; 821 } 822 if (is_bad_inode(inode)) { 823 mlog(ML_ERROR, "access error (bad inode)\n"); 824 iput(inode); 825 inode = NULL; 826 status = -EACCES; 827 goto done; 828 } 829 830 SET_INODE_JOURNAL(inode); 831 OCFS2_I(inode)->ip_open_count++; 832 833 /* Skip recovery waits here - journal inode metadata never 834 * changes in a live cluster so it can be considered an 835 * exception to the rule. */ 836 status = ocfs2_inode_lock_full(inode, &bh, 1, OCFS2_META_LOCK_RECOVERY); 837 if (status < 0) { 838 if (status != -ERESTARTSYS) 839 mlog(ML_ERROR, "Could not get lock on journal!\n"); 840 goto done; 841 } 842 843 inode_lock = 1; 844 di = (struct ocfs2_dinode *)bh->b_data; 845 846 if (i_size_read(inode) < OCFS2_MIN_JOURNAL_SIZE) { 847 mlog(ML_ERROR, "Journal file size (%lld) is too small!\n", 848 i_size_read(inode)); 849 status = -EINVAL; 850 goto done; 851 } 852 853 trace_ocfs2_journal_init(i_size_read(inode), 854 (unsigned long long)inode->i_blocks, 855 OCFS2_I(inode)->ip_clusters); 856 857 /* call the kernels journal init function now */ 858 j_journal = jbd2_journal_init_inode(inode); 859 if (j_journal == NULL) { 860 mlog(ML_ERROR, "Linux journal layer error\n"); 861 status = -EINVAL; 862 goto done; 863 } 864 865 trace_ocfs2_journal_init_maxlen(j_journal->j_maxlen); 866 867 *dirty = (le32_to_cpu(di->id1.journal1.ij_flags) & 868 OCFS2_JOURNAL_DIRTY_FL); 869 870 journal->j_journal = j_journal; 871 journal->j_inode = inode; 872 journal->j_bh = bh; 873 874 ocfs2_set_journal_params(osb); 875 876 journal->j_state = OCFS2_JOURNAL_LOADED; 877 878 status = 0; 879done: 880 if (status < 0) { 881 if (inode_lock) 882 ocfs2_inode_unlock(inode, 1); 883 brelse(bh); 884 if (inode) { 885 OCFS2_I(inode)->ip_open_count--; 886 iput(inode); 887 } 888 } 889 890 return status; 891} 892 893static void ocfs2_bump_recovery_generation(struct ocfs2_dinode *di) 894{ 895 le32_add_cpu(&(di->id1.journal1.ij_recovery_generation), 1); 896} 897 898static u32 ocfs2_get_recovery_generation(struct ocfs2_dinode *di) 899{ 900 return le32_to_cpu(di->id1.journal1.ij_recovery_generation); 901} 902 903static int ocfs2_journal_toggle_dirty(struct ocfs2_super *osb, 904 int dirty, int replayed) 905{ 906 int status; 907 unsigned int flags; 908 struct ocfs2_journal *journal = osb->journal; 909 struct buffer_head *bh = journal->j_bh; 910 struct ocfs2_dinode *fe; 911 912 fe = (struct ocfs2_dinode *)bh->b_data; 913 914 /* The journal bh on the osb always comes from ocfs2_journal_init() 915 * and was validated there inside ocfs2_inode_lock_full(). It's a 916 * code bug if we mess it up. */ 917 BUG_ON(!OCFS2_IS_VALID_DINODE(fe)); 918 919 flags = le32_to_cpu(fe->id1.journal1.ij_flags); 920 if (dirty) 921 flags |= OCFS2_JOURNAL_DIRTY_FL; 922 else 923 flags &= ~OCFS2_JOURNAL_DIRTY_FL; 924 fe->id1.journal1.ij_flags = cpu_to_le32(flags); 925 926 if (replayed) 927 ocfs2_bump_recovery_generation(fe); 928 929 ocfs2_compute_meta_ecc(osb->sb, bh->b_data, &fe->i_check); 930 status = ocfs2_write_block(osb, bh, INODE_CACHE(journal->j_inode)); 931 if (status < 0) 932 mlog_errno(status); 933 934 return status; 935} 936 937/* 938 * If the journal has been kmalloc'd it needs to be freed after this 939 * call. 940 */ 941void ocfs2_journal_shutdown(struct ocfs2_super *osb) 942{ 943 struct ocfs2_journal *journal = NULL; 944 int status = 0; 945 struct inode *inode = NULL; 946 int num_running_trans = 0; 947 948 BUG_ON(!osb); 949 950 journal = osb->journal; 951 if (!journal) 952 goto done; 953 954 inode = journal->j_inode; 955 956 if (journal->j_state != OCFS2_JOURNAL_LOADED) 957 goto done; 958 959 /* need to inc inode use count - jbd2_journal_destroy will iput. */ 960 if (!igrab(inode)) 961 BUG(); 962 963 num_running_trans = atomic_read(&(osb->journal->j_num_trans)); 964 trace_ocfs2_journal_shutdown(num_running_trans); 965 966 /* Do a commit_cache here. It will flush our journal, *and* 967 * release any locks that are still held. 968 * set the SHUTDOWN flag and release the trans lock. 969 * the commit thread will take the trans lock for us below. */ 970 journal->j_state = OCFS2_JOURNAL_IN_SHUTDOWN; 971 972 /* The OCFS2_JOURNAL_IN_SHUTDOWN will signal to commit_cache to not 973 * drop the trans_lock (which we want to hold until we 974 * completely destroy the journal. */ 975 if (osb->commit_task) { 976 /* Wait for the commit thread */ 977 trace_ocfs2_journal_shutdown_wait(osb->commit_task); 978 kthread_stop(osb->commit_task); 979 osb->commit_task = NULL; 980 } 981 982 BUG_ON(atomic_read(&(osb->journal->j_num_trans)) != 0); 983 984 if (ocfs2_mount_local(osb)) { 985 jbd2_journal_lock_updates(journal->j_journal); 986 status = jbd2_journal_flush(journal->j_journal); 987 jbd2_journal_unlock_updates(journal->j_journal); 988 if (status < 0) 989 mlog_errno(status); 990 } 991 992 if (status == 0) { 993 /* 994 * Do not toggle if flush was unsuccessful otherwise 995 * will leave dirty metadata in a "clean" journal 996 */ 997 status = ocfs2_journal_toggle_dirty(osb, 0, 0); 998 if (status < 0) 999 mlog_errno(status); 1000 } 1001 1002 /* Shutdown the kernel journal system */ 1003 jbd2_journal_destroy(journal->j_journal); 1004 journal->j_journal = NULL; 1005 1006 OCFS2_I(inode)->ip_open_count--; 1007 1008 /* unlock our journal */ 1009 ocfs2_inode_unlock(inode, 1); 1010 1011 brelse(journal->j_bh); 1012 journal->j_bh = NULL; 1013 1014 journal->j_state = OCFS2_JOURNAL_FREE; 1015 1016// up_write(&journal->j_trans_barrier); 1017done: 1018 if (inode) 1019 iput(inode); 1020} 1021 1022static void ocfs2_clear_journal_error(struct super_block *sb, 1023 journal_t *journal, 1024 int slot) 1025{ 1026 int olderr; 1027 1028 olderr = jbd2_journal_errno(journal); 1029 if (olderr) { 1030 mlog(ML_ERROR, "File system error %d recorded in " 1031 "journal %u.\n", olderr, slot); 1032 mlog(ML_ERROR, "File system on device %s needs checking.\n", 1033 sb->s_id); 1034 1035 jbd2_journal_ack_err(journal); 1036 jbd2_journal_clear_err(journal); 1037 } 1038} 1039 1040int ocfs2_journal_load(struct ocfs2_journal *journal, int local, int replayed) 1041{ 1042 int status = 0; 1043 struct ocfs2_super *osb; 1044 1045 BUG_ON(!journal); 1046 1047 osb = journal->j_osb; 1048 1049 status = jbd2_journal_load(journal->j_journal); 1050 if (status < 0) { 1051 mlog(ML_ERROR, "Failed to load journal!\n"); 1052 goto done; 1053 } 1054 1055 ocfs2_clear_journal_error(osb->sb, journal->j_journal, osb->slot_num); 1056 1057 status = ocfs2_journal_toggle_dirty(osb, 1, replayed); 1058 if (status < 0) { 1059 mlog_errno(status); 1060 goto done; 1061 } 1062 1063 /* Launch the commit thread */ 1064 if (!local) { 1065 osb->commit_task = kthread_run(ocfs2_commit_thread, osb, 1066 "ocfs2cmt"); 1067 if (IS_ERR(osb->commit_task)) { 1068 status = PTR_ERR(osb->commit_task); 1069 osb->commit_task = NULL; 1070 mlog(ML_ERROR, "unable to launch ocfs2commit thread, " 1071 "error=%d", status); 1072 goto done; 1073 } 1074 } else 1075 osb->commit_task = NULL; 1076 1077done: 1078 return status; 1079} 1080 1081 1082/* 'full' flag tells us whether we clear out all blocks or if we just 1083 * mark the journal clean */ 1084int ocfs2_journal_wipe(struct ocfs2_journal *journal, int full) 1085{ 1086 int status; 1087 1088 BUG_ON(!journal); 1089 1090 status = jbd2_journal_wipe(journal->j_journal, full); 1091 if (status < 0) { 1092 mlog_errno(status); 1093 goto bail; 1094 } 1095 1096 status = ocfs2_journal_toggle_dirty(journal->j_osb, 0, 0); 1097 if (status < 0) 1098 mlog_errno(status); 1099 1100bail: 1101 return status; 1102} 1103 1104static int ocfs2_recovery_completed(struct ocfs2_super *osb) 1105{ 1106 int empty; 1107 struct ocfs2_recovery_map *rm = osb->recovery_map; 1108 1109 spin_lock(&osb->osb_lock); 1110 empty = (rm->rm_used == 0); 1111 spin_unlock(&osb->osb_lock); 1112 1113 return empty; 1114} 1115 1116void ocfs2_wait_for_recovery(struct ocfs2_super *osb) 1117{ 1118 wait_event(osb->recovery_event, ocfs2_recovery_completed(osb)); 1119} 1120 1121/* 1122 * JBD Might read a cached version of another nodes journal file. We 1123 * don't want this as this file changes often and we get no 1124 * notification on those changes. The only way to be sure that we've 1125 * got the most up to date version of those blocks then is to force 1126 * read them off disk. Just searching through the buffer cache won't 1127 * work as there may be pages backing this file which are still marked 1128 * up to date. We know things can't change on this file underneath us 1129 * as we have the lock by now :) 1130 */ 1131static int ocfs2_force_read_journal(struct inode *inode) 1132{ 1133 int status = 0; 1134 int i; 1135 u64 v_blkno, p_blkno, p_blocks, num_blocks; 1136#define CONCURRENT_JOURNAL_FILL 32ULL 1137 struct buffer_head *bhs[CONCURRENT_JOURNAL_FILL]; 1138 1139 memset(bhs, 0, sizeof(struct buffer_head *) * CONCURRENT_JOURNAL_FILL); 1140 1141 num_blocks = ocfs2_blocks_for_bytes(inode->i_sb, i_size_read(inode)); 1142 v_blkno = 0; 1143 while (v_blkno < num_blocks) { 1144 status = ocfs2_extent_map_get_blocks(inode, v_blkno, 1145 &p_blkno, &p_blocks, NULL); 1146 if (status < 0) { 1147 mlog_errno(status); 1148 goto bail; 1149 } 1150 1151 if (p_blocks > CONCURRENT_JOURNAL_FILL) 1152 p_blocks = CONCURRENT_JOURNAL_FILL; 1153 1154 /* We are reading journal data which should not 1155 * be put in the uptodate cache */ 1156 status = ocfs2_read_blocks_sync(OCFS2_SB(inode->i_sb), 1157 p_blkno, p_blocks, bhs); 1158 if (status < 0) { 1159 mlog_errno(status); 1160 goto bail; 1161 } 1162 1163 for(i = 0; i < p_blocks; i++) { 1164 brelse(bhs[i]); 1165 bhs[i] = NULL; 1166 } 1167 1168 v_blkno += p_blocks; 1169 } 1170 1171bail: 1172 for(i = 0; i < CONCURRENT_JOURNAL_FILL; i++) 1173 brelse(bhs[i]); 1174 return status; 1175} 1176 1177struct ocfs2_la_recovery_item { 1178 struct list_head lri_list; 1179 int lri_slot; 1180 struct ocfs2_dinode *lri_la_dinode; 1181 struct ocfs2_dinode *lri_tl_dinode; 1182 struct ocfs2_quota_recovery *lri_qrec; 1183 enum ocfs2_orphan_reco_type lri_orphan_reco_type; 1184}; 1185 1186/* Does the second half of the recovery process. By this point, the 1187 * node is marked clean and can actually be considered recovered, 1188 * hence it's no longer in the recovery map, but there's still some 1189 * cleanup we can do which shouldn't happen within the recovery thread 1190 * as locking in that context becomes very difficult if we are to take 1191 * recovering nodes into account. 1192 * 1193 * NOTE: This function can and will sleep on recovery of other nodes 1194 * during cluster locking, just like any other ocfs2 process. 1195 */ 1196void ocfs2_complete_recovery(struct work_struct *work) 1197{ 1198 int ret = 0; 1199 struct ocfs2_journal *journal = 1200 container_of(work, struct ocfs2_journal, j_recovery_work); 1201 struct ocfs2_super *osb = journal->j_osb; 1202 struct ocfs2_dinode *la_dinode, *tl_dinode; 1203 struct ocfs2_la_recovery_item *item, *n; 1204 struct ocfs2_quota_recovery *qrec; 1205 enum ocfs2_orphan_reco_type orphan_reco_type; 1206 LIST_HEAD(tmp_la_list); 1207 1208 trace_ocfs2_complete_recovery( 1209 (unsigned long long)OCFS2_I(journal->j_inode)->ip_blkno); 1210 1211 spin_lock(&journal->j_lock); 1212 list_splice_init(&journal->j_la_cleanups, &tmp_la_list); 1213 spin_unlock(&journal->j_lock); 1214 1215 list_for_each_entry_safe(item, n, &tmp_la_list, lri_list) { 1216 list_del_init(&item->lri_list); 1217 1218 ocfs2_wait_on_quotas(osb); 1219 1220 la_dinode = item->lri_la_dinode; 1221 tl_dinode = item->lri_tl_dinode; 1222 qrec = item->lri_qrec; 1223 orphan_reco_type = item->lri_orphan_reco_type; 1224 1225 trace_ocfs2_complete_recovery_slot(item->lri_slot, 1226 la_dinode ? le64_to_cpu(la_dinode->i_blkno) : 0, 1227 tl_dinode ? le64_to_cpu(tl_dinode->i_blkno) : 0, 1228 qrec); 1229 1230 if (la_dinode) { 1231 ret = ocfs2_complete_local_alloc_recovery(osb, 1232 la_dinode); 1233 if (ret < 0) 1234 mlog_errno(ret); 1235 1236 kfree(la_dinode); 1237 } 1238 1239 if (tl_dinode) { 1240 ret = ocfs2_complete_truncate_log_recovery(osb, 1241 tl_dinode); 1242 if (ret < 0) 1243 mlog_errno(ret); 1244 1245 kfree(tl_dinode); 1246 } 1247 1248 ret = ocfs2_recover_orphans(osb, item->lri_slot, 1249 orphan_reco_type); 1250 if (ret < 0) 1251 mlog_errno(ret); 1252 1253 if (qrec) { 1254 ret = ocfs2_finish_quota_recovery(osb, qrec, 1255 item->lri_slot); 1256 if (ret < 0) 1257 mlog_errno(ret); 1258 /* Recovery info is already freed now */ 1259 } 1260 1261 kfree(item); 1262 } 1263 1264 trace_ocfs2_complete_recovery_end(ret); 1265} 1266 1267/* NOTE: This function always eats your references to la_dinode and 1268 * tl_dinode, either manually on error, or by passing them to 1269 * ocfs2_complete_recovery */ 1270static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal, 1271 int slot_num, 1272 struct ocfs2_dinode *la_dinode, 1273 struct ocfs2_dinode *tl_dinode, 1274 struct ocfs2_quota_recovery *qrec, 1275 enum ocfs2_orphan_reco_type orphan_reco_type) 1276{ 1277 struct ocfs2_la_recovery_item *item; 1278 1279 item = kmalloc(sizeof(struct ocfs2_la_recovery_item), GFP_NOFS); 1280 if (!item) { 1281 /* Though we wish to avoid it, we are in fact safe in 1282 * skipping local alloc cleanup as fsck.ocfs2 is more 1283 * than capable of reclaiming unused space. */ 1284 kfree(la_dinode); 1285 kfree(tl_dinode); 1286 1287 if (qrec) 1288 ocfs2_free_quota_recovery(qrec); 1289 1290 mlog_errno(-ENOMEM); 1291 return; 1292 } 1293 1294 INIT_LIST_HEAD(&item->lri_list); 1295 item->lri_la_dinode = la_dinode; 1296 item->lri_slot = slot_num; 1297 item->lri_tl_dinode = tl_dinode; 1298 item->lri_qrec = qrec; 1299 item->lri_orphan_reco_type = orphan_reco_type; 1300 1301 spin_lock(&journal->j_lock); 1302 list_add_tail(&item->lri_list, &journal->j_la_cleanups); 1303 queue_work(ocfs2_wq, &journal->j_recovery_work); 1304 spin_unlock(&journal->j_lock); 1305} 1306 1307/* Called by the mount code to queue recovery the last part of 1308 * recovery for it's own and offline slot(s). */ 1309void ocfs2_complete_mount_recovery(struct ocfs2_super *osb) 1310{ 1311 struct ocfs2_journal *journal = osb->journal; 1312 1313 if (ocfs2_is_hard_readonly(osb)) 1314 return; 1315 1316 /* No need to queue up our truncate_log as regular cleanup will catch 1317 * that */ 1318 ocfs2_queue_recovery_completion(journal, osb->slot_num, 1319 osb->local_alloc_copy, NULL, NULL, 1320 ORPHAN_NEED_TRUNCATE); 1321 ocfs2_schedule_truncate_log_flush(osb, 0); 1322 1323 osb->local_alloc_copy = NULL; 1324 osb->dirty = 0; 1325 1326 /* queue to recover orphan slots for all offline slots */ 1327 ocfs2_replay_map_set_state(osb, REPLAY_NEEDED); 1328 ocfs2_queue_replay_slots(osb, ORPHAN_NEED_TRUNCATE); 1329 ocfs2_free_replay_slots(osb); 1330} 1331 1332void ocfs2_complete_quota_recovery(struct ocfs2_super *osb) 1333{ 1334 if (osb->quota_rec) { 1335 ocfs2_queue_recovery_completion(osb->journal, 1336 osb->slot_num, 1337 NULL, 1338 NULL, 1339 osb->quota_rec, 1340 ORPHAN_NEED_TRUNCATE); 1341 osb->quota_rec = NULL; 1342 } 1343} 1344 1345static int __ocfs2_recovery_thread(void *arg) 1346{ 1347 int status, node_num, slot_num; 1348 struct ocfs2_super *osb = arg; 1349 struct ocfs2_recovery_map *rm = osb->recovery_map; 1350 int *rm_quota = NULL; 1351 int rm_quota_used = 0, i; 1352 struct ocfs2_quota_recovery *qrec; 1353 1354 status = ocfs2_wait_on_mount(osb); 1355 if (status < 0) { 1356 goto bail; 1357 } 1358 1359 rm_quota = kzalloc(osb->max_slots * sizeof(int), GFP_NOFS); 1360 if (!rm_quota) { 1361 status = -ENOMEM; 1362 goto bail; 1363 } 1364restart: 1365 status = ocfs2_super_lock(osb, 1); 1366 if (status < 0) { 1367 mlog_errno(status); 1368 goto bail; 1369 } 1370 1371 status = ocfs2_compute_replay_slots(osb); 1372 if (status < 0) 1373 mlog_errno(status); 1374 1375 /* queue recovery for our own slot */ 1376 ocfs2_queue_recovery_completion(osb->journal, osb->slot_num, NULL, 1377 NULL, NULL, ORPHAN_NO_NEED_TRUNCATE); 1378 1379 spin_lock(&osb->osb_lock); 1380 while (rm->rm_used) { 1381 /* It's always safe to remove entry zero, as we won't 1382 * clear it until ocfs2_recover_node() has succeeded. */ 1383 node_num = rm->rm_entries[0]; 1384 spin_unlock(&osb->osb_lock); 1385 slot_num = ocfs2_node_num_to_slot(osb, node_num); 1386 trace_ocfs2_recovery_thread_node(node_num, slot_num); 1387 if (slot_num == -ENOENT) { 1388 status = 0; 1389 goto skip_recovery; 1390 } 1391 1392 /* It is a bit subtle with quota recovery. We cannot do it 1393 * immediately because we have to obtain cluster locks from 1394 * quota files and we also don't want to just skip it because 1395 * then quota usage would be out of sync until some node takes 1396 * the slot. So we remember which nodes need quota recovery 1397 * and when everything else is done, we recover quotas. */ 1398 for (i = 0; i < rm_quota_used && rm_quota[i] != slot_num; i++); 1399 if (i == rm_quota_used) 1400 rm_quota[rm_quota_used++] = slot_num; 1401 1402 status = ocfs2_recover_node(osb, node_num, slot_num); 1403skip_recovery: 1404 if (!status) { 1405 ocfs2_recovery_map_clear(osb, node_num); 1406 } else { 1407 mlog(ML_ERROR, 1408 "Error %d recovering node %d on device (%u,%u)!\n", 1409 status, node_num, 1410 MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev)); 1411 mlog(ML_ERROR, "Volume requires unmount.\n"); 1412 } 1413 1414 spin_lock(&osb->osb_lock); 1415 } 1416 spin_unlock(&osb->osb_lock); 1417 trace_ocfs2_recovery_thread_end(status); 1418 1419 /* Refresh all journal recovery generations from disk */ 1420 status = ocfs2_check_journals_nolocks(osb); 1421 status = (status == -EROFS) ? 0 : status; 1422 if (status < 0) 1423 mlog_errno(status); 1424 1425 /* Now it is right time to recover quotas... We have to do this under 1426 * superblock lock so that no one can start using the slot (and crash) 1427 * before we recover it */ 1428 for (i = 0; i < rm_quota_used; i++) { 1429 qrec = ocfs2_begin_quota_recovery(osb, rm_quota[i]); 1430 if (IS_ERR(qrec)) { 1431 status = PTR_ERR(qrec); 1432 mlog_errno(status); 1433 continue; 1434 } 1435 ocfs2_queue_recovery_completion(osb->journal, rm_quota[i], 1436 NULL, NULL, qrec, 1437 ORPHAN_NEED_TRUNCATE); 1438 } 1439 1440 ocfs2_super_unlock(osb, 1); 1441 1442 /* queue recovery for offline slots */ 1443 ocfs2_queue_replay_slots(osb, ORPHAN_NEED_TRUNCATE); 1444 1445bail: 1446 mutex_lock(&osb->recovery_lock); 1447 if (!status && !ocfs2_recovery_completed(osb)) { 1448 mutex_unlock(&osb->recovery_lock); 1449 goto restart; 1450 } 1451 1452 ocfs2_free_replay_slots(osb); 1453 osb->recovery_thread_task = NULL; 1454 mb(); /* sync with ocfs2_recovery_thread_running */ 1455 wake_up(&osb->recovery_event); 1456 1457 mutex_unlock(&osb->recovery_lock); 1458 1459 kfree(rm_quota); 1460 1461 /* no one is callint kthread_stop() for us so the kthread() api 1462 * requires that we call do_exit(). And it isn't exported, but 1463 * complete_and_exit() seems to be a minimal wrapper around it. */ 1464 complete_and_exit(NULL, status); 1465} 1466 1467void ocfs2_recovery_thread(struct ocfs2_super *osb, int node_num) 1468{ 1469 mutex_lock(&osb->recovery_lock); 1470 1471 trace_ocfs2_recovery_thread(node_num, osb->node_num, 1472 osb->disable_recovery, osb->recovery_thread_task, 1473 osb->disable_recovery ? 1474 -1 : ocfs2_recovery_map_set(osb, node_num)); 1475 1476 if (osb->disable_recovery) 1477 goto out; 1478 1479 if (osb->recovery_thread_task) 1480 goto out; 1481 1482 osb->recovery_thread_task = kthread_run(__ocfs2_recovery_thread, osb, 1483 "ocfs2rec"); 1484 if (IS_ERR(osb->recovery_thread_task)) { 1485 mlog_errno((int)PTR_ERR(osb->recovery_thread_task)); 1486 osb->recovery_thread_task = NULL; 1487 } 1488 1489out: 1490 mutex_unlock(&osb->recovery_lock); 1491 wake_up(&osb->recovery_event); 1492} 1493 1494static int ocfs2_read_journal_inode(struct ocfs2_super *osb, 1495 int slot_num, 1496 struct buffer_head **bh, 1497 struct inode **ret_inode) 1498{ 1499 int status = -EACCES; 1500 struct inode *inode = NULL; 1501 1502 BUG_ON(slot_num >= osb->max_slots); 1503 1504 inode = ocfs2_get_system_file_inode(osb, JOURNAL_SYSTEM_INODE, 1505 slot_num); 1506 if (!inode || is_bad_inode(inode)) { 1507 mlog_errno(status); 1508 goto bail; 1509 } 1510 SET_INODE_JOURNAL(inode); 1511 1512 status = ocfs2_read_inode_block_full(inode, bh, OCFS2_BH_IGNORE_CACHE); 1513 if (status < 0) { 1514 mlog_errno(status); 1515 goto bail; 1516 } 1517 1518 status = 0; 1519 1520bail: 1521 if (inode) { 1522 if (status || !ret_inode) 1523 iput(inode); 1524 else 1525 *ret_inode = inode; 1526 } 1527 return status; 1528} 1529 1530/* Does the actual journal replay and marks the journal inode as 1531 * clean. Will only replay if the journal inode is marked dirty. */ 1532static int ocfs2_replay_journal(struct ocfs2_super *osb, 1533 int node_num, 1534 int slot_num) 1535{ 1536 int status; 1537 int got_lock = 0; 1538 unsigned int flags; 1539 struct inode *inode = NULL; 1540 struct ocfs2_dinode *fe; 1541 journal_t *journal = NULL; 1542 struct buffer_head *bh = NULL; 1543 u32 slot_reco_gen; 1544 1545 status = ocfs2_read_journal_inode(osb, slot_num, &bh, &inode); 1546 if (status) { 1547 mlog_errno(status); 1548 goto done; 1549 } 1550 1551 fe = (struct ocfs2_dinode *)bh->b_data; 1552 slot_reco_gen = ocfs2_get_recovery_generation(fe); 1553 brelse(bh); 1554 bh = NULL; 1555 1556 /* 1557 * As the fs recovery is asynchronous, there is a small chance that 1558 * another node mounted (and recovered) the slot before the recovery 1559 * thread could get the lock. To handle that, we dirty read the journal 1560 * inode for that slot to get the recovery generation. If it is 1561 * different than what we expected, the slot has been recovered. 1562 * If not, it needs recovery. 1563 */ 1564 if (osb->slot_recovery_generations[slot_num] != slot_reco_gen) { 1565 trace_ocfs2_replay_journal_recovered(slot_num, 1566 osb->slot_recovery_generations[slot_num], slot_reco_gen); 1567 osb->slot_recovery_generations[slot_num] = slot_reco_gen; 1568 status = -EBUSY; 1569 goto done; 1570 } 1571 1572 /* Continue with recovery as the journal has not yet been recovered */ 1573 1574 status = ocfs2_inode_lock_full(inode, &bh, 1, OCFS2_META_LOCK_RECOVERY); 1575 if (status < 0) { 1576 trace_ocfs2_replay_journal_lock_err(status); 1577 if (status != -ERESTARTSYS) 1578 mlog(ML_ERROR, "Could not lock journal!\n"); 1579 goto done; 1580 } 1581 got_lock = 1; 1582 1583 fe = (struct ocfs2_dinode *) bh->b_data; 1584 1585 flags = le32_to_cpu(fe->id1.journal1.ij_flags); 1586 slot_reco_gen = ocfs2_get_recovery_generation(fe); 1587 1588 if (!(flags & OCFS2_JOURNAL_DIRTY_FL)) { 1589 trace_ocfs2_replay_journal_skip(node_num); 1590 /* Refresh recovery generation for the slot */ 1591 osb->slot_recovery_generations[slot_num] = slot_reco_gen; 1592 goto done; 1593 } 1594 1595 /* we need to run complete recovery for offline orphan slots */ 1596 ocfs2_replay_map_set_state(osb, REPLAY_NEEDED); 1597 1598 printk(KERN_NOTICE "ocfs2: Begin replay journal (node %d, slot %d) on "\ 1599 "device (%u,%u)\n", node_num, slot_num, MAJOR(osb->sb->s_dev), 1600 MINOR(osb->sb->s_dev)); 1601 1602 OCFS2_I(inode)->ip_clusters = le32_to_cpu(fe->i_clusters); 1603 1604 status = ocfs2_force_read_journal(inode); 1605 if (status < 0) { 1606 mlog_errno(status); 1607 goto done; 1608 } 1609 1610 journal = jbd2_journal_init_inode(inode); 1611 if (journal == NULL) { 1612 mlog(ML_ERROR, "Linux journal layer error\n"); 1613 status = -EIO; 1614 goto done; 1615 } 1616 1617 status = jbd2_journal_load(journal); 1618 if (status < 0) { 1619 mlog_errno(status); 1620 if (!igrab(inode)) 1621 BUG(); 1622 jbd2_journal_destroy(journal); 1623 goto done; 1624 } 1625 1626 ocfs2_clear_journal_error(osb->sb, journal, slot_num); 1627 1628 /* wipe the journal */ 1629 jbd2_journal_lock_updates(journal); 1630 status = jbd2_journal_flush(journal); 1631 jbd2_journal_unlock_updates(journal); 1632 if (status < 0) 1633 mlog_errno(status); 1634 1635 /* This will mark the node clean */ 1636 flags = le32_to_cpu(fe->id1.journal1.ij_flags); 1637 flags &= ~OCFS2_JOURNAL_DIRTY_FL; 1638 fe->id1.journal1.ij_flags = cpu_to_le32(flags); 1639 1640 /* Increment recovery generation to indicate successful recovery */ 1641 ocfs2_bump_recovery_generation(fe); 1642 osb->slot_recovery_generations[slot_num] = 1643 ocfs2_get_recovery_generation(fe); 1644 1645 ocfs2_compute_meta_ecc(osb->sb, bh->b_data, &fe->i_check); 1646 status = ocfs2_write_block(osb, bh, INODE_CACHE(inode)); 1647 if (status < 0) 1648 mlog_errno(status); 1649 1650 if (!igrab(inode)) 1651 BUG(); 1652 1653 jbd2_journal_destroy(journal); 1654 1655 printk(KERN_NOTICE "ocfs2: End replay journal (node %d, slot %d) on "\ 1656 "device (%u,%u)\n", node_num, slot_num, MAJOR(osb->sb->s_dev), 1657 MINOR(osb->sb->s_dev)); 1658done: 1659 /* drop the lock on this nodes journal */ 1660 if (got_lock) 1661 ocfs2_inode_unlock(inode, 1); 1662 1663 if (inode) 1664 iput(inode); 1665 1666 brelse(bh); 1667 1668 return status; 1669} 1670 1671/* 1672 * Do the most important parts of node recovery: 1673 * - Replay it's journal 1674 * - Stamp a clean local allocator file 1675 * - Stamp a clean truncate log 1676 * - Mark the node clean 1677 * 1678 * If this function completes without error, a node in OCFS2 can be 1679 * said to have been safely recovered. As a result, failure during the 1680 * second part of a nodes recovery process (local alloc recovery) is 1681 * far less concerning. 1682 */ 1683static int ocfs2_recover_node(struct ocfs2_super *osb, 1684 int node_num, int slot_num) 1685{ 1686 int status = 0; 1687 struct ocfs2_dinode *la_copy = NULL; 1688 struct ocfs2_dinode *tl_copy = NULL; 1689 1690 trace_ocfs2_recover_node(node_num, slot_num, osb->node_num); 1691 1692 /* Should not ever be called to recover ourselves -- in that 1693 * case we should've called ocfs2_journal_load instead. */ 1694 BUG_ON(osb->node_num == node_num); 1695 1696 status = ocfs2_replay_journal(osb, node_num, slot_num); 1697 if (status < 0) { 1698 if (status == -EBUSY) { 1699 trace_ocfs2_recover_node_skip(slot_num, node_num); 1700 status = 0; 1701 goto done; 1702 } 1703 mlog_errno(status); 1704 goto done; 1705 } 1706 1707 /* Stamp a clean local alloc file AFTER recovering the journal... */ 1708 status = ocfs2_begin_local_alloc_recovery(osb, slot_num, &la_copy); 1709 if (status < 0) { 1710 mlog_errno(status); 1711 goto done; 1712 } 1713 1714 /* An error from begin_truncate_log_recovery is not 1715 * serious enough to warrant halting the rest of 1716 * recovery. */ 1717 status = ocfs2_begin_truncate_log_recovery(osb, slot_num, &tl_copy); 1718 if (status < 0) 1719 mlog_errno(status); 1720 1721 /* Likewise, this would be a strange but ultimately not so 1722 * harmful place to get an error... */ 1723 status = ocfs2_clear_slot(osb, slot_num); 1724 if (status < 0) 1725 mlog_errno(status); 1726 1727 /* This will kfree the memory pointed to by la_copy and tl_copy */ 1728 ocfs2_queue_recovery_completion(osb->journal, slot_num, la_copy, 1729 tl_copy, NULL, ORPHAN_NEED_TRUNCATE); 1730 1731 status = 0; 1732done: 1733 1734 return status; 1735} 1736 1737/* Test node liveness by trylocking his journal. If we get the lock, 1738 * we drop it here. Return 0 if we got the lock, -EAGAIN if node is 1739 * still alive (we couldn't get the lock) and < 0 on error. */ 1740static int ocfs2_trylock_journal(struct ocfs2_super *osb, 1741 int slot_num) 1742{ 1743 int status, flags; 1744 struct inode *inode = NULL; 1745 1746 inode = ocfs2_get_system_file_inode(osb, JOURNAL_SYSTEM_INODE, 1747 slot_num); 1748 if (inode == NULL) { 1749 mlog(ML_ERROR, "access error\n"); 1750 status = -EACCES; 1751 goto bail; 1752 } 1753 if (is_bad_inode(inode)) { 1754 mlog(ML_ERROR, "access error (bad inode)\n"); 1755 iput(inode); 1756 inode = NULL; 1757 status = -EACCES; 1758 goto bail; 1759 } 1760 SET_INODE_JOURNAL(inode); 1761 1762 flags = OCFS2_META_LOCK_RECOVERY | OCFS2_META_LOCK_NOQUEUE; 1763 status = ocfs2_inode_lock_full(inode, NULL, 1, flags); 1764 if (status < 0) { 1765 if (status != -EAGAIN) 1766 mlog_errno(status); 1767 goto bail; 1768 } 1769 1770 ocfs2_inode_unlock(inode, 1); 1771bail: 1772 if (inode) 1773 iput(inode); 1774 1775 return status; 1776} 1777 1778/* Call this underneath ocfs2_super_lock. It also assumes that the 1779 * slot info struct has been updated from disk. */ 1780int ocfs2_mark_dead_nodes(struct ocfs2_super *osb) 1781{ 1782 unsigned int node_num; 1783 int status, i; 1784 u32 gen; 1785 struct buffer_head *bh = NULL; 1786 struct ocfs2_dinode *di; 1787 1788 /* This is called with the super block cluster lock, so we 1789 * know that the slot map can't change underneath us. */ 1790 1791 for (i = 0; i < osb->max_slots; i++) { 1792 /* Read journal inode to get the recovery generation */ 1793 status = ocfs2_read_journal_inode(osb, i, &bh, NULL); 1794 if (status) { 1795 mlog_errno(status); 1796 goto bail; 1797 } 1798 di = (struct ocfs2_dinode *)bh->b_data; 1799 gen = ocfs2_get_recovery_generation(di); 1800 brelse(bh); 1801 bh = NULL; 1802 1803 spin_lock(&osb->osb_lock); 1804 osb->slot_recovery_generations[i] = gen; 1805 1806 trace_ocfs2_mark_dead_nodes(i, 1807 osb->slot_recovery_generations[i]); 1808 1809 if (i == osb->slot_num) { 1810 spin_unlock(&osb->osb_lock); 1811 continue; 1812 } 1813 1814 status = ocfs2_slot_to_node_num_locked(osb, i, &node_num); 1815 if (status == -ENOENT) { 1816 spin_unlock(&osb->osb_lock); 1817 continue; 1818 } 1819 1820 if (__ocfs2_recovery_map_test(osb, node_num)) { 1821 spin_unlock(&osb->osb_lock); 1822 continue; 1823 } 1824 spin_unlock(&osb->osb_lock); 1825 1826 /* Ok, we have a slot occupied by another node which 1827 * is not in the recovery map. We trylock his journal 1828 * file here to test if he's alive. */ 1829 status = ocfs2_trylock_journal(osb, i); 1830 if (!status) { 1831 /* Since we're called from mount, we know that 1832 * the recovery thread can't race us on 1833 * setting / checking the recovery bits. */ 1834 ocfs2_recovery_thread(osb, node_num); 1835 } else if ((status < 0) && (status != -EAGAIN)) { 1836 mlog_errno(status); 1837 goto bail; 1838 } 1839 } 1840 1841 status = 0; 1842bail: 1843 return status; 1844} 1845 1846/* 1847 * Scan timer should get fired every ORPHAN_SCAN_SCHEDULE_TIMEOUT. Add some 1848 * randomness to the timeout to minimize multple nodes firing the timer at the 1849 * same time. 1850 */ 1851static inline unsigned long ocfs2_orphan_scan_timeout(void) 1852{ 1853 unsigned long time; 1854 1855 get_random_bytes(&time, sizeof(time)); 1856 time = ORPHAN_SCAN_SCHEDULE_TIMEOUT + (time % 5000); 1857 return msecs_to_jiffies(time); 1858} 1859 1860/* 1861 * ocfs2_queue_orphan_scan calls ocfs2_queue_recovery_completion for 1862 * every slot, queuing a recovery of the slot on the ocfs2_wq thread. This 1863 * is done to catch any orphans that are left over in orphan directories. 1864 * 1865 * It scans all slots, even ones that are in use. It does so to handle the 1866 * case described below: 1867 * 1868 * Node 1 has an inode it was using. The dentry went away due to memory 1869 * pressure. Node 1 closes the inode, but it's on the free list. The node 1870 * has the open lock. 1871 * Node 2 unlinks the inode. It grabs the dentry lock to notify others, 1872 * but node 1 has no dentry and doesn't get the message. It trylocks the 1873 * open lock, sees that another node has a PR, and does nothing. 1874 * Later node 2 runs its orphan dir. It igets the inode, trylocks the 1875 * open lock, sees the PR still, and does nothing. 1876 * Basically, we have to trigger an orphan iput on node 1. The only way 1877 * for this to happen is if node 1 runs node 2's orphan dir. 1878 * 1879 * ocfs2_queue_orphan_scan gets called every ORPHAN_SCAN_SCHEDULE_TIMEOUT 1880 * seconds. It gets an EX lock on os_lockres and checks sequence number 1881 * stored in LVB. If the sequence number has changed, it means some other 1882 * node has done the scan. This node skips the scan and tracks the 1883 * sequence number. If the sequence number didn't change, it means a scan 1884 * hasn't happened. The node queues a scan and increments the 1885 * sequence number in the LVB. 1886 */ 1887void ocfs2_queue_orphan_scan(struct ocfs2_super *osb) 1888{ 1889 struct ocfs2_orphan_scan *os; 1890 int status, i; 1891 u32 seqno = 0; 1892 1893 os = &osb->osb_orphan_scan; 1894 1895 if (atomic_read(&os->os_state) == ORPHAN_SCAN_INACTIVE) 1896 goto out; 1897 1898 trace_ocfs2_queue_orphan_scan_begin(os->os_count, os->os_seqno, 1899 atomic_read(&os->os_state)); 1900 1901 status = ocfs2_orphan_scan_lock(osb, &seqno); 1902 if (status < 0) { 1903 if (status != -EAGAIN) 1904 mlog_errno(status); 1905 goto out; 1906 } 1907 1908 /* Do no queue the tasks if the volume is being umounted */ 1909 if (atomic_read(&os->os_state) == ORPHAN_SCAN_INACTIVE) 1910 goto unlock; 1911 1912 if (os->os_seqno != seqno) { 1913 os->os_seqno = seqno; 1914 goto unlock; 1915 } 1916 1917 for (i = 0; i < osb->max_slots; i++) 1918 ocfs2_queue_recovery_completion(osb->journal, i, NULL, NULL, 1919 NULL, ORPHAN_NO_NEED_TRUNCATE); 1920 /* 1921 * We queued a recovery on orphan slots, increment the sequence 1922 * number and update LVB so other node will skip the scan for a while 1923 */ 1924 seqno++; 1925 os->os_count++; 1926 os->os_scantime = CURRENT_TIME; 1927unlock: 1928 ocfs2_orphan_scan_unlock(osb, seqno); 1929out: 1930 trace_ocfs2_queue_orphan_scan_end(os->os_count, os->os_seqno, 1931 atomic_read(&os->os_state)); 1932 return; 1933} 1934 1935/* Worker task that gets fired every ORPHAN_SCAN_SCHEDULE_TIMEOUT millsec */ 1936void ocfs2_orphan_scan_work(struct work_struct *work) 1937{ 1938 struct ocfs2_orphan_scan *os; 1939 struct ocfs2_super *osb; 1940 1941 os = container_of(work, struct ocfs2_orphan_scan, 1942 os_orphan_scan_work.work); 1943 osb = os->os_osb; 1944 1945 mutex_lock(&os->os_lock); 1946 ocfs2_queue_orphan_scan(osb); 1947 if (atomic_read(&os->os_state) == ORPHAN_SCAN_ACTIVE) 1948 queue_delayed_work(ocfs2_wq, &os->os_orphan_scan_work, 1949 ocfs2_orphan_scan_timeout()); 1950 mutex_unlock(&os->os_lock); 1951} 1952 1953void ocfs2_orphan_scan_stop(struct ocfs2_super *osb) 1954{ 1955 struct ocfs2_orphan_scan *os; 1956 1957 os = &osb->osb_orphan_scan; 1958 if (atomic_read(&os->os_state) == ORPHAN_SCAN_ACTIVE) { 1959 atomic_set(&os->os_state, ORPHAN_SCAN_INACTIVE); 1960 mutex_lock(&os->os_lock); 1961 cancel_delayed_work(&os->os_orphan_scan_work); 1962 mutex_unlock(&os->os_lock); 1963 } 1964} 1965 1966void ocfs2_orphan_scan_init(struct ocfs2_super *osb) 1967{ 1968 struct ocfs2_orphan_scan *os; 1969 1970 os = &osb->osb_orphan_scan; 1971 os->os_osb = osb; 1972 os->os_count = 0; 1973 os->os_seqno = 0; 1974 mutex_init(&os->os_lock); 1975 INIT_DELAYED_WORK(&os->os_orphan_scan_work, ocfs2_orphan_scan_work); 1976} 1977 1978void ocfs2_orphan_scan_start(struct ocfs2_super *osb) 1979{ 1980 struct ocfs2_orphan_scan *os; 1981 1982 os = &osb->osb_orphan_scan; 1983 os->os_scantime = CURRENT_TIME; 1984 if (ocfs2_is_hard_readonly(osb) || ocfs2_mount_local(osb)) 1985 atomic_set(&os->os_state, ORPHAN_SCAN_INACTIVE); 1986 else { 1987 atomic_set(&os->os_state, ORPHAN_SCAN_ACTIVE); 1988 queue_delayed_work(ocfs2_wq, &os->os_orphan_scan_work, 1989 ocfs2_orphan_scan_timeout()); 1990 } 1991} 1992 1993struct ocfs2_orphan_filldir_priv { 1994 struct dir_context ctx; 1995 struct inode *head; 1996 struct ocfs2_super *osb; 1997}; 1998 1999static int ocfs2_orphan_filldir(struct dir_context *ctx, const char *name, 2000 int name_len, loff_t pos, u64 ino, 2001 unsigned type) 2002{ 2003 struct ocfs2_orphan_filldir_priv *p = 2004 container_of(ctx, struct ocfs2_orphan_filldir_priv, ctx); 2005 struct inode *iter; 2006 2007 if (name_len == 1 && !strncmp(".", name, 1)) 2008 return 0; 2009 if (name_len == 2 && !strncmp("..", name, 2)) 2010 return 0; 2011 2012 /* Skip bad inodes so that recovery can continue */ 2013 iter = ocfs2_iget(p->osb, ino, 2014 OCFS2_FI_FLAG_ORPHAN_RECOVERY, 0); 2015 if (IS_ERR(iter)) 2016 return 0; 2017 2018 /* Skip inodes which are already added to recover list, since dio may 2019 * happen concurrently with unlink/rename */ 2020 if (OCFS2_I(iter)->ip_next_orphan) { 2021 iput(iter); 2022 return 0; 2023 } 2024 2025 trace_ocfs2_orphan_filldir((unsigned long long)OCFS2_I(iter)->ip_blkno); 2026 /* No locking is required for the next_orphan queue as there 2027 * is only ever a single process doing orphan recovery. */ 2028 OCFS2_I(iter)->ip_next_orphan = p->head; 2029 p->head = iter; 2030 2031 return 0; 2032} 2033 2034static int ocfs2_queue_orphans(struct ocfs2_super *osb, 2035 int slot, 2036 struct inode **head) 2037{ 2038 int status; 2039 struct inode *orphan_dir_inode = NULL; 2040 struct ocfs2_orphan_filldir_priv priv = { 2041 .ctx.actor = ocfs2_orphan_filldir, 2042 .osb = osb, 2043 .head = *head 2044 }; 2045 2046 orphan_dir_inode = ocfs2_get_system_file_inode(osb, 2047 ORPHAN_DIR_SYSTEM_INODE, 2048 slot); 2049 if (!orphan_dir_inode) { 2050 status = -ENOENT; 2051 mlog_errno(status); 2052 return status; 2053 } 2054 2055 mutex_lock(&orphan_dir_inode->i_mutex); 2056 status = ocfs2_inode_lock(orphan_dir_inode, NULL, 0); 2057 if (status < 0) { 2058 mlog_errno(status); 2059 goto out; 2060 } 2061 2062 status = ocfs2_dir_foreach(orphan_dir_inode, &priv.ctx); 2063 if (status) { 2064 mlog_errno(status); 2065 goto out_cluster; 2066 } 2067 2068 *head = priv.head; 2069 2070out_cluster: 2071 ocfs2_inode_unlock(orphan_dir_inode, 0); 2072out: 2073 mutex_unlock(&orphan_dir_inode->i_mutex); 2074 iput(orphan_dir_inode); 2075 return status; 2076} 2077 2078static int ocfs2_orphan_recovery_can_continue(struct ocfs2_super *osb, 2079 int slot) 2080{ 2081 int ret; 2082 2083 spin_lock(&osb->osb_lock); 2084 ret = !osb->osb_orphan_wipes[slot]; 2085 spin_unlock(&osb->osb_lock); 2086 return ret; 2087} 2088 2089static void ocfs2_mark_recovering_orphan_dir(struct ocfs2_super *osb, 2090 int slot) 2091{ 2092 spin_lock(&osb->osb_lock); 2093 /* Mark ourselves such that new processes in delete_inode() 2094 * know to quit early. */ 2095 ocfs2_node_map_set_bit(osb, &osb->osb_recovering_orphan_dirs, slot); 2096 while (osb->osb_orphan_wipes[slot]) { 2097 /* If any processes are already in the middle of an 2098 * orphan wipe on this dir, then we need to wait for 2099 * them. */ 2100 spin_unlock(&osb->osb_lock); 2101 wait_event_interruptible(osb->osb_wipe_event, 2102 ocfs2_orphan_recovery_can_continue(osb, slot)); 2103 spin_lock(&osb->osb_lock); 2104 } 2105 spin_unlock(&osb->osb_lock); 2106} 2107 2108static void ocfs2_clear_recovering_orphan_dir(struct ocfs2_super *osb, 2109 int slot) 2110{ 2111 ocfs2_node_map_clear_bit(osb, &osb->osb_recovering_orphan_dirs, slot); 2112} 2113 2114/* 2115 * Orphan recovery. Each mounted node has it's own orphan dir which we 2116 * must run during recovery. Our strategy here is to build a list of 2117 * the inodes in the orphan dir and iget/iput them. The VFS does 2118 * (most) of the rest of the work. 2119 * 2120 * Orphan recovery can happen at any time, not just mount so we have a 2121 * couple of extra considerations. 2122 * 2123 * - We grab as many inodes as we can under the orphan dir lock - 2124 * doing iget() outside the orphan dir risks getting a reference on 2125 * an invalid inode. 2126 * - We must be sure not to deadlock with other processes on the 2127 * system wanting to run delete_inode(). This can happen when they go 2128 * to lock the orphan dir and the orphan recovery process attempts to 2129 * iget() inside the orphan dir lock. This can be avoided by 2130 * advertising our state to ocfs2_delete_inode(). 2131 */ 2132static int ocfs2_recover_orphans(struct ocfs2_super *osb, 2133 int slot, 2134 enum ocfs2_orphan_reco_type orphan_reco_type) 2135{ 2136 int ret = 0; 2137 struct inode *inode = NULL; 2138 struct inode *iter; 2139 struct ocfs2_inode_info *oi; 2140 2141 trace_ocfs2_recover_orphans(slot); 2142 2143 ocfs2_mark_recovering_orphan_dir(osb, slot); 2144 ret = ocfs2_queue_orphans(osb, slot, &inode); 2145 ocfs2_clear_recovering_orphan_dir(osb, slot); 2146 2147 /* Error here should be noted, but we want to continue with as 2148 * many queued inodes as we've got. */ 2149 if (ret) 2150 mlog_errno(ret); 2151 2152 while (inode) { 2153 oi = OCFS2_I(inode); 2154 trace_ocfs2_recover_orphans_iput( 2155 (unsigned long long)oi->ip_blkno); 2156 2157 iter = oi->ip_next_orphan; 2158 oi->ip_next_orphan = NULL; 2159 2160 /* 2161 * We need to take and drop the inode lock to 2162 * force read inode from disk. 2163 */ 2164 ret = ocfs2_inode_lock(inode, NULL, 0); 2165 if (ret) { 2166 mlog_errno(ret); 2167 goto next; 2168 } 2169 ocfs2_inode_unlock(inode, 0); 2170 2171 if (inode->i_nlink == 0) { 2172 spin_lock(&oi->ip_lock); 2173 /* Set the proper information to get us going into 2174 * ocfs2_delete_inode. */ 2175 oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED; 2176 spin_unlock(&oi->ip_lock); 2177 } else if (orphan_reco_type == ORPHAN_NEED_TRUNCATE) { 2178 struct buffer_head *di_bh = NULL; 2179 2180 ret = ocfs2_rw_lock(inode, 1); 2181 if (ret) { 2182 mlog_errno(ret); 2183 goto next; 2184 } 2185 2186 ret = ocfs2_inode_lock(inode, &di_bh, 1); 2187 if (ret < 0) { 2188 ocfs2_rw_unlock(inode, 1); 2189 mlog_errno(ret); 2190 goto next; 2191 } 2192 2193 ret = ocfs2_truncate_file(inode, di_bh, 2194 i_size_read(inode)); 2195 ocfs2_inode_unlock(inode, 1); 2196 ocfs2_rw_unlock(inode, 1); 2197 brelse(di_bh); 2198 if (ret < 0) { 2199 if (ret != -ENOSPC) 2200 mlog_errno(ret); 2201 goto next; 2202 } 2203 2204 ret = ocfs2_del_inode_from_orphan(osb, inode, 0, 0); 2205 if (ret) 2206 mlog_errno(ret); 2207 2208 wake_up(&OCFS2_I(inode)->append_dio_wq); 2209 } /* else if ORPHAN_NO_NEED_TRUNCATE, do nothing */ 2210 2211next: 2212 iput(inode); 2213 2214 inode = iter; 2215 } 2216 2217 return ret; 2218} 2219 2220static int __ocfs2_wait_on_mount(struct ocfs2_super *osb, int quota) 2221{ 2222 /* This check is good because ocfs2 will wait on our recovery 2223 * thread before changing it to something other than MOUNTED 2224 * or DISABLED. */ 2225 wait_event(osb->osb_mount_event, 2226 (!quota && atomic_read(&osb->vol_state) == VOLUME_MOUNTED) || 2227 atomic_read(&osb->vol_state) == VOLUME_MOUNTED_QUOTAS || 2228 atomic_read(&osb->vol_state) == VOLUME_DISABLED); 2229 2230 /* If there's an error on mount, then we may never get to the 2231 * MOUNTED flag, but this is set right before 2232 * dismount_volume() so we can trust it. */ 2233 if (atomic_read(&osb->vol_state) == VOLUME_DISABLED) { 2234 trace_ocfs2_wait_on_mount(VOLUME_DISABLED); 2235 mlog(0, "mount error, exiting!\n"); 2236 return -EBUSY; 2237 } 2238 2239 return 0; 2240} 2241 2242static int ocfs2_commit_thread(void *arg) 2243{ 2244 int status; 2245 struct ocfs2_super *osb = arg; 2246 struct ocfs2_journal *journal = osb->journal; 2247 2248 /* we can trust j_num_trans here because _should_stop() is only set in 2249 * shutdown and nobody other than ourselves should be able to start 2250 * transactions. committing on shutdown might take a few iterations 2251 * as final transactions put deleted inodes on the list */ 2252 while (!(kthread_should_stop() && 2253 atomic_read(&journal->j_num_trans) == 0)) { 2254 2255 wait_event_interruptible(osb->checkpoint_event, 2256 atomic_read(&journal->j_num_trans) 2257 || kthread_should_stop()); 2258 2259 status = ocfs2_commit_cache(osb); 2260 if (status < 0) { 2261 static unsigned long abort_warn_time; 2262 2263 /* Warn about this once per minute */ 2264 if (printk_timed_ratelimit(&abort_warn_time, 60*HZ)) 2265 mlog(ML_ERROR, "status = %d, journal is " 2266 "already aborted.\n", status); 2267 /* 2268 * After ocfs2_commit_cache() fails, j_num_trans has a 2269 * non-zero value. Sleep here to avoid a busy-wait 2270 * loop. 2271 */ 2272 msleep_interruptible(1000); 2273 } 2274 2275 if (kthread_should_stop() && atomic_read(&journal->j_num_trans)){ 2276 mlog(ML_KTHREAD, 2277 "commit_thread: %u transactions pending on " 2278 "shutdown\n", 2279 atomic_read(&journal->j_num_trans)); 2280 } 2281 } 2282 2283 return 0; 2284} 2285 2286/* Reads all the journal inodes without taking any cluster locks. Used 2287 * for hard readonly access to determine whether any journal requires 2288 * recovery. Also used to refresh the recovery generation numbers after 2289 * a journal has been recovered by another node. 2290 */ 2291int ocfs2_check_journals_nolocks(struct ocfs2_super *osb) 2292{ 2293 int ret = 0; 2294 unsigned int slot; 2295 struct buffer_head *di_bh = NULL; 2296 struct ocfs2_dinode *di; 2297 int journal_dirty = 0; 2298 2299 for(slot = 0; slot < osb->max_slots; slot++) { 2300 ret = ocfs2_read_journal_inode(osb, slot, &di_bh, NULL); 2301 if (ret) { 2302 mlog_errno(ret); 2303 goto out; 2304 } 2305 2306 di = (struct ocfs2_dinode *) di_bh->b_data; 2307 2308 osb->slot_recovery_generations[slot] = 2309 ocfs2_get_recovery_generation(di); 2310 2311 if (le32_to_cpu(di->id1.journal1.ij_flags) & 2312 OCFS2_JOURNAL_DIRTY_FL) 2313 journal_dirty = 1; 2314 2315 brelse(di_bh); 2316 di_bh = NULL; 2317 } 2318 2319out: 2320 if (journal_dirty) 2321 ret = -EROFS; 2322 return ret; 2323} 2324