root/drivers/md/bcache/journal.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. journal_read_endio
  2. journal_read_bucket
  3. bch_journal_read
  4. bch_journal_mark
  5. is_discard_enabled
  6. bch_journal_replay
  7. btree_flush_write
  8. journal_discard_endio
  9. journal_discard_work
  10. do_journal_discard
  11. journal_reclaim
  12. bch_journal_next
  13. journal_write_endio
  14. journal_write_done
  15. journal_write_unlock
  16. journal_write_unlocked
  17. journal_write
  18. journal_try_write
  19. journal_wait_for_write
  20. journal_write_work
  21. bch_journal
  22. bch_journal_meta
  23. bch_journal_free
  24. bch_journal_alloc

   1 // SPDX-License-Identifier: GPL-2.0
   2 /*
   3  * bcache journalling code, for btree insertions
   4  *
   5  * Copyright 2012 Google, Inc.
   6  */
   7 
   8 #include "bcache.h"
   9 #include "btree.h"
  10 #include "debug.h"
  11 #include "extents.h"
  12 
  13 #include <trace/events/bcache.h>
  14 
  15 /*
  16  * Journal replay/recovery:
  17  *
  18  * This code is all driven from run_cache_set(); we first read the journal
  19  * entries, do some other stuff, then we mark all the keys in the journal
  20  * entries (same as garbage collection would), then we replay them - reinserting
  21  * them into the cache in precisely the same order as they appear in the
  22  * journal.
  23  *
  24  * We only journal keys that go in leaf nodes, which simplifies things quite a
  25  * bit.
  26  */
  27 
  28 static void journal_read_endio(struct bio *bio)
  29 {
  30         struct closure *cl = bio->bi_private;
  31 
  32         closure_put(cl);
  33 }
  34 
  35 static int journal_read_bucket(struct cache *ca, struct list_head *list,
  36                                unsigned int bucket_index)
  37 {
  38         struct journal_device *ja = &ca->journal;
  39         struct bio *bio = &ja->bio;
  40 
  41         struct journal_replay *i;
  42         struct jset *j, *data = ca->set->journal.w[0].data;
  43         struct closure cl;
  44         unsigned int len, left, offset = 0;
  45         int ret = 0;
  46         sector_t bucket = bucket_to_sector(ca->set, ca->sb.d[bucket_index]);
  47 
  48         closure_init_stack(&cl);
  49 
  50         pr_debug("reading %u", bucket_index);
  51 
  52         while (offset < ca->sb.bucket_size) {
  53 reread:         left = ca->sb.bucket_size - offset;
  54                 len = min_t(unsigned int, left, PAGE_SECTORS << JSET_BITS);
  55 
  56                 bio_reset(bio);
  57                 bio->bi_iter.bi_sector  = bucket + offset;
  58                 bio_set_dev(bio, ca->bdev);
  59                 bio->bi_iter.bi_size    = len << 9;
  60 
  61                 bio->bi_end_io  = journal_read_endio;
  62                 bio->bi_private = &cl;
  63                 bio_set_op_attrs(bio, REQ_OP_READ, 0);
  64                 bch_bio_map(bio, data);
  65 
  66                 closure_bio_submit(ca->set, bio, &cl);
  67                 closure_sync(&cl);
  68 
  69                 /* This function could be simpler now since we no longer write
  70                  * journal entries that overlap bucket boundaries; this means
  71                  * the start of a bucket will always have a valid journal entry
  72                  * if it has any journal entries at all.
  73                  */
  74 
  75                 j = data;
  76                 while (len) {
  77                         struct list_head *where;
  78                         size_t blocks, bytes = set_bytes(j);
  79 
  80                         if (j->magic != jset_magic(&ca->sb)) {
  81                                 pr_debug("%u: bad magic", bucket_index);
  82                                 return ret;
  83                         }
  84 
  85                         if (bytes > left << 9 ||
  86                             bytes > PAGE_SIZE << JSET_BITS) {
  87                                 pr_info("%u: too big, %zu bytes, offset %u",
  88                                         bucket_index, bytes, offset);
  89                                 return ret;
  90                         }
  91 
  92                         if (bytes > len << 9)
  93                                 goto reread;
  94 
  95                         if (j->csum != csum_set(j)) {
  96                                 pr_info("%u: bad csum, %zu bytes, offset %u",
  97                                         bucket_index, bytes, offset);
  98                                 return ret;
  99                         }
 100 
 101                         blocks = set_blocks(j, block_bytes(ca->set));
 102 
 103                         /*
 104                          * Nodes in 'list' are in linear increasing order of
 105                          * i->j.seq, the node on head has the smallest (oldest)
 106                          * journal seq, the node on tail has the biggest
 107                          * (latest) journal seq.
 108                          */
 109 
 110                         /*
 111                          * Check from the oldest jset for last_seq. If
 112                          * i->j.seq < j->last_seq, it means the oldest jset
 113                          * in list is expired and useless, remove it from
 114                          * this list. Otherwise, j is a condidate jset for
 115                          * further following checks.
 116                          */
 117                         while (!list_empty(list)) {
 118                                 i = list_first_entry(list,
 119                                         struct journal_replay, list);
 120                                 if (i->j.seq >= j->last_seq)
 121                                         break;
 122                                 list_del(&i->list);
 123                                 kfree(i);
 124                         }
 125 
 126                         /* iterate list in reverse order (from latest jset) */
 127                         list_for_each_entry_reverse(i, list, list) {
 128                                 if (j->seq == i->j.seq)
 129                                         goto next_set;
 130 
 131                                 /*
 132                                  * if j->seq is less than any i->j.last_seq
 133                                  * in list, j is an expired and useless jset.
 134                                  */
 135                                 if (j->seq < i->j.last_seq)
 136                                         goto next_set;
 137 
 138                                 /*
 139                                  * 'where' points to first jset in list which
 140                                  * is elder then j.
 141                                  */
 142                                 if (j->seq > i->j.seq) {
 143                                         where = &i->list;
 144                                         goto add;
 145                                 }
 146                         }
 147 
 148                         where = list;
 149 add:
 150                         i = kmalloc(offsetof(struct journal_replay, j) +
 151                                     bytes, GFP_KERNEL);
 152                         if (!i)
 153                                 return -ENOMEM;
 154                         memcpy(&i->j, j, bytes);
 155                         /* Add to the location after 'where' points to */
 156                         list_add(&i->list, where);
 157                         ret = 1;
 158 
 159                         if (j->seq > ja->seq[bucket_index])
 160                                 ja->seq[bucket_index] = j->seq;
 161 next_set:
 162                         offset  += blocks * ca->sb.block_size;
 163                         len     -= blocks * ca->sb.block_size;
 164                         j = ((void *) j) + blocks * block_bytes(ca);
 165                 }
 166         }
 167 
 168         return ret;
 169 }
 170 
 171 int bch_journal_read(struct cache_set *c, struct list_head *list)
 172 {
 173 #define read_bucket(b)                                                  \
 174         ({                                                              \
 175                 ret = journal_read_bucket(ca, list, b);                 \
 176                 __set_bit(b, bitmap);                                   \
 177                 if (ret < 0)                                            \
 178                         return ret;                                     \
 179                 ret;                                                    \
 180         })
 181 
 182         struct cache *ca;
 183         unsigned int iter;
 184         int ret = 0;
 185 
 186         for_each_cache(ca, c, iter) {
 187                 struct journal_device *ja = &ca->journal;
 188                 DECLARE_BITMAP(bitmap, SB_JOURNAL_BUCKETS);
 189                 unsigned int i, l, r, m;
 190                 uint64_t seq;
 191 
 192                 bitmap_zero(bitmap, SB_JOURNAL_BUCKETS);
 193                 pr_debug("%u journal buckets", ca->sb.njournal_buckets);
 194 
 195                 /*
 196                  * Read journal buckets ordered by golden ratio hash to quickly
 197                  * find a sequence of buckets with valid journal entries
 198                  */
 199                 for (i = 0; i < ca->sb.njournal_buckets; i++) {
 200                         /*
 201                          * We must try the index l with ZERO first for
 202                          * correctness due to the scenario that the journal
 203                          * bucket is circular buffer which might have wrapped
 204                          */
 205                         l = (i * 2654435769U) % ca->sb.njournal_buckets;
 206 
 207                         if (test_bit(l, bitmap))
 208                                 break;
 209 
 210                         if (read_bucket(l))
 211                                 goto bsearch;
 212                 }
 213 
 214                 /*
 215                  * If that fails, check all the buckets we haven't checked
 216                  * already
 217                  */
 218                 pr_debug("falling back to linear search");
 219 
 220                 for (l = find_first_zero_bit(bitmap, ca->sb.njournal_buckets);
 221                      l < ca->sb.njournal_buckets;
 222                      l = find_next_zero_bit(bitmap, ca->sb.njournal_buckets,
 223                                             l + 1))
 224                         if (read_bucket(l))
 225                                 goto bsearch;
 226 
 227                 /* no journal entries on this device? */
 228                 if (l == ca->sb.njournal_buckets)
 229                         continue;
 230 bsearch:
 231                 BUG_ON(list_empty(list));
 232 
 233                 /* Binary search */
 234                 m = l;
 235                 r = find_next_bit(bitmap, ca->sb.njournal_buckets, l + 1);
 236                 pr_debug("starting binary search, l %u r %u", l, r);
 237 
 238                 while (l + 1 < r) {
 239                         seq = list_entry(list->prev, struct journal_replay,
 240                                          list)->j.seq;
 241 
 242                         m = (l + r) >> 1;
 243                         read_bucket(m);
 244 
 245                         if (seq != list_entry(list->prev, struct journal_replay,
 246                                               list)->j.seq)
 247                                 l = m;
 248                         else
 249                                 r = m;
 250                 }
 251 
 252                 /*
 253                  * Read buckets in reverse order until we stop finding more
 254                  * journal entries
 255                  */
 256                 pr_debug("finishing up: m %u njournal_buckets %u",
 257                          m, ca->sb.njournal_buckets);
 258                 l = m;
 259 
 260                 while (1) {
 261                         if (!l--)
 262                                 l = ca->sb.njournal_buckets - 1;
 263 
 264                         if (l == m)
 265                                 break;
 266 
 267                         if (test_bit(l, bitmap))
 268                                 continue;
 269 
 270                         if (!read_bucket(l))
 271                                 break;
 272                 }
 273 
 274                 seq = 0;
 275 
 276                 for (i = 0; i < ca->sb.njournal_buckets; i++)
 277                         if (ja->seq[i] > seq) {
 278                                 seq = ja->seq[i];
 279                                 /*
 280                                  * When journal_reclaim() goes to allocate for
 281                                  * the first time, it'll use the bucket after
 282                                  * ja->cur_idx
 283                                  */
 284                                 ja->cur_idx = i;
 285                                 ja->last_idx = ja->discard_idx = (i + 1) %
 286                                         ca->sb.njournal_buckets;
 287 
 288                         }
 289         }
 290 
 291         if (!list_empty(list))
 292                 c->journal.seq = list_entry(list->prev,
 293                                             struct journal_replay,
 294                                             list)->j.seq;
 295 
 296         return 0;
 297 #undef read_bucket
 298 }
 299 
 300 void bch_journal_mark(struct cache_set *c, struct list_head *list)
 301 {
 302         atomic_t p = { 0 };
 303         struct bkey *k;
 304         struct journal_replay *i;
 305         struct journal *j = &c->journal;
 306         uint64_t last = j->seq;
 307 
 308         /*
 309          * journal.pin should never fill up - we never write a journal
 310          * entry when it would fill up. But if for some reason it does, we
 311          * iterate over the list in reverse order so that we can just skip that
 312          * refcount instead of bugging.
 313          */
 314 
 315         list_for_each_entry_reverse(i, list, list) {
 316                 BUG_ON(last < i->j.seq);
 317                 i->pin = NULL;
 318 
 319                 while (last-- != i->j.seq)
 320                         if (fifo_free(&j->pin) > 1) {
 321                                 fifo_push_front(&j->pin, p);
 322                                 atomic_set(&fifo_front(&j->pin), 0);
 323                         }
 324 
 325                 if (fifo_free(&j->pin) > 1) {
 326                         fifo_push_front(&j->pin, p);
 327                         i->pin = &fifo_front(&j->pin);
 328                         atomic_set(i->pin, 1);
 329                 }
 330 
 331                 for (k = i->j.start;
 332                      k < bset_bkey_last(&i->j);
 333                      k = bkey_next(k))
 334                         if (!__bch_extent_invalid(c, k)) {
 335                                 unsigned int j;
 336 
 337                                 for (j = 0; j < KEY_PTRS(k); j++)
 338                                         if (ptr_available(c, k, j))
 339                                                 atomic_inc(&PTR_BUCKET(c, k, j)->pin);
 340 
 341                                 bch_initial_mark_key(c, 0, k);
 342                         }
 343         }
 344 }
 345 
 346 static bool is_discard_enabled(struct cache_set *s)
 347 {
 348         struct cache *ca;
 349         unsigned int i;
 350 
 351         for_each_cache(ca, s, i)
 352                 if (ca->discard)
 353                         return true;
 354 
 355         return false;
 356 }
 357 
 358 int bch_journal_replay(struct cache_set *s, struct list_head *list)
 359 {
 360         int ret = 0, keys = 0, entries = 0;
 361         struct bkey *k;
 362         struct journal_replay *i =
 363                 list_entry(list->prev, struct journal_replay, list);
 364 
 365         uint64_t start = i->j.last_seq, end = i->j.seq, n = start;
 366         struct keylist keylist;
 367 
 368         list_for_each_entry(i, list, list) {
 369                 BUG_ON(i->pin && atomic_read(i->pin) != 1);
 370 
 371                 if (n != i->j.seq) {
 372                         if (n == start && is_discard_enabled(s))
 373                                 pr_info("bcache: journal entries %llu-%llu may be discarded! (replaying %llu-%llu)",
 374                                         n, i->j.seq - 1, start, end);
 375                         else {
 376                                 pr_err("bcache: journal entries %llu-%llu missing! (replaying %llu-%llu)",
 377                                         n, i->j.seq - 1, start, end);
 378                                 ret = -EIO;
 379                                 goto err;
 380                         }
 381                 }
 382 
 383                 for (k = i->j.start;
 384                      k < bset_bkey_last(&i->j);
 385                      k = bkey_next(k)) {
 386                         trace_bcache_journal_replay_key(k);
 387 
 388                         bch_keylist_init_single(&keylist, k);
 389 
 390                         ret = bch_btree_insert(s, &keylist, i->pin, NULL);
 391                         if (ret)
 392                                 goto err;
 393 
 394                         BUG_ON(!bch_keylist_empty(&keylist));
 395                         keys++;
 396 
 397                         cond_resched();
 398                 }
 399 
 400                 if (i->pin)
 401                         atomic_dec(i->pin);
 402                 n = i->j.seq + 1;
 403                 entries++;
 404         }
 405 
 406         pr_info("journal replay done, %i keys in %i entries, seq %llu",
 407                 keys, entries, end);
 408 err:
 409         while (!list_empty(list)) {
 410                 i = list_first_entry(list, struct journal_replay, list);
 411                 list_del(&i->list);
 412                 kfree(i);
 413         }
 414 
 415         return ret;
 416 }
 417 
 418 /* Journalling */
 419 
 420 #define nr_to_fifo_front(p, front_p, mask)      (((p) - (front_p)) & (mask))
 421 
 422 static void btree_flush_write(struct cache_set *c)
 423 {
 424         struct btree *b, *t, *btree_nodes[BTREE_FLUSH_NR];
 425         unsigned int i, nr;
 426         int ref_nr;
 427         atomic_t *fifo_front_p, *now_fifo_front_p;
 428         size_t mask;
 429 
 430         if (c->journal.btree_flushing)
 431                 return;
 432 
 433         spin_lock(&c->journal.flush_write_lock);
 434         if (c->journal.btree_flushing) {
 435                 spin_unlock(&c->journal.flush_write_lock);
 436                 return;
 437         }
 438         c->journal.btree_flushing = true;
 439         spin_unlock(&c->journal.flush_write_lock);
 440 
 441         /* get the oldest journal entry and check its refcount */
 442         spin_lock(&c->journal.lock);
 443         fifo_front_p = &fifo_front(&c->journal.pin);
 444         ref_nr = atomic_read(fifo_front_p);
 445         if (ref_nr <= 0) {
 446                 /*
 447                  * do nothing if no btree node references
 448                  * the oldest journal entry
 449                  */
 450                 spin_unlock(&c->journal.lock);
 451                 goto out;
 452         }
 453         spin_unlock(&c->journal.lock);
 454 
 455         mask = c->journal.pin.mask;
 456         nr = 0;
 457         atomic_long_inc(&c->flush_write);
 458         memset(btree_nodes, 0, sizeof(btree_nodes));
 459 
 460         mutex_lock(&c->bucket_lock);
 461         list_for_each_entry_safe_reverse(b, t, &c->btree_cache, list) {
 462                 /*
 463                  * It is safe to get now_fifo_front_p without holding
 464                  * c->journal.lock here, because we don't need to know
 465                  * the exactly accurate value, just check whether the
 466                  * front pointer of c->journal.pin is changed.
 467                  */
 468                 now_fifo_front_p = &fifo_front(&c->journal.pin);
 469                 /*
 470                  * If the oldest journal entry is reclaimed and front
 471                  * pointer of c->journal.pin changes, it is unnecessary
 472                  * to scan c->btree_cache anymore, just quit the loop and
 473                  * flush out what we have already.
 474                  */
 475                 if (now_fifo_front_p != fifo_front_p)
 476                         break;
 477                 /*
 478                  * quit this loop if all matching btree nodes are
 479                  * scanned and record in btree_nodes[] already.
 480                  */
 481                 ref_nr = atomic_read(fifo_front_p);
 482                 if (nr >= ref_nr)
 483                         break;
 484 
 485                 if (btree_node_journal_flush(b))
 486                         pr_err("BUG: flush_write bit should not be set here!");
 487 
 488                 mutex_lock(&b->write_lock);
 489 
 490                 if (!btree_node_dirty(b)) {
 491                         mutex_unlock(&b->write_lock);
 492                         continue;
 493                 }
 494 
 495                 if (!btree_current_write(b)->journal) {
 496                         mutex_unlock(&b->write_lock);
 497                         continue;
 498                 }
 499 
 500                 /*
 501                  * Only select the btree node which exactly references
 502                  * the oldest journal entry.
 503                  *
 504                  * If the journal entry pointed by fifo_front_p is
 505                  * reclaimed in parallel, don't worry:
 506                  * - the list_for_each_xxx loop will quit when checking
 507                  *   next now_fifo_front_p.
 508                  * - If there are matched nodes recorded in btree_nodes[],
 509                  *   they are clean now (this is why and how the oldest
 510                  *   journal entry can be reclaimed). These selected nodes
 511                  *   will be ignored and skipped in the folowing for-loop.
 512                  */
 513                 if (nr_to_fifo_front(btree_current_write(b)->journal,
 514                                      fifo_front_p,
 515                                      mask) != 0) {
 516                         mutex_unlock(&b->write_lock);
 517                         continue;
 518                 }
 519 
 520                 set_btree_node_journal_flush(b);
 521 
 522                 mutex_unlock(&b->write_lock);
 523 
 524                 btree_nodes[nr++] = b;
 525                 /*
 526                  * To avoid holding c->bucket_lock too long time,
 527                  * only scan for BTREE_FLUSH_NR matched btree nodes
 528                  * at most. If there are more btree nodes reference
 529                  * the oldest journal entry, try to flush them next
 530                  * time when btree_flush_write() is called.
 531                  */
 532                 if (nr == BTREE_FLUSH_NR)
 533                         break;
 534         }
 535         mutex_unlock(&c->bucket_lock);
 536 
 537         for (i = 0; i < nr; i++) {
 538                 b = btree_nodes[i];
 539                 if (!b) {
 540                         pr_err("BUG: btree_nodes[%d] is NULL", i);
 541                         continue;
 542                 }
 543 
 544                 /* safe to check without holding b->write_lock */
 545                 if (!btree_node_journal_flush(b)) {
 546                         pr_err("BUG: bnode %p: journal_flush bit cleaned", b);
 547                         continue;
 548                 }
 549 
 550                 mutex_lock(&b->write_lock);
 551                 if (!btree_current_write(b)->journal) {
 552                         clear_bit(BTREE_NODE_journal_flush, &b->flags);
 553                         mutex_unlock(&b->write_lock);
 554                         pr_debug("bnode %p: written by others", b);
 555                         continue;
 556                 }
 557 
 558                 if (!btree_node_dirty(b)) {
 559                         clear_bit(BTREE_NODE_journal_flush, &b->flags);
 560                         mutex_unlock(&b->write_lock);
 561                         pr_debug("bnode %p: dirty bit cleaned by others", b);
 562                         continue;
 563                 }
 564 
 565                 __bch_btree_node_write(b, NULL);
 566                 clear_bit(BTREE_NODE_journal_flush, &b->flags);
 567                 mutex_unlock(&b->write_lock);
 568         }
 569 
 570 out:
 571         spin_lock(&c->journal.flush_write_lock);
 572         c->journal.btree_flushing = false;
 573         spin_unlock(&c->journal.flush_write_lock);
 574 }
 575 
 576 #define last_seq(j)     ((j)->seq - fifo_used(&(j)->pin) + 1)
 577 
 578 static void journal_discard_endio(struct bio *bio)
 579 {
 580         struct journal_device *ja =
 581                 container_of(bio, struct journal_device, discard_bio);
 582         struct cache *ca = container_of(ja, struct cache, journal);
 583 
 584         atomic_set(&ja->discard_in_flight, DISCARD_DONE);
 585 
 586         closure_wake_up(&ca->set->journal.wait);
 587         closure_put(&ca->set->cl);
 588 }
 589 
 590 static void journal_discard_work(struct work_struct *work)
 591 {
 592         struct journal_device *ja =
 593                 container_of(work, struct journal_device, discard_work);
 594 
 595         submit_bio(&ja->discard_bio);
 596 }
 597 
 598 static void do_journal_discard(struct cache *ca)
 599 {
 600         struct journal_device *ja = &ca->journal;
 601         struct bio *bio = &ja->discard_bio;
 602 
 603         if (!ca->discard) {
 604                 ja->discard_idx = ja->last_idx;
 605                 return;
 606         }
 607 
 608         switch (atomic_read(&ja->discard_in_flight)) {
 609         case DISCARD_IN_FLIGHT:
 610                 return;
 611 
 612         case DISCARD_DONE:
 613                 ja->discard_idx = (ja->discard_idx + 1) %
 614                         ca->sb.njournal_buckets;
 615 
 616                 atomic_set(&ja->discard_in_flight, DISCARD_READY);
 617                 /* fallthrough */
 618 
 619         case DISCARD_READY:
 620                 if (ja->discard_idx == ja->last_idx)
 621                         return;
 622 
 623                 atomic_set(&ja->discard_in_flight, DISCARD_IN_FLIGHT);
 624 
 625                 bio_init(bio, bio->bi_inline_vecs, 1);
 626                 bio_set_op_attrs(bio, REQ_OP_DISCARD, 0);
 627                 bio->bi_iter.bi_sector  = bucket_to_sector(ca->set,
 628                                                 ca->sb.d[ja->discard_idx]);
 629                 bio_set_dev(bio, ca->bdev);
 630                 bio->bi_iter.bi_size    = bucket_bytes(ca);
 631                 bio->bi_end_io          = journal_discard_endio;
 632 
 633                 closure_get(&ca->set->cl);
 634                 INIT_WORK(&ja->discard_work, journal_discard_work);
 635                 queue_work(bch_journal_wq, &ja->discard_work);
 636         }
 637 }
 638 
 639 static void journal_reclaim(struct cache_set *c)
 640 {
 641         struct bkey *k = &c->journal.key;
 642         struct cache *ca;
 643         uint64_t last_seq;
 644         unsigned int iter, n = 0;
 645         atomic_t p __maybe_unused;
 646 
 647         atomic_long_inc(&c->reclaim);
 648 
 649         while (!atomic_read(&fifo_front(&c->journal.pin)))
 650                 fifo_pop(&c->journal.pin, p);
 651 
 652         last_seq = last_seq(&c->journal);
 653 
 654         /* Update last_idx */
 655 
 656         for_each_cache(ca, c, iter) {
 657                 struct journal_device *ja = &ca->journal;
 658 
 659                 while (ja->last_idx != ja->cur_idx &&
 660                        ja->seq[ja->last_idx] < last_seq)
 661                         ja->last_idx = (ja->last_idx + 1) %
 662                                 ca->sb.njournal_buckets;
 663         }
 664 
 665         for_each_cache(ca, c, iter)
 666                 do_journal_discard(ca);
 667 
 668         if (c->journal.blocks_free)
 669                 goto out;
 670 
 671         /*
 672          * Allocate:
 673          * XXX: Sort by free journal space
 674          */
 675 
 676         for_each_cache(ca, c, iter) {
 677                 struct journal_device *ja = &ca->journal;
 678                 unsigned int next = (ja->cur_idx + 1) % ca->sb.njournal_buckets;
 679 
 680                 /* No space available on this device */
 681                 if (next == ja->discard_idx)
 682                         continue;
 683 
 684                 ja->cur_idx = next;
 685                 k->ptr[n++] = MAKE_PTR(0,
 686                                   bucket_to_sector(c, ca->sb.d[ja->cur_idx]),
 687                                   ca->sb.nr_this_dev);
 688                 atomic_long_inc(&c->reclaimed_journal_buckets);
 689         }
 690 
 691         if (n) {
 692                 bkey_init(k);
 693                 SET_KEY_PTRS(k, n);
 694                 c->journal.blocks_free = c->sb.bucket_size >> c->block_bits;
 695         }
 696 out:
 697         if (!journal_full(&c->journal))
 698                 __closure_wake_up(&c->journal.wait);
 699 }
 700 
 701 void bch_journal_next(struct journal *j)
 702 {
 703         atomic_t p = { 1 };
 704 
 705         j->cur = (j->cur == j->w)
 706                 ? &j->w[1]
 707                 : &j->w[0];
 708 
 709         /*
 710          * The fifo_push() needs to happen at the same time as j->seq is
 711          * incremented for last_seq() to be calculated correctly
 712          */
 713         BUG_ON(!fifo_push(&j->pin, p));
 714         atomic_set(&fifo_back(&j->pin), 1);
 715 
 716         j->cur->data->seq       = ++j->seq;
 717         j->cur->dirty           = false;
 718         j->cur->need_write      = false;
 719         j->cur->data->keys      = 0;
 720 
 721         if (fifo_full(&j->pin))
 722                 pr_debug("journal_pin full (%zu)", fifo_used(&j->pin));
 723 }
 724 
 725 static void journal_write_endio(struct bio *bio)
 726 {
 727         struct journal_write *w = bio->bi_private;
 728 
 729         cache_set_err_on(bio->bi_status, w->c, "journal io error");
 730         closure_put(&w->c->journal.io);
 731 }
 732 
 733 static void journal_write(struct closure *cl);
 734 
 735 static void journal_write_done(struct closure *cl)
 736 {
 737         struct journal *j = container_of(cl, struct journal, io);
 738         struct journal_write *w = (j->cur == j->w)
 739                 ? &j->w[1]
 740                 : &j->w[0];
 741 
 742         __closure_wake_up(&w->wait);
 743         continue_at_nobarrier(cl, journal_write, bch_journal_wq);
 744 }
 745 
 746 static void journal_write_unlock(struct closure *cl)
 747         __releases(&c->journal.lock)
 748 {
 749         struct cache_set *c = container_of(cl, struct cache_set, journal.io);
 750 
 751         c->journal.io_in_flight = 0;
 752         spin_unlock(&c->journal.lock);
 753 }
 754 
 755 static void journal_write_unlocked(struct closure *cl)
 756         __releases(c->journal.lock)
 757 {
 758         struct cache_set *c = container_of(cl, struct cache_set, journal.io);
 759         struct cache *ca;
 760         struct journal_write *w = c->journal.cur;
 761         struct bkey *k = &c->journal.key;
 762         unsigned int i, sectors = set_blocks(w->data, block_bytes(c)) *
 763                 c->sb.block_size;
 764 
 765         struct bio *bio;
 766         struct bio_list list;
 767 
 768         bio_list_init(&list);
 769 
 770         if (!w->need_write) {
 771                 closure_return_with_destructor(cl, journal_write_unlock);
 772                 return;
 773         } else if (journal_full(&c->journal)) {
 774                 journal_reclaim(c);
 775                 spin_unlock(&c->journal.lock);
 776 
 777                 btree_flush_write(c);
 778                 continue_at(cl, journal_write, bch_journal_wq);
 779                 return;
 780         }
 781 
 782         c->journal.blocks_free -= set_blocks(w->data, block_bytes(c));
 783 
 784         w->data->btree_level = c->root->level;
 785 
 786         bkey_copy(&w->data->btree_root, &c->root->key);
 787         bkey_copy(&w->data->uuid_bucket, &c->uuid_bucket);
 788 
 789         for_each_cache(ca, c, i)
 790                 w->data->prio_bucket[ca->sb.nr_this_dev] = ca->prio_buckets[0];
 791 
 792         w->data->magic          = jset_magic(&c->sb);
 793         w->data->version        = BCACHE_JSET_VERSION;
 794         w->data->last_seq       = last_seq(&c->journal);
 795         w->data->csum           = csum_set(w->data);
 796 
 797         for (i = 0; i < KEY_PTRS(k); i++) {
 798                 ca = PTR_CACHE(c, k, i);
 799                 bio = &ca->journal.bio;
 800 
 801                 atomic_long_add(sectors, &ca->meta_sectors_written);
 802 
 803                 bio_reset(bio);
 804                 bio->bi_iter.bi_sector  = PTR_OFFSET(k, i);
 805                 bio_set_dev(bio, ca->bdev);
 806                 bio->bi_iter.bi_size = sectors << 9;
 807 
 808                 bio->bi_end_io  = journal_write_endio;
 809                 bio->bi_private = w;
 810                 bio_set_op_attrs(bio, REQ_OP_WRITE,
 811                                  REQ_SYNC|REQ_META|REQ_PREFLUSH|REQ_FUA);
 812                 bch_bio_map(bio, w->data);
 813 
 814                 trace_bcache_journal_write(bio, w->data->keys);
 815                 bio_list_add(&list, bio);
 816 
 817                 SET_PTR_OFFSET(k, i, PTR_OFFSET(k, i) + sectors);
 818 
 819                 ca->journal.seq[ca->journal.cur_idx] = w->data->seq;
 820         }
 821 
 822         /* If KEY_PTRS(k) == 0, this jset gets lost in air */
 823         BUG_ON(i == 0);
 824 
 825         atomic_dec_bug(&fifo_back(&c->journal.pin));
 826         bch_journal_next(&c->journal);
 827         journal_reclaim(c);
 828 
 829         spin_unlock(&c->journal.lock);
 830 
 831         while ((bio = bio_list_pop(&list)))
 832                 closure_bio_submit(c, bio, cl);
 833 
 834         continue_at(cl, journal_write_done, NULL);
 835 }
 836 
 837 static void journal_write(struct closure *cl)
 838 {
 839         struct cache_set *c = container_of(cl, struct cache_set, journal.io);
 840 
 841         spin_lock(&c->journal.lock);
 842         journal_write_unlocked(cl);
 843 }
 844 
 845 static void journal_try_write(struct cache_set *c)
 846         __releases(c->journal.lock)
 847 {
 848         struct closure *cl = &c->journal.io;
 849         struct journal_write *w = c->journal.cur;
 850 
 851         w->need_write = true;
 852 
 853         if (!c->journal.io_in_flight) {
 854                 c->journal.io_in_flight = 1;
 855                 closure_call(cl, journal_write_unlocked, NULL, &c->cl);
 856         } else {
 857                 spin_unlock(&c->journal.lock);
 858         }
 859 }
 860 
 861 static struct journal_write *journal_wait_for_write(struct cache_set *c,
 862                                                     unsigned int nkeys)
 863         __acquires(&c->journal.lock)
 864 {
 865         size_t sectors;
 866         struct closure cl;
 867         bool wait = false;
 868 
 869         closure_init_stack(&cl);
 870 
 871         spin_lock(&c->journal.lock);
 872 
 873         while (1) {
 874                 struct journal_write *w = c->journal.cur;
 875 
 876                 sectors = __set_blocks(w->data, w->data->keys + nkeys,
 877                                        block_bytes(c)) * c->sb.block_size;
 878 
 879                 if (sectors <= min_t(size_t,
 880                                      c->journal.blocks_free * c->sb.block_size,
 881                                      PAGE_SECTORS << JSET_BITS))
 882                         return w;
 883 
 884                 if (wait)
 885                         closure_wait(&c->journal.wait, &cl);
 886 
 887                 if (!journal_full(&c->journal)) {
 888                         if (wait)
 889                                 trace_bcache_journal_entry_full(c);
 890 
 891                         /*
 892                          * XXX: If we were inserting so many keys that they
 893                          * won't fit in an _empty_ journal write, we'll
 894                          * deadlock. For now, handle this in
 895                          * bch_keylist_realloc() - but something to think about.
 896                          */
 897                         BUG_ON(!w->data->keys);
 898 
 899                         journal_try_write(c); /* unlocks */
 900                 } else {
 901                         if (wait)
 902                                 trace_bcache_journal_full(c);
 903 
 904                         journal_reclaim(c);
 905                         spin_unlock(&c->journal.lock);
 906 
 907                         btree_flush_write(c);
 908                 }
 909 
 910                 closure_sync(&cl);
 911                 spin_lock(&c->journal.lock);
 912                 wait = true;
 913         }
 914 }
 915 
 916 static void journal_write_work(struct work_struct *work)
 917 {
 918         struct cache_set *c = container_of(to_delayed_work(work),
 919                                            struct cache_set,
 920                                            journal.work);
 921         spin_lock(&c->journal.lock);
 922         if (c->journal.cur->dirty)
 923                 journal_try_write(c);
 924         else
 925                 spin_unlock(&c->journal.lock);
 926 }
 927 
 928 /*
 929  * Entry point to the journalling code - bio_insert() and btree_invalidate()
 930  * pass bch_journal() a list of keys to be journalled, and then
 931  * bch_journal() hands those same keys off to btree_insert_async()
 932  */
 933 
 934 atomic_t *bch_journal(struct cache_set *c,
 935                       struct keylist *keys,
 936                       struct closure *parent)
 937 {
 938         struct journal_write *w;
 939         atomic_t *ret;
 940 
 941         /* No journaling if CACHE_SET_IO_DISABLE set already */
 942         if (unlikely(test_bit(CACHE_SET_IO_DISABLE, &c->flags)))
 943                 return NULL;
 944 
 945         if (!CACHE_SYNC(&c->sb))
 946                 return NULL;
 947 
 948         w = journal_wait_for_write(c, bch_keylist_nkeys(keys));
 949 
 950         memcpy(bset_bkey_last(w->data), keys->keys, bch_keylist_bytes(keys));
 951         w->data->keys += bch_keylist_nkeys(keys);
 952 
 953         ret = &fifo_back(&c->journal.pin);
 954         atomic_inc(ret);
 955 
 956         if (parent) {
 957                 closure_wait(&w->wait, parent);
 958                 journal_try_write(c);
 959         } else if (!w->dirty) {
 960                 w->dirty = true;
 961                 schedule_delayed_work(&c->journal.work,
 962                                       msecs_to_jiffies(c->journal_delay_ms));
 963                 spin_unlock(&c->journal.lock);
 964         } else {
 965                 spin_unlock(&c->journal.lock);
 966         }
 967 
 968 
 969         return ret;
 970 }
 971 
 972 void bch_journal_meta(struct cache_set *c, struct closure *cl)
 973 {
 974         struct keylist keys;
 975         atomic_t *ref;
 976 
 977         bch_keylist_init(&keys);
 978 
 979         ref = bch_journal(c, &keys, cl);
 980         if (ref)
 981                 atomic_dec_bug(ref);
 982 }
 983 
 984 void bch_journal_free(struct cache_set *c)
 985 {
 986         free_pages((unsigned long) c->journal.w[1].data, JSET_BITS);
 987         free_pages((unsigned long) c->journal.w[0].data, JSET_BITS);
 988         free_fifo(&c->journal.pin);
 989 }
 990 
 991 int bch_journal_alloc(struct cache_set *c)
 992 {
 993         struct journal *j = &c->journal;
 994 
 995         spin_lock_init(&j->lock);
 996         spin_lock_init(&j->flush_write_lock);
 997         INIT_DELAYED_WORK(&j->work, journal_write_work);
 998 
 999         c->journal_delay_ms = 100;
1000 
1001         j->w[0].c = c;
1002         j->w[1].c = c;
1003 
1004         if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL)) ||
1005             !(j->w[0].data = (void *) __get_free_pages(GFP_KERNEL, JSET_BITS)) ||
1006             !(j->w[1].data = (void *) __get_free_pages(GFP_KERNEL, JSET_BITS)))
1007                 return -ENOMEM;
1008 
1009         return 0;
1010 }

/* [<][>][^][v][top][bottom][index][help] */