root/fs/dlm/lockspace.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. dlm_control_store
  2. dlm_event_store
  3. dlm_id_show
  4. dlm_id_store
  5. dlm_nodir_show
  6. dlm_nodir_store
  7. dlm_recover_status_show
  8. dlm_recover_nodeid_show
  9. dlm_attr_show
  10. dlm_attr_store
  11. lockspace_kobj_release
  12. do_uevent
  13. dlm_uevent
  14. dlm_lockspace_init
  15. dlm_lockspace_exit
  16. find_ls_to_scan
  17. dlm_scand
  18. dlm_scand_start
  19. dlm_scand_stop
  20. dlm_find_lockspace_global
  21. dlm_find_lockspace_local
  22. dlm_find_lockspace_device
  23. dlm_put_lockspace
  24. remove_lockspace
  25. threads_start
  26. threads_stop
  27. new_lockspace
  28. dlm_new_lockspace
  29. lkb_idr_is_local
  30. lkb_idr_is_any
  31. lkb_idr_free
  32. lockspace_busy
  33. release_lockspace
  34. dlm_release_lockspace
  35. dlm_stop_lockspaces

   1 // SPDX-License-Identifier: GPL-2.0-only
   2 /******************************************************************************
   3 *******************************************************************************
   4 **
   5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
   7 **
   8 **
   9 *******************************************************************************
  10 ******************************************************************************/
  11 
  12 #include <linux/module.h>
  13 
  14 #include "dlm_internal.h"
  15 #include "lockspace.h"
  16 #include "member.h"
  17 #include "recoverd.h"
  18 #include "dir.h"
  19 #include "lowcomms.h"
  20 #include "config.h"
  21 #include "memory.h"
  22 #include "lock.h"
  23 #include "recover.h"
  24 #include "requestqueue.h"
  25 #include "user.h"
  26 #include "ast.h"
  27 
  28 static int                      ls_count;
  29 static struct mutex             ls_lock;
  30 static struct list_head         lslist;
  31 static spinlock_t               lslist_lock;
  32 static struct task_struct *     scand_task;
  33 
  34 
  35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
  36 {
  37         ssize_t ret = len;
  38         int n;
  39         int rc = kstrtoint(buf, 0, &n);
  40 
  41         if (rc)
  42                 return rc;
  43         ls = dlm_find_lockspace_local(ls->ls_local_handle);
  44         if (!ls)
  45                 return -EINVAL;
  46 
  47         switch (n) {
  48         case 0:
  49                 dlm_ls_stop(ls);
  50                 break;
  51         case 1:
  52                 dlm_ls_start(ls);
  53                 break;
  54         default:
  55                 ret = -EINVAL;
  56         }
  57         dlm_put_lockspace(ls);
  58         return ret;
  59 }
  60 
  61 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
  62 {
  63         int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
  64 
  65         if (rc)
  66                 return rc;
  67         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
  68         wake_up(&ls->ls_uevent_wait);
  69         return len;
  70 }
  71 
  72 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
  73 {
  74         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
  75 }
  76 
  77 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
  78 {
  79         int rc = kstrtouint(buf, 0, &ls->ls_global_id);
  80 
  81         if (rc)
  82                 return rc;
  83         return len;
  84 }
  85 
  86 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
  87 {
  88         return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
  89 }
  90 
  91 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
  92 {
  93         int val;
  94         int rc = kstrtoint(buf, 0, &val);
  95 
  96         if (rc)
  97                 return rc;
  98         if (val == 1)
  99                 set_bit(LSFL_NODIR, &ls->ls_flags);
 100         return len;
 101 }
 102 
 103 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
 104 {
 105         uint32_t status = dlm_recover_status(ls);
 106         return snprintf(buf, PAGE_SIZE, "%x\n", status);
 107 }
 108 
 109 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
 110 {
 111         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
 112 }
 113 
 114 struct dlm_attr {
 115         struct attribute attr;
 116         ssize_t (*show)(struct dlm_ls *, char *);
 117         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
 118 };
 119 
 120 static struct dlm_attr dlm_attr_control = {
 121         .attr  = {.name = "control", .mode = S_IWUSR},
 122         .store = dlm_control_store
 123 };
 124 
 125 static struct dlm_attr dlm_attr_event = {
 126         .attr  = {.name = "event_done", .mode = S_IWUSR},
 127         .store = dlm_event_store
 128 };
 129 
 130 static struct dlm_attr dlm_attr_id = {
 131         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
 132         .show  = dlm_id_show,
 133         .store = dlm_id_store
 134 };
 135 
 136 static struct dlm_attr dlm_attr_nodir = {
 137         .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
 138         .show  = dlm_nodir_show,
 139         .store = dlm_nodir_store
 140 };
 141 
 142 static struct dlm_attr dlm_attr_recover_status = {
 143         .attr  = {.name = "recover_status", .mode = S_IRUGO},
 144         .show  = dlm_recover_status_show
 145 };
 146 
 147 static struct dlm_attr dlm_attr_recover_nodeid = {
 148         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
 149         .show  = dlm_recover_nodeid_show
 150 };
 151 
 152 static struct attribute *dlm_attrs[] = {
 153         &dlm_attr_control.attr,
 154         &dlm_attr_event.attr,
 155         &dlm_attr_id.attr,
 156         &dlm_attr_nodir.attr,
 157         &dlm_attr_recover_status.attr,
 158         &dlm_attr_recover_nodeid.attr,
 159         NULL,
 160 };
 161 ATTRIBUTE_GROUPS(dlm);
 162 
 163 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
 164                              char *buf)
 165 {
 166         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
 167         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
 168         return a->show ? a->show(ls, buf) : 0;
 169 }
 170 
 171 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
 172                               const char *buf, size_t len)
 173 {
 174         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
 175         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
 176         return a->store ? a->store(ls, buf, len) : len;
 177 }
 178 
 179 static void lockspace_kobj_release(struct kobject *k)
 180 {
 181         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
 182         kfree(ls);
 183 }
 184 
 185 static const struct sysfs_ops dlm_attr_ops = {
 186         .show  = dlm_attr_show,
 187         .store = dlm_attr_store,
 188 };
 189 
 190 static struct kobj_type dlm_ktype = {
 191         .default_groups = dlm_groups,
 192         .sysfs_ops     = &dlm_attr_ops,
 193         .release       = lockspace_kobj_release,
 194 };
 195 
 196 static struct kset *dlm_kset;
 197 
 198 static int do_uevent(struct dlm_ls *ls, int in)
 199 {
 200         int error;
 201 
 202         if (in)
 203                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
 204         else
 205                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
 206 
 207         log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
 208 
 209         /* dlm_controld will see the uevent, do the necessary group management
 210            and then write to sysfs to wake us */
 211 
 212         error = wait_event_interruptible(ls->ls_uevent_wait,
 213                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
 214 
 215         log_rinfo(ls, "group event done %d %d", error, ls->ls_uevent_result);
 216 
 217         if (error)
 218                 goto out;
 219 
 220         error = ls->ls_uevent_result;
 221  out:
 222         if (error)
 223                 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
 224                           error, ls->ls_uevent_result);
 225         return error;
 226 }
 227 
 228 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
 229                       struct kobj_uevent_env *env)
 230 {
 231         struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
 232 
 233         add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
 234         return 0;
 235 }
 236 
 237 static const struct kset_uevent_ops dlm_uevent_ops = {
 238         .uevent = dlm_uevent,
 239 };
 240 
 241 int __init dlm_lockspace_init(void)
 242 {
 243         ls_count = 0;
 244         mutex_init(&ls_lock);
 245         INIT_LIST_HEAD(&lslist);
 246         spin_lock_init(&lslist_lock);
 247 
 248         dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
 249         if (!dlm_kset) {
 250                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
 251                 return -ENOMEM;
 252         }
 253         return 0;
 254 }
 255 
 256 void dlm_lockspace_exit(void)
 257 {
 258         kset_unregister(dlm_kset);
 259 }
 260 
 261 static struct dlm_ls *find_ls_to_scan(void)
 262 {
 263         struct dlm_ls *ls;
 264 
 265         spin_lock(&lslist_lock);
 266         list_for_each_entry(ls, &lslist, ls_list) {
 267                 if (time_after_eq(jiffies, ls->ls_scan_time +
 268                                             dlm_config.ci_scan_secs * HZ)) {
 269                         spin_unlock(&lslist_lock);
 270                         return ls;
 271                 }
 272         }
 273         spin_unlock(&lslist_lock);
 274         return NULL;
 275 }
 276 
 277 static int dlm_scand(void *data)
 278 {
 279         struct dlm_ls *ls;
 280 
 281         while (!kthread_should_stop()) {
 282                 ls = find_ls_to_scan();
 283                 if (ls) {
 284                         if (dlm_lock_recovery_try(ls)) {
 285                                 ls->ls_scan_time = jiffies;
 286                                 dlm_scan_rsbs(ls);
 287                                 dlm_scan_timeout(ls);
 288                                 dlm_scan_waiters(ls);
 289                                 dlm_unlock_recovery(ls);
 290                         } else {
 291                                 ls->ls_scan_time += HZ;
 292                         }
 293                         continue;
 294                 }
 295                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
 296         }
 297         return 0;
 298 }
 299 
 300 static int dlm_scand_start(void)
 301 {
 302         struct task_struct *p;
 303         int error = 0;
 304 
 305         p = kthread_run(dlm_scand, NULL, "dlm_scand");
 306         if (IS_ERR(p))
 307                 error = PTR_ERR(p);
 308         else
 309                 scand_task = p;
 310         return error;
 311 }
 312 
 313 static void dlm_scand_stop(void)
 314 {
 315         kthread_stop(scand_task);
 316 }
 317 
 318 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
 319 {
 320         struct dlm_ls *ls;
 321 
 322         spin_lock(&lslist_lock);
 323 
 324         list_for_each_entry(ls, &lslist, ls_list) {
 325                 if (ls->ls_global_id == id) {
 326                         ls->ls_count++;
 327                         goto out;
 328                 }
 329         }
 330         ls = NULL;
 331  out:
 332         spin_unlock(&lslist_lock);
 333         return ls;
 334 }
 335 
 336 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
 337 {
 338         struct dlm_ls *ls;
 339 
 340         spin_lock(&lslist_lock);
 341         list_for_each_entry(ls, &lslist, ls_list) {
 342                 if (ls->ls_local_handle == lockspace) {
 343                         ls->ls_count++;
 344                         goto out;
 345                 }
 346         }
 347         ls = NULL;
 348  out:
 349         spin_unlock(&lslist_lock);
 350         return ls;
 351 }
 352 
 353 struct dlm_ls *dlm_find_lockspace_device(int minor)
 354 {
 355         struct dlm_ls *ls;
 356 
 357         spin_lock(&lslist_lock);
 358         list_for_each_entry(ls, &lslist, ls_list) {
 359                 if (ls->ls_device.minor == minor) {
 360                         ls->ls_count++;
 361                         goto out;
 362                 }
 363         }
 364         ls = NULL;
 365  out:
 366         spin_unlock(&lslist_lock);
 367         return ls;
 368 }
 369 
 370 void dlm_put_lockspace(struct dlm_ls *ls)
 371 {
 372         spin_lock(&lslist_lock);
 373         ls->ls_count--;
 374         spin_unlock(&lslist_lock);
 375 }
 376 
 377 static void remove_lockspace(struct dlm_ls *ls)
 378 {
 379         for (;;) {
 380                 spin_lock(&lslist_lock);
 381                 if (ls->ls_count == 0) {
 382                         WARN_ON(ls->ls_create_count != 0);
 383                         list_del(&ls->ls_list);
 384                         spin_unlock(&lslist_lock);
 385                         return;
 386                 }
 387                 spin_unlock(&lslist_lock);
 388                 ssleep(1);
 389         }
 390 }
 391 
 392 static int threads_start(void)
 393 {
 394         int error;
 395 
 396         error = dlm_scand_start();
 397         if (error) {
 398                 log_print("cannot start dlm_scand thread %d", error);
 399                 goto fail;
 400         }
 401 
 402         /* Thread for sending/receiving messages for all lockspace's */
 403         error = dlm_lowcomms_start();
 404         if (error) {
 405                 log_print("cannot start dlm lowcomms %d", error);
 406                 goto scand_fail;
 407         }
 408 
 409         return 0;
 410 
 411  scand_fail:
 412         dlm_scand_stop();
 413  fail:
 414         return error;
 415 }
 416 
 417 static void threads_stop(void)
 418 {
 419         dlm_scand_stop();
 420         dlm_lowcomms_stop();
 421 }
 422 
 423 static int new_lockspace(const char *name, const char *cluster,
 424                          uint32_t flags, int lvblen,
 425                          const struct dlm_lockspace_ops *ops, void *ops_arg,
 426                          int *ops_result, dlm_lockspace_t **lockspace)
 427 {
 428         struct dlm_ls *ls;
 429         int i, size, error;
 430         int do_unreg = 0;
 431         int namelen = strlen(name);
 432 
 433         if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
 434                 return -EINVAL;
 435 
 436         if (!lvblen || (lvblen % 8))
 437                 return -EINVAL;
 438 
 439         if (!try_module_get(THIS_MODULE))
 440                 return -EINVAL;
 441 
 442         if (!dlm_user_daemon_available()) {
 443                 log_print("dlm user daemon not available");
 444                 error = -EUNATCH;
 445                 goto out;
 446         }
 447 
 448         if (ops && ops_result) {
 449                 if (!dlm_config.ci_recover_callbacks)
 450                         *ops_result = -EOPNOTSUPP;
 451                 else
 452                         *ops_result = 0;
 453         }
 454 
 455         if (!cluster)
 456                 log_print("dlm cluster name '%s' is being used without an application provided cluster name",
 457                           dlm_config.ci_cluster_name);
 458 
 459         if (dlm_config.ci_recover_callbacks && cluster &&
 460             strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
 461                 log_print("dlm cluster name '%s' does not match "
 462                           "the application cluster name '%s'",
 463                           dlm_config.ci_cluster_name, cluster);
 464                 error = -EBADR;
 465                 goto out;
 466         }
 467 
 468         error = 0;
 469 
 470         spin_lock(&lslist_lock);
 471         list_for_each_entry(ls, &lslist, ls_list) {
 472                 WARN_ON(ls->ls_create_count <= 0);
 473                 if (ls->ls_namelen != namelen)
 474                         continue;
 475                 if (memcmp(ls->ls_name, name, namelen))
 476                         continue;
 477                 if (flags & DLM_LSFL_NEWEXCL) {
 478                         error = -EEXIST;
 479                         break;
 480                 }
 481                 ls->ls_create_count++;
 482                 *lockspace = ls;
 483                 error = 1;
 484                 break;
 485         }
 486         spin_unlock(&lslist_lock);
 487 
 488         if (error)
 489                 goto out;
 490 
 491         error = -ENOMEM;
 492 
 493         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
 494         if (!ls)
 495                 goto out;
 496         memcpy(ls->ls_name, name, namelen);
 497         ls->ls_namelen = namelen;
 498         ls->ls_lvblen = lvblen;
 499         ls->ls_count = 0;
 500         ls->ls_flags = 0;
 501         ls->ls_scan_time = jiffies;
 502 
 503         if (ops && dlm_config.ci_recover_callbacks) {
 504                 ls->ls_ops = ops;
 505                 ls->ls_ops_arg = ops_arg;
 506         }
 507 
 508         if (flags & DLM_LSFL_TIMEWARN)
 509                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
 510 
 511         /* ls_exflags are forced to match among nodes, and we don't
 512            need to require all nodes to have some flags set */
 513         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
 514                                     DLM_LSFL_NEWEXCL));
 515 
 516         size = dlm_config.ci_rsbtbl_size;
 517         ls->ls_rsbtbl_size = size;
 518 
 519         ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
 520         if (!ls->ls_rsbtbl)
 521                 goto out_lsfree;
 522         for (i = 0; i < size; i++) {
 523                 ls->ls_rsbtbl[i].keep.rb_node = NULL;
 524                 ls->ls_rsbtbl[i].toss.rb_node = NULL;
 525                 spin_lock_init(&ls->ls_rsbtbl[i].lock);
 526         }
 527 
 528         spin_lock_init(&ls->ls_remove_spin);
 529 
 530         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
 531                 ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
 532                                                  GFP_KERNEL);
 533                 if (!ls->ls_remove_names[i])
 534                         goto out_rsbtbl;
 535         }
 536 
 537         idr_init(&ls->ls_lkbidr);
 538         spin_lock_init(&ls->ls_lkbidr_spin);
 539 
 540         INIT_LIST_HEAD(&ls->ls_waiters);
 541         mutex_init(&ls->ls_waiters_mutex);
 542         INIT_LIST_HEAD(&ls->ls_orphans);
 543         mutex_init(&ls->ls_orphans_mutex);
 544         INIT_LIST_HEAD(&ls->ls_timeout);
 545         mutex_init(&ls->ls_timeout_mutex);
 546 
 547         INIT_LIST_HEAD(&ls->ls_new_rsb);
 548         spin_lock_init(&ls->ls_new_rsb_spin);
 549 
 550         INIT_LIST_HEAD(&ls->ls_nodes);
 551         INIT_LIST_HEAD(&ls->ls_nodes_gone);
 552         ls->ls_num_nodes = 0;
 553         ls->ls_low_nodeid = 0;
 554         ls->ls_total_weight = 0;
 555         ls->ls_node_array = NULL;
 556 
 557         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
 558         ls->ls_stub_rsb.res_ls = ls;
 559 
 560         ls->ls_debug_rsb_dentry = NULL;
 561         ls->ls_debug_waiters_dentry = NULL;
 562 
 563         init_waitqueue_head(&ls->ls_uevent_wait);
 564         ls->ls_uevent_result = 0;
 565         init_completion(&ls->ls_members_done);
 566         ls->ls_members_result = -1;
 567 
 568         mutex_init(&ls->ls_cb_mutex);
 569         INIT_LIST_HEAD(&ls->ls_cb_delay);
 570 
 571         ls->ls_recoverd_task = NULL;
 572         mutex_init(&ls->ls_recoverd_active);
 573         spin_lock_init(&ls->ls_recover_lock);
 574         spin_lock_init(&ls->ls_rcom_spin);
 575         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
 576         ls->ls_recover_status = 0;
 577         ls->ls_recover_seq = 0;
 578         ls->ls_recover_args = NULL;
 579         init_rwsem(&ls->ls_in_recovery);
 580         init_rwsem(&ls->ls_recv_active);
 581         INIT_LIST_HEAD(&ls->ls_requestqueue);
 582         mutex_init(&ls->ls_requestqueue_mutex);
 583         mutex_init(&ls->ls_clear_proc_locks);
 584 
 585         ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
 586         if (!ls->ls_recover_buf)
 587                 goto out_lkbidr;
 588 
 589         ls->ls_slot = 0;
 590         ls->ls_num_slots = 0;
 591         ls->ls_slots_size = 0;
 592         ls->ls_slots = NULL;
 593 
 594         INIT_LIST_HEAD(&ls->ls_recover_list);
 595         spin_lock_init(&ls->ls_recover_list_lock);
 596         idr_init(&ls->ls_recover_idr);
 597         spin_lock_init(&ls->ls_recover_idr_lock);
 598         ls->ls_recover_list_count = 0;
 599         ls->ls_local_handle = ls;
 600         init_waitqueue_head(&ls->ls_wait_general);
 601         INIT_LIST_HEAD(&ls->ls_root_list);
 602         init_rwsem(&ls->ls_root_sem);
 603 
 604         spin_lock(&lslist_lock);
 605         ls->ls_create_count = 1;
 606         list_add(&ls->ls_list, &lslist);
 607         spin_unlock(&lslist_lock);
 608 
 609         if (flags & DLM_LSFL_FS) {
 610                 error = dlm_callback_start(ls);
 611                 if (error) {
 612                         log_error(ls, "can't start dlm_callback %d", error);
 613                         goto out_delist;
 614                 }
 615         }
 616 
 617         init_waitqueue_head(&ls->ls_recover_lock_wait);
 618 
 619         /*
 620          * Once started, dlm_recoverd first looks for ls in lslist, then
 621          * initializes ls_in_recovery as locked in "down" mode.  We need
 622          * to wait for the wakeup from dlm_recoverd because in_recovery
 623          * has to start out in down mode.
 624          */
 625 
 626         error = dlm_recoverd_start(ls);
 627         if (error) {
 628                 log_error(ls, "can't start dlm_recoverd %d", error);
 629                 goto out_callback;
 630         }
 631 
 632         wait_event(ls->ls_recover_lock_wait,
 633                    test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
 634 
 635         ls->ls_kobj.kset = dlm_kset;
 636         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
 637                                      "%s", ls->ls_name);
 638         if (error)
 639                 goto out_recoverd;
 640         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
 641 
 642         /* let kobject handle freeing of ls if there's an error */
 643         do_unreg = 1;
 644 
 645         /* This uevent triggers dlm_controld in userspace to add us to the
 646            group of nodes that are members of this lockspace (managed by the
 647            cluster infrastructure.)  Once it's done that, it tells us who the
 648            current lockspace members are (via configfs) and then tells the
 649            lockspace to start running (via sysfs) in dlm_ls_start(). */
 650 
 651         error = do_uevent(ls, 1);
 652         if (error)
 653                 goto out_recoverd;
 654 
 655         wait_for_completion(&ls->ls_members_done);
 656         error = ls->ls_members_result;
 657         if (error)
 658                 goto out_members;
 659 
 660         dlm_create_debug_file(ls);
 661 
 662         log_rinfo(ls, "join complete");
 663         *lockspace = ls;
 664         return 0;
 665 
 666  out_members:
 667         do_uevent(ls, 0);
 668         dlm_clear_members(ls);
 669         kfree(ls->ls_node_array);
 670  out_recoverd:
 671         dlm_recoverd_stop(ls);
 672  out_callback:
 673         dlm_callback_stop(ls);
 674  out_delist:
 675         spin_lock(&lslist_lock);
 676         list_del(&ls->ls_list);
 677         spin_unlock(&lslist_lock);
 678         idr_destroy(&ls->ls_recover_idr);
 679         kfree(ls->ls_recover_buf);
 680  out_lkbidr:
 681         idr_destroy(&ls->ls_lkbidr);
 682  out_rsbtbl:
 683         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
 684                 kfree(ls->ls_remove_names[i]);
 685         vfree(ls->ls_rsbtbl);
 686  out_lsfree:
 687         if (do_unreg)
 688                 kobject_put(&ls->ls_kobj);
 689         else
 690                 kfree(ls);
 691  out:
 692         module_put(THIS_MODULE);
 693         return error;
 694 }
 695 
 696 int dlm_new_lockspace(const char *name, const char *cluster,
 697                       uint32_t flags, int lvblen,
 698                       const struct dlm_lockspace_ops *ops, void *ops_arg,
 699                       int *ops_result, dlm_lockspace_t **lockspace)
 700 {
 701         int error = 0;
 702 
 703         mutex_lock(&ls_lock);
 704         if (!ls_count)
 705                 error = threads_start();
 706         if (error)
 707                 goto out;
 708 
 709         error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
 710                               ops_result, lockspace);
 711         if (!error)
 712                 ls_count++;
 713         if (error > 0)
 714                 error = 0;
 715         if (!ls_count)
 716                 threads_stop();
 717  out:
 718         mutex_unlock(&ls_lock);
 719         return error;
 720 }
 721 
 722 static int lkb_idr_is_local(int id, void *p, void *data)
 723 {
 724         struct dlm_lkb *lkb = p;
 725 
 726         return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
 727 }
 728 
 729 static int lkb_idr_is_any(int id, void *p, void *data)
 730 {
 731         return 1;
 732 }
 733 
 734 static int lkb_idr_free(int id, void *p, void *data)
 735 {
 736         struct dlm_lkb *lkb = p;
 737 
 738         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
 739                 dlm_free_lvb(lkb->lkb_lvbptr);
 740 
 741         dlm_free_lkb(lkb);
 742         return 0;
 743 }
 744 
 745 /* NOTE: We check the lkbidr here rather than the resource table.
 746    This is because there may be LKBs queued as ASTs that have been unlinked
 747    from their RSBs and are pending deletion once the AST has been delivered */
 748 
 749 static int lockspace_busy(struct dlm_ls *ls, int force)
 750 {
 751         int rv;
 752 
 753         spin_lock(&ls->ls_lkbidr_spin);
 754         if (force == 0) {
 755                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
 756         } else if (force == 1) {
 757                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
 758         } else {
 759                 rv = 0;
 760         }
 761         spin_unlock(&ls->ls_lkbidr_spin);
 762         return rv;
 763 }
 764 
 765 static int release_lockspace(struct dlm_ls *ls, int force)
 766 {
 767         struct dlm_rsb *rsb;
 768         struct rb_node *n;
 769         int i, busy, rv;
 770 
 771         busy = lockspace_busy(ls, force);
 772 
 773         spin_lock(&lslist_lock);
 774         if (ls->ls_create_count == 1) {
 775                 if (busy) {
 776                         rv = -EBUSY;
 777                 } else {
 778                         /* remove_lockspace takes ls off lslist */
 779                         ls->ls_create_count = 0;
 780                         rv = 0;
 781                 }
 782         } else if (ls->ls_create_count > 1) {
 783                 rv = --ls->ls_create_count;
 784         } else {
 785                 rv = -EINVAL;
 786         }
 787         spin_unlock(&lslist_lock);
 788 
 789         if (rv) {
 790                 log_debug(ls, "release_lockspace no remove %d", rv);
 791                 return rv;
 792         }
 793 
 794         dlm_device_deregister(ls);
 795 
 796         if (force < 3 && dlm_user_daemon_available())
 797                 do_uevent(ls, 0);
 798 
 799         dlm_recoverd_stop(ls);
 800 
 801         dlm_callback_stop(ls);
 802 
 803         remove_lockspace(ls);
 804 
 805         dlm_delete_debug_file(ls);
 806 
 807         idr_destroy(&ls->ls_recover_idr);
 808         kfree(ls->ls_recover_buf);
 809 
 810         /*
 811          * Free all lkb's in idr
 812          */
 813 
 814         idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
 815         idr_destroy(&ls->ls_lkbidr);
 816 
 817         /*
 818          * Free all rsb's on rsbtbl[] lists
 819          */
 820 
 821         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 822                 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
 823                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
 824                         rb_erase(n, &ls->ls_rsbtbl[i].keep);
 825                         dlm_free_rsb(rsb);
 826                 }
 827 
 828                 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
 829                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
 830                         rb_erase(n, &ls->ls_rsbtbl[i].toss);
 831                         dlm_free_rsb(rsb);
 832                 }
 833         }
 834 
 835         vfree(ls->ls_rsbtbl);
 836 
 837         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
 838                 kfree(ls->ls_remove_names[i]);
 839 
 840         while (!list_empty(&ls->ls_new_rsb)) {
 841                 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
 842                                        res_hashchain);
 843                 list_del(&rsb->res_hashchain);
 844                 dlm_free_rsb(rsb);
 845         }
 846 
 847         /*
 848          * Free structures on any other lists
 849          */
 850 
 851         dlm_purge_requestqueue(ls);
 852         kfree(ls->ls_recover_args);
 853         dlm_clear_members(ls);
 854         dlm_clear_members_gone(ls);
 855         kfree(ls->ls_node_array);
 856         log_rinfo(ls, "release_lockspace final free");
 857         kobject_put(&ls->ls_kobj);
 858         /* The ls structure will be freed when the kobject is done with */
 859 
 860         module_put(THIS_MODULE);
 861         return 0;
 862 }
 863 
 864 /*
 865  * Called when a system has released all its locks and is not going to use the
 866  * lockspace any longer.  We free everything we're managing for this lockspace.
 867  * Remaining nodes will go through the recovery process as if we'd died.  The
 868  * lockspace must continue to function as usual, participating in recoveries,
 869  * until this returns.
 870  *
 871  * Force has 4 possible values:
 872  * 0 - don't destroy locksapce if it has any LKBs
 873  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
 874  * 2 - destroy lockspace regardless of LKBs
 875  * 3 - destroy lockspace as part of a forced shutdown
 876  */
 877 
 878 int dlm_release_lockspace(void *lockspace, int force)
 879 {
 880         struct dlm_ls *ls;
 881         int error;
 882 
 883         ls = dlm_find_lockspace_local(lockspace);
 884         if (!ls)
 885                 return -EINVAL;
 886         dlm_put_lockspace(ls);
 887 
 888         mutex_lock(&ls_lock);
 889         error = release_lockspace(ls, force);
 890         if (!error)
 891                 ls_count--;
 892         if (!ls_count)
 893                 threads_stop();
 894         mutex_unlock(&ls_lock);
 895 
 896         return error;
 897 }
 898 
 899 void dlm_stop_lockspaces(void)
 900 {
 901         struct dlm_ls *ls;
 902         int count;
 903 
 904  restart:
 905         count = 0;
 906         spin_lock(&lslist_lock);
 907         list_for_each_entry(ls, &lslist, ls_list) {
 908                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
 909                         count++;
 910                         continue;
 911                 }
 912                 spin_unlock(&lslist_lock);
 913                 log_error(ls, "no userland control daemon, stopping lockspace");
 914                 dlm_ls_stop(ls);
 915                 goto restart;
 916         }
 917         spin_unlock(&lslist_lock);
 918 
 919         if (count)
 920                 log_print("dlm user daemon left %d lockspaces", count);
 921 }
 922 

/* [<][>][^][v][top][bottom][index][help] */