This source file includes following definitions.
- __dlm_wait_on_lockres_flags
- __dlm_lockres_has_locks
- __dlm_lockres_unused
- __dlm_lockres_calc_usage
- dlm_lockres_calc_usage
- __dlm_do_purge_lockres
- dlm_purge_lockres
- dlm_run_purge_list
- dlm_shuffle_lists
- dlm_kick_thread
- __dlm_dirty_lockres
- dlm_launch_thread
- dlm_complete_thread
- dlm_dirty_list_empty
- dlm_flush_asts
- dlm_thread
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 #include <linux/module.h>
  14 #include <linux/fs.h>
  15 #include <linux/types.h>
  16 #include <linux/highmem.h>
  17 #include <linux/init.h>
  18 #include <linux/sysctl.h>
  19 #include <linux/random.h>
  20 #include <linux/blkdev.h>
  21 #include <linux/socket.h>
  22 #include <linux/inet.h>
  23 #include <linux/timer.h>
  24 #include <linux/kthread.h>
  25 #include <linux/delay.h>
  26 
  27 
  28 #include "../cluster/heartbeat.h"
  29 #include "../cluster/nodemanager.h"
  30 #include "../cluster/tcp.h"
  31 
  32 #include "dlmapi.h"
  33 #include "dlmcommon.h"
  34 #include "dlmdomain.h"
  35 
  36 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_THREAD)
  37 #include "../cluster/masklog.h"
  38 
  39 static int dlm_thread(void *data);
  40 static void dlm_flush_asts(struct dlm_ctxt *dlm);
  41 
  42 #define dlm_lock_is_remote(dlm, lock)     ((lock)->ml.node != (dlm)->node_num)
  43 
  44 
  45 
  46 void __dlm_wait_on_lockres_flags(struct dlm_lock_resource *res, int flags)
  47 {
  48         DECLARE_WAITQUEUE(wait, current);
  49 
  50         assert_spin_locked(&res->spinlock);
  51 
  52         add_wait_queue(&res->wq, &wait);
  53 repeat:
  54         set_current_state(TASK_UNINTERRUPTIBLE);
  55         if (res->state & flags) {
  56                 spin_unlock(&res->spinlock);
  57                 schedule();
  58                 spin_lock(&res->spinlock);
  59                 goto repeat;
  60         }
  61         remove_wait_queue(&res->wq, &wait);
  62         __set_current_state(TASK_RUNNING);
  63 }
  64 
  65 int __dlm_lockres_has_locks(struct dlm_lock_resource *res)
  66 {
  67         if (list_empty(&res->granted) &&
  68             list_empty(&res->converting) &&
  69             list_empty(&res->blocked))
  70                 return 0;
  71         return 1;
  72 }
  73 
  74 
  75 
  76 
  77 
  78 int __dlm_lockres_unused(struct dlm_lock_resource *res)
  79 {
  80         int bit;
  81 
  82         assert_spin_locked(&res->spinlock);
  83 
  84         if (__dlm_lockres_has_locks(res))
  85                 return 0;
  86 
  87         
  88         if (res->inflight_locks)
  89                 return 0;
  90 
  91         if (!list_empty(&res->dirty) || res->state & DLM_LOCK_RES_DIRTY)
  92                 return 0;
  93 
  94         if (res->state & (DLM_LOCK_RES_RECOVERING|
  95                         DLM_LOCK_RES_RECOVERY_WAITING))
  96                 return 0;
  97 
  98         
  99         bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0);
 100         if (bit < O2NM_MAX_NODES)
 101                 return 0;
 102 
 103         return 1;
 104 }
 105 
 106 
 107 
 108 
 109 
 110 void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
 111                               struct dlm_lock_resource *res)
 112 {
 113         assert_spin_locked(&dlm->spinlock);
 114         assert_spin_locked(&res->spinlock);
 115 
 116         if (__dlm_lockres_unused(res)){
 117                 if (list_empty(&res->purge)) {
 118                         mlog(0, "%s: Adding res %.*s to purge list\n",
 119                              dlm->name, res->lockname.len, res->lockname.name);
 120 
 121                         res->last_used = jiffies;
 122                         dlm_lockres_get(res);
 123                         list_add_tail(&res->purge, &dlm->purge_list);
 124                         dlm->purge_count++;
 125                 }
 126         } else if (!list_empty(&res->purge)) {
 127                 mlog(0, "%s: Removing res %.*s from purge list\n",
 128                      dlm->name, res->lockname.len, res->lockname.name);
 129 
 130                 list_del_init(&res->purge);
 131                 dlm_lockres_put(res);
 132                 dlm->purge_count--;
 133         }
 134 }
 135 
 136 void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
 137                             struct dlm_lock_resource *res)
 138 {
 139         spin_lock(&dlm->spinlock);
 140         spin_lock(&res->spinlock);
 141 
 142         __dlm_lockres_calc_usage(dlm, res);
 143 
 144         spin_unlock(&res->spinlock);
 145         spin_unlock(&dlm->spinlock);
 146 }
 147 
 148 
 149 
 150 
 151 
 152 
 153 
 154 void __dlm_do_purge_lockres(struct dlm_ctxt *dlm,
 155                 struct dlm_lock_resource *res)
 156 {
 157         assert_spin_locked(&dlm->spinlock);
 158         assert_spin_locked(&res->spinlock);
 159 
 160         if (!list_empty(&res->purge)) {
 161                 mlog(0, "%s: Removing res %.*s from purgelist\n",
 162                      dlm->name, res->lockname.len, res->lockname.name);
 163                 list_del_init(&res->purge);
 164                 dlm_lockres_put(res);
 165                 dlm->purge_count--;
 166         }
 167 
 168         if (!__dlm_lockres_unused(res)) {
 169                 mlog(ML_ERROR, "%s: res %.*s in use after deref\n",
 170                      dlm->name, res->lockname.len, res->lockname.name);
 171                 __dlm_print_one_lock_resource(res);
 172                 BUG();
 173         }
 174 
 175         __dlm_unhash_lockres(dlm, res);
 176 
 177         spin_lock(&dlm->track_lock);
 178         if (!list_empty(&res->tracking))
 179                 list_del_init(&res->tracking);
 180         else {
 181                 mlog(ML_ERROR, "%s: Resource %.*s not on the Tracking list\n",
 182                      dlm->name, res->lockname.len, res->lockname.name);
 183                 __dlm_print_one_lock_resource(res);
 184         }
 185         spin_unlock(&dlm->track_lock);
 186 
 187         
 188 
 189 
 190 
 191         res->state &= ~DLM_LOCK_RES_DROPPING_REF;
 192 }
 193 
 194 static void dlm_purge_lockres(struct dlm_ctxt *dlm,
 195                              struct dlm_lock_resource *res)
 196 {
 197         int master;
 198         int ret = 0;
 199 
 200         assert_spin_locked(&dlm->spinlock);
 201         assert_spin_locked(&res->spinlock);
 202 
 203         master = (res->owner == dlm->node_num);
 204 
 205         mlog(0, "%s: Purging res %.*s, master %d\n", dlm->name,
 206              res->lockname.len, res->lockname.name, master);
 207 
 208         if (!master) {
 209                 if (res->state & DLM_LOCK_RES_DROPPING_REF) {
 210                         mlog(ML_NOTICE, "%s: res %.*s already in DLM_LOCK_RES_DROPPING_REF state\n",
 211                                 dlm->name, res->lockname.len, res->lockname.name);
 212                         spin_unlock(&res->spinlock);
 213                         return;
 214                 }
 215 
 216                 res->state |= DLM_LOCK_RES_DROPPING_REF;
 217                 
 218                 spin_unlock(&res->spinlock);
 219                 spin_unlock(&dlm->spinlock);
 220 
 221                 spin_lock(&res->spinlock);
 222                 
 223                 __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
 224                 spin_unlock(&res->spinlock);
 225 
 226                 
 227                 ret = dlm_drop_lockres_ref(dlm, res);
 228                 if (ret < 0) {
 229                         if (!dlm_is_host_down(ret))
 230                                 BUG();
 231                 }
 232                 spin_lock(&dlm->spinlock);
 233                 spin_lock(&res->spinlock);
 234         }
 235 
 236         if (!list_empty(&res->purge)) {
 237                 mlog(0, "%s: Removing res %.*s from purgelist, master %d\n",
 238                      dlm->name, res->lockname.len, res->lockname.name, master);
 239                 list_del_init(&res->purge);
 240                 dlm_lockres_put(res);
 241                 dlm->purge_count--;
 242         }
 243 
 244         if (!master && ret == DLM_DEREF_RESPONSE_INPROG) {
 245                 mlog(0, "%s: deref %.*s in progress\n",
 246                         dlm->name, res->lockname.len, res->lockname.name);
 247                 spin_unlock(&res->spinlock);
 248                 return;
 249         }
 250 
 251         if (!__dlm_lockres_unused(res)) {
 252                 mlog(ML_ERROR, "%s: res %.*s in use after deref\n",
 253                      dlm->name, res->lockname.len, res->lockname.name);
 254                 __dlm_print_one_lock_resource(res);
 255                 BUG();
 256         }
 257 
 258         __dlm_unhash_lockres(dlm, res);
 259 
 260         spin_lock(&dlm->track_lock);
 261         if (!list_empty(&res->tracking))
 262                 list_del_init(&res->tracking);
 263         else {
 264                 mlog(ML_ERROR, "Resource %.*s not on the Tracking list\n",
 265                                 res->lockname.len, res->lockname.name);
 266                 __dlm_print_one_lock_resource(res);
 267         }
 268         spin_unlock(&dlm->track_lock);
 269 
 270         
 271 
 272         if (!master) {
 273                 res->state &= ~DLM_LOCK_RES_DROPPING_REF;
 274                 spin_unlock(&res->spinlock);
 275                 wake_up(&res->wq);
 276         } else
 277                 spin_unlock(&res->spinlock);
 278 }
 279 
 280 static void dlm_run_purge_list(struct dlm_ctxt *dlm,
 281                                int purge_now)
 282 {
 283         unsigned int run_max, unused;
 284         unsigned long purge_jiffies;
 285         struct dlm_lock_resource *lockres;
 286 
 287         spin_lock(&dlm->spinlock);
 288         run_max = dlm->purge_count;
 289 
 290         while(run_max && !list_empty(&dlm->purge_list)) {
 291                 run_max--;
 292 
 293                 lockres = list_entry(dlm->purge_list.next,
 294                                      struct dlm_lock_resource, purge);
 295 
 296                 spin_lock(&lockres->spinlock);
 297 
 298                 purge_jiffies = lockres->last_used +
 299                         msecs_to_jiffies(DLM_PURGE_INTERVAL_MS);
 300 
 301                 
 302 
 303                 if (!purge_now && time_after(purge_jiffies, jiffies)) {
 304                         
 305 
 306 
 307 
 308                         spin_unlock(&lockres->spinlock);
 309                         break;
 310                 }
 311 
 312                 
 313 
 314 
 315 
 316                 unused = __dlm_lockres_unused(lockres);
 317                 if (!unused ||
 318                     (lockres->state & DLM_LOCK_RES_MIGRATING) ||
 319                     (lockres->inflight_assert_workers != 0)) {
 320                         mlog(0, "%s: res %.*s is in use or being remastered, "
 321                              "used %d, state %d, assert master workers %u\n",
 322                              dlm->name, lockres->lockname.len,
 323                              lockres->lockname.name,
 324                              !unused, lockres->state,
 325                              lockres->inflight_assert_workers);
 326                         list_move_tail(&lockres->purge, &dlm->purge_list);
 327                         spin_unlock(&lockres->spinlock);
 328                         continue;
 329                 }
 330 
 331                 dlm_lockres_get(lockres);
 332 
 333                 dlm_purge_lockres(dlm, lockres);
 334 
 335                 dlm_lockres_put(lockres);
 336 
 337                 
 338                 cond_resched_lock(&dlm->spinlock);
 339         }
 340 
 341         spin_unlock(&dlm->spinlock);
 342 }
 343 
 344 static void dlm_shuffle_lists(struct dlm_ctxt *dlm,
 345                               struct dlm_lock_resource *res)
 346 {
 347         struct dlm_lock *lock, *target;
 348         int can_grant = 1;
 349 
 350         
 351 
 352 
 353 
 354 
 355 
 356         assert_spin_locked(&dlm->ast_lock);
 357         assert_spin_locked(&res->spinlock);
 358         BUG_ON((res->state & (DLM_LOCK_RES_MIGRATING|
 359                               DLM_LOCK_RES_RECOVERING|
 360                               DLM_LOCK_RES_IN_PROGRESS)));
 361 
 362 converting:
 363         if (list_empty(&res->converting))
 364                 goto blocked;
 365         mlog(0, "%s: res %.*s has locks on the convert queue\n", dlm->name,
 366              res->lockname.len, res->lockname.name);
 367 
 368         target = list_entry(res->converting.next, struct dlm_lock, list);
 369         if (target->ml.convert_type == LKM_IVMODE) {
 370                 mlog(ML_ERROR, "%s: res %.*s converting lock to invalid mode\n",
 371                      dlm->name, res->lockname.len, res->lockname.name);
 372                 BUG();
 373         }
 374         list_for_each_entry(lock, &res->granted, list) {
 375                 if (lock==target)
 376                         continue;
 377                 if (!dlm_lock_compatible(lock->ml.type,
 378                                          target->ml.convert_type)) {
 379                         can_grant = 0;
 380                         
 381                         if (lock->ml.highest_blocked == LKM_IVMODE) {
 382                                 __dlm_lockres_reserve_ast(res);
 383                                 __dlm_queue_bast(dlm, lock);
 384                         }
 385                         
 386                         if (lock->ml.highest_blocked < target->ml.convert_type)
 387                                 lock->ml.highest_blocked =
 388                                         target->ml.convert_type;
 389                 }
 390         }
 391 
 392         list_for_each_entry(lock, &res->converting, list) {
 393                 if (lock==target)
 394                         continue;
 395                 if (!dlm_lock_compatible(lock->ml.type,
 396                                          target->ml.convert_type)) {
 397                         can_grant = 0;
 398                         if (lock->ml.highest_blocked == LKM_IVMODE) {
 399                                 __dlm_lockres_reserve_ast(res);
 400                                 __dlm_queue_bast(dlm, lock);
 401                         }
 402                         if (lock->ml.highest_blocked < target->ml.convert_type)
 403                                 lock->ml.highest_blocked =
 404                                         target->ml.convert_type;
 405                 }
 406         }
 407 
 408         
 409         if (can_grant) {
 410                 spin_lock(&target->spinlock);
 411                 BUG_ON(target->ml.highest_blocked != LKM_IVMODE);
 412 
 413                 mlog(0, "%s: res %.*s, AST for Converting lock %u:%llu, type "
 414                      "%d => %d, node %u\n", dlm->name, res->lockname.len,
 415                      res->lockname.name,
 416                      dlm_get_lock_cookie_node(be64_to_cpu(target->ml.cookie)),
 417                      dlm_get_lock_cookie_seq(be64_to_cpu(target->ml.cookie)),
 418                      target->ml.type,
 419                      target->ml.convert_type, target->ml.node);
 420 
 421                 target->ml.type = target->ml.convert_type;
 422                 target->ml.convert_type = LKM_IVMODE;
 423                 list_move_tail(&target->list, &res->granted);
 424 
 425                 BUG_ON(!target->lksb);
 426                 target->lksb->status = DLM_NORMAL;
 427 
 428                 spin_unlock(&target->spinlock);
 429 
 430                 __dlm_lockres_reserve_ast(res);
 431                 __dlm_queue_ast(dlm, target);
 432                 
 433                 goto converting;
 434         }
 435 
 436 blocked:
 437         if (list_empty(&res->blocked))
 438                 goto leave;
 439         target = list_entry(res->blocked.next, struct dlm_lock, list);
 440 
 441         list_for_each_entry(lock, &res->granted, list) {
 442                 if (lock==target)
 443                         continue;
 444                 if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) {
 445                         can_grant = 0;
 446                         if (lock->ml.highest_blocked == LKM_IVMODE) {
 447                                 __dlm_lockres_reserve_ast(res);
 448                                 __dlm_queue_bast(dlm, lock);
 449                         }
 450                         if (lock->ml.highest_blocked < target->ml.type)
 451                                 lock->ml.highest_blocked = target->ml.type;
 452                 }
 453         }
 454 
 455         list_for_each_entry(lock, &res->converting, list) {
 456                 if (lock==target)
 457                         continue;
 458                 if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) {
 459                         can_grant = 0;
 460                         if (lock->ml.highest_blocked == LKM_IVMODE) {
 461                                 __dlm_lockres_reserve_ast(res);
 462                                 __dlm_queue_bast(dlm, lock);
 463                         }
 464                         if (lock->ml.highest_blocked < target->ml.type)
 465                                 lock->ml.highest_blocked = target->ml.type;
 466                 }
 467         }
 468 
 469         
 470 
 471         if (can_grant) {
 472                 spin_lock(&target->spinlock);
 473                 BUG_ON(target->ml.highest_blocked != LKM_IVMODE);
 474 
 475                 mlog(0, "%s: res %.*s, AST for Blocked lock %u:%llu, type %d, "
 476                      "node %u\n", dlm->name, res->lockname.len,
 477                      res->lockname.name,
 478                      dlm_get_lock_cookie_node(be64_to_cpu(target->ml.cookie)),
 479                      dlm_get_lock_cookie_seq(be64_to_cpu(target->ml.cookie)),
 480                      target->ml.type, target->ml.node);
 481 
 482                 
 483                 list_move_tail(&target->list, &res->granted);
 484 
 485                 BUG_ON(!target->lksb);
 486                 target->lksb->status = DLM_NORMAL;
 487 
 488                 spin_unlock(&target->spinlock);
 489 
 490                 __dlm_lockres_reserve_ast(res);
 491                 __dlm_queue_ast(dlm, target);
 492                 
 493                 goto converting;
 494         }
 495 
 496 leave:
 497         return;
 498 }
 499 
 500 
 501 void dlm_kick_thread(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
 502 {
 503         if (res) {
 504                 spin_lock(&dlm->spinlock);
 505                 spin_lock(&res->spinlock);
 506                 __dlm_dirty_lockres(dlm, res);
 507                 spin_unlock(&res->spinlock);
 508                 spin_unlock(&dlm->spinlock);
 509         }
 510         wake_up(&dlm->dlm_thread_wq);
 511 }
 512 
 513 void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
 514 {
 515         assert_spin_locked(&dlm->spinlock);
 516         assert_spin_locked(&res->spinlock);
 517 
 518         
 519         if (res->owner == dlm->node_num) {
 520                 if (res->state & (DLM_LOCK_RES_MIGRATING |
 521                                   DLM_LOCK_RES_BLOCK_DIRTY))
 522                     return;
 523 
 524                 if (list_empty(&res->dirty)) {
 525                         
 526                         dlm_lockres_get(res);
 527                         list_add_tail(&res->dirty, &dlm->dirty_list);
 528                         res->state |= DLM_LOCK_RES_DIRTY;
 529                 }
 530         }
 531 
 532         mlog(0, "%s: res %.*s\n", dlm->name, res->lockname.len,
 533              res->lockname.name);
 534 }
 535 
 536 
 537 
 538 int dlm_launch_thread(struct dlm_ctxt *dlm)
 539 {
 540         mlog(0, "Starting dlm_thread...\n");
 541 
 542         dlm->dlm_thread_task = kthread_run(dlm_thread, dlm, "dlm-%s",
 543                         dlm->name);
 544         if (IS_ERR(dlm->dlm_thread_task)) {
 545                 mlog_errno(PTR_ERR(dlm->dlm_thread_task));
 546                 dlm->dlm_thread_task = NULL;
 547                 return -EINVAL;
 548         }
 549 
 550         return 0;
 551 }
 552 
 553 void dlm_complete_thread(struct dlm_ctxt *dlm)
 554 {
 555         if (dlm->dlm_thread_task) {
 556                 mlog(ML_KTHREAD, "Waiting for dlm thread to exit\n");
 557                 kthread_stop(dlm->dlm_thread_task);
 558                 dlm->dlm_thread_task = NULL;
 559         }
 560 }
 561 
 562 static int dlm_dirty_list_empty(struct dlm_ctxt *dlm)
 563 {
 564         int empty;
 565 
 566         spin_lock(&dlm->spinlock);
 567         empty = list_empty(&dlm->dirty_list);
 568         spin_unlock(&dlm->spinlock);
 569 
 570         return empty;
 571 }
 572 
 573 static void dlm_flush_asts(struct dlm_ctxt *dlm)
 574 {
 575         int ret;
 576         struct dlm_lock *lock;
 577         struct dlm_lock_resource *res;
 578         u8 hi;
 579 
 580         spin_lock(&dlm->ast_lock);
 581         while (!list_empty(&dlm->pending_asts)) {
 582                 lock = list_entry(dlm->pending_asts.next,
 583                                   struct dlm_lock, ast_list);
 584                 
 585                 dlm_lock_get(lock);
 586                 res = lock->lockres;
 587                 mlog(0, "%s: res %.*s, Flush AST for lock %u:%llu, type %d, "
 588                      "node %u\n", dlm->name, res->lockname.len,
 589                      res->lockname.name,
 590                      dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
 591                      dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
 592                      lock->ml.type, lock->ml.node);
 593 
 594                 BUG_ON(!lock->ast_pending);
 595 
 596                 
 597                 list_del_init(&lock->ast_list);
 598                 dlm_lock_put(lock);
 599                 spin_unlock(&dlm->ast_lock);
 600 
 601                 if (lock->ml.node != dlm->node_num) {
 602                         ret = dlm_do_remote_ast(dlm, res, lock);
 603                         if (ret < 0)
 604                                 mlog_errno(ret);
 605                 } else
 606                         dlm_do_local_ast(dlm, res, lock);
 607 
 608                 spin_lock(&dlm->ast_lock);
 609 
 610                 
 611 
 612                 if (!list_empty(&lock->ast_list)) {
 613                         mlog(0, "%s: res %.*s, AST queued while flushing last "
 614                              "one\n", dlm->name, res->lockname.len,
 615                              res->lockname.name);
 616                 } else
 617                         lock->ast_pending = 0;
 618 
 619                 
 620 
 621                 dlm_lock_put(lock);
 622                 dlm_lockres_release_ast(dlm, res);
 623         }
 624 
 625         while (!list_empty(&dlm->pending_basts)) {
 626                 lock = list_entry(dlm->pending_basts.next,
 627                                   struct dlm_lock, bast_list);
 628                 
 629                 dlm_lock_get(lock);
 630                 res = lock->lockres;
 631 
 632                 BUG_ON(!lock->bast_pending);
 633 
 634                 
 635                 spin_lock(&lock->spinlock);
 636                 BUG_ON(lock->ml.highest_blocked <= LKM_IVMODE);
 637                 hi = lock->ml.highest_blocked;
 638                 lock->ml.highest_blocked = LKM_IVMODE;
 639                 spin_unlock(&lock->spinlock);
 640 
 641                 
 642                 list_del_init(&lock->bast_list);
 643                 dlm_lock_put(lock);
 644                 spin_unlock(&dlm->ast_lock);
 645 
 646                 mlog(0, "%s: res %.*s, Flush BAST for lock %u:%llu, "
 647                      "blocked %d, node %u\n",
 648                      dlm->name, res->lockname.len, res->lockname.name,
 649                      dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
 650                      dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
 651                      hi, lock->ml.node);
 652 
 653                 if (lock->ml.node != dlm->node_num) {
 654                         ret = dlm_send_proxy_bast(dlm, res, lock, hi);
 655                         if (ret < 0)
 656                                 mlog_errno(ret);
 657                 } else
 658                         dlm_do_local_bast(dlm, res, lock, hi);
 659 
 660                 spin_lock(&dlm->ast_lock);
 661 
 662                 
 663 
 664                 if (!list_empty(&lock->bast_list)) {
 665                         mlog(0, "%s: res %.*s, BAST queued while flushing last "
 666                              "one\n", dlm->name, res->lockname.len,
 667                              res->lockname.name);
 668                 } else
 669                         lock->bast_pending = 0;
 670 
 671                 
 672 
 673                 dlm_lock_put(lock);
 674                 dlm_lockres_release_ast(dlm, res);
 675         }
 676         wake_up(&dlm->ast_wq);
 677         spin_unlock(&dlm->ast_lock);
 678 }
 679 
 680 
 681 #define DLM_THREAD_TIMEOUT_MS (4 * 1000)
 682 #define DLM_THREAD_MAX_DIRTY  100
 683 #define DLM_THREAD_MAX_ASTS   10
 684 
 685 static int dlm_thread(void *data)
 686 {
 687         struct dlm_lock_resource *res;
 688         struct dlm_ctxt *dlm = data;
 689         unsigned long timeout = msecs_to_jiffies(DLM_THREAD_TIMEOUT_MS);
 690 
 691         mlog(0, "dlm thread running for %s...\n", dlm->name);
 692 
 693         while (!kthread_should_stop()) {
 694                 int n = DLM_THREAD_MAX_DIRTY;
 695 
 696                 
 697 
 698 
 699 
 700                 dlm_run_purge_list(dlm, dlm_shutting_down(dlm));
 701 
 702                 
 703 
 704 
 705 
 706 
 707 
 708 
 709                 spin_lock(&dlm->spinlock);
 710                 while (!list_empty(&dlm->dirty_list)) {
 711                         int delay = 0;
 712                         res = list_entry(dlm->dirty_list.next,
 713                                          struct dlm_lock_resource, dirty);
 714 
 715                         
 716 
 717                         BUG_ON(!res);
 718                         dlm_lockres_get(res);
 719 
 720                         spin_lock(&res->spinlock);
 721                         
 722                         list_del_init(&res->dirty);
 723                         spin_unlock(&res->spinlock);
 724                         spin_unlock(&dlm->spinlock);
 725                         
 726                         dlm_lockres_put(res);
 727 
 728                         
 729 
 730 
 731                         spin_lock(&dlm->ast_lock);
 732                         spin_lock(&res->spinlock);
 733                         if (res->owner != dlm->node_num) {
 734                                 __dlm_print_one_lock_resource(res);
 735                                 mlog(ML_ERROR, "%s: inprog %d, mig %d, reco %d,"
 736                                      " dirty %d\n", dlm->name,
 737                                      !!(res->state & DLM_LOCK_RES_IN_PROGRESS),
 738                                      !!(res->state & DLM_LOCK_RES_MIGRATING),
 739                                      !!(res->state & DLM_LOCK_RES_RECOVERING),
 740                                      !!(res->state & DLM_LOCK_RES_DIRTY));
 741                         }
 742                         BUG_ON(res->owner != dlm->node_num);
 743 
 744                         
 745 
 746 
 747                         BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
 748                         if (res->state & (DLM_LOCK_RES_IN_PROGRESS |
 749                                           DLM_LOCK_RES_RECOVERING |
 750                                           DLM_LOCK_RES_RECOVERY_WAITING)) {
 751                                 
 752                                 res->state &= ~DLM_LOCK_RES_DIRTY;
 753                                 spin_unlock(&res->spinlock);
 754                                 spin_unlock(&dlm->ast_lock);
 755                                 mlog(0, "%s: res %.*s, inprogress, delay list "
 756                                      "shuffle, state %d\n", dlm->name,
 757                                      res->lockname.len, res->lockname.name,
 758                                      res->state);
 759                                 delay = 1;
 760                                 goto in_progress;
 761                         }
 762 
 763                         
 764 
 765 
 766 
 767 
 768                         
 769                         dlm_shuffle_lists(dlm, res);
 770                         res->state &= ~DLM_LOCK_RES_DIRTY;
 771                         spin_unlock(&res->spinlock);
 772                         spin_unlock(&dlm->ast_lock);
 773 
 774                         dlm_lockres_calc_usage(dlm, res);
 775 
 776 in_progress:
 777 
 778                         spin_lock(&dlm->spinlock);
 779                         
 780 
 781                         if (delay) {
 782                                 spin_lock(&res->spinlock);
 783                                 __dlm_dirty_lockres(dlm, res);
 784                                 spin_unlock(&res->spinlock);
 785                         }
 786                         dlm_lockres_put(res);
 787 
 788                         
 789 
 790                         if (!--n) {
 791                                 mlog(0, "%s: Throttling dlm thread\n",
 792                                      dlm->name);
 793                                 break;
 794                         }
 795                 }
 796 
 797                 spin_unlock(&dlm->spinlock);
 798                 dlm_flush_asts(dlm);
 799 
 800                 
 801                 if (!n) {
 802                         cond_resched();
 803                         continue;
 804                 }
 805 
 806                 wait_event_interruptible_timeout(dlm->dlm_thread_wq,
 807                                                  !dlm_dirty_list_empty(dlm) ||
 808                                                  kthread_should_stop(),
 809                                                  timeout);
 810         }
 811 
 812         mlog(0, "quitting DLM thread\n");
 813         return 0;
 814 }