1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * dlmlock.c
5 *
6 * underlying calls for lock creation
7 *
8 * Copyright (C) 2004 Oracle.  All rights reserved.
9 *
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18 * General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
24 *
25 */
26
27
28#include <linux/module.h>
29#include <linux/fs.h>
30#include <linux/types.h>
31#include <linux/slab.h>
32#include <linux/highmem.h>
33#include <linux/init.h>
34#include <linux/sysctl.h>
35#include <linux/random.h>
36#include <linux/blkdev.h>
37#include <linux/socket.h>
38#include <linux/inet.h>
39#include <linux/spinlock.h>
40#include <linux/delay.h>
41
42
43#include "cluster/heartbeat.h"
44#include "cluster/nodemanager.h"
45#include "cluster/tcp.h"
46
47#include "dlmapi.h"
48#include "dlmcommon.h"
49
50#include "dlmconvert.h"
51
52#define MLOG_MASK_PREFIX ML_DLM
53#include "cluster/masklog.h"
54
55static struct kmem_cache *dlm_lock_cache;
56
57static DEFINE_SPINLOCK(dlm_cookie_lock);
58static u64 dlm_next_cookie = 1;
59
60static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm,
61					       struct dlm_lock_resource *res,
62					       struct dlm_lock *lock, int flags);
63static void dlm_init_lock(struct dlm_lock *newlock, int type,
64			  u8 node, u64 cookie);
65static void dlm_lock_release(struct kref *kref);
66static void dlm_lock_detach_lockres(struct dlm_lock *lock);
67
68int dlm_init_lock_cache(void)
69{
70	dlm_lock_cache = kmem_cache_create("o2dlm_lock",
71					   sizeof(struct dlm_lock),
72					   0, SLAB_HWCACHE_ALIGN, NULL);
73	if (dlm_lock_cache == NULL)
74		return -ENOMEM;
75	return 0;
76}
77
78void dlm_destroy_lock_cache(void)
79{
80	if (dlm_lock_cache)
81		kmem_cache_destroy(dlm_lock_cache);
82}
83
84/* Tell us whether we can grant a new lock request.
85 * locking:
86 *   caller needs:  res->spinlock
87 *   taken:         none
88 *   held on exit:  none
89 * returns: 1 if the lock can be granted, 0 otherwise.
90 */
91static int dlm_can_grant_new_lock(struct dlm_lock_resource *res,
92				  struct dlm_lock *lock)
93{
94	struct dlm_lock *tmplock;
95
96	list_for_each_entry(tmplock, &res->granted, list) {
97		if (!dlm_lock_compatible(tmplock->ml.type, lock->ml.type))
98			return 0;
99	}
100
101	list_for_each_entry(tmplock, &res->converting, list) {
102		if (!dlm_lock_compatible(tmplock->ml.type, lock->ml.type))
103			return 0;
104		if (!dlm_lock_compatible(tmplock->ml.convert_type,
105					 lock->ml.type))
106			return 0;
107	}
108
109	return 1;
110}
111
112/* performs lock creation at the lockres master site
113 * locking:
114 *   caller needs:  none
115 *   taken:         takes and drops res->spinlock
116 *   held on exit:  none
117 * returns: DLM_NORMAL, DLM_NOTQUEUED
118 */
119static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm,
120				      struct dlm_lock_resource *res,
121				      struct dlm_lock *lock, int flags)
122{
123	int call_ast = 0, kick_thread = 0;
124	enum dlm_status status = DLM_NORMAL;
125
126	mlog(0, "type=%d\n", lock->ml.type);
127
128	spin_lock(&res->spinlock);
129	/* if called from dlm_create_lock_handler, need to
130	 * ensure it will not sleep in dlm_wait_on_lockres */
131	status = __dlm_lockres_state_to_status(res);
132	if (status != DLM_NORMAL &&
133	    lock->ml.node != dlm->node_num) {
134		/* erf.  state changed after lock was dropped. */
135		spin_unlock(&res->spinlock);
136		dlm_error(status);
137		return status;
138	}
139	__dlm_wait_on_lockres(res);
140	__dlm_lockres_reserve_ast(res);
141
142	if (dlm_can_grant_new_lock(res, lock)) {
143		mlog(0, "I can grant this lock right away\n");
144		/* got it right away */
145		lock->lksb->status = DLM_NORMAL;
146		status = DLM_NORMAL;
147		dlm_lock_get(lock);
148		list_add_tail(&lock->list, &res->granted);
149
150		/* for the recovery lock, we can't allow the ast
151		 * to be queued since the dlmthread is already
152		 * frozen.  but the recovery lock is always locked
153		 * with LKM_NOQUEUE so we do not need the ast in
154		 * this special case */
155		if (!dlm_is_recovery_lock(res->lockname.name,
156					  res->lockname.len)) {
157			kick_thread = 1;
158			call_ast = 1;
159		} else {
160			mlog(0, "%s: returning DLM_NORMAL to "
161			     "node %u for reco lock\n", dlm->name,
162			     lock->ml.node);
163		}
164	} else {
165		/* for NOQUEUE request, unless we get the
166		 * lock right away, return DLM_NOTQUEUED */
167		if (flags & LKM_NOQUEUE) {
168			status = DLM_NOTQUEUED;
169			if (dlm_is_recovery_lock(res->lockname.name,
170						 res->lockname.len)) {
171				mlog(0, "%s: returning NOTQUEUED to "
172				     "node %u for reco lock\n", dlm->name,
173				     lock->ml.node);
174			}
175		} else {
176			status = DLM_NORMAL;
177			dlm_lock_get(lock);
178			list_add_tail(&lock->list, &res->blocked);
179			kick_thread = 1;
180		}
181	}
182
183	spin_unlock(&res->spinlock);
184	wake_up(&res->wq);
185
186	/* either queue the ast or release it */
187	if (call_ast)
188		dlm_queue_ast(dlm, lock);
189	else
190		dlm_lockres_release_ast(dlm, res);
191
192	dlm_lockres_calc_usage(dlm, res);
193	if (kick_thread)
194		dlm_kick_thread(dlm, res);
195
196	return status;
197}
198
199void dlm_revert_pending_lock(struct dlm_lock_resource *res,
200			     struct dlm_lock *lock)
201{
202	/* remove from local queue if it failed */
203	list_del_init(&lock->list);
204	lock->lksb->flags &= ~DLM_LKSB_GET_LVB;
205}
206
207
208/*
209 * locking:
210 *   caller needs:  none
211 *   taken:         takes and drops res->spinlock
212 *   held on exit:  none
213 * returns: DLM_DENIED, DLM_RECOVERING, or net status
214 */
215static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
216				      struct dlm_lock_resource *res,
217				      struct dlm_lock *lock, int flags)
218{
219	enum dlm_status status = DLM_DENIED;
220	int lockres_changed = 1;
221
222	mlog(0, "type=%d, lockres %.*s, flags = 0x%x\n",
223	     lock->ml.type, res->lockname.len,
224	     res->lockname.name, flags);
225
226	/*
227	 * Wait if resource is getting recovered, remastered, etc.
228	 * If the resource was remastered and new owner is self, then exit.
229	 */
230	spin_lock(&res->spinlock);
231	__dlm_wait_on_lockres(res);
232	if (res->owner == dlm->node_num) {
233		spin_unlock(&res->spinlock);
234		return DLM_RECOVERING;
235	}
236	res->state |= DLM_LOCK_RES_IN_PROGRESS;
237
238	/* add lock to local (secondary) queue */
239	dlm_lock_get(lock);
240	list_add_tail(&lock->list, &res->blocked);
241	lock->lock_pending = 1;
242	spin_unlock(&res->spinlock);
243
244	/* spec seems to say that you will get DLM_NORMAL when the lock
245	 * has been queued, meaning we need to wait for a reply here. */
246	status = dlm_send_remote_lock_request(dlm, res, lock, flags);
247
248	spin_lock(&res->spinlock);
249	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
250	lock->lock_pending = 0;
251	if (status != DLM_NORMAL) {
252		if (status == DLM_RECOVERING &&
253		    dlm_is_recovery_lock(res->lockname.name,
254					 res->lockname.len)) {
255			/* recovery lock was mastered by dead node.
256			 * we need to have calc_usage shoot down this
257			 * lockres and completely remaster it. */
258			mlog(0, "%s: recovery lock was owned by "
259			     "dead node %u, remaster it now.\n",
260			     dlm->name, res->owner);
261		} else if (status != DLM_NOTQUEUED) {
262			/*
263			 * DO NOT call calc_usage, as this would unhash
264			 * the remote lockres before we ever get to use
265			 * it.  treat as if we never made any change to
266			 * the lockres.
267			 */
268			lockres_changed = 0;
269			dlm_error(status);
270		}
271		dlm_revert_pending_lock(res, lock);
272		dlm_lock_put(lock);
273	} else if (dlm_is_recovery_lock(res->lockname.name,
274					res->lockname.len)) {
275		/* special case for the $RECOVERY lock.
276		 * there will never be an AST delivered to put
277		 * this lock on the proper secondary queue
278		 * (granted), so do it manually. */
279		mlog(0, "%s: $RECOVERY lock for this node (%u) is "
280		     "mastered by %u; got lock, manually granting (no ast)\n",
281		     dlm->name, dlm->node_num, res->owner);
282		list_move_tail(&lock->list, &res->granted);
283	}
284	spin_unlock(&res->spinlock);
285
286	if (lockres_changed)
287		dlm_lockres_calc_usage(dlm, res);
288
289	wake_up(&res->wq);
290	return status;
291}
292
293
294/* for remote lock creation.
295 * locking:
296 *   caller needs:  none, but need res->state & DLM_LOCK_RES_IN_PROGRESS
297 *   taken:         none
298 *   held on exit:  none
299 * returns: DLM_NOLOCKMGR, or net status
300 */
301static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm,
302					       struct dlm_lock_resource *res,
303					       struct dlm_lock *lock, int flags)
304{
305	struct dlm_create_lock create;
306	int tmpret, status = 0;
307	enum dlm_status ret;
308
309	memset(&create, 0, sizeof(create));
310	create.node_idx = dlm->node_num;
311	create.requested_type = lock->ml.type;
312	create.cookie = lock->ml.cookie;
313	create.namelen = res->lockname.len;
314	create.flags = cpu_to_be32(flags);
315	memcpy(create.name, res->lockname.name, create.namelen);
316
317	tmpret = o2net_send_message(DLM_CREATE_LOCK_MSG, dlm->key, &create,
318				    sizeof(create), res->owner, &status);
319	if (tmpret >= 0) {
320		ret = status;
321		if (ret == DLM_REJECTED) {
322			mlog(ML_ERROR, "%s: res %.*s, Stale lockres no longer "
323			     "owned by node %u. That node is coming back up "
324			     "currently.\n", dlm->name, create.namelen,
325			     create.name, res->owner);
326			dlm_print_one_lock_resource(res);
327			BUG();
328		}
329	} else {
330		mlog(ML_ERROR, "%s: res %.*s, Error %d send CREATE LOCK to "
331		     "node %u\n", dlm->name, create.namelen, create.name,
332		     tmpret, res->owner);
333		if (dlm_is_host_down(tmpret))
334			ret = DLM_RECOVERING;
335		else
336			ret = dlm_err_to_dlm_status(tmpret);
337	}
338
339	return ret;
340}
341
342void dlm_lock_get(struct dlm_lock *lock)
343{
344	kref_get(&lock->lock_refs);
345}
346
347void dlm_lock_put(struct dlm_lock *lock)
348{
349	kref_put(&lock->lock_refs, dlm_lock_release);
350}
351
352static void dlm_lock_release(struct kref *kref)
353{
354	struct dlm_lock *lock;
355
356	lock = container_of(kref, struct dlm_lock, lock_refs);
357
358	BUG_ON(!list_empty(&lock->list));
359	BUG_ON(!list_empty(&lock->ast_list));
360	BUG_ON(!list_empty(&lock->bast_list));
361	BUG_ON(lock->ast_pending);
362	BUG_ON(lock->bast_pending);
363
364	dlm_lock_detach_lockres(lock);
365
366	if (lock->lksb_kernel_allocated) {
367		mlog(0, "freeing kernel-allocated lksb\n");
368		kfree(lock->lksb);
369	}
370	kmem_cache_free(dlm_lock_cache, lock);
371}
372
373/* associate a lock with it's lockres, getting a ref on the lockres */
374void dlm_lock_attach_lockres(struct dlm_lock *lock,
375			     struct dlm_lock_resource *res)
376{
377	dlm_lockres_get(res);
378	lock->lockres = res;
379}
380
381/* drop ref on lockres, if there is still one associated with lock */
382static void dlm_lock_detach_lockres(struct dlm_lock *lock)
383{
384	struct dlm_lock_resource *res;
385
386	res = lock->lockres;
387	if (res) {
388		lock->lockres = NULL;
389		mlog(0, "removing lock's lockres reference\n");
390		dlm_lockres_put(res);
391	}
392}
393
394static void dlm_init_lock(struct dlm_lock *newlock, int type,
395			  u8 node, u64 cookie)
396{
397	INIT_LIST_HEAD(&newlock->list);
398	INIT_LIST_HEAD(&newlock->ast_list);
399	INIT_LIST_HEAD(&newlock->bast_list);
400	spin_lock_init(&newlock->spinlock);
401	newlock->ml.type = type;
402	newlock->ml.convert_type = LKM_IVMODE;
403	newlock->ml.highest_blocked = LKM_IVMODE;
404	newlock->ml.node = node;
405	newlock->ml.pad1 = 0;
406	newlock->ml.list = 0;
407	newlock->ml.flags = 0;
408	newlock->ast = NULL;
409	newlock->bast = NULL;
410	newlock->astdata = NULL;
411	newlock->ml.cookie = cpu_to_be64(cookie);
412	newlock->ast_pending = 0;
413	newlock->bast_pending = 0;
414	newlock->convert_pending = 0;
415	newlock->lock_pending = 0;
416	newlock->unlock_pending = 0;
417	newlock->cancel_pending = 0;
418	newlock->lksb_kernel_allocated = 0;
419
420	kref_init(&newlock->lock_refs);
421}
422
423struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie,
424			       struct dlm_lockstatus *lksb)
425{
426	struct dlm_lock *lock;
427	int kernel_allocated = 0;
428
429	lock = kmem_cache_zalloc(dlm_lock_cache, GFP_NOFS);
430	if (!lock)
431		return NULL;
432
433	if (!lksb) {
434		/* zero memory only if kernel-allocated */
435		lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
436		if (!lksb) {
437			kmem_cache_free(dlm_lock_cache, lock);
438			return NULL;
439		}
440		kernel_allocated = 1;
441	}
442
443	dlm_init_lock(lock, type, node, cookie);
444	if (kernel_allocated)
445		lock->lksb_kernel_allocated = 1;
446	lock->lksb = lksb;
447	lksb->lockid = lock;
448	return lock;
449}
450
451/* handler for lock creation net message
452 * locking:
453 *   caller needs:  none
454 *   taken:         takes and drops res->spinlock
455 *   held on exit:  none
456 * returns: DLM_NORMAL, DLM_SYSERR, DLM_IVLOCKID, DLM_NOTQUEUED
457 */
458int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data,
459			    void **ret_data)
460{
461	struct dlm_ctxt *dlm = data;
462	struct dlm_create_lock *create = (struct dlm_create_lock *)msg->buf;
463	struct dlm_lock_resource *res = NULL;
464	struct dlm_lock *newlock = NULL;
465	struct dlm_lockstatus *lksb = NULL;
466	enum dlm_status status = DLM_NORMAL;
467	char *name;
468	unsigned int namelen;
469
470	BUG_ON(!dlm);
471
472	if (!dlm_grab(dlm))
473		return DLM_REJECTED;
474
475	name = create->name;
476	namelen = create->namelen;
477	status = DLM_REJECTED;
478	if (!dlm_domain_fully_joined(dlm)) {
479		mlog(ML_ERROR, "Domain %s not fully joined, but node %u is "
480		     "sending a create_lock message for lock %.*s!\n",
481		     dlm->name, create->node_idx, namelen, name);
482		dlm_error(status);
483		goto leave;
484	}
485
486	status = DLM_IVBUFLEN;
487	if (namelen > DLM_LOCKID_NAME_MAX) {
488		dlm_error(status);
489		goto leave;
490	}
491
492	status = DLM_SYSERR;
493	newlock = dlm_new_lock(create->requested_type,
494			       create->node_idx,
495			       be64_to_cpu(create->cookie), NULL);
496	if (!newlock) {
497		dlm_error(status);
498		goto leave;
499	}
500
501	lksb = newlock->lksb;
502
503	if (be32_to_cpu(create->flags) & LKM_GET_LVB) {
504		lksb->flags |= DLM_LKSB_GET_LVB;
505		mlog(0, "set DLM_LKSB_GET_LVB flag\n");
506	}
507
508	status = DLM_IVLOCKID;
509	res = dlm_lookup_lockres(dlm, name, namelen);
510	if (!res) {
511		dlm_error(status);
512		goto leave;
513	}
514
515	spin_lock(&res->spinlock);
516	status = __dlm_lockres_state_to_status(res);
517	spin_unlock(&res->spinlock);
518
519	if (status != DLM_NORMAL) {
520		mlog(0, "lockres recovering/migrating/in-progress\n");
521		goto leave;
522	}
523
524	dlm_lock_attach_lockres(newlock, res);
525
526	status = dlmlock_master(dlm, res, newlock, be32_to_cpu(create->flags));
527leave:
528	if (status != DLM_NORMAL)
529		if (newlock)
530			dlm_lock_put(newlock);
531
532	if (res)
533		dlm_lockres_put(res);
534
535	dlm_put(dlm);
536
537	return status;
538}
539
540
541/* fetch next node-local (u8 nodenum + u56 cookie) into u64 */
542static inline void dlm_get_next_cookie(u8 node_num, u64 *cookie)
543{
544	u64 tmpnode = node_num;
545
546	/* shift single byte of node num into top 8 bits */
547	tmpnode <<= 56;
548
549	spin_lock(&dlm_cookie_lock);
550	*cookie = (dlm_next_cookie | tmpnode);
551	if (++dlm_next_cookie & 0xff00000000000000ull) {
552		mlog(0, "This node's cookie will now wrap!\n");
553		dlm_next_cookie = 1;
554	}
555	spin_unlock(&dlm_cookie_lock);
556}
557
558enum dlm_status dlmlock(struct dlm_ctxt *dlm, int mode,
559			struct dlm_lockstatus *lksb, int flags,
560			const char *name, int namelen, dlm_astlockfunc_t *ast,
561			void *data, dlm_bastlockfunc_t *bast)
562{
563	enum dlm_status status;
564	struct dlm_lock_resource *res = NULL;
565	struct dlm_lock *lock = NULL;
566	int convert = 0, recovery = 0;
567
568	/* yes this function is a mess.
569	 * TODO: clean this up.  lots of common code in the
570	 *       lock and convert paths, especially in the retry blocks */
571	if (!lksb) {
572		dlm_error(DLM_BADARGS);
573		return DLM_BADARGS;
574	}
575
576	status = DLM_BADPARAM;
577	if (mode != LKM_EXMODE && mode != LKM_PRMODE && mode != LKM_NLMODE) {
578		dlm_error(status);
579		goto error;
580	}
581
582	if (flags & ~LKM_VALID_FLAGS) {
583		dlm_error(status);
584		goto error;
585	}
586
587	convert = (flags & LKM_CONVERT);
588	recovery = (flags & LKM_RECOVERY);
589
590	if (recovery &&
591	    (!dlm_is_recovery_lock(name, namelen) || convert) ) {
592		dlm_error(status);
593		goto error;
594	}
595	if (convert && (flags & LKM_LOCAL)) {
596		mlog(ML_ERROR, "strange LOCAL convert request!\n");
597		goto error;
598	}
599
600	if (convert) {
601		/* CONVERT request */
602
603		/* if converting, must pass in a valid dlm_lock */
604		lock = lksb->lockid;
605		if (!lock) {
606			mlog(ML_ERROR, "NULL lock pointer in convert "
607			     "request\n");
608			goto error;
609		}
610
611		res = lock->lockres;
612		if (!res) {
613			mlog(ML_ERROR, "NULL lockres pointer in convert "
614			     "request\n");
615			goto error;
616		}
617		dlm_lockres_get(res);
618
619		/* XXX: for ocfs2 purposes, the ast/bast/astdata/lksb are
620	 	 * static after the original lock call.  convert requests will
621		 * ensure that everything is the same, or return DLM_BADARGS.
622	 	 * this means that DLM_DENIED_NOASTS will never be returned.
623	 	 */
624		if (lock->lksb != lksb || lock->ast != ast ||
625		    lock->bast != bast || lock->astdata != data) {
626			status = DLM_BADARGS;
627			mlog(ML_ERROR, "new args:  lksb=%p, ast=%p, bast=%p, "
628			     "astdata=%p\n", lksb, ast, bast, data);
629			mlog(ML_ERROR, "orig args: lksb=%p, ast=%p, bast=%p, "
630			     "astdata=%p\n", lock->lksb, lock->ast,
631			     lock->bast, lock->astdata);
632			goto error;
633		}
634retry_convert:
635		dlm_wait_for_recovery(dlm);
636
637		if (res->owner == dlm->node_num)
638			status = dlmconvert_master(dlm, res, lock, flags, mode);
639		else
640			status = dlmconvert_remote(dlm, res, lock, flags, mode);
641		if (status == DLM_RECOVERING || status == DLM_MIGRATING ||
642		    status == DLM_FORWARD) {
643			/* for now, see how this works without sleeping
644			 * and just retry right away.  I suspect the reco
645			 * or migration will complete fast enough that
646			 * no waiting will be necessary */
647			mlog(0, "retrying convert with migration/recovery/"
648			     "in-progress\n");
649			msleep(100);
650			goto retry_convert;
651		}
652	} else {
653		u64 tmpcookie;
654
655		/* LOCK request */
656		status = DLM_BADARGS;
657		if (!name) {
658			dlm_error(status);
659			goto error;
660		}
661
662		status = DLM_IVBUFLEN;
663		if (namelen > DLM_LOCKID_NAME_MAX || namelen < 1) {
664			dlm_error(status);
665			goto error;
666		}
667
668		dlm_get_next_cookie(dlm->node_num, &tmpcookie);
669		lock = dlm_new_lock(mode, dlm->node_num, tmpcookie, lksb);
670		if (!lock) {
671			dlm_error(status);
672			goto error;
673		}
674
675		if (!recovery)
676			dlm_wait_for_recovery(dlm);
677
678		/* find or create the lock resource */
679		res = dlm_get_lock_resource(dlm, name, namelen, flags);
680		if (!res) {
681			status = DLM_IVLOCKID;
682			dlm_error(status);
683			goto error;
684		}
685
686		mlog(0, "type=%d, flags = 0x%x\n", mode, flags);
687		mlog(0, "creating lock: lock=%p res=%p\n", lock, res);
688
689		dlm_lock_attach_lockres(lock, res);
690		lock->ast = ast;
691		lock->bast = bast;
692		lock->astdata = data;
693
694retry_lock:
695		if (flags & LKM_VALBLK) {
696			mlog(0, "LKM_VALBLK passed by caller\n");
697
698			/* LVB requests for non PR, PW or EX locks are
699			 * ignored. */
700			if (mode < LKM_PRMODE)
701				flags &= ~LKM_VALBLK;
702			else {
703				flags |= LKM_GET_LVB;
704				lock->lksb->flags |= DLM_LKSB_GET_LVB;
705			}
706		}
707
708		if (res->owner == dlm->node_num)
709			status = dlmlock_master(dlm, res, lock, flags);
710		else
711			status = dlmlock_remote(dlm, res, lock, flags);
712
713		if (status == DLM_RECOVERING || status == DLM_MIGRATING ||
714		    status == DLM_FORWARD) {
715			msleep(100);
716			if (recovery) {
717				if (status != DLM_RECOVERING)
718					goto retry_lock;
719				/* wait to see the node go down, then
720				 * drop down and allow the lockres to
721				 * get cleaned up.  need to remaster. */
722				dlm_wait_for_node_death(dlm, res->owner,
723						DLM_NODE_DEATH_WAIT_MAX);
724			} else {
725				dlm_wait_for_recovery(dlm);
726				goto retry_lock;
727			}
728		}
729
730		/* Inflight taken in dlm_get_lock_resource() is dropped here */
731		spin_lock(&res->spinlock);
732		dlm_lockres_drop_inflight_ref(dlm, res);
733		spin_unlock(&res->spinlock);
734
735		dlm_lockres_calc_usage(dlm, res);
736		dlm_kick_thread(dlm, res);
737
738		if (status != DLM_NORMAL) {
739			lock->lksb->flags &= ~DLM_LKSB_GET_LVB;
740			if (status != DLM_NOTQUEUED)
741				dlm_error(status);
742			goto error;
743		}
744	}
745
746error:
747	if (status != DLM_NORMAL) {
748		if (lock && !convert)
749			dlm_lock_put(lock);
750		// this is kind of unnecessary
751		lksb->status = status;
752	}
753
754	/* put lockres ref from the convert path
755	 * or from dlm_get_lock_resource */
756	if (res)
757		dlm_lockres_put(res);
758
759	return status;
760}
761EXPORT_SYMBOL_GPL(dlmlock);
762