root/fs/ocfs2/dlmfs/userdlm.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. user_lksb_to_lock_res
  2. user_check_wait_flag
  3. user_wait_on_busy_lock
  4. user_wait_on_blocked_lock
  5. cluster_connection_from_user_lockres
  6. user_dlm_inode_from_user_lockres
  7. user_recover_from_dlm_error
  8. user_highest_compat_lock_level
  9. user_ast
  10. user_dlm_grab_inode_ref
  11. __user_dlm_queue_lockres
  12. __user_dlm_cond_queue_lockres
  13. user_bast
  14. user_unlock_ast
  15. user_dlm_drop_inode_ref
  16. user_dlm_unblock_lock
  17. user_dlm_inc_holders
  18. user_may_continue_on_blocked_lock
  19. user_dlm_cluster_lock
  20. user_dlm_dec_holders
  21. user_dlm_cluster_unlock
  22. user_dlm_write_lvb
  23. user_dlm_read_lvb
  24. user_dlm_lock_res_init
  25. user_dlm_destroy_lock
  26. user_dlm_recovery_handler_noop
  27. user_dlm_set_locking_protocol
  28. user_dlm_register
  29. user_dlm_unregister

   1 // SPDX-License-Identifier: GPL-2.0-or-later
   2 /* -*- mode: c; c-basic-offset: 8; -*-
   3  * vim: noexpandtab sw=8 ts=8 sts=0:
   4  *
   5  * userdlm.c
   6  *
   7  * Code which implements the kernel side of a minimal userspace
   8  * interface to our DLM.
   9  *
  10  * Many of the functions here are pared down versions of dlmglue.c
  11  * functions.
  12  *
  13  * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
  14  */
  15 
  16 #include <linux/signal.h>
  17 #include <linux/sched/signal.h>
  18 
  19 #include <linux/module.h>
  20 #include <linux/fs.h>
  21 #include <linux/types.h>
  22 #include <linux/crc32.h>
  23 
  24 #include "../ocfs2_lockingver.h"
  25 #include "../stackglue.h"
  26 #include "userdlm.h"
  27 
  28 #define MLOG_MASK_PREFIX ML_DLMFS
  29 #include "../cluster/masklog.h"
  30 
  31 
  32 static inline struct user_lock_res *user_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb)
  33 {
  34         return container_of(lksb, struct user_lock_res, l_lksb);
  35 }
  36 
  37 static inline int user_check_wait_flag(struct user_lock_res *lockres,
  38                                        int flag)
  39 {
  40         int ret;
  41 
  42         spin_lock(&lockres->l_lock);
  43         ret = lockres->l_flags & flag;
  44         spin_unlock(&lockres->l_lock);
  45 
  46         return ret;
  47 }
  48 
  49 static inline void user_wait_on_busy_lock(struct user_lock_res *lockres)
  50 
  51 {
  52         wait_event(lockres->l_event,
  53                    !user_check_wait_flag(lockres, USER_LOCK_BUSY));
  54 }
  55 
  56 static inline void user_wait_on_blocked_lock(struct user_lock_res *lockres)
  57 
  58 {
  59         wait_event(lockres->l_event,
  60                    !user_check_wait_flag(lockres, USER_LOCK_BLOCKED));
  61 }
  62 
  63 /* I heart container_of... */
  64 static inline struct ocfs2_cluster_connection *
  65 cluster_connection_from_user_lockres(struct user_lock_res *lockres)
  66 {
  67         struct dlmfs_inode_private *ip;
  68 
  69         ip = container_of(lockres,
  70                           struct dlmfs_inode_private,
  71                           ip_lockres);
  72         return ip->ip_conn;
  73 }
  74 
  75 static struct inode *
  76 user_dlm_inode_from_user_lockres(struct user_lock_res *lockres)
  77 {
  78         struct dlmfs_inode_private *ip;
  79 
  80         ip = container_of(lockres,
  81                           struct dlmfs_inode_private,
  82                           ip_lockres);
  83         return &ip->ip_vfs_inode;
  84 }
  85 
  86 static inline void user_recover_from_dlm_error(struct user_lock_res *lockres)
  87 {
  88         spin_lock(&lockres->l_lock);
  89         lockres->l_flags &= ~USER_LOCK_BUSY;
  90         spin_unlock(&lockres->l_lock);
  91 }
  92 
  93 #define user_log_dlm_error(_func, _stat, _lockres) do {                 \
  94         mlog(ML_ERROR, "Dlm error %d while calling %s on "              \
  95                 "resource %.*s\n", _stat, _func,                        \
  96                 _lockres->l_namelen, _lockres->l_name);                 \
  97 } while (0)
  98 
  99 /* WARNING: This function lives in a world where the only three lock
 100  * levels are EX, PR, and NL. It *will* have to be adjusted when more
 101  * lock types are added. */
 102 static inline int user_highest_compat_lock_level(int level)
 103 {
 104         int new_level = DLM_LOCK_EX;
 105 
 106         if (level == DLM_LOCK_EX)
 107                 new_level = DLM_LOCK_NL;
 108         else if (level == DLM_LOCK_PR)
 109                 new_level = DLM_LOCK_PR;
 110         return new_level;
 111 }
 112 
 113 static void user_ast(struct ocfs2_dlm_lksb *lksb)
 114 {
 115         struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);
 116         int status;
 117 
 118         mlog(ML_BASTS, "AST fired for lockres %.*s, level %d => %d\n",
 119              lockres->l_namelen, lockres->l_name, lockres->l_level,
 120              lockres->l_requested);
 121 
 122         spin_lock(&lockres->l_lock);
 123 
 124         status = ocfs2_dlm_lock_status(&lockres->l_lksb);
 125         if (status) {
 126                 mlog(ML_ERROR, "lksb status value of %u on lockres %.*s\n",
 127                      status, lockres->l_namelen, lockres->l_name);
 128                 spin_unlock(&lockres->l_lock);
 129                 return;
 130         }
 131 
 132         mlog_bug_on_msg(lockres->l_requested == DLM_LOCK_IV,
 133                         "Lockres %.*s, requested ivmode. flags 0x%x\n",
 134                         lockres->l_namelen, lockres->l_name, lockres->l_flags);
 135 
 136         /* we're downconverting. */
 137         if (lockres->l_requested < lockres->l_level) {
 138                 if (lockres->l_requested <=
 139                     user_highest_compat_lock_level(lockres->l_blocking)) {
 140                         lockres->l_blocking = DLM_LOCK_NL;
 141                         lockres->l_flags &= ~USER_LOCK_BLOCKED;
 142                 }
 143         }
 144 
 145         lockres->l_level = lockres->l_requested;
 146         lockres->l_requested = DLM_LOCK_IV;
 147         lockres->l_flags |= USER_LOCK_ATTACHED;
 148         lockres->l_flags &= ~USER_LOCK_BUSY;
 149 
 150         spin_unlock(&lockres->l_lock);
 151 
 152         wake_up(&lockres->l_event);
 153 }
 154 
 155 static inline void user_dlm_grab_inode_ref(struct user_lock_res *lockres)
 156 {
 157         struct inode *inode;
 158         inode = user_dlm_inode_from_user_lockres(lockres);
 159         if (!igrab(inode))
 160                 BUG();
 161 }
 162 
 163 static void user_dlm_unblock_lock(struct work_struct *work);
 164 
 165 static void __user_dlm_queue_lockres(struct user_lock_res *lockres)
 166 {
 167         if (!(lockres->l_flags & USER_LOCK_QUEUED)) {
 168                 user_dlm_grab_inode_ref(lockres);
 169 
 170                 INIT_WORK(&lockres->l_work, user_dlm_unblock_lock);
 171 
 172                 queue_work(user_dlm_worker, &lockres->l_work);
 173                 lockres->l_flags |= USER_LOCK_QUEUED;
 174         }
 175 }
 176 
 177 static void __user_dlm_cond_queue_lockres(struct user_lock_res *lockres)
 178 {
 179         int queue = 0;
 180 
 181         if (!(lockres->l_flags & USER_LOCK_BLOCKED))
 182                 return;
 183 
 184         switch (lockres->l_blocking) {
 185         case DLM_LOCK_EX:
 186                 if (!lockres->l_ex_holders && !lockres->l_ro_holders)
 187                         queue = 1;
 188                 break;
 189         case DLM_LOCK_PR:
 190                 if (!lockres->l_ex_holders)
 191                         queue = 1;
 192                 break;
 193         default:
 194                 BUG();
 195         }
 196 
 197         if (queue)
 198                 __user_dlm_queue_lockres(lockres);
 199 }
 200 
 201 static void user_bast(struct ocfs2_dlm_lksb *lksb, int level)
 202 {
 203         struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);
 204 
 205         mlog(ML_BASTS, "BAST fired for lockres %.*s, blocking %d, level %d\n",
 206              lockres->l_namelen, lockres->l_name, level, lockres->l_level);
 207 
 208         spin_lock(&lockres->l_lock);
 209         lockres->l_flags |= USER_LOCK_BLOCKED;
 210         if (level > lockres->l_blocking)
 211                 lockres->l_blocking = level;
 212 
 213         __user_dlm_queue_lockres(lockres);
 214         spin_unlock(&lockres->l_lock);
 215 
 216         wake_up(&lockres->l_event);
 217 }
 218 
 219 static void user_unlock_ast(struct ocfs2_dlm_lksb *lksb, int status)
 220 {
 221         struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);
 222 
 223         mlog(ML_BASTS, "UNLOCK AST fired for lockres %.*s, flags 0x%x\n",
 224              lockres->l_namelen, lockres->l_name, lockres->l_flags);
 225 
 226         if (status)
 227                 mlog(ML_ERROR, "dlm returns status %d\n", status);
 228 
 229         spin_lock(&lockres->l_lock);
 230         /* The teardown flag gets set early during the unlock process,
 231          * so test the cancel flag to make sure that this ast isn't
 232          * for a concurrent cancel. */
 233         if (lockres->l_flags & USER_LOCK_IN_TEARDOWN
 234             && !(lockres->l_flags & USER_LOCK_IN_CANCEL)) {
 235                 lockres->l_level = DLM_LOCK_IV;
 236         } else if (status == DLM_CANCELGRANT) {
 237                 /* We tried to cancel a convert request, but it was
 238                  * already granted. Don't clear the busy flag - the
 239                  * ast should've done this already. */
 240                 BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL));
 241                 lockres->l_flags &= ~USER_LOCK_IN_CANCEL;
 242                 goto out_noclear;
 243         } else {
 244                 BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL));
 245                 /* Cancel succeeded, we want to re-queue */
 246                 lockres->l_requested = DLM_LOCK_IV; /* cancel an
 247                                                     * upconvert
 248                                                     * request. */
 249                 lockres->l_flags &= ~USER_LOCK_IN_CANCEL;
 250                 /* we want the unblock thread to look at it again
 251                  * now. */
 252                 if (lockres->l_flags & USER_LOCK_BLOCKED)
 253                         __user_dlm_queue_lockres(lockres);
 254         }
 255 
 256         lockres->l_flags &= ~USER_LOCK_BUSY;
 257 out_noclear:
 258         spin_unlock(&lockres->l_lock);
 259 
 260         wake_up(&lockres->l_event);
 261 }
 262 
 263 /*
 264  * This is the userdlmfs locking protocol version.
 265  *
 266  * See fs/ocfs2/dlmglue.c for more details on locking versions.
 267  */
 268 static struct ocfs2_locking_protocol user_dlm_lproto = {
 269         .lp_max_version = {
 270                 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
 271                 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
 272         },
 273         .lp_lock_ast            = user_ast,
 274         .lp_blocking_ast        = user_bast,
 275         .lp_unlock_ast          = user_unlock_ast,
 276 };
 277 
 278 static inline void user_dlm_drop_inode_ref(struct user_lock_res *lockres)
 279 {
 280         struct inode *inode;
 281         inode = user_dlm_inode_from_user_lockres(lockres);
 282         iput(inode);
 283 }
 284 
 285 static void user_dlm_unblock_lock(struct work_struct *work)
 286 {
 287         int new_level, status;
 288         struct user_lock_res *lockres =
 289                 container_of(work, struct user_lock_res, l_work);
 290         struct ocfs2_cluster_connection *conn =
 291                 cluster_connection_from_user_lockres(lockres);
 292 
 293         mlog(0, "lockres %.*s\n", lockres->l_namelen, lockres->l_name);
 294 
 295         spin_lock(&lockres->l_lock);
 296 
 297         mlog_bug_on_msg(!(lockres->l_flags & USER_LOCK_QUEUED),
 298                         "Lockres %.*s, flags 0x%x\n",
 299                         lockres->l_namelen, lockres->l_name, lockres->l_flags);
 300 
 301         /* notice that we don't clear USER_LOCK_BLOCKED here. If it's
 302          * set, we want user_ast clear it. */
 303         lockres->l_flags &= ~USER_LOCK_QUEUED;
 304 
 305         /* It's valid to get here and no longer be blocked - if we get
 306          * several basts in a row, we might be queued by the first
 307          * one, the unblock thread might run and clear the queued
 308          * flag, and finally we might get another bast which re-queues
 309          * us before our ast for the downconvert is called. */
 310         if (!(lockres->l_flags & USER_LOCK_BLOCKED)) {
 311                 mlog(ML_BASTS, "lockres %.*s USER_LOCK_BLOCKED\n",
 312                      lockres->l_namelen, lockres->l_name);
 313                 spin_unlock(&lockres->l_lock);
 314                 goto drop_ref;
 315         }
 316 
 317         if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) {
 318                 mlog(ML_BASTS, "lockres %.*s USER_LOCK_IN_TEARDOWN\n",
 319                      lockres->l_namelen, lockres->l_name);
 320                 spin_unlock(&lockres->l_lock);
 321                 goto drop_ref;
 322         }
 323 
 324         if (lockres->l_flags & USER_LOCK_BUSY) {
 325                 if (lockres->l_flags & USER_LOCK_IN_CANCEL) {
 326                         mlog(ML_BASTS, "lockres %.*s USER_LOCK_IN_CANCEL\n",
 327                              lockres->l_namelen, lockres->l_name);
 328                         spin_unlock(&lockres->l_lock);
 329                         goto drop_ref;
 330                 }
 331 
 332                 lockres->l_flags |= USER_LOCK_IN_CANCEL;
 333                 spin_unlock(&lockres->l_lock);
 334 
 335                 status = ocfs2_dlm_unlock(conn, &lockres->l_lksb,
 336                                           DLM_LKF_CANCEL);
 337                 if (status)
 338                         user_log_dlm_error("ocfs2_dlm_unlock", status, lockres);
 339                 goto drop_ref;
 340         }
 341 
 342         /* If there are still incompat holders, we can exit safely
 343          * without worrying about re-queueing this lock as that will
 344          * happen on the last call to user_cluster_unlock. */
 345         if ((lockres->l_blocking == DLM_LOCK_EX)
 346             && (lockres->l_ex_holders || lockres->l_ro_holders)) {
 347                 spin_unlock(&lockres->l_lock);
 348                 mlog(ML_BASTS, "lockres %.*s, EX/PR Holders %u,%u\n",
 349                      lockres->l_namelen, lockres->l_name,
 350                      lockres->l_ex_holders, lockres->l_ro_holders);
 351                 goto drop_ref;
 352         }
 353 
 354         if ((lockres->l_blocking == DLM_LOCK_PR)
 355             && lockres->l_ex_holders) {
 356                 spin_unlock(&lockres->l_lock);
 357                 mlog(ML_BASTS, "lockres %.*s, EX Holders %u\n",
 358                      lockres->l_namelen, lockres->l_name,
 359                      lockres->l_ex_holders);
 360                 goto drop_ref;
 361         }
 362 
 363         /* yay, we can downconvert now. */
 364         new_level = user_highest_compat_lock_level(lockres->l_blocking);
 365         lockres->l_requested = new_level;
 366         lockres->l_flags |= USER_LOCK_BUSY;
 367         mlog(ML_BASTS, "lockres %.*s, downconvert %d => %d\n",
 368              lockres->l_namelen, lockres->l_name, lockres->l_level, new_level);
 369         spin_unlock(&lockres->l_lock);
 370 
 371         /* need lock downconvert request now... */
 372         status = ocfs2_dlm_lock(conn, new_level, &lockres->l_lksb,
 373                                 DLM_LKF_CONVERT|DLM_LKF_VALBLK,
 374                                 lockres->l_name,
 375                                 lockres->l_namelen);
 376         if (status) {
 377                 user_log_dlm_error("ocfs2_dlm_lock", status, lockres);
 378                 user_recover_from_dlm_error(lockres);
 379         }
 380 
 381 drop_ref:
 382         user_dlm_drop_inode_ref(lockres);
 383 }
 384 
 385 static inline void user_dlm_inc_holders(struct user_lock_res *lockres,
 386                                         int level)
 387 {
 388         switch(level) {
 389         case DLM_LOCK_EX:
 390                 lockres->l_ex_holders++;
 391                 break;
 392         case DLM_LOCK_PR:
 393                 lockres->l_ro_holders++;
 394                 break;
 395         default:
 396                 BUG();
 397         }
 398 }
 399 
 400 /* predict what lock level we'll be dropping down to on behalf
 401  * of another node, and return true if the currently wanted
 402  * level will be compatible with it. */
 403 static inline int
 404 user_may_continue_on_blocked_lock(struct user_lock_res *lockres,
 405                                   int wanted)
 406 {
 407         BUG_ON(!(lockres->l_flags & USER_LOCK_BLOCKED));
 408 
 409         return wanted <= user_highest_compat_lock_level(lockres->l_blocking);
 410 }
 411 
 412 int user_dlm_cluster_lock(struct user_lock_res *lockres,
 413                           int level,
 414                           int lkm_flags)
 415 {
 416         int status, local_flags;
 417         struct ocfs2_cluster_connection *conn =
 418                 cluster_connection_from_user_lockres(lockres);
 419 
 420         if (level != DLM_LOCK_EX &&
 421             level != DLM_LOCK_PR) {
 422                 mlog(ML_ERROR, "lockres %.*s: invalid request!\n",
 423                      lockres->l_namelen, lockres->l_name);
 424                 status = -EINVAL;
 425                 goto bail;
 426         }
 427 
 428         mlog(ML_BASTS, "lockres %.*s, level %d, flags = 0x%x\n",
 429              lockres->l_namelen, lockres->l_name, level, lkm_flags);
 430 
 431 again:
 432         if (signal_pending(current)) {
 433                 status = -ERESTARTSYS;
 434                 goto bail;
 435         }
 436 
 437         spin_lock(&lockres->l_lock);
 438 
 439         /* We only compare against the currently granted level
 440          * here. If the lock is blocked waiting on a downconvert,
 441          * we'll get caught below. */
 442         if ((lockres->l_flags & USER_LOCK_BUSY) &&
 443             (level > lockres->l_level)) {
 444                 /* is someone sitting in dlm_lock? If so, wait on
 445                  * them. */
 446                 spin_unlock(&lockres->l_lock);
 447 
 448                 user_wait_on_busy_lock(lockres);
 449                 goto again;
 450         }
 451 
 452         if ((lockres->l_flags & USER_LOCK_BLOCKED) &&
 453             (!user_may_continue_on_blocked_lock(lockres, level))) {
 454                 /* is the lock is currently blocked on behalf of
 455                  * another node */
 456                 spin_unlock(&lockres->l_lock);
 457 
 458                 user_wait_on_blocked_lock(lockres);
 459                 goto again;
 460         }
 461 
 462         if (level > lockres->l_level) {
 463                 local_flags = lkm_flags | DLM_LKF_VALBLK;
 464                 if (lockres->l_level != DLM_LOCK_IV)
 465                         local_flags |= DLM_LKF_CONVERT;
 466 
 467                 lockres->l_requested = level;
 468                 lockres->l_flags |= USER_LOCK_BUSY;
 469                 spin_unlock(&lockres->l_lock);
 470 
 471                 BUG_ON(level == DLM_LOCK_IV);
 472                 BUG_ON(level == DLM_LOCK_NL);
 473 
 474                 /* call dlm_lock to upgrade lock now */
 475                 status = ocfs2_dlm_lock(conn, level, &lockres->l_lksb,
 476                                         local_flags, lockres->l_name,
 477                                         lockres->l_namelen);
 478                 if (status) {
 479                         if ((lkm_flags & DLM_LKF_NOQUEUE) &&
 480                             (status != -EAGAIN))
 481                                 user_log_dlm_error("ocfs2_dlm_lock",
 482                                                    status, lockres);
 483                         user_recover_from_dlm_error(lockres);
 484                         goto bail;
 485                 }
 486 
 487                 user_wait_on_busy_lock(lockres);
 488                 goto again;
 489         }
 490 
 491         user_dlm_inc_holders(lockres, level);
 492         spin_unlock(&lockres->l_lock);
 493 
 494         status = 0;
 495 bail:
 496         return status;
 497 }
 498 
 499 static inline void user_dlm_dec_holders(struct user_lock_res *lockres,
 500                                         int level)
 501 {
 502         switch(level) {
 503         case DLM_LOCK_EX:
 504                 BUG_ON(!lockres->l_ex_holders);
 505                 lockres->l_ex_holders--;
 506                 break;
 507         case DLM_LOCK_PR:
 508                 BUG_ON(!lockres->l_ro_holders);
 509                 lockres->l_ro_holders--;
 510                 break;
 511         default:
 512                 BUG();
 513         }
 514 }
 515 
 516 void user_dlm_cluster_unlock(struct user_lock_res *lockres,
 517                              int level)
 518 {
 519         if (level != DLM_LOCK_EX &&
 520             level != DLM_LOCK_PR) {
 521                 mlog(ML_ERROR, "lockres %.*s: invalid request!\n",
 522                      lockres->l_namelen, lockres->l_name);
 523                 return;
 524         }
 525 
 526         spin_lock(&lockres->l_lock);
 527         user_dlm_dec_holders(lockres, level);
 528         __user_dlm_cond_queue_lockres(lockres);
 529         spin_unlock(&lockres->l_lock);
 530 }
 531 
 532 void user_dlm_write_lvb(struct inode *inode,
 533                         const char *val,
 534                         unsigned int len)
 535 {
 536         struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres;
 537         char *lvb;
 538 
 539         BUG_ON(len > DLM_LVB_LEN);
 540 
 541         spin_lock(&lockres->l_lock);
 542 
 543         BUG_ON(lockres->l_level < DLM_LOCK_EX);
 544         lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
 545         memcpy(lvb, val, len);
 546 
 547         spin_unlock(&lockres->l_lock);
 548 }
 549 
 550 ssize_t user_dlm_read_lvb(struct inode *inode,
 551                           char *val,
 552                           unsigned int len)
 553 {
 554         struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres;
 555         char *lvb;
 556         ssize_t ret = len;
 557 
 558         BUG_ON(len > DLM_LVB_LEN);
 559 
 560         spin_lock(&lockres->l_lock);
 561 
 562         BUG_ON(lockres->l_level < DLM_LOCK_PR);
 563         if (ocfs2_dlm_lvb_valid(&lockres->l_lksb)) {
 564                 lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
 565                 memcpy(val, lvb, len);
 566         } else
 567                 ret = 0;
 568 
 569         spin_unlock(&lockres->l_lock);
 570         return ret;
 571 }
 572 
 573 void user_dlm_lock_res_init(struct user_lock_res *lockres,
 574                             struct dentry *dentry)
 575 {
 576         memset(lockres, 0, sizeof(*lockres));
 577 
 578         spin_lock_init(&lockres->l_lock);
 579         init_waitqueue_head(&lockres->l_event);
 580         lockres->l_level = DLM_LOCK_IV;
 581         lockres->l_requested = DLM_LOCK_IV;
 582         lockres->l_blocking = DLM_LOCK_IV;
 583 
 584         /* should have been checked before getting here. */
 585         BUG_ON(dentry->d_name.len >= USER_DLM_LOCK_ID_MAX_LEN);
 586 
 587         memcpy(lockres->l_name,
 588                dentry->d_name.name,
 589                dentry->d_name.len);
 590         lockres->l_namelen = dentry->d_name.len;
 591 }
 592 
 593 int user_dlm_destroy_lock(struct user_lock_res *lockres)
 594 {
 595         int status = -EBUSY;
 596         struct ocfs2_cluster_connection *conn =
 597                 cluster_connection_from_user_lockres(lockres);
 598 
 599         mlog(ML_BASTS, "lockres %.*s\n", lockres->l_namelen, lockres->l_name);
 600 
 601         spin_lock(&lockres->l_lock);
 602         if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) {
 603                 spin_unlock(&lockres->l_lock);
 604                 return 0;
 605         }
 606 
 607         lockres->l_flags |= USER_LOCK_IN_TEARDOWN;
 608 
 609         while (lockres->l_flags & USER_LOCK_BUSY) {
 610                 spin_unlock(&lockres->l_lock);
 611 
 612                 user_wait_on_busy_lock(lockres);
 613 
 614                 spin_lock(&lockres->l_lock);
 615         }
 616 
 617         if (lockres->l_ro_holders || lockres->l_ex_holders) {
 618                 spin_unlock(&lockres->l_lock);
 619                 goto bail;
 620         }
 621 
 622         status = 0;
 623         if (!(lockres->l_flags & USER_LOCK_ATTACHED)) {
 624                 spin_unlock(&lockres->l_lock);
 625                 goto bail;
 626         }
 627 
 628         lockres->l_flags &= ~USER_LOCK_ATTACHED;
 629         lockres->l_flags |= USER_LOCK_BUSY;
 630         spin_unlock(&lockres->l_lock);
 631 
 632         status = ocfs2_dlm_unlock(conn, &lockres->l_lksb, DLM_LKF_VALBLK);
 633         if (status) {
 634                 user_log_dlm_error("ocfs2_dlm_unlock", status, lockres);
 635                 goto bail;
 636         }
 637 
 638         user_wait_on_busy_lock(lockres);
 639 
 640         status = 0;
 641 bail:
 642         return status;
 643 }
 644 
 645 static void user_dlm_recovery_handler_noop(int node_num,
 646                                            void *recovery_data)
 647 {
 648         /* We ignore recovery events */
 649         return;
 650 }
 651 
 652 void user_dlm_set_locking_protocol(void)
 653 {
 654         ocfs2_stack_glue_set_max_proto_version(&user_dlm_lproto.lp_max_version);
 655 }
 656 
 657 struct ocfs2_cluster_connection *user_dlm_register(const struct qstr *name)
 658 {
 659         int rc;
 660         struct ocfs2_cluster_connection *conn;
 661 
 662         rc = ocfs2_cluster_connect_agnostic(name->name, name->len,
 663                                             &user_dlm_lproto,
 664                                             user_dlm_recovery_handler_noop,
 665                                             NULL, &conn);
 666         if (rc)
 667                 mlog_errno(rc);
 668 
 669         return rc ? ERR_PTR(rc) : conn;
 670 }
 671 
 672 void user_dlm_unregister(struct ocfs2_cluster_connection *conn)
 673 {
 674         ocfs2_cluster_disconnect(conn, 0);
 675 }

/* [<][>][^][v][top][bottom][index][help] */