This source file includes following definitions.
- dlm_control_store
- dlm_event_store
- dlm_id_show
- dlm_id_store
- dlm_nodir_show
- dlm_nodir_store
- dlm_recover_status_show
- dlm_recover_nodeid_show
- dlm_attr_show
- dlm_attr_store
- lockspace_kobj_release
- do_uevent
- dlm_uevent
- dlm_lockspace_init
- dlm_lockspace_exit
- find_ls_to_scan
- dlm_scand
- dlm_scand_start
- dlm_scand_stop
- dlm_find_lockspace_global
- dlm_find_lockspace_local
- dlm_find_lockspace_device
- dlm_put_lockspace
- remove_lockspace
- threads_start
- threads_stop
- new_lockspace
- dlm_new_lockspace
- lkb_idr_is_local
- lkb_idr_is_any
- lkb_idr_free
- lockspace_busy
- release_lockspace
- dlm_release_lockspace
- dlm_stop_lockspaces
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 #include <linux/module.h>
  13 
  14 #include "dlm_internal.h"
  15 #include "lockspace.h"
  16 #include "member.h"
  17 #include "recoverd.h"
  18 #include "dir.h"
  19 #include "lowcomms.h"
  20 #include "config.h"
  21 #include "memory.h"
  22 #include "lock.h"
  23 #include "recover.h"
  24 #include "requestqueue.h"
  25 #include "user.h"
  26 #include "ast.h"
  27 
  28 static int                      ls_count;
  29 static struct mutex             ls_lock;
  30 static struct list_head         lslist;
  31 static spinlock_t               lslist_lock;
  32 static struct task_struct *     scand_task;
  33 
  34 
  35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
  36 {
  37         ssize_t ret = len;
  38         int n;
  39         int rc = kstrtoint(buf, 0, &n);
  40 
  41         if (rc)
  42                 return rc;
  43         ls = dlm_find_lockspace_local(ls->ls_local_handle);
  44         if (!ls)
  45                 return -EINVAL;
  46 
  47         switch (n) {
  48         case 0:
  49                 dlm_ls_stop(ls);
  50                 break;
  51         case 1:
  52                 dlm_ls_start(ls);
  53                 break;
  54         default:
  55                 ret = -EINVAL;
  56         }
  57         dlm_put_lockspace(ls);
  58         return ret;
  59 }
  60 
  61 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
  62 {
  63         int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
  64 
  65         if (rc)
  66                 return rc;
  67         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
  68         wake_up(&ls->ls_uevent_wait);
  69         return len;
  70 }
  71 
  72 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
  73 {
  74         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
  75 }
  76 
  77 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
  78 {
  79         int rc = kstrtouint(buf, 0, &ls->ls_global_id);
  80 
  81         if (rc)
  82                 return rc;
  83         return len;
  84 }
  85 
  86 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
  87 {
  88         return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
  89 }
  90 
  91 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
  92 {
  93         int val;
  94         int rc = kstrtoint(buf, 0, &val);
  95 
  96         if (rc)
  97                 return rc;
  98         if (val == 1)
  99                 set_bit(LSFL_NODIR, &ls->ls_flags);
 100         return len;
 101 }
 102 
 103 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
 104 {
 105         uint32_t status = dlm_recover_status(ls);
 106         return snprintf(buf, PAGE_SIZE, "%x\n", status);
 107 }
 108 
 109 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
 110 {
 111         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
 112 }
 113 
 114 struct dlm_attr {
 115         struct attribute attr;
 116         ssize_t (*show)(struct dlm_ls *, char *);
 117         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
 118 };
 119 
 120 static struct dlm_attr dlm_attr_control = {
 121         .attr  = {.name = "control", .mode = S_IWUSR},
 122         .store = dlm_control_store
 123 };
 124 
 125 static struct dlm_attr dlm_attr_event = {
 126         .attr  = {.name = "event_done", .mode = S_IWUSR},
 127         .store = dlm_event_store
 128 };
 129 
 130 static struct dlm_attr dlm_attr_id = {
 131         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
 132         .show  = dlm_id_show,
 133         .store = dlm_id_store
 134 };
 135 
 136 static struct dlm_attr dlm_attr_nodir = {
 137         .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
 138         .show  = dlm_nodir_show,
 139         .store = dlm_nodir_store
 140 };
 141 
 142 static struct dlm_attr dlm_attr_recover_status = {
 143         .attr  = {.name = "recover_status", .mode = S_IRUGO},
 144         .show  = dlm_recover_status_show
 145 };
 146 
 147 static struct dlm_attr dlm_attr_recover_nodeid = {
 148         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
 149         .show  = dlm_recover_nodeid_show
 150 };
 151 
 152 static struct attribute *dlm_attrs[] = {
 153         &dlm_attr_control.attr,
 154         &dlm_attr_event.attr,
 155         &dlm_attr_id.attr,
 156         &dlm_attr_nodir.attr,
 157         &dlm_attr_recover_status.attr,
 158         &dlm_attr_recover_nodeid.attr,
 159         NULL,
 160 };
 161 ATTRIBUTE_GROUPS(dlm);
 162 
 163 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
 164                              char *buf)
 165 {
 166         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
 167         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
 168         return a->show ? a->show(ls, buf) : 0;
 169 }
 170 
 171 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
 172                               const char *buf, size_t len)
 173 {
 174         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
 175         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
 176         return a->store ? a->store(ls, buf, len) : len;
 177 }
 178 
 179 static void lockspace_kobj_release(struct kobject *k)
 180 {
 181         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
 182         kfree(ls);
 183 }
 184 
 185 static const struct sysfs_ops dlm_attr_ops = {
 186         .show  = dlm_attr_show,
 187         .store = dlm_attr_store,
 188 };
 189 
 190 static struct kobj_type dlm_ktype = {
 191         .default_groups = dlm_groups,
 192         .sysfs_ops     = &dlm_attr_ops,
 193         .release       = lockspace_kobj_release,
 194 };
 195 
 196 static struct kset *dlm_kset;
 197 
 198 static int do_uevent(struct dlm_ls *ls, int in)
 199 {
 200         int error;
 201 
 202         if (in)
 203                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
 204         else
 205                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
 206 
 207         log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
 208 
 209         
 210 
 211 
 212         error = wait_event_interruptible(ls->ls_uevent_wait,
 213                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
 214 
 215         log_rinfo(ls, "group event done %d %d", error, ls->ls_uevent_result);
 216 
 217         if (error)
 218                 goto out;
 219 
 220         error = ls->ls_uevent_result;
 221  out:
 222         if (error)
 223                 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
 224                           error, ls->ls_uevent_result);
 225         return error;
 226 }
 227 
 228 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
 229                       struct kobj_uevent_env *env)
 230 {
 231         struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
 232 
 233         add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
 234         return 0;
 235 }
 236 
 237 static const struct kset_uevent_ops dlm_uevent_ops = {
 238         .uevent = dlm_uevent,
 239 };
 240 
 241 int __init dlm_lockspace_init(void)
 242 {
 243         ls_count = 0;
 244         mutex_init(&ls_lock);
 245         INIT_LIST_HEAD(&lslist);
 246         spin_lock_init(&lslist_lock);
 247 
 248         dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
 249         if (!dlm_kset) {
 250                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
 251                 return -ENOMEM;
 252         }
 253         return 0;
 254 }
 255 
 256 void dlm_lockspace_exit(void)
 257 {
 258         kset_unregister(dlm_kset);
 259 }
 260 
 261 static struct dlm_ls *find_ls_to_scan(void)
 262 {
 263         struct dlm_ls *ls;
 264 
 265         spin_lock(&lslist_lock);
 266         list_for_each_entry(ls, &lslist, ls_list) {
 267                 if (time_after_eq(jiffies, ls->ls_scan_time +
 268                                             dlm_config.ci_scan_secs * HZ)) {
 269                         spin_unlock(&lslist_lock);
 270                         return ls;
 271                 }
 272         }
 273         spin_unlock(&lslist_lock);
 274         return NULL;
 275 }
 276 
 277 static int dlm_scand(void *data)
 278 {
 279         struct dlm_ls *ls;
 280 
 281         while (!kthread_should_stop()) {
 282                 ls = find_ls_to_scan();
 283                 if (ls) {
 284                         if (dlm_lock_recovery_try(ls)) {
 285                                 ls->ls_scan_time = jiffies;
 286                                 dlm_scan_rsbs(ls);
 287                                 dlm_scan_timeout(ls);
 288                                 dlm_scan_waiters(ls);
 289                                 dlm_unlock_recovery(ls);
 290                         } else {
 291                                 ls->ls_scan_time += HZ;
 292                         }
 293                         continue;
 294                 }
 295                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
 296         }
 297         return 0;
 298 }
 299 
 300 static int dlm_scand_start(void)
 301 {
 302         struct task_struct *p;
 303         int error = 0;
 304 
 305         p = kthread_run(dlm_scand, NULL, "dlm_scand");
 306         if (IS_ERR(p))
 307                 error = PTR_ERR(p);
 308         else
 309                 scand_task = p;
 310         return error;
 311 }
 312 
 313 static void dlm_scand_stop(void)
 314 {
 315         kthread_stop(scand_task);
 316 }
 317 
 318 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
 319 {
 320         struct dlm_ls *ls;
 321 
 322         spin_lock(&lslist_lock);
 323 
 324         list_for_each_entry(ls, &lslist, ls_list) {
 325                 if (ls->ls_global_id == id) {
 326                         ls->ls_count++;
 327                         goto out;
 328                 }
 329         }
 330         ls = NULL;
 331  out:
 332         spin_unlock(&lslist_lock);
 333         return ls;
 334 }
 335 
 336 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
 337 {
 338         struct dlm_ls *ls;
 339 
 340         spin_lock(&lslist_lock);
 341         list_for_each_entry(ls, &lslist, ls_list) {
 342                 if (ls->ls_local_handle == lockspace) {
 343                         ls->ls_count++;
 344                         goto out;
 345                 }
 346         }
 347         ls = NULL;
 348  out:
 349         spin_unlock(&lslist_lock);
 350         return ls;
 351 }
 352 
 353 struct dlm_ls *dlm_find_lockspace_device(int minor)
 354 {
 355         struct dlm_ls *ls;
 356 
 357         spin_lock(&lslist_lock);
 358         list_for_each_entry(ls, &lslist, ls_list) {
 359                 if (ls->ls_device.minor == minor) {
 360                         ls->ls_count++;
 361                         goto out;
 362                 }
 363         }
 364         ls = NULL;
 365  out:
 366         spin_unlock(&lslist_lock);
 367         return ls;
 368 }
 369 
 370 void dlm_put_lockspace(struct dlm_ls *ls)
 371 {
 372         spin_lock(&lslist_lock);
 373         ls->ls_count--;
 374         spin_unlock(&lslist_lock);
 375 }
 376 
 377 static void remove_lockspace(struct dlm_ls *ls)
 378 {
 379         for (;;) {
 380                 spin_lock(&lslist_lock);
 381                 if (ls->ls_count == 0) {
 382                         WARN_ON(ls->ls_create_count != 0);
 383                         list_del(&ls->ls_list);
 384                         spin_unlock(&lslist_lock);
 385                         return;
 386                 }
 387                 spin_unlock(&lslist_lock);
 388                 ssleep(1);
 389         }
 390 }
 391 
 392 static int threads_start(void)
 393 {
 394         int error;
 395 
 396         error = dlm_scand_start();
 397         if (error) {
 398                 log_print("cannot start dlm_scand thread %d", error);
 399                 goto fail;
 400         }
 401 
 402         
 403         error = dlm_lowcomms_start();
 404         if (error) {
 405                 log_print("cannot start dlm lowcomms %d", error);
 406                 goto scand_fail;
 407         }
 408 
 409         return 0;
 410 
 411  scand_fail:
 412         dlm_scand_stop();
 413  fail:
 414         return error;
 415 }
 416 
 417 static void threads_stop(void)
 418 {
 419         dlm_scand_stop();
 420         dlm_lowcomms_stop();
 421 }
 422 
 423 static int new_lockspace(const char *name, const char *cluster,
 424                          uint32_t flags, int lvblen,
 425                          const struct dlm_lockspace_ops *ops, void *ops_arg,
 426                          int *ops_result, dlm_lockspace_t **lockspace)
 427 {
 428         struct dlm_ls *ls;
 429         int i, size, error;
 430         int do_unreg = 0;
 431         int namelen = strlen(name);
 432 
 433         if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
 434                 return -EINVAL;
 435 
 436         if (!lvblen || (lvblen % 8))
 437                 return -EINVAL;
 438 
 439         if (!try_module_get(THIS_MODULE))
 440                 return -EINVAL;
 441 
 442         if (!dlm_user_daemon_available()) {
 443                 log_print("dlm user daemon not available");
 444                 error = -EUNATCH;
 445                 goto out;
 446         }
 447 
 448         if (ops && ops_result) {
 449                 if (!dlm_config.ci_recover_callbacks)
 450                         *ops_result = -EOPNOTSUPP;
 451                 else
 452                         *ops_result = 0;
 453         }
 454 
 455         if (!cluster)
 456                 log_print("dlm cluster name '%s' is being used without an application provided cluster name",
 457                           dlm_config.ci_cluster_name);
 458 
 459         if (dlm_config.ci_recover_callbacks && cluster &&
 460             strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
 461                 log_print("dlm cluster name '%s' does not match "
 462                           "the application cluster name '%s'",
 463                           dlm_config.ci_cluster_name, cluster);
 464                 error = -EBADR;
 465                 goto out;
 466         }
 467 
 468         error = 0;
 469 
 470         spin_lock(&lslist_lock);
 471         list_for_each_entry(ls, &lslist, ls_list) {
 472                 WARN_ON(ls->ls_create_count <= 0);
 473                 if (ls->ls_namelen != namelen)
 474                         continue;
 475                 if (memcmp(ls->ls_name, name, namelen))
 476                         continue;
 477                 if (flags & DLM_LSFL_NEWEXCL) {
 478                         error = -EEXIST;
 479                         break;
 480                 }
 481                 ls->ls_create_count++;
 482                 *lockspace = ls;
 483                 error = 1;
 484                 break;
 485         }
 486         spin_unlock(&lslist_lock);
 487 
 488         if (error)
 489                 goto out;
 490 
 491         error = -ENOMEM;
 492 
 493         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
 494         if (!ls)
 495                 goto out;
 496         memcpy(ls->ls_name, name, namelen);
 497         ls->ls_namelen = namelen;
 498         ls->ls_lvblen = lvblen;
 499         ls->ls_count = 0;
 500         ls->ls_flags = 0;
 501         ls->ls_scan_time = jiffies;
 502 
 503         if (ops && dlm_config.ci_recover_callbacks) {
 504                 ls->ls_ops = ops;
 505                 ls->ls_ops_arg = ops_arg;
 506         }
 507 
 508         if (flags & DLM_LSFL_TIMEWARN)
 509                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
 510 
 511         
 512 
 513         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
 514                                     DLM_LSFL_NEWEXCL));
 515 
 516         size = dlm_config.ci_rsbtbl_size;
 517         ls->ls_rsbtbl_size = size;
 518 
 519         ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
 520         if (!ls->ls_rsbtbl)
 521                 goto out_lsfree;
 522         for (i = 0; i < size; i++) {
 523                 ls->ls_rsbtbl[i].keep.rb_node = NULL;
 524                 ls->ls_rsbtbl[i].toss.rb_node = NULL;
 525                 spin_lock_init(&ls->ls_rsbtbl[i].lock);
 526         }
 527 
 528         spin_lock_init(&ls->ls_remove_spin);
 529 
 530         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
 531                 ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
 532                                                  GFP_KERNEL);
 533                 if (!ls->ls_remove_names[i])
 534                         goto out_rsbtbl;
 535         }
 536 
 537         idr_init(&ls->ls_lkbidr);
 538         spin_lock_init(&ls->ls_lkbidr_spin);
 539 
 540         INIT_LIST_HEAD(&ls->ls_waiters);
 541         mutex_init(&ls->ls_waiters_mutex);
 542         INIT_LIST_HEAD(&ls->ls_orphans);
 543         mutex_init(&ls->ls_orphans_mutex);
 544         INIT_LIST_HEAD(&ls->ls_timeout);
 545         mutex_init(&ls->ls_timeout_mutex);
 546 
 547         INIT_LIST_HEAD(&ls->ls_new_rsb);
 548         spin_lock_init(&ls->ls_new_rsb_spin);
 549 
 550         INIT_LIST_HEAD(&ls->ls_nodes);
 551         INIT_LIST_HEAD(&ls->ls_nodes_gone);
 552         ls->ls_num_nodes = 0;
 553         ls->ls_low_nodeid = 0;
 554         ls->ls_total_weight = 0;
 555         ls->ls_node_array = NULL;
 556 
 557         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
 558         ls->ls_stub_rsb.res_ls = ls;
 559 
 560         ls->ls_debug_rsb_dentry = NULL;
 561         ls->ls_debug_waiters_dentry = NULL;
 562 
 563         init_waitqueue_head(&ls->ls_uevent_wait);
 564         ls->ls_uevent_result = 0;
 565         init_completion(&ls->ls_members_done);
 566         ls->ls_members_result = -1;
 567 
 568         mutex_init(&ls->ls_cb_mutex);
 569         INIT_LIST_HEAD(&ls->ls_cb_delay);
 570 
 571         ls->ls_recoverd_task = NULL;
 572         mutex_init(&ls->ls_recoverd_active);
 573         spin_lock_init(&ls->ls_recover_lock);
 574         spin_lock_init(&ls->ls_rcom_spin);
 575         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
 576         ls->ls_recover_status = 0;
 577         ls->ls_recover_seq = 0;
 578         ls->ls_recover_args = NULL;
 579         init_rwsem(&ls->ls_in_recovery);
 580         init_rwsem(&ls->ls_recv_active);
 581         INIT_LIST_HEAD(&ls->ls_requestqueue);
 582         mutex_init(&ls->ls_requestqueue_mutex);
 583         mutex_init(&ls->ls_clear_proc_locks);
 584 
 585         ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
 586         if (!ls->ls_recover_buf)
 587                 goto out_lkbidr;
 588 
 589         ls->ls_slot = 0;
 590         ls->ls_num_slots = 0;
 591         ls->ls_slots_size = 0;
 592         ls->ls_slots = NULL;
 593 
 594         INIT_LIST_HEAD(&ls->ls_recover_list);
 595         spin_lock_init(&ls->ls_recover_list_lock);
 596         idr_init(&ls->ls_recover_idr);
 597         spin_lock_init(&ls->ls_recover_idr_lock);
 598         ls->ls_recover_list_count = 0;
 599         ls->ls_local_handle = ls;
 600         init_waitqueue_head(&ls->ls_wait_general);
 601         INIT_LIST_HEAD(&ls->ls_root_list);
 602         init_rwsem(&ls->ls_root_sem);
 603 
 604         spin_lock(&lslist_lock);
 605         ls->ls_create_count = 1;
 606         list_add(&ls->ls_list, &lslist);
 607         spin_unlock(&lslist_lock);
 608 
 609         if (flags & DLM_LSFL_FS) {
 610                 error = dlm_callback_start(ls);
 611                 if (error) {
 612                         log_error(ls, "can't start dlm_callback %d", error);
 613                         goto out_delist;
 614                 }
 615         }
 616 
 617         init_waitqueue_head(&ls->ls_recover_lock_wait);
 618 
 619         
 620 
 621 
 622 
 623 
 624 
 625 
 626         error = dlm_recoverd_start(ls);
 627         if (error) {
 628                 log_error(ls, "can't start dlm_recoverd %d", error);
 629                 goto out_callback;
 630         }
 631 
 632         wait_event(ls->ls_recover_lock_wait,
 633                    test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
 634 
 635         ls->ls_kobj.kset = dlm_kset;
 636         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
 637                                      "%s", ls->ls_name);
 638         if (error)
 639                 goto out_recoverd;
 640         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
 641 
 642         
 643         do_unreg = 1;
 644 
 645         
 646 
 647 
 648 
 649 
 650 
 651         error = do_uevent(ls, 1);
 652         if (error)
 653                 goto out_recoverd;
 654 
 655         wait_for_completion(&ls->ls_members_done);
 656         error = ls->ls_members_result;
 657         if (error)
 658                 goto out_members;
 659 
 660         dlm_create_debug_file(ls);
 661 
 662         log_rinfo(ls, "join complete");
 663         *lockspace = ls;
 664         return 0;
 665 
 666  out_members:
 667         do_uevent(ls, 0);
 668         dlm_clear_members(ls);
 669         kfree(ls->ls_node_array);
 670  out_recoverd:
 671         dlm_recoverd_stop(ls);
 672  out_callback:
 673         dlm_callback_stop(ls);
 674  out_delist:
 675         spin_lock(&lslist_lock);
 676         list_del(&ls->ls_list);
 677         spin_unlock(&lslist_lock);
 678         idr_destroy(&ls->ls_recover_idr);
 679         kfree(ls->ls_recover_buf);
 680  out_lkbidr:
 681         idr_destroy(&ls->ls_lkbidr);
 682  out_rsbtbl:
 683         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
 684                 kfree(ls->ls_remove_names[i]);
 685         vfree(ls->ls_rsbtbl);
 686  out_lsfree:
 687         if (do_unreg)
 688                 kobject_put(&ls->ls_kobj);
 689         else
 690                 kfree(ls);
 691  out:
 692         module_put(THIS_MODULE);
 693         return error;
 694 }
 695 
 696 int dlm_new_lockspace(const char *name, const char *cluster,
 697                       uint32_t flags, int lvblen,
 698                       const struct dlm_lockspace_ops *ops, void *ops_arg,
 699                       int *ops_result, dlm_lockspace_t **lockspace)
 700 {
 701         int error = 0;
 702 
 703         mutex_lock(&ls_lock);
 704         if (!ls_count)
 705                 error = threads_start();
 706         if (error)
 707                 goto out;
 708 
 709         error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
 710                               ops_result, lockspace);
 711         if (!error)
 712                 ls_count++;
 713         if (error > 0)
 714                 error = 0;
 715         if (!ls_count)
 716                 threads_stop();
 717  out:
 718         mutex_unlock(&ls_lock);
 719         return error;
 720 }
 721 
 722 static int lkb_idr_is_local(int id, void *p, void *data)
 723 {
 724         struct dlm_lkb *lkb = p;
 725 
 726         return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
 727 }
 728 
 729 static int lkb_idr_is_any(int id, void *p, void *data)
 730 {
 731         return 1;
 732 }
 733 
 734 static int lkb_idr_free(int id, void *p, void *data)
 735 {
 736         struct dlm_lkb *lkb = p;
 737 
 738         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
 739                 dlm_free_lvb(lkb->lkb_lvbptr);
 740 
 741         dlm_free_lkb(lkb);
 742         return 0;
 743 }
 744 
 745 
 746 
 747 
 748 
 749 static int lockspace_busy(struct dlm_ls *ls, int force)
 750 {
 751         int rv;
 752 
 753         spin_lock(&ls->ls_lkbidr_spin);
 754         if (force == 0) {
 755                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
 756         } else if (force == 1) {
 757                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
 758         } else {
 759                 rv = 0;
 760         }
 761         spin_unlock(&ls->ls_lkbidr_spin);
 762         return rv;
 763 }
 764 
 765 static int release_lockspace(struct dlm_ls *ls, int force)
 766 {
 767         struct dlm_rsb *rsb;
 768         struct rb_node *n;
 769         int i, busy, rv;
 770 
 771         busy = lockspace_busy(ls, force);
 772 
 773         spin_lock(&lslist_lock);
 774         if (ls->ls_create_count == 1) {
 775                 if (busy) {
 776                         rv = -EBUSY;
 777                 } else {
 778                         
 779                         ls->ls_create_count = 0;
 780                         rv = 0;
 781                 }
 782         } else if (ls->ls_create_count > 1) {
 783                 rv = --ls->ls_create_count;
 784         } else {
 785                 rv = -EINVAL;
 786         }
 787         spin_unlock(&lslist_lock);
 788 
 789         if (rv) {
 790                 log_debug(ls, "release_lockspace no remove %d", rv);
 791                 return rv;
 792         }
 793 
 794         dlm_device_deregister(ls);
 795 
 796         if (force < 3 && dlm_user_daemon_available())
 797                 do_uevent(ls, 0);
 798 
 799         dlm_recoverd_stop(ls);
 800 
 801         dlm_callback_stop(ls);
 802 
 803         remove_lockspace(ls);
 804 
 805         dlm_delete_debug_file(ls);
 806 
 807         idr_destroy(&ls->ls_recover_idr);
 808         kfree(ls->ls_recover_buf);
 809 
 810         
 811 
 812 
 813 
 814         idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
 815         idr_destroy(&ls->ls_lkbidr);
 816 
 817         
 818 
 819 
 820 
 821         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 822                 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
 823                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
 824                         rb_erase(n, &ls->ls_rsbtbl[i].keep);
 825                         dlm_free_rsb(rsb);
 826                 }
 827 
 828                 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
 829                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
 830                         rb_erase(n, &ls->ls_rsbtbl[i].toss);
 831                         dlm_free_rsb(rsb);
 832                 }
 833         }
 834 
 835         vfree(ls->ls_rsbtbl);
 836 
 837         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
 838                 kfree(ls->ls_remove_names[i]);
 839 
 840         while (!list_empty(&ls->ls_new_rsb)) {
 841                 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
 842                                        res_hashchain);
 843                 list_del(&rsb->res_hashchain);
 844                 dlm_free_rsb(rsb);
 845         }
 846 
 847         
 848 
 849 
 850 
 851         dlm_purge_requestqueue(ls);
 852         kfree(ls->ls_recover_args);
 853         dlm_clear_members(ls);
 854         dlm_clear_members_gone(ls);
 855         kfree(ls->ls_node_array);
 856         log_rinfo(ls, "release_lockspace final free");
 857         kobject_put(&ls->ls_kobj);
 858         
 859 
 860         module_put(THIS_MODULE);
 861         return 0;
 862 }
 863 
 864 
 865 
 866 
 867 
 868 
 869 
 870 
 871 
 872 
 873 
 874 
 875 
 876 
 877 
 878 int dlm_release_lockspace(void *lockspace, int force)
 879 {
 880         struct dlm_ls *ls;
 881         int error;
 882 
 883         ls = dlm_find_lockspace_local(lockspace);
 884         if (!ls)
 885                 return -EINVAL;
 886         dlm_put_lockspace(ls);
 887 
 888         mutex_lock(&ls_lock);
 889         error = release_lockspace(ls, force);
 890         if (!error)
 891                 ls_count--;
 892         if (!ls_count)
 893                 threads_stop();
 894         mutex_unlock(&ls_lock);
 895 
 896         return error;
 897 }
 898 
 899 void dlm_stop_lockspaces(void)
 900 {
 901         struct dlm_ls *ls;
 902         int count;
 903 
 904  restart:
 905         count = 0;
 906         spin_lock(&lslist_lock);
 907         list_for_each_entry(ls, &lslist, ls_list) {
 908                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
 909                         count++;
 910                         continue;
 911                 }
 912                 spin_unlock(&lslist_lock);
 913                 log_error(ls, "no userland control daemon, stopping lockspace");
 914                 dlm_ls_stop(ls);
 915                 goto restart;
 916         }
 917         spin_unlock(&lslist_lock);
 918 
 919         if (count)
 920                 log_print("dlm user daemon left %d lockspaces", count);
 921 }
 922