1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * dlmglue.c
5 *
6 * Code which implements an OCFS2 specific interface to our DLM.
7 *
8 * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
9 *
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18 * General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
24 */
25
26#include <linux/types.h>
27#include <linux/slab.h>
28#include <linux/highmem.h>
29#include <linux/mm.h>
30#include <linux/kthread.h>
31#include <linux/pagemap.h>
32#include <linux/debugfs.h>
33#include <linux/seq_file.h>
34#include <linux/time.h>
35#include <linux/quotaops.h>
36
37#define MLOG_MASK_PREFIX ML_DLM_GLUE
38#include <cluster/masklog.h>
39
40#include "ocfs2.h"
41#include "ocfs2_lockingver.h"
42
43#include "alloc.h"
44#include "dcache.h"
45#include "dlmglue.h"
46#include "extent_map.h"
47#include "file.h"
48#include "heartbeat.h"
49#include "inode.h"
50#include "journal.h"
51#include "stackglue.h"
52#include "slot_map.h"
53#include "super.h"
54#include "uptodate.h"
55#include "quota.h"
56#include "refcounttree.h"
57
58#include "buffer_head_io.h"
59
60struct ocfs2_mask_waiter {
61	struct list_head	mw_item;
62	int			mw_status;
63	struct completion	mw_complete;
64	unsigned long		mw_mask;
65	unsigned long		mw_goal;
66#ifdef CONFIG_OCFS2_FS_STATS
67	ktime_t			mw_lock_start;
68#endif
69};
70
71static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
72static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
73static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres);
74static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres);
75
76/*
77 * Return value from ->downconvert_worker functions.
78 *
79 * These control the precise actions of ocfs2_unblock_lock()
80 * and ocfs2_process_blocked_lock()
81 *
82 */
83enum ocfs2_unblock_action {
84	UNBLOCK_CONTINUE	= 0, /* Continue downconvert */
85	UNBLOCK_CONTINUE_POST	= 1, /* Continue downconvert, fire
86				      * ->post_unlock callback */
87	UNBLOCK_STOP_POST	= 2, /* Do not downconvert, fire
88				      * ->post_unlock() callback. */
89};
90
91struct ocfs2_unblock_ctl {
92	int requeue;
93	enum ocfs2_unblock_action unblock_action;
94};
95
96/* Lockdep class keys */
97struct lock_class_key lockdep_keys[OCFS2_NUM_LOCK_TYPES];
98
99static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
100					int new_level);
101static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres);
102
103static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
104				     int blocking);
105
106static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
107				       int blocking);
108
109static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
110				     struct ocfs2_lock_res *lockres);
111
112static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres);
113
114static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres,
115					    int new_level);
116static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres,
117					 int blocking);
118
119#define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres)
120
121/* This aids in debugging situations where a bad LVB might be involved. */
122static void ocfs2_dump_meta_lvb_info(u64 level,
123				     const char *function,
124				     unsigned int line,
125				     struct ocfs2_lock_res *lockres)
126{
127	struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
128
129	mlog(level, "LVB information for %s (called from %s:%u):\n",
130	     lockres->l_name, function, line);
131	mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
132	     lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
133	     be32_to_cpu(lvb->lvb_igeneration));
134	mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
135	     (unsigned long long)be64_to_cpu(lvb->lvb_isize),
136	     be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
137	     be16_to_cpu(lvb->lvb_imode));
138	mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
139	     "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
140	     (long long)be64_to_cpu(lvb->lvb_iatime_packed),
141	     (long long)be64_to_cpu(lvb->lvb_ictime_packed),
142	     (long long)be64_to_cpu(lvb->lvb_imtime_packed),
143	     be32_to_cpu(lvb->lvb_iattr));
144}
145
146
147/*
148 * OCFS2 Lock Resource Operations
149 *
150 * These fine tune the behavior of the generic dlmglue locking infrastructure.
151 *
152 * The most basic of lock types can point ->l_priv to their respective
153 * struct ocfs2_super and allow the default actions to manage things.
154 *
155 * Right now, each lock type also needs to implement an init function,
156 * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres()
157 * should be called when the lock is no longer needed (i.e., object
158 * destruction time).
159 */
160struct ocfs2_lock_res_ops {
161	/*
162	 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
163	 * this callback if ->l_priv is not an ocfs2_super pointer
164	 */
165	struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
166
167	/*
168	 * Optionally called in the downconvert thread after a
169	 * successful downconvert. The lockres will not be referenced
170	 * after this callback is called, so it is safe to free
171	 * memory, etc.
172	 *
173	 * The exact semantics of when this is called are controlled
174	 * by ->downconvert_worker()
175	 */
176	void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
177
178	/*
179	 * Allow a lock type to add checks to determine whether it is
180	 * safe to downconvert a lock. Return 0 to re-queue the
181	 * downconvert at a later time, nonzero to continue.
182	 *
183	 * For most locks, the default checks that there are no
184	 * incompatible holders are sufficient.
185	 *
186	 * Called with the lockres spinlock held.
187	 */
188	int (*check_downconvert)(struct ocfs2_lock_res *, int);
189
190	/*
191	 * Allows a lock type to populate the lock value block. This
192	 * is called on downconvert, and when we drop a lock.
193	 *
194	 * Locks that want to use this should set LOCK_TYPE_USES_LVB
195	 * in the flags field.
196	 *
197	 * Called with the lockres spinlock held.
198	 */
199	void (*set_lvb)(struct ocfs2_lock_res *);
200
201	/*
202	 * Called from the downconvert thread when it is determined
203	 * that a lock will be downconverted. This is called without
204	 * any locks held so the function can do work that might
205	 * schedule (syncing out data, etc).
206	 *
207	 * This should return any one of the ocfs2_unblock_action
208	 * values, depending on what it wants the thread to do.
209	 */
210	int (*downconvert_worker)(struct ocfs2_lock_res *, int);
211
212	/*
213	 * LOCK_TYPE_* flags which describe the specific requirements
214	 * of a lock type. Descriptions of each individual flag follow.
215	 */
216	int flags;
217};
218
219/*
220 * Some locks want to "refresh" potentially stale data when a
221 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
222 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
223 * individual lockres l_flags member from the ast function. It is
224 * expected that the locking wrapper will clear the
225 * OCFS2_LOCK_NEEDS_REFRESH flag when done.
226 */
227#define LOCK_TYPE_REQUIRES_REFRESH 0x1
228
229/*
230 * Indicate that a lock type makes use of the lock value block. The
231 * ->set_lvb lock type callback must be defined.
232 */
233#define LOCK_TYPE_USES_LVB		0x2
234
235static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
236	.get_osb	= ocfs2_get_inode_osb,
237	.flags		= 0,
238};
239
240static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = {
241	.get_osb	= ocfs2_get_inode_osb,
242	.check_downconvert = ocfs2_check_meta_downconvert,
243	.set_lvb	= ocfs2_set_meta_lvb,
244	.downconvert_worker = ocfs2_data_convert_worker,
245	.flags		= LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
246};
247
248static struct ocfs2_lock_res_ops ocfs2_super_lops = {
249	.flags		= LOCK_TYPE_REQUIRES_REFRESH,
250};
251
252static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
253	.flags		= 0,
254};
255
256static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = {
257	.flags		= 0,
258};
259
260static struct ocfs2_lock_res_ops ocfs2_orphan_scan_lops = {
261	.flags		= LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
262};
263
264static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
265	.get_osb	= ocfs2_get_dentry_osb,
266	.post_unlock	= ocfs2_dentry_post_unlock,
267	.downconvert_worker = ocfs2_dentry_convert_worker,
268	.flags		= 0,
269};
270
271static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = {
272	.get_osb	= ocfs2_get_inode_osb,
273	.flags		= 0,
274};
275
276static struct ocfs2_lock_res_ops ocfs2_flock_lops = {
277	.get_osb	= ocfs2_get_file_osb,
278	.flags		= 0,
279};
280
281static struct ocfs2_lock_res_ops ocfs2_qinfo_lops = {
282	.set_lvb	= ocfs2_set_qinfo_lvb,
283	.get_osb	= ocfs2_get_qinfo_osb,
284	.flags		= LOCK_TYPE_REQUIRES_REFRESH | LOCK_TYPE_USES_LVB,
285};
286
287static struct ocfs2_lock_res_ops ocfs2_refcount_block_lops = {
288	.check_downconvert = ocfs2_check_refcount_downconvert,
289	.downconvert_worker = ocfs2_refcount_convert_worker,
290	.flags		= 0,
291};
292
293static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
294{
295	return lockres->l_type == OCFS2_LOCK_TYPE_META ||
296		lockres->l_type == OCFS2_LOCK_TYPE_RW ||
297		lockres->l_type == OCFS2_LOCK_TYPE_OPEN;
298}
299
300static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb)
301{
302	return container_of(lksb, struct ocfs2_lock_res, l_lksb);
303}
304
305static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
306{
307	BUG_ON(!ocfs2_is_inode_lock(lockres));
308
309	return (struct inode *) lockres->l_priv;
310}
311
312static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
313{
314	BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
315
316	return (struct ocfs2_dentry_lock *)lockres->l_priv;
317}
318
319static inline struct ocfs2_mem_dqinfo *ocfs2_lock_res_qinfo(struct ocfs2_lock_res *lockres)
320{
321	BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_QINFO);
322
323	return (struct ocfs2_mem_dqinfo *)lockres->l_priv;
324}
325
326static inline struct ocfs2_refcount_tree *
327ocfs2_lock_res_refcount_tree(struct ocfs2_lock_res *res)
328{
329	return container_of(res, struct ocfs2_refcount_tree, rf_lockres);
330}
331
332static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
333{
334	if (lockres->l_ops->get_osb)
335		return lockres->l_ops->get_osb(lockres);
336
337	return (struct ocfs2_super *)lockres->l_priv;
338}
339
340static int ocfs2_lock_create(struct ocfs2_super *osb,
341			     struct ocfs2_lock_res *lockres,
342			     int level,
343			     u32 dlm_flags);
344static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
345						     int wanted);
346static void __ocfs2_cluster_unlock(struct ocfs2_super *osb,
347				   struct ocfs2_lock_res *lockres,
348				   int level, unsigned long caller_ip);
349static inline void ocfs2_cluster_unlock(struct ocfs2_super *osb,
350					struct ocfs2_lock_res *lockres,
351					int level)
352{
353	__ocfs2_cluster_unlock(osb, lockres, level, _RET_IP_);
354}
355
356static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
357static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
358static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
359static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
360static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
361					struct ocfs2_lock_res *lockres);
362static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
363						int convert);
364#define ocfs2_log_dlm_error(_func, _err, _lockres) do {					\
365	if ((_lockres)->l_type != OCFS2_LOCK_TYPE_DENTRY)				\
366		mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n",	\
367		     _err, _func, _lockres->l_name);					\
368	else										\
369		mlog(ML_ERROR, "DLM error %d while calling %s on resource %.*s%08x\n",	\
370		     _err, _func, OCFS2_DENTRY_LOCK_INO_START - 1, (_lockres)->l_name,	\
371		     (unsigned int)ocfs2_get_dentry_lock_ino(_lockres));		\
372} while (0)
373static int ocfs2_downconvert_thread(void *arg);
374static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
375					struct ocfs2_lock_res *lockres);
376static int ocfs2_inode_lock_update(struct inode *inode,
377				  struct buffer_head **bh);
378static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
379static inline int ocfs2_highest_compat_lock_level(int level);
380static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
381					      int new_level);
382static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
383				  struct ocfs2_lock_res *lockres,
384				  int new_level,
385				  int lvb,
386				  unsigned int generation);
387static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
388				        struct ocfs2_lock_res *lockres);
389static int ocfs2_cancel_convert(struct ocfs2_super *osb,
390				struct ocfs2_lock_res *lockres);
391
392
393static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
394				  u64 blkno,
395				  u32 generation,
396				  char *name)
397{
398	int len;
399
400	BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
401
402	len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
403		       ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
404		       (long long)blkno, generation);
405
406	BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
407
408	mlog(0, "built lock resource with name: %s\n", name);
409}
410
411static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
412
413static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
414				       struct ocfs2_dlm_debug *dlm_debug)
415{
416	mlog(0, "Add tracking for lockres %s\n", res->l_name);
417
418	spin_lock(&ocfs2_dlm_tracking_lock);
419	list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
420	spin_unlock(&ocfs2_dlm_tracking_lock);
421}
422
423static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
424{
425	spin_lock(&ocfs2_dlm_tracking_lock);
426	if (!list_empty(&res->l_debug_list))
427		list_del_init(&res->l_debug_list);
428	spin_unlock(&ocfs2_dlm_tracking_lock);
429}
430
431#ifdef CONFIG_OCFS2_FS_STATS
432static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
433{
434	res->l_lock_refresh = 0;
435	memset(&res->l_lock_prmode, 0, sizeof(struct ocfs2_lock_stats));
436	memset(&res->l_lock_exmode, 0, sizeof(struct ocfs2_lock_stats));
437}
438
439static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level,
440				    struct ocfs2_mask_waiter *mw, int ret)
441{
442	u32 usec;
443	ktime_t kt;
444	struct ocfs2_lock_stats *stats;
445
446	if (level == LKM_PRMODE)
447		stats = &res->l_lock_prmode;
448	else if (level == LKM_EXMODE)
449		stats = &res->l_lock_exmode;
450	else
451		return;
452
453	kt = ktime_sub(ktime_get(), mw->mw_lock_start);
454	usec = ktime_to_us(kt);
455
456	stats->ls_gets++;
457	stats->ls_total += ktime_to_ns(kt);
458	/* overflow */
459	if (unlikely(stats->ls_gets == 0)) {
460		stats->ls_gets++;
461		stats->ls_total = ktime_to_ns(kt);
462	}
463
464	if (stats->ls_max < usec)
465		stats->ls_max = usec;
466
467	if (ret)
468		stats->ls_fail++;
469}
470
471static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
472{
473	lockres->l_lock_refresh++;
474}
475
476static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
477{
478	mw->mw_lock_start = ktime_get();
479}
480#else
481static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
482{
483}
484static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res,
485			   int level, struct ocfs2_mask_waiter *mw, int ret)
486{
487}
488static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
489{
490}
491static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
492{
493}
494#endif
495
496static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
497				       struct ocfs2_lock_res *res,
498				       enum ocfs2_lock_type type,
499				       struct ocfs2_lock_res_ops *ops,
500				       void *priv)
501{
502	res->l_type          = type;
503	res->l_ops           = ops;
504	res->l_priv          = priv;
505
506	res->l_level         = DLM_LOCK_IV;
507	res->l_requested     = DLM_LOCK_IV;
508	res->l_blocking      = DLM_LOCK_IV;
509	res->l_action        = OCFS2_AST_INVALID;
510	res->l_unlock_action = OCFS2_UNLOCK_INVALID;
511
512	res->l_flags         = OCFS2_LOCK_INITIALIZED;
513
514	ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
515
516	ocfs2_init_lock_stats(res);
517#ifdef CONFIG_DEBUG_LOCK_ALLOC
518	if (type != OCFS2_LOCK_TYPE_OPEN)
519		lockdep_init_map(&res->l_lockdep_map, ocfs2_lock_type_strings[type],
520				 &lockdep_keys[type], 0);
521	else
522		res->l_lockdep_map.key = NULL;
523#endif
524}
525
526void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
527{
528	/* This also clears out the lock status block */
529	memset(res, 0, sizeof(struct ocfs2_lock_res));
530	spin_lock_init(&res->l_lock);
531	init_waitqueue_head(&res->l_event);
532	INIT_LIST_HEAD(&res->l_blocked_list);
533	INIT_LIST_HEAD(&res->l_mask_waiters);
534}
535
536void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
537			       enum ocfs2_lock_type type,
538			       unsigned int generation,
539			       struct inode *inode)
540{
541	struct ocfs2_lock_res_ops *ops;
542
543	switch(type) {
544		case OCFS2_LOCK_TYPE_RW:
545			ops = &ocfs2_inode_rw_lops;
546			break;
547		case OCFS2_LOCK_TYPE_META:
548			ops = &ocfs2_inode_inode_lops;
549			break;
550		case OCFS2_LOCK_TYPE_OPEN:
551			ops = &ocfs2_inode_open_lops;
552			break;
553		default:
554			mlog_bug_on_msg(1, "type: %d\n", type);
555			ops = NULL; /* thanks, gcc */
556			break;
557	};
558
559	ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
560			      generation, res->l_name);
561	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
562}
563
564static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
565{
566	struct inode *inode = ocfs2_lock_res_inode(lockres);
567
568	return OCFS2_SB(inode->i_sb);
569}
570
571static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres)
572{
573	struct ocfs2_mem_dqinfo *info = lockres->l_priv;
574
575	return OCFS2_SB(info->dqi_gi.dqi_sb);
576}
577
578static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres)
579{
580	struct ocfs2_file_private *fp = lockres->l_priv;
581
582	return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb);
583}
584
585static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
586{
587	__be64 inode_blkno_be;
588
589	memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
590	       sizeof(__be64));
591
592	return be64_to_cpu(inode_blkno_be);
593}
594
595static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
596{
597	struct ocfs2_dentry_lock *dl = lockres->l_priv;
598
599	return OCFS2_SB(dl->dl_inode->i_sb);
600}
601
602void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
603				u64 parent, struct inode *inode)
604{
605	int len;
606	u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
607	__be64 inode_blkno_be = cpu_to_be64(inode_blkno);
608	struct ocfs2_lock_res *lockres = &dl->dl_lockres;
609
610	ocfs2_lock_res_init_once(lockres);
611
612	/*
613	 * Unfortunately, the standard lock naming scheme won't work
614	 * here because we have two 16 byte values to use. Instead,
615	 * we'll stuff the inode number as a binary value. We still
616	 * want error prints to show something without garbling the
617	 * display, so drop a null byte in there before the inode
618	 * number. A future version of OCFS2 will likely use all
619	 * binary lock names. The stringified names have been a
620	 * tremendous aid in debugging, but now that the debugfs
621	 * interface exists, we can mangle things there if need be.
622	 *
623	 * NOTE: We also drop the standard "pad" value (the total lock
624	 * name size stays the same though - the last part is all
625	 * zeros due to the memset in ocfs2_lock_res_init_once()
626	 */
627	len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
628		       "%c%016llx",
629		       ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
630		       (long long)parent);
631
632	BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
633
634	memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
635	       sizeof(__be64));
636
637	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
638				   OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
639				   dl);
640}
641
642static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
643				      struct ocfs2_super *osb)
644{
645	/* Superblock lockres doesn't come from a slab so we call init
646	 * once on it manually.  */
647	ocfs2_lock_res_init_once(res);
648	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
649			      0, res->l_name);
650	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
651				   &ocfs2_super_lops, osb);
652}
653
654static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
655				       struct ocfs2_super *osb)
656{
657	/* Rename lockres doesn't come from a slab so we call init
658	 * once on it manually.  */
659	ocfs2_lock_res_init_once(res);
660	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
661	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
662				   &ocfs2_rename_lops, osb);
663}
664
665static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res,
666					 struct ocfs2_super *osb)
667{
668	/* nfs_sync lockres doesn't come from a slab so we call init
669	 * once on it manually.  */
670	ocfs2_lock_res_init_once(res);
671	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_NFS_SYNC, 0, 0, res->l_name);
672	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_NFS_SYNC,
673				   &ocfs2_nfs_sync_lops, osb);
674}
675
676static void ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res *res,
677					    struct ocfs2_super *osb)
678{
679	ocfs2_lock_res_init_once(res);
680	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_ORPHAN_SCAN, 0, 0, res->l_name);
681	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_ORPHAN_SCAN,
682				   &ocfs2_orphan_scan_lops, osb);
683}
684
685void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres,
686			      struct ocfs2_file_private *fp)
687{
688	struct inode *inode = fp->fp_file->f_mapping->host;
689	struct ocfs2_inode_info *oi = OCFS2_I(inode);
690
691	ocfs2_lock_res_init_once(lockres);
692	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno,
693			      inode->i_generation, lockres->l_name);
694	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
695				   OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops,
696				   fp);
697	lockres->l_flags |= OCFS2_LOCK_NOCACHE;
698}
699
700void ocfs2_qinfo_lock_res_init(struct ocfs2_lock_res *lockres,
701			       struct ocfs2_mem_dqinfo *info)
702{
703	ocfs2_lock_res_init_once(lockres);
704	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_QINFO, info->dqi_gi.dqi_type,
705			      0, lockres->l_name);
706	ocfs2_lock_res_init_common(OCFS2_SB(info->dqi_gi.dqi_sb), lockres,
707				   OCFS2_LOCK_TYPE_QINFO, &ocfs2_qinfo_lops,
708				   info);
709}
710
711void ocfs2_refcount_lock_res_init(struct ocfs2_lock_res *lockres,
712				  struct ocfs2_super *osb, u64 ref_blkno,
713				  unsigned int generation)
714{
715	ocfs2_lock_res_init_once(lockres);
716	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_REFCOUNT, ref_blkno,
717			      generation, lockres->l_name);
718	ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_REFCOUNT,
719				   &ocfs2_refcount_block_lops, osb);
720}
721
722void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
723{
724	if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
725		return;
726
727	ocfs2_remove_lockres_tracking(res);
728
729	mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
730			"Lockres %s is on the blocked list\n",
731			res->l_name);
732	mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
733			"Lockres %s has mask waiters pending\n",
734			res->l_name);
735	mlog_bug_on_msg(spin_is_locked(&res->l_lock),
736			"Lockres %s is locked\n",
737			res->l_name);
738	mlog_bug_on_msg(res->l_ro_holders,
739			"Lockres %s has %u ro holders\n",
740			res->l_name, res->l_ro_holders);
741	mlog_bug_on_msg(res->l_ex_holders,
742			"Lockres %s has %u ex holders\n",
743			res->l_name, res->l_ex_holders);
744
745	/* Need to clear out the lock status block for the dlm */
746	memset(&res->l_lksb, 0, sizeof(res->l_lksb));
747
748	res->l_flags = 0UL;
749}
750
751static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
752				     int level)
753{
754	BUG_ON(!lockres);
755
756	switch(level) {
757	case DLM_LOCK_EX:
758		lockres->l_ex_holders++;
759		break;
760	case DLM_LOCK_PR:
761		lockres->l_ro_holders++;
762		break;
763	default:
764		BUG();
765	}
766}
767
768static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
769				     int level)
770{
771	BUG_ON(!lockres);
772
773	switch(level) {
774	case DLM_LOCK_EX:
775		BUG_ON(!lockres->l_ex_holders);
776		lockres->l_ex_holders--;
777		break;
778	case DLM_LOCK_PR:
779		BUG_ON(!lockres->l_ro_holders);
780		lockres->l_ro_holders--;
781		break;
782	default:
783		BUG();
784	}
785}
786
787/* WARNING: This function lives in a world where the only three lock
788 * levels are EX, PR, and NL. It *will* have to be adjusted when more
789 * lock types are added. */
790static inline int ocfs2_highest_compat_lock_level(int level)
791{
792	int new_level = DLM_LOCK_EX;
793
794	if (level == DLM_LOCK_EX)
795		new_level = DLM_LOCK_NL;
796	else if (level == DLM_LOCK_PR)
797		new_level = DLM_LOCK_PR;
798	return new_level;
799}
800
801static void lockres_set_flags(struct ocfs2_lock_res *lockres,
802			      unsigned long newflags)
803{
804	struct ocfs2_mask_waiter *mw, *tmp;
805
806 	assert_spin_locked(&lockres->l_lock);
807
808	lockres->l_flags = newflags;
809
810	list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) {
811		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
812			continue;
813
814		list_del_init(&mw->mw_item);
815		mw->mw_status = 0;
816		complete(&mw->mw_complete);
817	}
818}
819static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
820{
821	lockres_set_flags(lockres, lockres->l_flags | or);
822}
823static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
824				unsigned long clear)
825{
826	lockres_set_flags(lockres, lockres->l_flags & ~clear);
827}
828
829static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
830{
831	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
832	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
833	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
834	BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
835
836	lockres->l_level = lockres->l_requested;
837	if (lockres->l_level <=
838	    ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
839		lockres->l_blocking = DLM_LOCK_NL;
840		lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
841	}
842	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
843}
844
845static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
846{
847	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
848	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
849
850	/* Convert from RO to EX doesn't really need anything as our
851	 * information is already up to data. Convert from NL to
852	 * *anything* however should mark ourselves as needing an
853	 * update */
854	if (lockres->l_level == DLM_LOCK_NL &&
855	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
856		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
857
858	lockres->l_level = lockres->l_requested;
859
860	/*
861	 * We set the OCFS2_LOCK_UPCONVERT_FINISHING flag before clearing
862	 * the OCFS2_LOCK_BUSY flag to prevent the dc thread from
863	 * downconverting the lock before the upconvert has fully completed.
864	 * Do not prevent the dc thread from downconverting if NONBLOCK lock
865	 * had already returned.
866	 */
867	if (!(lockres->l_flags & OCFS2_LOCK_NONBLOCK_FINISHED))
868		lockres_or_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
869	else
870		lockres_clear_flags(lockres, OCFS2_LOCK_NONBLOCK_FINISHED);
871
872	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
873}
874
875static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
876{
877	BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY)));
878	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
879
880	if (lockres->l_requested > DLM_LOCK_NL &&
881	    !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
882	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
883		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
884
885	lockres->l_level = lockres->l_requested;
886	lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
887	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
888}
889
890static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
891				     int level)
892{
893	int needs_downconvert = 0;
894
895	assert_spin_locked(&lockres->l_lock);
896
897	if (level > lockres->l_blocking) {
898		/* only schedule a downconvert if we haven't already scheduled
899		 * one that goes low enough to satisfy the level we're
900		 * blocking.  this also catches the case where we get
901		 * duplicate BASTs */
902		if (ocfs2_highest_compat_lock_level(level) <
903		    ocfs2_highest_compat_lock_level(lockres->l_blocking))
904			needs_downconvert = 1;
905
906		lockres->l_blocking = level;
907	}
908
909	mlog(ML_BASTS, "lockres %s, block %d, level %d, l_block %d, dwn %d\n",
910	     lockres->l_name, level, lockres->l_level, lockres->l_blocking,
911	     needs_downconvert);
912
913	if (needs_downconvert)
914		lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
915	mlog(0, "needs_downconvert = %d\n", needs_downconvert);
916	return needs_downconvert;
917}
918
919/*
920 * OCFS2_LOCK_PENDING and l_pending_gen.
921 *
922 * Why does OCFS2_LOCK_PENDING exist?  To close a race between setting
923 * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock().  See ocfs2_unblock_lock()
924 * for more details on the race.
925 *
926 * OCFS2_LOCK_PENDING closes the race quite nicely.  However, it introduces
927 * a race on itself.  In o2dlm, we can get the ast before ocfs2_dlm_lock()
928 * returns.  The ast clears OCFS2_LOCK_BUSY, and must therefore clear
929 * OCFS2_LOCK_PENDING at the same time.  When ocfs2_dlm_lock() returns,
930 * the caller is going to try to clear PENDING again.  If nothing else is
931 * happening, __lockres_clear_pending() sees PENDING is unset and does
932 * nothing.
933 *
934 * But what if another path (eg downconvert thread) has just started a
935 * new locking action?  The other path has re-set PENDING.  Our path
936 * cannot clear PENDING, because that will re-open the original race
937 * window.
938 *
939 * [Example]
940 *
941 * ocfs2_meta_lock()
942 *  ocfs2_cluster_lock()
943 *   set BUSY
944 *   set PENDING
945 *   drop l_lock
946 *   ocfs2_dlm_lock()
947 *    ocfs2_locking_ast()		ocfs2_downconvert_thread()
948 *     clear PENDING			 ocfs2_unblock_lock()
949 *					  take_l_lock
950 *					  !BUSY
951 *					  ocfs2_prepare_downconvert()
952 *					   set BUSY
953 *					   set PENDING
954 *					  drop l_lock
955 *   take l_lock
956 *   clear PENDING
957 *   drop l_lock
958 *			<window>
959 *					  ocfs2_dlm_lock()
960 *
961 * So as you can see, we now have a window where l_lock is not held,
962 * PENDING is not set, and ocfs2_dlm_lock() has not been called.
963 *
964 * The core problem is that ocfs2_cluster_lock() has cleared the PENDING
965 * set by ocfs2_prepare_downconvert().  That wasn't nice.
966 *
967 * To solve this we introduce l_pending_gen.  A call to
968 * lockres_clear_pending() will only do so when it is passed a generation
969 * number that matches the lockres.  lockres_set_pending() will return the
970 * current generation number.  When ocfs2_cluster_lock() goes to clear
971 * PENDING, it passes the generation it got from set_pending().  In our
972 * example above, the generation numbers will *not* match.  Thus,
973 * ocfs2_cluster_lock() will not clear the PENDING set by
974 * ocfs2_prepare_downconvert().
975 */
976
977/* Unlocked version for ocfs2_locking_ast() */
978static void __lockres_clear_pending(struct ocfs2_lock_res *lockres,
979				    unsigned int generation,
980				    struct ocfs2_super *osb)
981{
982	assert_spin_locked(&lockres->l_lock);
983
984	/*
985	 * The ast and locking functions can race us here.  The winner
986	 * will clear pending, the loser will not.
987	 */
988	if (!(lockres->l_flags & OCFS2_LOCK_PENDING) ||
989	    (lockres->l_pending_gen != generation))
990		return;
991
992	lockres_clear_flags(lockres, OCFS2_LOCK_PENDING);
993	lockres->l_pending_gen++;
994
995	/*
996	 * The downconvert thread may have skipped us because we
997	 * were PENDING.  Wake it up.
998	 */
999	if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
1000		ocfs2_wake_downconvert_thread(osb);
1001}
1002
1003/* Locked version for callers of ocfs2_dlm_lock() */
1004static void lockres_clear_pending(struct ocfs2_lock_res *lockres,
1005				  unsigned int generation,
1006				  struct ocfs2_super *osb)
1007{
1008	unsigned long flags;
1009
1010	spin_lock_irqsave(&lockres->l_lock, flags);
1011	__lockres_clear_pending(lockres, generation, osb);
1012	spin_unlock_irqrestore(&lockres->l_lock, flags);
1013}
1014
1015static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
1016{
1017	assert_spin_locked(&lockres->l_lock);
1018	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
1019
1020	lockres_or_flags(lockres, OCFS2_LOCK_PENDING);
1021
1022	return lockres->l_pending_gen;
1023}
1024
1025static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level)
1026{
1027	struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
1028	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1029	int needs_downconvert;
1030	unsigned long flags;
1031
1032	BUG_ON(level <= DLM_LOCK_NL);
1033
1034	mlog(ML_BASTS, "BAST fired for lockres %s, blocking %d, level %d, "
1035	     "type %s\n", lockres->l_name, level, lockres->l_level,
1036	     ocfs2_lock_type_string(lockres->l_type));
1037
1038	/*
1039	 * We can skip the bast for locks which don't enable caching -
1040	 * they'll be dropped at the earliest possible time anyway.
1041	 */
1042	if (lockres->l_flags & OCFS2_LOCK_NOCACHE)
1043		return;
1044
1045	spin_lock_irqsave(&lockres->l_lock, flags);
1046	needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
1047	if (needs_downconvert)
1048		ocfs2_schedule_blocked_lock(osb, lockres);
1049	spin_unlock_irqrestore(&lockres->l_lock, flags);
1050
1051	wake_up(&lockres->l_event);
1052
1053	ocfs2_wake_downconvert_thread(osb);
1054}
1055
1056static void ocfs2_locking_ast(struct ocfs2_dlm_lksb *lksb)
1057{
1058	struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
1059	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1060	unsigned long flags;
1061	int status;
1062
1063	spin_lock_irqsave(&lockres->l_lock, flags);
1064
1065	status = ocfs2_dlm_lock_status(&lockres->l_lksb);
1066
1067	if (status == -EAGAIN) {
1068		lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1069		goto out;
1070	}
1071
1072	if (status) {
1073		mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n",
1074		     lockres->l_name, status);
1075		spin_unlock_irqrestore(&lockres->l_lock, flags);
1076		return;
1077	}
1078
1079	mlog(ML_BASTS, "AST fired for lockres %s, action %d, unlock %d, "
1080	     "level %d => %d\n", lockres->l_name, lockres->l_action,
1081	     lockres->l_unlock_action, lockres->l_level, lockres->l_requested);
1082
1083	switch(lockres->l_action) {
1084	case OCFS2_AST_ATTACH:
1085		ocfs2_generic_handle_attach_action(lockres);
1086		lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
1087		break;
1088	case OCFS2_AST_CONVERT:
1089		ocfs2_generic_handle_convert_action(lockres);
1090		break;
1091	case OCFS2_AST_DOWNCONVERT:
1092		ocfs2_generic_handle_downconvert_action(lockres);
1093		break;
1094	default:
1095		mlog(ML_ERROR, "lockres %s: AST fired with invalid action: %u, "
1096		     "flags 0x%lx, unlock: %u\n",
1097		     lockres->l_name, lockres->l_action, lockres->l_flags,
1098		     lockres->l_unlock_action);
1099		BUG();
1100	}
1101out:
1102	/* set it to something invalid so if we get called again we
1103	 * can catch it. */
1104	lockres->l_action = OCFS2_AST_INVALID;
1105
1106	/* Did we try to cancel this lock?  Clear that state */
1107	if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT)
1108		lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1109
1110	/*
1111	 * We may have beaten the locking functions here.  We certainly
1112	 * know that dlm_lock() has been called :-)
1113	 * Because we can't have two lock calls in flight at once, we
1114	 * can use lockres->l_pending_gen.
1115	 */
1116	__lockres_clear_pending(lockres, lockres->l_pending_gen,  osb);
1117
1118	wake_up(&lockres->l_event);
1119	spin_unlock_irqrestore(&lockres->l_lock, flags);
1120}
1121
1122static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error)
1123{
1124	struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
1125	unsigned long flags;
1126
1127	mlog(ML_BASTS, "UNLOCK AST fired for lockres %s, action = %d\n",
1128	     lockres->l_name, lockres->l_unlock_action);
1129
1130	spin_lock_irqsave(&lockres->l_lock, flags);
1131	if (error) {
1132		mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
1133		     "unlock_action %d\n", error, lockres->l_name,
1134		     lockres->l_unlock_action);
1135		spin_unlock_irqrestore(&lockres->l_lock, flags);
1136		return;
1137	}
1138
1139	switch(lockres->l_unlock_action) {
1140	case OCFS2_UNLOCK_CANCEL_CONVERT:
1141		mlog(0, "Cancel convert success for %s\n", lockres->l_name);
1142		lockres->l_action = OCFS2_AST_INVALID;
1143		/* Downconvert thread may have requeued this lock, we
1144		 * need to wake it. */
1145		if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
1146			ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres));
1147		break;
1148	case OCFS2_UNLOCK_DROP_LOCK:
1149		lockres->l_level = DLM_LOCK_IV;
1150		break;
1151	default:
1152		BUG();
1153	}
1154
1155	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1156	lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1157	wake_up(&lockres->l_event);
1158	spin_unlock_irqrestore(&lockres->l_lock, flags);
1159}
1160
1161/*
1162 * This is the filesystem locking protocol.  It provides the lock handling
1163 * hooks for the underlying DLM.  It has a maximum version number.
1164 * The version number allows interoperability with systems running at
1165 * the same major number and an equal or smaller minor number.
1166 *
1167 * Whenever the filesystem does new things with locks (adds or removes a
1168 * lock, orders them differently, does different things underneath a lock),
1169 * the version must be changed.  The protocol is negotiated when joining
1170 * the dlm domain.  A node may join the domain if its major version is
1171 * identical to all other nodes and its minor version is greater than
1172 * or equal to all other nodes.  When its minor version is greater than
1173 * the other nodes, it will run at the minor version specified by the
1174 * other nodes.
1175 *
1176 * If a locking change is made that will not be compatible with older
1177 * versions, the major number must be increased and the minor version set
1178 * to zero.  If a change merely adds a behavior that can be disabled when
1179 * speaking to older versions, the minor version must be increased.  If a
1180 * change adds a fully backwards compatible change (eg, LVB changes that
1181 * are just ignored by older versions), the version does not need to be
1182 * updated.
1183 */
1184static struct ocfs2_locking_protocol lproto = {
1185	.lp_max_version = {
1186		.pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
1187		.pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
1188	},
1189	.lp_lock_ast		= ocfs2_locking_ast,
1190	.lp_blocking_ast	= ocfs2_blocking_ast,
1191	.lp_unlock_ast		= ocfs2_unlock_ast,
1192};
1193
1194void ocfs2_set_locking_protocol(void)
1195{
1196	ocfs2_stack_glue_set_max_proto_version(&lproto.lp_max_version);
1197}
1198
1199static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
1200						int convert)
1201{
1202	unsigned long flags;
1203
1204	spin_lock_irqsave(&lockres->l_lock, flags);
1205	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1206	lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
1207	if (convert)
1208		lockres->l_action = OCFS2_AST_INVALID;
1209	else
1210		lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1211	spin_unlock_irqrestore(&lockres->l_lock, flags);
1212
1213	wake_up(&lockres->l_event);
1214}
1215
1216/* Note: If we detect another process working on the lock (i.e.,
1217 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
1218 * to do the right thing in that case.
1219 */
1220static int ocfs2_lock_create(struct ocfs2_super *osb,
1221			     struct ocfs2_lock_res *lockres,
1222			     int level,
1223			     u32 dlm_flags)
1224{
1225	int ret = 0;
1226	unsigned long flags;
1227	unsigned int gen;
1228
1229	mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level,
1230	     dlm_flags);
1231
1232	spin_lock_irqsave(&lockres->l_lock, flags);
1233	if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
1234	    (lockres->l_flags & OCFS2_LOCK_BUSY)) {
1235		spin_unlock_irqrestore(&lockres->l_lock, flags);
1236		goto bail;
1237	}
1238
1239	lockres->l_action = OCFS2_AST_ATTACH;
1240	lockres->l_requested = level;
1241	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1242	gen = lockres_set_pending(lockres);
1243	spin_unlock_irqrestore(&lockres->l_lock, flags);
1244
1245	ret = ocfs2_dlm_lock(osb->cconn,
1246			     level,
1247			     &lockres->l_lksb,
1248			     dlm_flags,
1249			     lockres->l_name,
1250			     OCFS2_LOCK_ID_MAX_LEN - 1);
1251	lockres_clear_pending(lockres, gen, osb);
1252	if (ret) {
1253		ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
1254		ocfs2_recover_from_dlm_error(lockres, 1);
1255	}
1256
1257	mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name);
1258
1259bail:
1260	return ret;
1261}
1262
1263static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
1264					int flag)
1265{
1266	unsigned long flags;
1267	int ret;
1268
1269	spin_lock_irqsave(&lockres->l_lock, flags);
1270	ret = lockres->l_flags & flag;
1271	spin_unlock_irqrestore(&lockres->l_lock, flags);
1272
1273	return ret;
1274}
1275
1276static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
1277
1278{
1279	wait_event(lockres->l_event,
1280		   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
1281}
1282
1283static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
1284
1285{
1286	wait_event(lockres->l_event,
1287		   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
1288}
1289
1290/* predict what lock level we'll be dropping down to on behalf
1291 * of another node, and return true if the currently wanted
1292 * level will be compatible with it. */
1293static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
1294						     int wanted)
1295{
1296	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
1297
1298	return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
1299}
1300
1301static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
1302{
1303	INIT_LIST_HEAD(&mw->mw_item);
1304	init_completion(&mw->mw_complete);
1305	ocfs2_init_start_time(mw);
1306}
1307
1308static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
1309{
1310	wait_for_completion(&mw->mw_complete);
1311	/* Re-arm the completion in case we want to wait on it again */
1312	reinit_completion(&mw->mw_complete);
1313	return mw->mw_status;
1314}
1315
1316static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
1317				    struct ocfs2_mask_waiter *mw,
1318				    unsigned long mask,
1319				    unsigned long goal)
1320{
1321	BUG_ON(!list_empty(&mw->mw_item));
1322
1323	assert_spin_locked(&lockres->l_lock);
1324
1325	list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
1326	mw->mw_mask = mask;
1327	mw->mw_goal = goal;
1328}
1329
1330/* returns 0 if the mw that was removed was already satisfied, -EBUSY
1331 * if the mask still hadn't reached its goal */
1332static int __lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
1333				      struct ocfs2_mask_waiter *mw)
1334{
1335	int ret = 0;
1336
1337	assert_spin_locked(&lockres->l_lock);
1338	if (!list_empty(&mw->mw_item)) {
1339		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
1340			ret = -EBUSY;
1341
1342		list_del_init(&mw->mw_item);
1343		init_completion(&mw->mw_complete);
1344	}
1345
1346	return ret;
1347}
1348
1349static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
1350				      struct ocfs2_mask_waiter *mw)
1351{
1352	unsigned long flags;
1353	int ret = 0;
1354
1355	spin_lock_irqsave(&lockres->l_lock, flags);
1356	ret = __lockres_remove_mask_waiter(lockres, mw);
1357	spin_unlock_irqrestore(&lockres->l_lock, flags);
1358
1359	return ret;
1360
1361}
1362
1363static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw,
1364					     struct ocfs2_lock_res *lockres)
1365{
1366	int ret;
1367
1368	ret = wait_for_completion_interruptible(&mw->mw_complete);
1369	if (ret)
1370		lockres_remove_mask_waiter(lockres, mw);
1371	else
1372		ret = mw->mw_status;
1373	/* Re-arm the completion in case we want to wait on it again */
1374	reinit_completion(&mw->mw_complete);
1375	return ret;
1376}
1377
1378static int __ocfs2_cluster_lock(struct ocfs2_super *osb,
1379				struct ocfs2_lock_res *lockres,
1380				int level,
1381				u32 lkm_flags,
1382				int arg_flags,
1383				int l_subclass,
1384				unsigned long caller_ip)
1385{
1386	struct ocfs2_mask_waiter mw;
1387	int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
1388	int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
1389	unsigned long flags;
1390	unsigned int gen;
1391	int noqueue_attempted = 0;
1392	int dlm_locked = 0;
1393	int kick_dc = 0;
1394
1395	if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) {
1396		mlog_errno(-EINVAL);
1397		return -EINVAL;
1398	}
1399
1400	ocfs2_init_mask_waiter(&mw);
1401
1402	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
1403		lkm_flags |= DLM_LKF_VALBLK;
1404
1405again:
1406	wait = 0;
1407
1408	spin_lock_irqsave(&lockres->l_lock, flags);
1409
1410	if (catch_signals && signal_pending(current)) {
1411		ret = -ERESTARTSYS;
1412		goto unlock;
1413	}
1414
1415	mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
1416			"Cluster lock called on freeing lockres %s! flags "
1417			"0x%lx\n", lockres->l_name, lockres->l_flags);
1418
1419	/* We only compare against the currently granted level
1420	 * here. If the lock is blocked waiting on a downconvert,
1421	 * we'll get caught below. */
1422	if (lockres->l_flags & OCFS2_LOCK_BUSY &&
1423	    level > lockres->l_level) {
1424		/* is someone sitting in dlm_lock? If so, wait on
1425		 * them. */
1426		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1427		wait = 1;
1428		goto unlock;
1429	}
1430
1431	if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) {
1432		/*
1433		 * We've upconverted. If the lock now has a level we can
1434		 * work with, we take it. If, however, the lock is not at the
1435		 * required level, we go thru the full cycle. One way this could
1436		 * happen is if a process requesting an upconvert to PR is
1437		 * closely followed by another requesting upconvert to an EX.
1438		 * If the process requesting EX lands here, we want it to
1439		 * continue attempting to upconvert and let the process
1440		 * requesting PR take the lock.
1441		 * If multiple processes request upconvert to PR, the first one
1442		 * here will take the lock. The others will have to go thru the
1443		 * OCFS2_LOCK_BLOCKED check to ensure that there is no pending
1444		 * downconvert request.
1445		 */
1446		if (level <= lockres->l_level)
1447			goto update_holders;
1448	}
1449
1450	if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
1451	    !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
1452		/* is the lock is currently blocked on behalf of
1453		 * another node */
1454		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
1455		wait = 1;
1456		goto unlock;
1457	}
1458
1459	if (level > lockres->l_level) {
1460		if (noqueue_attempted > 0) {
1461			ret = -EAGAIN;
1462			goto unlock;
1463		}
1464		if (lkm_flags & DLM_LKF_NOQUEUE)
1465			noqueue_attempted = 1;
1466
1467		if (lockres->l_action != OCFS2_AST_INVALID)
1468			mlog(ML_ERROR, "lockres %s has action %u pending\n",
1469			     lockres->l_name, lockres->l_action);
1470
1471		if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1472			lockres->l_action = OCFS2_AST_ATTACH;
1473			lkm_flags &= ~DLM_LKF_CONVERT;
1474		} else {
1475			lockres->l_action = OCFS2_AST_CONVERT;
1476			lkm_flags |= DLM_LKF_CONVERT;
1477		}
1478
1479		lockres->l_requested = level;
1480		lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1481		gen = lockres_set_pending(lockres);
1482		spin_unlock_irqrestore(&lockres->l_lock, flags);
1483
1484		BUG_ON(level == DLM_LOCK_IV);
1485		BUG_ON(level == DLM_LOCK_NL);
1486
1487		mlog(ML_BASTS, "lockres %s, convert from %d to %d\n",
1488		     lockres->l_name, lockres->l_level, level);
1489
1490		/* call dlm_lock to upgrade lock now */
1491		ret = ocfs2_dlm_lock(osb->cconn,
1492				     level,
1493				     &lockres->l_lksb,
1494				     lkm_flags,
1495				     lockres->l_name,
1496				     OCFS2_LOCK_ID_MAX_LEN - 1);
1497		lockres_clear_pending(lockres, gen, osb);
1498		if (ret) {
1499			if (!(lkm_flags & DLM_LKF_NOQUEUE) ||
1500			    (ret != -EAGAIN)) {
1501				ocfs2_log_dlm_error("ocfs2_dlm_lock",
1502						    ret, lockres);
1503			}
1504			ocfs2_recover_from_dlm_error(lockres, 1);
1505			goto out;
1506		}
1507		dlm_locked = 1;
1508
1509		mlog(0, "lock %s, successful return from ocfs2_dlm_lock\n",
1510		     lockres->l_name);
1511
1512		/* At this point we've gone inside the dlm and need to
1513		 * complete our work regardless. */
1514		catch_signals = 0;
1515
1516		/* wait for busy to clear and carry on */
1517		goto again;
1518	}
1519
1520update_holders:
1521	/* Ok, if we get here then we're good to go. */
1522	ocfs2_inc_holders(lockres, level);
1523
1524	ret = 0;
1525unlock:
1526	lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
1527
1528	/* ocfs2_unblock_lock reques on seeing OCFS2_LOCK_UPCONVERT_FINISHING */
1529	kick_dc = (lockres->l_flags & OCFS2_LOCK_BLOCKED);
1530
1531	spin_unlock_irqrestore(&lockres->l_lock, flags);
1532	if (kick_dc)
1533		ocfs2_wake_downconvert_thread(osb);
1534out:
1535	/*
1536	 * This is helping work around a lock inversion between the page lock
1537	 * and dlm locks.  One path holds the page lock while calling aops
1538	 * which block acquiring dlm locks.  The voting thread holds dlm
1539	 * locks while acquiring page locks while down converting data locks.
1540	 * This block is helping an aop path notice the inversion and back
1541	 * off to unlock its page lock before trying the dlm lock again.
1542	 */
1543	if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
1544	    mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
1545		wait = 0;
1546		spin_lock_irqsave(&lockres->l_lock, flags);
1547		if (__lockres_remove_mask_waiter(lockres, &mw)) {
1548			if (dlm_locked)
1549				lockres_or_flags(lockres,
1550					OCFS2_LOCK_NONBLOCK_FINISHED);
1551			spin_unlock_irqrestore(&lockres->l_lock, flags);
1552			ret = -EAGAIN;
1553		} else {
1554			spin_unlock_irqrestore(&lockres->l_lock, flags);
1555			goto again;
1556		}
1557	}
1558	if (wait) {
1559		ret = ocfs2_wait_for_mask(&mw);
1560		if (ret == 0)
1561			goto again;
1562		mlog_errno(ret);
1563	}
1564	ocfs2_update_lock_stats(lockres, level, &mw, ret);
1565
1566#ifdef CONFIG_DEBUG_LOCK_ALLOC
1567	if (!ret && lockres->l_lockdep_map.key != NULL) {
1568		if (level == DLM_LOCK_PR)
1569			rwsem_acquire_read(&lockres->l_lockdep_map, l_subclass,
1570				!!(arg_flags & OCFS2_META_LOCK_NOQUEUE),
1571				caller_ip);
1572		else
1573			rwsem_acquire(&lockres->l_lockdep_map, l_subclass,
1574				!!(arg_flags & OCFS2_META_LOCK_NOQUEUE),
1575				caller_ip);
1576	}
1577#endif
1578	return ret;
1579}
1580
1581static inline int ocfs2_cluster_lock(struct ocfs2_super *osb,
1582				     struct ocfs2_lock_res *lockres,
1583				     int level,
1584				     u32 lkm_flags,
1585				     int arg_flags)
1586{
1587	return __ocfs2_cluster_lock(osb, lockres, level, lkm_flags, arg_flags,
1588				    0, _RET_IP_);
1589}
1590
1591
1592static void __ocfs2_cluster_unlock(struct ocfs2_super *osb,
1593				   struct ocfs2_lock_res *lockres,
1594				   int level,
1595				   unsigned long caller_ip)
1596{
1597	unsigned long flags;
1598
1599	spin_lock_irqsave(&lockres->l_lock, flags);
1600	ocfs2_dec_holders(lockres, level);
1601	ocfs2_downconvert_on_unlock(osb, lockres);
1602	spin_unlock_irqrestore(&lockres->l_lock, flags);
1603#ifdef CONFIG_DEBUG_LOCK_ALLOC
1604	if (lockres->l_lockdep_map.key != NULL)
1605		rwsem_release(&lockres->l_lockdep_map, 1, caller_ip);
1606#endif
1607}
1608
1609static int ocfs2_create_new_lock(struct ocfs2_super *osb,
1610				 struct ocfs2_lock_res *lockres,
1611				 int ex,
1612				 int local)
1613{
1614	int level =  ex ? DLM_LOCK_EX : DLM_LOCK_PR;
1615	unsigned long flags;
1616	u32 lkm_flags = local ? DLM_LKF_LOCAL : 0;
1617
1618	spin_lock_irqsave(&lockres->l_lock, flags);
1619	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
1620	lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1621	spin_unlock_irqrestore(&lockres->l_lock, flags);
1622
1623	return ocfs2_lock_create(osb, lockres, level, lkm_flags);
1624}
1625
1626/* Grants us an EX lock on the data and metadata resources, skipping
1627 * the normal cluster directory lookup. Use this ONLY on newly created
1628 * inodes which other nodes can't possibly see, and which haven't been
1629 * hashed in the inode hash yet. This can give us a good performance
1630 * increase as it'll skip the network broadcast normally associated
1631 * with creating a new lock resource. */
1632int ocfs2_create_new_inode_locks(struct inode *inode)
1633{
1634	int ret;
1635	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1636
1637	BUG_ON(!inode);
1638	BUG_ON(!ocfs2_inode_is_new(inode));
1639
1640	mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
1641
1642	/* NOTE: That we don't increment any of the holder counts, nor
1643	 * do we add anything to a journal handle. Since this is
1644	 * supposed to be a new inode which the cluster doesn't know
1645	 * about yet, there is no need to.  As far as the LVB handling
1646	 * is concerned, this is basically like acquiring an EX lock
1647	 * on a resource which has an invalid one -- we'll set it
1648	 * valid when we release the EX. */
1649
1650	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
1651	if (ret) {
1652		mlog_errno(ret);
1653		goto bail;
1654	}
1655
1656	/*
1657	 * We don't want to use DLM_LKF_LOCAL on a meta data lock as they
1658	 * don't use a generation in their lock names.
1659	 */
1660	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0);
1661	if (ret) {
1662		mlog_errno(ret);
1663		goto bail;
1664	}
1665
1666	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0);
1667	if (ret) {
1668		mlog_errno(ret);
1669		goto bail;
1670	}
1671
1672bail:
1673	return ret;
1674}
1675
1676int ocfs2_rw_lock(struct inode *inode, int write)
1677{
1678	int status, level;
1679	struct ocfs2_lock_res *lockres;
1680	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1681
1682	BUG_ON(!inode);
1683
1684	mlog(0, "inode %llu take %s RW lock\n",
1685	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1686	     write ? "EXMODE" : "PRMODE");
1687
1688	if (ocfs2_mount_local(osb))
1689		return 0;
1690
1691	lockres = &OCFS2_I(inode)->ip_rw_lockres;
1692
1693	level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1694
1695	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
1696				    0);
1697	if (status < 0)
1698		mlog_errno(status);
1699
1700	return status;
1701}
1702
1703void ocfs2_rw_unlock(struct inode *inode, int write)
1704{
1705	int level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1706	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1707	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1708
1709	mlog(0, "inode %llu drop %s RW lock\n",
1710	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1711	     write ? "EXMODE" : "PRMODE");
1712
1713	if (!ocfs2_mount_local(osb))
1714		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1715}
1716
1717/*
1718 * ocfs2_open_lock always get PR mode lock.
1719 */
1720int ocfs2_open_lock(struct inode *inode)
1721{
1722	int status = 0;
1723	struct ocfs2_lock_res *lockres;
1724	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1725
1726	BUG_ON(!inode);
1727
1728	mlog(0, "inode %llu take PRMODE open lock\n",
1729	     (unsigned long long)OCFS2_I(inode)->ip_blkno);
1730
1731	if (ocfs2_is_hard_readonly(osb) || ocfs2_mount_local(osb))
1732		goto out;
1733
1734	lockres = &OCFS2_I(inode)->ip_open_lockres;
1735
1736	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
1737				    DLM_LOCK_PR, 0, 0);
1738	if (status < 0)
1739		mlog_errno(status);
1740
1741out:
1742	return status;
1743}
1744
1745int ocfs2_try_open_lock(struct inode *inode, int write)
1746{
1747	int status = 0, level;
1748	struct ocfs2_lock_res *lockres;
1749	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1750
1751	BUG_ON(!inode);
1752
1753	mlog(0, "inode %llu try to take %s open lock\n",
1754	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1755	     write ? "EXMODE" : "PRMODE");
1756
1757	if (ocfs2_is_hard_readonly(osb)) {
1758		if (write)
1759			status = -EROFS;
1760		goto out;
1761	}
1762
1763	if (ocfs2_mount_local(osb))
1764		goto out;
1765
1766	lockres = &OCFS2_I(inode)->ip_open_lockres;
1767
1768	level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1769
1770	/*
1771	 * The file system may already holding a PRMODE/EXMODE open lock.
1772	 * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on
1773	 * other nodes and the -EAGAIN will indicate to the caller that
1774	 * this inode is still in use.
1775	 */
1776	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
1777				    level, DLM_LKF_NOQUEUE, 0);
1778
1779out:
1780	return status;
1781}
1782
1783/*
1784 * ocfs2_open_unlock unlock PR and EX mode open locks.
1785 */
1786void ocfs2_open_unlock(struct inode *inode)
1787{
1788	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres;
1789	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1790
1791	mlog(0, "inode %llu drop open lock\n",
1792	     (unsigned long long)OCFS2_I(inode)->ip_blkno);
1793
1794	if (ocfs2_mount_local(osb))
1795		goto out;
1796
1797	if(lockres->l_ro_holders)
1798		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
1799				     DLM_LOCK_PR);
1800	if(lockres->l_ex_holders)
1801		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
1802				     DLM_LOCK_EX);
1803
1804out:
1805	return;
1806}
1807
1808static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres,
1809				     int level)
1810{
1811	int ret;
1812	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1813	unsigned long flags;
1814	struct ocfs2_mask_waiter mw;
1815
1816	ocfs2_init_mask_waiter(&mw);
1817
1818retry_cancel:
1819	spin_lock_irqsave(&lockres->l_lock, flags);
1820	if (lockres->l_flags & OCFS2_LOCK_BUSY) {
1821		ret = ocfs2_prepare_cancel_convert(osb, lockres);
1822		if (ret) {
1823			spin_unlock_irqrestore(&lockres->l_lock, flags);
1824			ret = ocfs2_cancel_convert(osb, lockres);
1825			if (ret < 0) {
1826				mlog_errno(ret);
1827				goto out;
1828			}
1829			goto retry_cancel;
1830		}
1831		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1832		spin_unlock_irqrestore(&lockres->l_lock, flags);
1833
1834		ocfs2_wait_for_mask(&mw);
1835		goto retry_cancel;
1836	}
1837
1838	ret = -ERESTARTSYS;
1839	/*
1840	 * We may still have gotten the lock, in which case there's no
1841	 * point to restarting the syscall.
1842	 */
1843	if (lockres->l_level == level)
1844		ret = 0;
1845
1846	mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret,
1847	     lockres->l_flags, lockres->l_level, lockres->l_action);
1848
1849	spin_unlock_irqrestore(&lockres->l_lock, flags);
1850
1851out:
1852	return ret;
1853}
1854
1855/*
1856 * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of
1857 * flock() calls. The locking approach this requires is sufficiently
1858 * different from all other cluster lock types that we implement a
1859 * separate path to the "low-level" dlm calls. In particular:
1860 *
1861 * - No optimization of lock levels is done - we take at exactly
1862 *   what's been requested.
1863 *
1864 * - No lock caching is employed. We immediately downconvert to
1865 *   no-lock at unlock time. This also means flock locks never go on
1866 *   the blocking list).
1867 *
1868 * - Since userspace can trivially deadlock itself with flock, we make
1869 *   sure to allow cancellation of a misbehaving applications flock()
1870 *   request.
1871 *
1872 * - Access to any flock lockres doesn't require concurrency, so we
1873 *   can simplify the code by requiring the caller to guarantee
1874 *   serialization of dlmglue flock calls.
1875 */
1876int ocfs2_file_lock(struct file *file, int ex, int trylock)
1877{
1878	int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
1879	unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0;
1880	unsigned long flags;
1881	struct ocfs2_file_private *fp = file->private_data;
1882	struct ocfs2_lock_res *lockres = &fp->fp_flock;
1883	struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
1884	struct ocfs2_mask_waiter mw;
1885
1886	ocfs2_init_mask_waiter(&mw);
1887
1888	if ((lockres->l_flags & OCFS2_LOCK_BUSY) ||
1889	    (lockres->l_level > DLM_LOCK_NL)) {
1890		mlog(ML_ERROR,
1891		     "File lock \"%s\" has busy or locked state: flags: 0x%lx, "
1892		     "level: %u\n", lockres->l_name, lockres->l_flags,
1893		     lockres->l_level);
1894		return -EINVAL;
1895	}
1896
1897	spin_lock_irqsave(&lockres->l_lock, flags);
1898	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1899		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1900		spin_unlock_irqrestore(&lockres->l_lock, flags);
1901
1902		/*
1903		 * Get the lock at NLMODE to start - that way we
1904		 * can cancel the upconvert request if need be.
1905		 */
1906		ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0);
1907		if (ret < 0) {
1908			mlog_errno(ret);
1909			goto out;
1910		}
1911
1912		ret = ocfs2_wait_for_mask(&mw);
1913		if (ret) {
1914			mlog_errno(ret);
1915			goto out;
1916		}
1917		spin_lock_irqsave(&lockres->l_lock, flags);
1918	}
1919
1920	lockres->l_action = OCFS2_AST_CONVERT;
1921	lkm_flags |= DLM_LKF_CONVERT;
1922	lockres->l_requested = level;
1923	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1924
1925	lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1926	spin_unlock_irqrestore(&lockres->l_lock, flags);
1927
1928	ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags,
1929			     lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1);
1930	if (ret) {
1931		if (!trylock || (ret != -EAGAIN)) {
1932			ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
1933			ret = -EINVAL;
1934		}
1935
1936		ocfs2_recover_from_dlm_error(lockres, 1);
1937		lockres_remove_mask_waiter(lockres, &mw);
1938		goto out;
1939	}
1940
1941	ret = ocfs2_wait_for_mask_interruptible(&mw, lockres);
1942	if (ret == -ERESTARTSYS) {
1943		/*
1944		 * Userspace can cause deadlock itself with
1945		 * flock(). Current behavior locally is to allow the
1946		 * deadlock, but abort the system call if a signal is
1947		 * received. We follow this example, otherwise a
1948		 * poorly written program could sit in kernel until
1949		 * reboot.
1950		 *
1951		 * Handling this is a bit more complicated for Ocfs2
1952		 * though. We can't exit this function with an
1953		 * outstanding lock request, so a cancel convert is
1954		 * required. We intentionally overwrite 'ret' - if the
1955		 * cancel fails and the lock was granted, it's easier
1956		 * to just bubble success back up to the user.
1957		 */
1958		ret = ocfs2_flock_handle_signal(lockres, level);
1959	} else if (!ret && (level > lockres->l_level)) {
1960		/* Trylock failed asynchronously */
1961		BUG_ON(!trylock);
1962		ret = -EAGAIN;
1963	}
1964
1965out:
1966
1967	mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n",
1968	     lockres->l_name, ex, trylock, ret);
1969	return ret;
1970}
1971
1972void ocfs2_file_unlock(struct file *file)
1973{
1974	int ret;
1975	unsigned int gen;
1976	unsigned long flags;
1977	struct ocfs2_file_private *fp = file->private_data;
1978	struct ocfs2_lock_res *lockres = &fp->fp_flock;
1979	struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
1980	struct ocfs2_mask_waiter mw;
1981
1982	ocfs2_init_mask_waiter(&mw);
1983
1984	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED))
1985		return;
1986
1987	if (lockres->l_level == DLM_LOCK_NL)
1988		return;
1989
1990	mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n",
1991	     lockres->l_name, lockres->l_flags, lockres->l_level,
1992	     lockres->l_action);
1993
1994	spin_lock_irqsave(&lockres->l_lock, flags);
1995	/*
1996	 * Fake a blocking ast for the downconvert code.
1997	 */
1998	lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
1999	lockres->l_blocking = DLM_LOCK_EX;
2000
2001	gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL);
2002	lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
2003	spin_unlock_irqrestore(&lockres->l_lock, flags);
2004
2005	ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen);
2006	if (ret) {
2007		mlog_errno(ret);
2008		return;
2009	}
2010
2011	ret = ocfs2_wait_for_mask(&mw);
2012	if (ret)
2013		mlog_errno(ret);
2014}
2015
2016static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
2017					struct ocfs2_lock_res *lockres)
2018{
2019	int kick = 0;
2020
2021	/* If we know that another node is waiting on our lock, kick
2022	 * the downconvert thread * pre-emptively when we reach a release
2023	 * condition. */
2024	if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
2025		switch(lockres->l_blocking) {
2026		case DLM_LOCK_EX:
2027			if (!lockres->l_ex_holders && !lockres->l_ro_holders)
2028				kick = 1;
2029			break;
2030		case DLM_LOCK_PR:
2031			if (!lockres->l_ex_holders)
2032				kick = 1;
2033			break;
2034		default:
2035			BUG();
2036		}
2037	}
2038
2039	if (kick)
2040		ocfs2_wake_downconvert_thread(osb);
2041}
2042
2043#define OCFS2_SEC_BITS   34
2044#define OCFS2_SEC_SHIFT  (64 - 34)
2045#define OCFS2_NSEC_MASK  ((1ULL << OCFS2_SEC_SHIFT) - 1)
2046
2047/* LVB only has room for 64 bits of time here so we pack it for
2048 * now. */
2049static u64 ocfs2_pack_timespec(struct timespec *spec)
2050{
2051	u64 res;
2052	u64 sec = spec->tv_sec;
2053	u32 nsec = spec->tv_nsec;
2054
2055	res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
2056
2057	return res;
2058}
2059
2060/* Call this with the lockres locked. I am reasonably sure we don't
2061 * need ip_lock in this function as anyone who would be changing those
2062 * values is supposed to be blocked in ocfs2_inode_lock right now. */
2063static void __ocfs2_stuff_meta_lvb(struct inode *inode)
2064{
2065	struct ocfs2_inode_info *oi = OCFS2_I(inode);
2066	struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
2067	struct ocfs2_meta_lvb *lvb;
2068
2069	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2070
2071	/*
2072	 * Invalidate the LVB of a deleted inode - this way other
2073	 * nodes are forced to go to disk and discover the new inode
2074	 * status.
2075	 */
2076	if (oi->ip_flags & OCFS2_INODE_DELETED) {
2077		lvb->lvb_version = 0;
2078		goto out;
2079	}
2080
2081	lvb->lvb_version   = OCFS2_LVB_VERSION;
2082	lvb->lvb_isize	   = cpu_to_be64(i_size_read(inode));
2083	lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
2084	lvb->lvb_iuid      = cpu_to_be32(i_uid_read(inode));
2085	lvb->lvb_igid      = cpu_to_be32(i_gid_read(inode));
2086	lvb->lvb_imode     = cpu_to_be16(inode->i_mode);
2087	lvb->lvb_inlink    = cpu_to_be16(inode->i_nlink);
2088	lvb->lvb_iatime_packed  =
2089		cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime));
2090	lvb->lvb_ictime_packed =
2091		cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime));
2092	lvb->lvb_imtime_packed =
2093		cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
2094	lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
2095	lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features);
2096	lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
2097
2098out:
2099	mlog_meta_lvb(0, lockres);
2100}
2101
2102static void ocfs2_unpack_timespec(struct timespec *spec,
2103				  u64 packed_time)
2104{
2105	spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
2106	spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
2107}
2108
2109static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
2110{
2111	struct ocfs2_inode_info *oi = OCFS2_I(inode);
2112	struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
2113	struct ocfs2_meta_lvb *lvb;
2114
2115	mlog_meta_lvb(0, lockres);
2116
2117	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2118
2119	/* We're safe here without the lockres lock... */
2120	spin_lock(&oi->ip_lock);
2121	oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
2122	i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
2123
2124	oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
2125	oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures);
2126	ocfs2_set_inode_flags(inode);
2127
2128	/* fast-symlinks are a special case */
2129	if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
2130		inode->i_blocks = 0;
2131	else
2132		inode->i_blocks = ocfs2_inode_sector_count(inode);
2133
2134	i_uid_write(inode, be32_to_cpu(lvb->lvb_iuid));
2135	i_gid_write(inode, be32_to_cpu(lvb->lvb_igid));
2136	inode->i_mode    = be16_to_cpu(lvb->lvb_imode);
2137	set_nlink(inode, be16_to_cpu(lvb->lvb_inlink));
2138	ocfs2_unpack_timespec(&inode->i_atime,
2139			      be64_to_cpu(lvb->lvb_iatime_packed));
2140	ocfs2_unpack_timespec(&inode->i_mtime,
2141			      be64_to_cpu(lvb->lvb_imtime_packed));
2142	ocfs2_unpack_timespec(&inode->i_ctime,
2143			      be64_to_cpu(lvb->lvb_ictime_packed));
2144	spin_unlock(&oi->ip_lock);
2145}
2146
2147static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
2148					      struct ocfs2_lock_res *lockres)
2149{
2150	struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2151
2152	if (ocfs2_dlm_lvb_valid(&lockres->l_lksb)
2153	    && lvb->lvb_version == OCFS2_LVB_VERSION
2154	    && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
2155		return 1;
2156	return 0;
2157}
2158
2159/* Determine whether a lock resource needs to be refreshed, and
2160 * arbitrate who gets to refresh it.
2161 *
2162 *   0 means no refresh needed.
2163 *
2164 *   > 0 means you need to refresh this and you MUST call
2165 *   ocfs2_complete_lock_res_refresh afterwards. */
2166static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
2167{
2168	unsigned long flags;
2169	int status = 0;
2170
2171refresh_check:
2172	spin_lock_irqsave(&lockres->l_lock, flags);
2173	if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
2174		spin_unlock_irqrestore(&lockres->l_lock, flags);
2175		goto bail;
2176	}
2177
2178	if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
2179		spin_unlock_irqrestore(&lockres->l_lock, flags);
2180
2181		ocfs2_wait_on_refreshing_lock(lockres);
2182		goto refresh_check;
2183	}
2184
2185	/* Ok, I'll be the one to refresh this lock. */
2186	lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
2187	spin_unlock_irqrestore(&lockres->l_lock, flags);
2188
2189	status = 1;
2190bail:
2191	mlog(0, "status %d\n", status);
2192	return status;
2193}
2194
2195/* If status is non zero, I'll mark it as not being in refresh
2196 * anymroe, but i won't clear the needs refresh flag. */
2197static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
2198						   int status)
2199{
2200	unsigned long flags;
2201
2202	spin_lock_irqsave(&lockres->l_lock, flags);
2203	lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
2204	if (!status)
2205		lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
2206	spin_unlock_irqrestore(&lockres->l_lock, flags);
2207
2208	wake_up(&lockres->l_event);
2209}
2210
2211/* may or may not return a bh if it went to disk. */
2212static int ocfs2_inode_lock_update(struct inode *inode,
2213				  struct buffer_head **bh)
2214{
2215	int status = 0;
2216	struct ocfs2_inode_info *oi = OCFS2_I(inode);
2217	struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
2218	struct ocfs2_dinode *fe;
2219	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2220
2221	if (ocfs2_mount_local(osb))
2222		goto bail;
2223
2224	spin_lock(&oi->ip_lock);
2225	if (oi->ip_flags & OCFS2_INODE_DELETED) {
2226		mlog(0, "Orphaned inode %llu was deleted while we "
2227		     "were waiting on a lock. ip_flags = 0x%x\n",
2228		     (unsigned long long)oi->ip_blkno, oi->ip_flags);
2229		spin_unlock(&oi->ip_lock);
2230		status = -ENOENT;
2231		goto bail;
2232	}
2233	spin_unlock(&oi->ip_lock);
2234
2235	if (!ocfs2_should_refresh_lock_res(lockres))
2236		goto bail;
2237
2238	/* This will discard any caching information we might have had
2239	 * for the inode metadata. */
2240	ocfs2_metadata_cache_purge(INODE_CACHE(inode));
2241
2242	ocfs2_extent_map_trunc(inode, 0);
2243
2244	if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
2245		mlog(0, "Trusting LVB on inode %llu\n",
2246		     (unsigned long long)oi->ip_blkno);
2247		ocfs2_refresh_inode_from_lvb(inode);
2248	} else {
2249		/* Boo, we have to go to disk. */
2250		/* read bh, cast, ocfs2_refresh_inode */
2251		status = ocfs2_read_inode_block(inode, bh);
2252		if (status < 0) {
2253			mlog_errno(status);
2254			goto bail_refresh;
2255		}
2256		fe = (struct ocfs2_dinode *) (*bh)->b_data;
2257
2258		/* This is a good chance to make sure we're not
2259		 * locking an invalid object.  ocfs2_read_inode_block()
2260		 * already checked that the inode block is sane.
2261		 *
2262		 * We bug on a stale inode here because we checked
2263		 * above whether it was wiped from disk. The wiping
2264		 * node provides a guarantee that we receive that
2265		 * message and can mark the inode before dropping any
2266		 * locks associated with it. */
2267		mlog_bug_on_msg(inode->i_generation !=
2268				le32_to_cpu(fe->i_generation),
2269				"Invalid dinode %llu disk generation: %u "
2270				"inode->i_generation: %u\n",
2271				(unsigned long long)oi->ip_blkno,
2272				le32_to_cpu(fe->i_generation),
2273				inode->i_generation);
2274		mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
2275				!(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
2276				"Stale dinode %llu dtime: %llu flags: 0x%x\n",
2277				(unsigned long long)oi->ip_blkno,
2278				(unsigned long long)le64_to_cpu(fe->i_dtime),
2279				le32_to_cpu(fe->i_flags));
2280
2281		ocfs2_refresh_inode(inode, fe);
2282		ocfs2_track_lock_refresh(lockres);
2283	}
2284
2285	status = 0;
2286bail_refresh:
2287	ocfs2_complete_lock_res_refresh(lockres, status);
2288bail:
2289	return status;
2290}
2291
2292static int ocfs2_assign_bh(struct inode *inode,
2293			   struct buffer_head **ret_bh,
2294			   struct buffer_head *passed_bh)
2295{
2296	int status;
2297
2298	if (passed_bh) {
2299		/* Ok, the update went to disk for us, use the
2300		 * returned bh. */
2301		*ret_bh = passed_bh;
2302		get_bh(*ret_bh);
2303
2304		return 0;
2305	}
2306
2307	status = ocfs2_read_inode_block(inode, ret_bh);
2308	if (status < 0)
2309		mlog_errno(status);
2310
2311	return status;
2312}
2313
2314/*
2315 * returns < 0 error if the callback will never be called, otherwise
2316 * the result of the lock will be communicated via the callback.
2317 */
2318int ocfs2_inode_lock_full_nested(struct inode *inode,
2319				 struct buffer_head **ret_bh,
2320				 int ex,
2321				 int arg_flags,
2322				 int subclass)
2323{
2324	int status, level, acquired;
2325	u32 dlm_flags;
2326	struct ocfs2_lock_res *lockres = NULL;
2327	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2328	struct buffer_head *local_bh = NULL;
2329
2330	BUG_ON(!inode);
2331
2332	mlog(0, "inode %llu, take %s META lock\n",
2333	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
2334	     ex ? "EXMODE" : "PRMODE");
2335
2336	status = 0;
2337	acquired = 0;
2338	/* We'll allow faking a readonly metadata lock for
2339	 * rodevices. */
2340	if (ocfs2_is_hard_readonly(osb)) {
2341		if (ex)
2342			status = -EROFS;
2343		goto getbh;
2344	}
2345
2346	if (ocfs2_mount_local(osb))
2347		goto local;
2348
2349	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
2350		ocfs2_wait_for_recovery(osb);
2351
2352	lockres = &OCFS2_I(inode)->ip_inode_lockres;
2353	level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2354	dlm_flags = 0;
2355	if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
2356		dlm_flags |= DLM_LKF_NOQUEUE;
2357
2358	status = __ocfs2_cluster_lock(osb, lockres, level, dlm_flags,
2359				      arg_flags, subclass, _RET_IP_);
2360	if (status < 0) {
2361		if (status != -EAGAIN)
2362			mlog_errno(status);
2363		goto bail;
2364	}
2365
2366	/* Notify the error cleanup path to drop the cluster lock. */
2367	acquired = 1;
2368
2369	/* We wait twice because a node may have died while we were in
2370	 * the lower dlm layers. The second time though, we've
2371	 * committed to owning this lock so we don't allow signals to
2372	 * abort the operation. */
2373	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
2374		ocfs2_wait_for_recovery(osb);
2375
2376local:
2377	/*
2378	 * We only see this flag if we're being called from
2379	 * ocfs2_read_locked_inode(). It means we're locking an inode
2380	 * which hasn't been populated yet, so clear the refresh flag
2381	 * and let the caller handle it.
2382	 */
2383	if (inode->i_state & I_NEW) {
2384		status = 0;
2385		if (lockres)
2386			ocfs2_complete_lock_res_refresh(lockres, 0);
2387		goto bail;
2388	}
2389
2390	/* This is fun. The caller may want a bh back, or it may
2391	 * not. ocfs2_inode_lock_update definitely wants one in, but
2392	 * may or may not read one, depending on what's in the
2393	 * LVB. The result of all of this is that we've *only* gone to
2394	 * disk if we have to, so the complexity is worthwhile. */
2395	status = ocfs2_inode_lock_update(inode, &local_bh);
2396	if (status < 0) {
2397		if (status != -ENOENT)
2398			mlog_errno(status);
2399		goto bail;
2400	}
2401getbh:
2402	if (ret_bh) {
2403		status = ocfs2_assign_bh(inode, ret_bh, local_bh);
2404		if (status < 0) {
2405			mlog_errno(status);
2406			goto bail;
2407		}
2408	}
2409
2410bail:
2411	if (status < 0) {
2412		if (ret_bh && (*ret_bh)) {
2413			brelse(*ret_bh);
2414			*ret_bh = NULL;
2415		}
2416		if (acquired)
2417			ocfs2_inode_unlock(inode, ex);
2418	}
2419
2420	if (local_bh)
2421		brelse(local_bh);
2422
2423	return status;
2424}
2425
2426/*
2427 * This is working around a lock inversion between tasks acquiring DLM
2428 * locks while holding a page lock and the downconvert thread which
2429 * blocks dlm lock acquiry while acquiring page locks.
2430 *
2431 * ** These _with_page variantes are only intended to be called from aop
2432 * methods that hold page locks and return a very specific *positive* error
2433 * code that aop methods pass up to the VFS -- test for errors with != 0. **
2434 *
2435 * The DLM is called such that it returns -EAGAIN if it would have
2436 * blocked waiting for the downconvert thread.  In that case we unlock
2437 * our page so the downconvert thread can make progress.  Once we've
2438 * done this we have to return AOP_TRUNCATED_PAGE so the aop method
2439 * that called us can bubble that back up into the VFS who will then
2440 * immediately retry the aop call.
2441 *
2442 * We do a blocking lock and immediate unlock before returning, though, so that
2443 * the lock has a great chance of being cached on this node by the time the VFS
2444 * calls back to retry the aop.    This has a potential to livelock as nodes
2445 * ping locks back and forth, but that's a risk we're willing to take to avoid
2446 * the lock inversion simply.
2447 */
2448int ocfs2_inode_lock_with_page(struct inode *inode,
2449			      struct buffer_head **ret_bh,
2450			      int ex,
2451			      struct page *page)
2452{
2453	int ret;
2454
2455	ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK);
2456	if (ret == -EAGAIN) {
2457		unlock_page(page);
2458		if (ocfs2_inode_lock(inode, ret_bh, ex) == 0)
2459			ocfs2_inode_unlock(inode, ex);
2460		ret = AOP_TRUNCATED_PAGE;
2461	}
2462
2463	return ret;
2464}
2465
2466int ocfs2_inode_lock_atime(struct inode *inode,
2467			  struct vfsmount *vfsmnt,
2468			  int *level)
2469{
2470	int ret;
2471
2472	ret = ocfs2_inode_lock(inode, NULL, 0);
2473	if (ret < 0) {
2474		mlog_errno(ret);
2475		return ret;
2476	}
2477
2478	/*
2479	 * If we should update atime, we will get EX lock,
2480	 * otherwise we just get PR lock.
2481	 */
2482	if (ocfs2_should_update_atime(inode, vfsmnt)) {
2483		struct buffer_head *bh = NULL;
2484
2485		ocfs2_inode_unlock(inode, 0);
2486		ret = ocfs2_inode_lock(inode, &bh, 1);
2487		if (ret < 0) {
2488			mlog_errno(ret);
2489			return ret;
2490		}
2491		*level = 1;
2492		if (ocfs2_should_update_atime(inode, vfsmnt))
2493			ocfs2_update_inode_atime(inode, bh);
2494		if (bh)
2495			brelse(bh);
2496	} else
2497		*level = 0;
2498
2499	return ret;
2500}
2501
2502void ocfs2_inode_unlock(struct inode *inode,
2503		       int ex)
2504{
2505	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2506	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres;
2507	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2508
2509	mlog(0, "inode %llu drop %s META lock\n",
2510	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
2511	     ex ? "EXMODE" : "PRMODE");
2512
2513	if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) &&
2514	    !ocfs2_mount_local(osb))
2515		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
2516}
2517
2518int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno)
2519{
2520	struct ocfs2_lock_res *lockres;
2521	struct ocfs2_orphan_scan_lvb *lvb;
2522	int status = 0;
2523
2524	if (ocfs2_is_hard_readonly(osb))
2525		return -EROFS;
2526
2527	if (ocfs2_mount_local(osb))
2528		return 0;
2529
2530	lockres = &osb->osb_orphan_scan.os_lockres;
2531	status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
2532	if (status < 0)
2533		return status;
2534
2535	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2536	if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
2537	    lvb->lvb_version == OCFS2_ORPHAN_LVB_VERSION)
2538		*seqno = be32_to_cpu(lvb->lvb_os_seqno);
2539	else
2540		*seqno = osb->osb_orphan_scan.os_seqno + 1;
2541
2542	return status;
2543}
2544
2545void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno)
2546{
2547	struct ocfs2_lock_res *lockres;
2548	struct ocfs2_orphan_scan_lvb *lvb;
2549
2550	if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) {
2551		lockres = &osb->osb_orphan_scan.os_lockres;
2552		lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2553		lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION;
2554		lvb->lvb_os_seqno = cpu_to_be32(seqno);
2555		ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
2556	}
2557}
2558
2559int ocfs2_super_lock(struct ocfs2_super *osb,
2560		     int ex)
2561{
2562	int status = 0;
2563	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2564	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2565
2566	if (ocfs2_is_hard_readonly(osb))
2567		return -EROFS;
2568
2569	if (ocfs2_mount_local(osb))
2570		goto bail;
2571
2572	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
2573	if (status < 0) {
2574		mlog_errno(status);
2575		goto bail;
2576	}
2577
2578	/* The super block lock path is really in the best position to
2579	 * know when resources covered by the lock need to be
2580	 * refreshed, so we do it here. Of course, making sense of
2581	 * everything is up to the caller :) */
2582	status = ocfs2_should_refresh_lock_res(lockres);
2583	if (status) {
2584		status = ocfs2_refresh_slot_info(osb);
2585
2586		ocfs2_complete_lock_res_refresh(lockres, status);
2587
2588		if (status < 0) {
2589			ocfs2_cluster_unlock(osb, lockres, level);
2590			mlog_errno(status);
2591		}
2592		ocfs2_track_lock_refresh(lockres);
2593	}
2594bail:
2595	return status;
2596}
2597
2598void ocfs2_super_unlock(struct ocfs2_super *osb,
2599			int ex)
2600{
2601	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2602	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2603
2604	if (!ocfs2_mount_local(osb))
2605		ocfs2_cluster_unlock(osb, lockres, level);
2606}
2607
2608int ocfs2_rename_lock(struct ocfs2_super *osb)
2609{
2610	int status;
2611	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2612
2613	if (ocfs2_is_hard_readonly(osb))
2614		return -EROFS;
2615
2616	if (ocfs2_mount_local(osb))
2617		return 0;
2618
2619	status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
2620	if (status < 0)
2621		mlog_errno(status);
2622
2623	return status;
2624}
2625
2626void ocfs2_rename_unlock(struct ocfs2_super *osb)
2627{
2628	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2629
2630	if (!ocfs2_mount_local(osb))
2631		ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
2632}
2633
2634int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex)
2635{
2636	int status;
2637	struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres;
2638
2639	if (ocfs2_is_hard_readonly(osb))
2640		return -EROFS;
2641
2642	if (ocfs2_mount_local(osb))
2643		return 0;
2644
2645	status = ocfs2_cluster_lock(osb, lockres, ex ? LKM_EXMODE : LKM_PRMODE,
2646				    0, 0);
2647	if (status < 0)
2648		mlog(ML_ERROR, "lock on nfs sync lock failed %d\n", status);
2649
2650	return status;
2651}
2652
2653void ocfs2_nfs_sync_unlock(struct ocfs2_super *osb, int ex)
2654{
2655	struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres;
2656
2657	if (!ocfs2_mount_local(osb))
2658		ocfs2_cluster_unlock(osb, lockres,
2659				     ex ? LKM_EXMODE : LKM_PRMODE);
2660}
2661
2662int ocfs2_dentry_lock(struct dentry *dentry, int ex)
2663{
2664	int ret;
2665	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2666	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2667	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2668
2669	BUG_ON(!dl);
2670
2671	if (ocfs2_is_hard_readonly(osb)) {
2672		if (ex)
2673			return -EROFS;
2674		return 0;
2675	}
2676
2677	if (ocfs2_mount_local(osb))
2678		return 0;
2679
2680	ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
2681	if (ret < 0)
2682		mlog_errno(ret);
2683
2684	return ret;
2685}
2686
2687void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
2688{
2689	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2690	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2691	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2692
2693	if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb))
2694		ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
2695}
2696
2697/* Reference counting of the dlm debug structure. We want this because
2698 * open references on the debug inodes can live on after a mount, so
2699 * we can't rely on the ocfs2_super to always exist. */
2700static void ocfs2_dlm_debug_free(struct kref *kref)
2701{
2702	struct ocfs2_dlm_debug *dlm_debug;
2703
2704	dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
2705
2706	kfree(dlm_debug);
2707}
2708
2709void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
2710{
2711	if (dlm_debug)
2712		kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
2713}
2714
2715static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
2716{
2717	kref_get(&debug->d_refcnt);
2718}
2719
2720struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
2721{
2722	struct ocfs2_dlm_debug *dlm_debug;
2723
2724	dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
2725	if (!dlm_debug) {
2726		mlog_errno(-ENOMEM);
2727		goto out;
2728	}
2729
2730	kref_init(&dlm_debug->d_refcnt);
2731	INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
2732	dlm_debug->d_locking_state = NULL;
2733out:
2734	return dlm_debug;
2735}
2736
2737/* Access to this is arbitrated for us via seq_file->sem. */
2738struct ocfs2_dlm_seq_priv {
2739	struct ocfs2_dlm_debug *p_dlm_debug;
2740	struct ocfs2_lock_res p_iter_res;
2741	struct ocfs2_lock_res p_tmp_res;
2742};
2743
2744static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
2745						 struct ocfs2_dlm_seq_priv *priv)
2746{
2747	struct ocfs2_lock_res *iter, *ret = NULL;
2748	struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
2749
2750	assert_spin_locked(&ocfs2_dlm_tracking_lock);
2751
2752	list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
2753		/* discover the head of the list */
2754		if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
2755			mlog(0, "End of list found, %p\n", ret);
2756			break;
2757		}
2758
2759		/* We track our "dummy" iteration lockres' by a NULL
2760		 * l_ops field. */
2761		if (iter->l_ops != NULL) {
2762			ret = iter;
2763			break;
2764		}
2765	}
2766
2767	return ret;
2768}
2769
2770static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
2771{
2772	struct ocfs2_dlm_seq_priv *priv = m->private;
2773	struct ocfs2_lock_res *iter;
2774
2775	spin_lock(&ocfs2_dlm_tracking_lock);
2776	iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
2777	if (iter) {
2778		/* Since lockres' have the lifetime of their container
2779		 * (which can be inodes, ocfs2_supers, etc) we want to
2780		 * copy this out to a temporary lockres while still
2781		 * under the spinlock. Obviously after this we can't
2782		 * trust any pointers on the copy returned, but that's
2783		 * ok as the information we want isn't typically held
2784		 * in them. */
2785		priv->p_tmp_res = *iter;
2786		iter = &priv->p_tmp_res;
2787	}
2788	spin_unlock(&ocfs2_dlm_tracking_lock);
2789
2790	return iter;
2791}
2792
2793static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
2794{
2795}
2796
2797static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
2798{
2799	struct ocfs2_dlm_seq_priv *priv = m->private;
2800	struct ocfs2_lock_res *iter = v;
2801	struct ocfs2_lock_res *dummy = &priv->p_iter_res;
2802
2803	spin_lock(&ocfs2_dlm_tracking_lock);
2804	iter = ocfs2_dlm_next_res(iter, priv);
2805	list_del_init(&dummy->l_debug_list);
2806	if (iter) {
2807		list_add(&dummy->l_debug_list, &iter->l_debug_list);
2808		priv->p_tmp_res = *iter;
2809		iter = &priv->p_tmp_res;
2810	}
2811	spin_unlock(&ocfs2_dlm_tracking_lock);
2812
2813	return iter;
2814}
2815
2816/*
2817 * Version is used by debugfs.ocfs2 to determine the format being used
2818 *
2819 * New in version 2
2820 *	- Lock stats printed
2821 * New in version 3
2822 *	- Max time in lock stats is in usecs (instead of nsecs)
2823 */
2824#define OCFS2_DLM_DEBUG_STR_VERSION 3
2825static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
2826{
2827	int i;
2828	char *lvb;
2829	struct ocfs2_lock_res *lockres = v;
2830
2831	if (!lockres)
2832		return -EINVAL;
2833
2834	seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
2835
2836	if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
2837		seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
2838			   lockres->l_name,
2839			   (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
2840	else
2841		seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
2842
2843	seq_printf(m, "%d\t"
2844		   "0x%lx\t"
2845		   "0x%x\t"
2846		   "0x%x\t"
2847		   "%u\t"
2848		   "%u\t"
2849		   "%d\t"
2850		   "%d\t",
2851		   lockres->l_level,
2852		   lockres->l_flags,
2853		   lockres->l_action,
2854		   lockres->l_unlock_action,
2855		   lockres->l_ro_holders,
2856		   lockres->l_ex_holders,
2857		   lockres->l_requested,
2858		   lockres->l_blocking);
2859
2860	/* Dump the raw LVB */
2861	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2862	for(i = 0; i < DLM_LVB_LEN; i++)
2863		seq_printf(m, "0x%x\t", lvb[i]);
2864
2865#ifdef CONFIG_OCFS2_FS_STATS
2866# define lock_num_prmode(_l)		((_l)->l_lock_prmode.ls_gets)
2867# define lock_num_exmode(_l)		((_l)->l_lock_exmode.ls_gets)
2868# define lock_num_prmode_failed(_l)	((_l)->l_lock_prmode.ls_fail)
2869# define lock_num_exmode_failed(_l)	((_l)->l_lock_exmode.ls_fail)
2870# define lock_total_prmode(_l)		((_l)->l_lock_prmode.ls_total)
2871# define lock_total_exmode(_l)		((_l)->l_lock_exmode.ls_total)
2872# define lock_max_prmode(_l)		((_l)->l_lock_prmode.ls_max)
2873# define lock_max_exmode(_l)		((_l)->l_lock_exmode.ls_max)
2874# define lock_refresh(_l)		((_l)->l_lock_refresh)
2875#else
2876# define lock_num_prmode(_l)		(0)
2877# define lock_num_exmode(_l)		(0)
2878# define lock_num_prmode_failed(_l)	(0)
2879# define lock_num_exmode_failed(_l)	(0)
2880# define lock_total_prmode(_l)		(0ULL)
2881# define lock_total_exmode(_l)		(0ULL)
2882# define lock_max_prmode(_l)		(0)
2883# define lock_max_exmode(_l)		(0)
2884# define lock_refresh(_l)		(0)
2885#endif
2886	/* The following seq_print was added in version 2 of this output */
2887	seq_printf(m, "%u\t"
2888		   "%u\t"
2889		   "%u\t"
2890		   "%u\t"
2891		   "%llu\t"
2892		   "%llu\t"
2893		   "%u\t"
2894		   "%u\t"
2895		   "%u\t",
2896		   lock_num_prmode(lockres),
2897		   lock_num_exmode(lockres),
2898		   lock_num_prmode_failed(lockres),
2899		   lock_num_exmode_failed(lockres),
2900		   lock_total_prmode(lockres),
2901		   lock_total_exmode(lockres),
2902		   lock_max_prmode(lockres),
2903		   lock_max_exmode(lockres),
2904		   lock_refresh(lockres));
2905
2906	/* End the line */
2907	seq_printf(m, "\n");
2908	return 0;
2909}
2910
2911static const struct seq_operations ocfs2_dlm_seq_ops = {
2912	.start =	ocfs2_dlm_seq_start,
2913	.stop =		ocfs2_dlm_seq_stop,
2914	.next =		ocfs2_dlm_seq_next,
2915	.show =		ocfs2_dlm_seq_show,
2916};
2917
2918static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
2919{
2920	struct seq_file *seq = file->private_data;
2921	struct ocfs2_dlm_seq_priv *priv = seq->private;
2922	struct ocfs2_lock_res *res = &priv->p_iter_res;
2923
2924	ocfs2_remove_lockres_tracking(res);
2925	ocfs2_put_dlm_debug(priv->p_dlm_debug);
2926	return seq_release_private(inode, file);
2927}
2928
2929static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
2930{
2931	struct ocfs2_dlm_seq_priv *priv;
2932	struct ocfs2_super *osb;
2933
2934	priv = __seq_open_private(file, &ocfs2_dlm_seq_ops, sizeof(*priv));
2935	if (!priv) {
2936		mlog_errno(-ENOMEM);
2937		return -ENOMEM;
2938	}
2939
2940	osb = inode->i_private;
2941	ocfs2_get_dlm_debug(osb->osb_dlm_debug);
2942	priv->p_dlm_debug = osb->osb_dlm_debug;
2943	INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
2944
2945	ocfs2_add_lockres_tracking(&priv->p_iter_res,
2946				   priv->p_dlm_debug);
2947
2948	return 0;
2949}
2950
2951static const struct file_operations ocfs2_dlm_debug_fops = {
2952	.open =		ocfs2_dlm_debug_open,
2953	.release =	ocfs2_dlm_debug_release,
2954	.read =		seq_read,
2955	.llseek =	seq_lseek,
2956};
2957
2958static int ocfs2_dlm_init_debug(struct ocfs2_super *osb)
2959{
2960	int ret = 0;
2961	struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2962
2963	dlm_debug->d_locking_state = debugfs_create_file("locking_state",
2964							 S_IFREG|S_IRUSR,
2965							 osb->osb_debug_root,
2966							 osb,
2967							 &ocfs2_dlm_debug_fops);
2968	if (!dlm_debug->d_locking_state) {
2969		ret = -EINVAL;
2970		mlog(ML_ERROR,
2971		     "Unable to create locking state debugfs file.\n");
2972		goto out;
2973	}
2974
2975	ocfs2_get_dlm_debug(dlm_debug);
2976out:
2977	return ret;
2978}
2979
2980static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
2981{
2982	struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2983
2984	if (dlm_debug) {
2985		debugfs_remove(dlm_debug->d_locking_state);
2986		ocfs2_put_dlm_debug(dlm_debug);
2987	}
2988}
2989
2990int ocfs2_dlm_init(struct ocfs2_super *osb)
2991{
2992	int status = 0;
2993	struct ocfs2_cluster_connection *conn = NULL;
2994
2995	if (ocfs2_mount_local(osb)) {
2996		osb->node_num = 0;
2997		goto local;
2998	}
2999
3000	status = ocfs2_dlm_init_debug(osb);
3001	if (status < 0) {
3002		mlog_errno(status);
3003		goto bail;
3004	}
3005
3006	/* launch downconvert thread */
3007	osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc");
3008	if (IS_ERR(osb->dc_task)) {
3009		status = PTR_ERR(osb->dc_task);
3010		osb->dc_task = NULL;
3011		mlog_errno(status);
3012		goto bail;
3013	}
3014
3015	/* for now, uuid == domain */
3016	status = ocfs2_cluster_connect(osb->osb_cluster_stack,
3017				       osb->osb_cluster_name,
3018				       strlen(osb->osb_cluster_name),
3019				       osb->uuid_str,
3020				       strlen(osb->uuid_str),
3021				       &lproto, ocfs2_do_node_down, osb,
3022				       &conn);
3023	if (status) {
3024		mlog_errno(status);
3025		goto bail;
3026	}
3027
3028	status = ocfs2_cluster_this_node(conn, &osb->node_num);
3029	if (status < 0) {
3030		mlog_errno(status);
3031		mlog(ML_ERROR,
3032		     "could not find this host's node number\n");
3033		ocfs2_cluster_disconnect(conn, 0);
3034		goto bail;
3035	}
3036
3037local:
3038	ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
3039	ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
3040	ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb);
3041	ocfs2_orphan_scan_lock_res_init(&osb->osb_orphan_scan.os_lockres, osb);
3042
3043	osb->cconn = conn;
3044
3045	status = 0;
3046bail:
3047	if (status < 0) {
3048		ocfs2_dlm_shutdown_debug(osb);
3049		if (osb->dc_task)
3050			kthread_stop(osb->dc_task);
3051	}
3052
3053	return status;
3054}
3055
3056void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
3057			int hangup_pending)
3058{
3059	ocfs2_drop_osb_locks(osb);
3060
3061	/*
3062	 * Now that we have dropped all locks and ocfs2_dismount_volume()
3063	 * has disabled recovery, the DLM won't be talking to us.  It's
3064	 * safe to tear things down before disconnecting the cluster.
3065	 */
3066
3067	if (osb->dc_task) {
3068		kthread_stop(osb->dc_task);
3069		osb->dc_task = NULL;
3070	}
3071
3072	ocfs2_lock_res_free(&osb->osb_super_lockres);
3073	ocfs2_lock_res_free(&osb->osb_rename_lockres);
3074	ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres);
3075	ocfs2_lock_res_free(&osb->osb_orphan_scan.os_lockres);
3076
3077	ocfs2_cluster_disconnect(osb->cconn, hangup_pending);
3078	osb->cconn = NULL;
3079
3080	ocfs2_dlm_shutdown_debug(osb);
3081}
3082
3083static int ocfs2_drop_lock(struct ocfs2_super *osb,
3084			   struct ocfs2_lock_res *lockres)
3085{
3086	int ret;
3087	unsigned long flags;
3088	u32 lkm_flags = 0;
3089
3090	/* We didn't get anywhere near actually using this lockres. */
3091	if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
3092		goto out;
3093
3094	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
3095		lkm_flags |= DLM_LKF_VALBLK;
3096
3097	spin_lock_irqsave(&lockres->l_lock, flags);
3098
3099	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
3100			"lockres %s, flags 0x%lx\n",
3101			lockres->l_name, lockres->l_flags);
3102
3103	while (lockres->l_flags & OCFS2_LOCK_BUSY) {
3104		mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
3105		     "%u, unlock_action = %u\n",
3106		     lockres->l_name, lockres->l_flags, lockres->l_action,
3107		     lockres->l_unlock_action);
3108
3109		spin_unlock_irqrestore(&lockres->l_lock, flags);
3110
3111		/* XXX: Today we just wait on any busy
3112		 * locks... Perhaps we need to cancel converts in the
3113		 * future? */
3114		ocfs2_wait_on_busy_lock(lockres);
3115
3116		spin_lock_irqsave(&lockres->l_lock, flags);
3117	}
3118
3119	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
3120		if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
3121		    lockres->l_level == DLM_LOCK_EX &&
3122		    !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
3123			lockres->l_ops->set_lvb(lockres);
3124	}
3125
3126	if (lockres->l_flags & OCFS2_LOCK_BUSY)
3127		mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
3128		     lockres->l_name);
3129	if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
3130		mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
3131
3132	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
3133		spin_unlock_irqrestore(&lockres->l_lock, flags);
3134		goto out;
3135	}
3136
3137	lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
3138
3139	/* make sure we never get here while waiting for an ast to
3140	 * fire. */
3141	BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
3142
3143	/* is this necessary? */
3144	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
3145	lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
3146	spin_unlock_irqrestore(&lockres->l_lock, flags);
3147
3148	mlog(0, "lock %s\n", lockres->l_name);
3149
3150	ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags);
3151	if (ret) {
3152		ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
3153		mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
3154		ocfs2_dlm_dump_lksb(&lockres->l_lksb);
3155		BUG();
3156	}
3157	mlog(0, "lock %s, successful return from ocfs2_dlm_unlock\n",
3158	     lockres->l_name);
3159
3160	ocfs2_wait_on_busy_lock(lockres);
3161out:
3162	return 0;
3163}
3164
3165static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
3166				       struct ocfs2_lock_res *lockres);
3167
3168/* Mark the lockres as being dropped. It will no longer be
3169 * queued if blocking, but we still may have to wait on it
3170 * being dequeued from the downconvert thread before we can consider
3171 * it safe to drop.
3172 *
3173 * You can *not* attempt to call cluster_lock on this lockres anymore. */
3174void ocfs2_mark_lockres_freeing(struct ocfs2_super *osb,
3175				struct ocfs2_lock_res *lockres)
3176{
3177	int status;
3178	struct ocfs2_mask_waiter mw;
3179	unsigned long flags, flags2;
3180
3181	ocfs2_init_mask_waiter(&mw);
3182
3183	spin_lock_irqsave(&lockres->l_lock, flags);
3184	lockres->l_flags |= OCFS2_LOCK_FREEING;
3185	if (lockres->l_flags & OCFS2_LOCK_QUEUED && current == osb->dc_task) {
3186		/*
3187		 * We know the downconvert is queued but not in progress
3188		 * because we are the downconvert thread and processing
3189		 * different lock. So we can just remove the lock from the
3190		 * queue. This is not only an optimization but also a way
3191		 * to avoid the following deadlock:
3192		 *   ocfs2_dentry_post_unlock()
3193		 *     ocfs2_dentry_lock_put()
3194		 *       ocfs2_drop_dentry_lock()
3195		 *         iput()
3196		 *           ocfs2_evict_inode()
3197		 *             ocfs2_clear_inode()
3198		 *               ocfs2_mark_lockres_freeing()
3199		 *                 ... blocks waiting for OCFS2_LOCK_QUEUED
3200		 *                 since we are the downconvert thread which
3201		 *                 should clear the flag.
3202		 */
3203		spin_unlock_irqrestore(&lockres->l_lock, flags);
3204		spin_lock_irqsave(&osb->dc_task_lock, flags2);
3205		list_del_init(&lockres->l_blocked_list);
3206		osb->blocked_lock_count--;
3207		spin_unlock_irqrestore(&osb->dc_task_lock, flags2);
3208		/*
3209		 * Warn if we recurse into another post_unlock call.  Strictly
3210		 * speaking it isn't a problem but we need to be careful if
3211		 * that happens (stack overflow, deadlocks, ...) so warn if
3212		 * ocfs2 grows a path for which this can happen.
3213		 */
3214		WARN_ON_ONCE(lockres->l_ops->post_unlock);
3215		/* Since the lock is freeing we don't do much in the fn below */
3216		ocfs2_process_blocked_lock(osb, lockres);
3217		return;
3218	}
3219	while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
3220		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
3221		spin_unlock_irqrestore(&lockres->l_lock, flags);
3222
3223		mlog(0, "Waiting on lockres %s\n", lockres->l_name);
3224
3225		status = ocfs2_wait_for_mask(&mw);
3226		if (status)
3227			mlog_errno(status);
3228
3229		spin_lock_irqsave(&lockres->l_lock, flags);
3230	}
3231	spin_unlock_irqrestore(&lockres->l_lock, flags);
3232}
3233
3234void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
3235			       struct ocfs2_lock_res *lockres)
3236{
3237	int ret;
3238
3239	ocfs2_mark_lockres_freeing(osb, lockres);
3240	ret = ocfs2_drop_lock(osb, lockres);
3241	if (ret)
3242		mlog_errno(ret);
3243}
3244
3245static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
3246{
3247	ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
3248	ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
3249	ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres);
3250	ocfs2_simple_drop_lockres(osb, &osb->osb_orphan_scan.os_lockres);
3251}
3252
3253int ocfs2_drop_inode_locks(struct inode *inode)
3254{
3255	int status, err;
3256
3257	/* No need to call ocfs2_mark_lockres_freeing here -
3258	 * ocfs2_clear_inode has done it for us. */
3259
3260	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3261			      &OCFS2_I(inode)->ip_open_lockres);
3262	if (err < 0)
3263		mlog_errno(err);
3264
3265	status = err;
3266
3267	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3268			      &OCFS2_I(inode)->ip_inode_lockres);
3269	if (err < 0)
3270		mlog_errno(err);
3271	if (err < 0 && !status)
3272		status = err;
3273
3274	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3275			      &OCFS2_I(inode)->ip_rw_lockres);
3276	if (err < 0)
3277		mlog_errno(err);
3278	if (err < 0 && !status)
3279		status = err;
3280
3281	return status;
3282}
3283
3284static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
3285					      int new_level)
3286{
3287	assert_spin_locked(&lockres->l_lock);
3288
3289	BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
3290
3291	if (lockres->l_level <= new_level) {
3292		mlog(ML_ERROR, "lockres %s, lvl %d <= %d, blcklst %d, mask %d, "
3293		     "type %d, flags 0x%lx, hold %d %d, act %d %d, req %d, "
3294		     "block %d, pgen %d\n", lockres->l_name, lockres->l_level,
3295		     new_level, list_empty(&lockres->l_blocked_list),
3296		     list_empty(&lockres->l_mask_waiters), lockres->l_type,
3297		     lockres->l_flags, lockres->l_ro_holders,
3298		     lockres->l_ex_holders, lockres->l_action,
3299		     lockres->l_unlock_action, lockres->l_requested,
3300		     lockres->l_blocking, lockres->l_pending_gen);
3301		BUG();
3302	}
3303
3304	mlog(ML_BASTS, "lockres %s, level %d => %d, blocking %d\n",
3305	     lockres->l_name, lockres->l_level, new_level, lockres->l_blocking);
3306
3307	lockres->l_action = OCFS2_AST_DOWNCONVERT;
3308	lockres->l_requested = new_level;
3309	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
3310	return lockres_set_pending(lockres);
3311}
3312
3313static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
3314				  struct ocfs2_lock_res *lockres,
3315				  int new_level,
3316				  int lvb,
3317				  unsigned int generation)
3318{
3319	int ret;
3320	u32 dlm_flags = DLM_LKF_CONVERT;
3321
3322	mlog(ML_BASTS, "lockres %s, level %d => %d\n", lockres->l_name,
3323	     lockres->l_level, new_level);
3324
3325	if (lvb)
3326		dlm_flags |= DLM_LKF_VALBLK;
3327
3328	ret = ocfs2_dlm_lock(osb->cconn,
3329			     new_level,
3330			     &lockres->l_lksb,
3331			     dlm_flags,
3332			     lockres->l_name,
3333			     OCFS2_LOCK_ID_MAX_LEN - 1);
3334	lockres_clear_pending(lockres, generation, osb);
3335	if (ret) {
3336		ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
3337		ocfs2_recover_from_dlm_error(lockres, 1);
3338		goto bail;
3339	}
3340
3341	ret = 0;
3342bail:
3343	return ret;
3344}
3345
3346/* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */
3347static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
3348				        struct ocfs2_lock_res *lockres)
3349{
3350	assert_spin_locked(&lockres->l_lock);
3351
3352	if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
3353		/* If we're already trying to cancel a lock conversion
3354		 * then just drop the spinlock and allow the caller to
3355		 * requeue this lock. */
3356		mlog(ML_BASTS, "lockres %s, skip convert\n", lockres->l_name);
3357		return 0;
3358	}
3359
3360	/* were we in a convert when we got the bast fire? */
3361	BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
3362	       lockres->l_action != OCFS2_AST_DOWNCONVERT);
3363	/* set things up for the unlockast to know to just
3364	 * clear out the ast_action and unset busy, etc. */
3365	lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
3366
3367	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
3368			"lock %s, invalid flags: 0x%lx\n",
3369			lockres->l_name, lockres->l_flags);
3370
3371	mlog(ML_BASTS, "lockres %s\n", lockres->l_name);
3372
3373	return 1;
3374}
3375
3376static int ocfs2_cancel_convert(struct ocfs2_super *osb,
3377				struct ocfs2_lock_res *lockres)
3378{
3379	int ret;
3380
3381	ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb,
3382			       DLM_LKF_CANCEL);
3383	if (ret) {
3384		ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
3385		ocfs2_recover_from_dlm_error(lockres, 0);
3386	}
3387
3388	mlog(ML_BASTS, "lockres %s\n", lockres->l_name);
3389
3390	return ret;
3391}
3392
3393static int ocfs2_unblock_lock(struct ocfs2_super *osb,
3394			      struct ocfs2_lock_res *lockres,
3395			      struct ocfs2_unblock_ctl *ctl)
3396{
3397	unsigned long flags;
3398	int blocking;
3399	int new_level;
3400	int level;
3401	int ret = 0;
3402	int set_lvb = 0;
3403	unsigned int gen;
3404
3405	spin_lock_irqsave(&lockres->l_lock, flags);
3406
3407recheck:
3408	/*
3409	 * Is it still blocking? If not, we have no more work to do.
3410	 */
3411	if (!(lockres->l_flags & OCFS2_LOCK_BLOCKED)) {
3412		BUG_ON(lockres->l_blocking != DLM_LOCK_NL);
3413		spin_unlock_irqrestore(&lockres->l_lock, flags);
3414		ret = 0;
3415		goto leave;
3416	}
3417
3418	if (lockres->l_flags & OCFS2_LOCK_BUSY) {
3419		/* XXX
3420		 * This is a *big* race.  The OCFS2_LOCK_PENDING flag
3421		 * exists entirely for one reason - another thread has set
3422		 * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock().
3423		 *
3424		 * If we do ocfs2_cancel_convert() before the other thread
3425		 * calls dlm_lock(), our cancel will do nothing.  We will
3426		 * get no ast, and we will have no way of knowing the
3427		 * cancel failed.  Meanwhile, the other thread will call
3428		 * into dlm_lock() and wait...forever.
3429		 *
3430		 * Why forever?  Because another node has asked for the
3431		 * lock first; that's why we're here in unblock_lock().
3432		 *
3433		 * The solution is OCFS2_LOCK_PENDING.  When PENDING is
3434		 * set, we just requeue the unblock.  Only when the other
3435		 * thread has called dlm_lock() and cleared PENDING will
3436		 * we then cancel their request.
3437		 *
3438		 * All callers of dlm_lock() must set OCFS2_DLM_PENDING
3439		 * at the same time they set OCFS2_DLM_BUSY.  They must
3440		 * clear OCFS2_DLM_PENDING after dlm_lock() returns.
3441		 */
3442		if (lockres->l_flags & OCFS2_LOCK_PENDING) {
3443			mlog(ML_BASTS, "lockres %s, ReQ: Pending\n",
3444			     lockres->l_name);
3445			goto leave_requeue;
3446		}
3447
3448		ctl->requeue = 1;
3449		ret = ocfs2_prepare_cancel_convert(osb, lockres);
3450		spin_unlock_irqrestore(&lockres->l_lock, flags);
3451		if (ret) {
3452			ret = ocfs2_cancel_convert(osb, lockres);
3453			if (ret < 0)
3454				mlog_errno(ret);
3455		}
3456		goto leave;
3457	}
3458
3459	/*
3460	 * This prevents livelocks. OCFS2_LOCK_UPCONVERT_FINISHING flag is
3461	 * set when the ast is received for an upconvert just before the
3462	 * OCFS2_LOCK_BUSY flag is cleared. Now if the fs received a bast
3463	 * on the heels of the ast, we want to delay the downconvert just
3464	 * enough to allow the up requestor to do its task. Because this
3465	 * lock is in the blocked queue, the lock will be downconverted
3466	 * as soon as the requestor is done with the lock.
3467	 */
3468	if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING)
3469		goto leave_requeue;
3470
3471	/*
3472	 * How can we block and yet be at NL?  We were trying to upconvert
3473	 * from NL and got canceled.  The code comes back here, and now
3474	 * we notice and clear BLOCKING.
3475	 */
3476	if (lockres->l_level == DLM_LOCK_NL) {
3477		BUG_ON(lockres->l_ex_holders || lockres->l_ro_holders);
3478		mlog(ML_BASTS, "lockres %s, Aborting dc\n", lockres->l_name);
3479		lockres->l_blocking = DLM_LOCK_NL;
3480		lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
3481		spin_unlock_irqrestore(&lockres->l_lock, flags);
3482		goto leave;
3483	}
3484
3485	/* if we're blocking an exclusive and we have *any* holders,
3486	 * then requeue. */
3487	if ((lockres->l_blocking == DLM_LOCK_EX)
3488	    && (lockres->l_ex_holders || lockres->l_ro_holders)) {
3489		mlog(ML_BASTS, "lockres %s, ReQ: EX/PR Holders %u,%u\n",
3490		     lockres->l_name, lockres->l_ex_holders,
3491		     lockres->l_ro_holders);
3492		goto leave_requeue;
3493	}
3494
3495	/* If it's a PR we're blocking, then only
3496	 * requeue if we've got any EX holders */
3497	if (lockres->l_blocking == DLM_LOCK_PR &&
3498	    lockres->l_ex_holders) {
3499		mlog(ML_BASTS, "lockres %s, ReQ: EX Holders %u\n",
3500		     lockres->l_name, lockres->l_ex_holders);
3501		goto leave_requeue;
3502	}
3503
3504	/*
3505	 * Can we get a lock in this state if the holder counts are
3506	 * zero? The meta data unblock code used to check this.
3507	 */
3508	if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
3509	    && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) {
3510		mlog(ML_BASTS, "lockres %s, ReQ: Lock Refreshing\n",
3511		     lockres->l_name);
3512		goto leave_requeue;
3513	}
3514
3515	new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
3516
3517	if (lockres->l_ops->check_downconvert
3518	    && !lockres->l_ops->check_downconvert(lockres, new_level)) {
3519		mlog(ML_BASTS, "lockres %s, ReQ: Checkpointing\n",
3520		     lockres->l_name);
3521		goto leave_requeue;
3522	}
3523
3524	/* If we get here, then we know that there are no more
3525	 * incompatible holders (and anyone asking for an incompatible
3526	 * lock is blocked). We can now downconvert the lock */
3527	if (!lockres->l_ops->downconvert_worker)
3528		goto downconvert;
3529
3530	/* Some lockres types want to do a bit of work before
3531	 * downconverting a lock. Allow that here. The worker function
3532	 * may sleep, so we save off a copy of what we're blocking as
3533	 * it may change while we're not holding the spin lock. */
3534	blocking = lockres->l_blocking;
3535	level = lockres->l_level;
3536	spin_unlock_irqrestore(&lockres->l_lock, flags);
3537
3538	ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
3539
3540	if (ctl->unblock_action == UNBLOCK_STOP_POST) {
3541		mlog(ML_BASTS, "lockres %s, UNBLOCK_STOP_POST\n",
3542		     lockres->l_name);
3543		goto leave;
3544	}
3545
3546	spin_lock_irqsave(&lockres->l_lock, flags);
3547	if ((blocking != lockres->l_blocking) || (level != lockres->l_level)) {
3548		/* If this changed underneath us, then we can't drop
3549		 * it just yet. */
3550		mlog(ML_BASTS, "lockres %s, block=%d:%d, level=%d:%d, "
3551		     "Recheck\n", lockres->l_name, blocking,
3552		     lockres->l_blocking, level, lockres->l_level);
3553		goto recheck;
3554	}
3555
3556downconvert:
3557	ctl->requeue = 0;
3558
3559	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
3560		if (lockres->l_level == DLM_LOCK_EX)
3561			set_lvb = 1;
3562
3563		/*
3564		 * We only set the lvb if the lock has been fully
3565		 * refreshed - otherwise we risk setting stale
3566		 * data. Otherwise, there's no need to actually clear
3567		 * out the lvb here as it's value is still valid.
3568		 */
3569		if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
3570			lockres->l_ops->set_lvb(lockres);
3571	}
3572
3573	gen = ocfs2_prepare_downconvert(lockres, new_level);
3574	spin_unlock_irqrestore(&lockres->l_lock, flags);
3575	ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb,
3576				     gen);
3577
3578leave:
3579	if (ret)
3580		mlog_errno(ret);
3581	return ret;
3582
3583leave_requeue:
3584	spin_unlock_irqrestore(&lockres->l_lock, flags);
3585	ctl->requeue = 1;
3586
3587	return 0;
3588}
3589
3590static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
3591				     int blocking)
3592{
3593	struct inode *inode;
3594	struct address_space *mapping;
3595	struct ocfs2_inode_info *oi;
3596
3597       	inode = ocfs2_lock_res_inode(lockres);
3598	mapping = inode->i_mapping;
3599
3600	if (S_ISDIR(inode->i_mode)) {
3601		oi = OCFS2_I(inode);
3602		oi->ip_dir_lock_gen++;
3603		mlog(0, "generation: %u\n", oi->ip_dir_lock_gen);
3604		goto out;
3605	}
3606
3607	if (!S_ISREG(inode->i_mode))
3608		goto out;
3609
3610	/*
3611	 * We need this before the filemap_fdatawrite() so that it can
3612	 * transfer the dirty bit from the PTE to the
3613	 * page. Unfortunately this means that even for EX->PR
3614	 * downconverts, we'll lose our mappings and have to build
3615	 * them up again.
3616	 */
3617	unmap_mapping_range(mapping, 0, 0, 0);
3618
3619	if (filemap_fdatawrite(mapping)) {
3620		mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
3621		     (unsigned long long)OCFS2_I(inode)->ip_blkno);
3622	}
3623	sync_mapping_buffers(mapping);
3624	if (blocking == DLM_LOCK_EX) {
3625		truncate_inode_pages(mapping, 0);
3626	} else {
3627		/* We only need to wait on the I/O if we're not also
3628		 * truncating pages because truncate_inode_pages waits
3629		 * for us above. We don't truncate pages if we're
3630		 * blocking anything < EXMODE because we want to keep
3631		 * them around in that case. */
3632		filemap_fdatawait(mapping);
3633	}
3634
3635out:
3636	return UNBLOCK_CONTINUE;
3637}
3638
3639static int ocfs2_ci_checkpointed(struct ocfs2_caching_info *ci,
3640				 struct ocfs2_lock_res *lockres,
3641				 int new_level)
3642{
3643	int checkpointed = ocfs2_ci_fully_checkpointed(ci);
3644
3645	BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR);
3646	BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed);
3647
3648	if (checkpointed)
3649		return 1;
3650
3651	ocfs2_start_checkpoint(OCFS2_SB(ocfs2_metadata_cache_get_super(ci)));
3652	return 0;
3653}
3654
3655static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
3656					int new_level)
3657{
3658	struct inode *inode = ocfs2_lock_res_inode(lockres);
3659
3660	return ocfs2_ci_checkpointed(INODE_CACHE(inode), lockres, new_level);
3661}
3662
3663static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
3664{
3665	struct inode *inode = ocfs2_lock_res_inode(lockres);
3666
3667	__ocfs2_stuff_meta_lvb(inode);
3668}
3669
3670/*
3671 * Does the final reference drop on our dentry lock. Right now this
3672 * happens in the downconvert thread, but we could choose to simplify the
3673 * dlmglue API and push these off to the ocfs2_wq in the future.
3674 */
3675static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
3676				     struct ocfs2_lock_res *lockres)
3677{
3678	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
3679	ocfs2_dentry_lock_put(osb, dl);
3680}
3681
3682/*
3683 * d_delete() matching dentries before the lock downconvert.
3684 *
3685 * At this point, any process waiting to destroy the
3686 * dentry_lock due to last ref count is stopped by the
3687 * OCFS2_LOCK_QUEUED flag.
3688 *
3689 * We have two potential problems
3690 *
3691 * 1) If we do the last reference drop on our dentry_lock (via dput)
3692 *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
3693 *    the downconvert to finish. Instead we take an elevated
3694 *    reference and push the drop until after we've completed our
3695 *    unblock processing.
3696 *
3697 * 2) There might be another process with a final reference,
3698 *    waiting on us to finish processing. If this is the case, we
3699 *    detect it and exit out - there's no more dentries anyway.
3700 */
3701static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
3702				       int blocking)
3703{
3704	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
3705	struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
3706	struct dentry *dentry;
3707	unsigned long flags;
3708	int extra_ref = 0;
3709
3710	/*
3711	 * This node is blocking another node from getting a read
3712	 * lock. This happens when we've renamed within a
3713	 * directory. We've forced the other nodes to d_delete(), but
3714	 * we never actually dropped our lock because it's still
3715	 * valid. The downconvert code will retain a PR for this node,
3716	 * so there's no further work to do.
3717	 */
3718	if (blocking == DLM_LOCK_PR)
3719		return UNBLOCK_CONTINUE;
3720
3721	/*
3722	 * Mark this inode as potentially orphaned. The code in
3723	 * ocfs2_delete_inode() will figure out whether it actually
3724	 * needs to be freed or not.
3725	 */
3726	spin_lock(&oi->ip_lock);
3727	oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
3728	spin_unlock(&oi->ip_lock);
3729
3730	/*
3731	 * Yuck. We need to make sure however that the check of
3732	 * OCFS2_LOCK_FREEING and the extra reference are atomic with
3733	 * respect to a reference decrement or the setting of that
3734	 * flag.
3735	 */
3736	spin_lock_irqsave(&lockres->l_lock, flags);
3737	spin_lock(&dentry_attach_lock);
3738	if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
3739	    && dl->dl_count) {
3740		dl->dl_count++;
3741		extra_ref = 1;
3742	}
3743	spin_unlock(&dentry_attach_lock);
3744	spin_unlock_irqrestore(&lockres->l_lock, flags);
3745
3746	mlog(0, "extra_ref = %d\n", extra_ref);
3747
3748	/*
3749	 * We have a process waiting on us in ocfs2_dentry_iput(),
3750	 * which means we can't have any more outstanding
3751	 * aliases. There's no need to do any more work.
3752	 */
3753	if (!extra_ref)
3754		return UNBLOCK_CONTINUE;
3755
3756	spin_lock(&dentry_attach_lock);
3757	while (1) {
3758		dentry = ocfs2_find_local_alias(dl->dl_inode,
3759						dl->dl_parent_blkno, 1);
3760		if (!dentry)
3761			break;
3762		spin_unlock(&dentry_attach_lock);
3763
3764		if (S_ISDIR(dl->dl_inode->i_mode))
3765			shrink_dcache_parent(dentry);
3766
3767		mlog(0, "d_delete(%pd);\n", dentry);
3768
3769		/*
3770		 * The following dcache calls may do an
3771		 * iput(). Normally we don't want that from the
3772		 * downconverting thread, but in this case it's ok
3773		 * because the requesting node already has an
3774		 * exclusive lock on the inode, so it can't be queued
3775		 * for a downconvert.
3776		 */
3777		d_delete(dentry);
3778		dput(dentry);
3779
3780		spin_lock(&dentry_attach_lock);
3781	}
3782	spin_unlock(&dentry_attach_lock);
3783
3784	/*
3785	 * If we are the last holder of this dentry lock, there is no
3786	 * reason to downconvert so skip straight to the unlock.
3787	 */
3788	if (dl->dl_count == 1)
3789		return UNBLOCK_STOP_POST;
3790
3791	return UNBLOCK_CONTINUE_POST;
3792}
3793
3794static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres,
3795					    int new_level)
3796{
3797	struct ocfs2_refcount_tree *tree =
3798				ocfs2_lock_res_refcount_tree(lockres);
3799
3800	return ocfs2_ci_checkpointed(&tree->rf_ci, lockres, new_level);
3801}
3802
3803static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres,
3804					 int blocking)
3805{
3806	struct ocfs2_refcount_tree *tree =
3807				ocfs2_lock_res_refcount_tree(lockres);
3808
3809	ocfs2_metadata_cache_purge(&tree->rf_ci);
3810
3811	return UNBLOCK_CONTINUE;
3812}
3813
3814static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres)
3815{
3816	struct ocfs2_qinfo_lvb *lvb;
3817	struct ocfs2_mem_dqinfo *oinfo = ocfs2_lock_res_qinfo(lockres);
3818	struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb,
3819					    oinfo->dqi_gi.dqi_type);
3820
3821	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
3822	lvb->lvb_version = OCFS2_QINFO_LVB_VERSION;
3823	lvb->lvb_bgrace = cpu_to_be32(info->dqi_bgrace);
3824	lvb->lvb_igrace = cpu_to_be32(info->dqi_igrace);
3825	lvb->lvb_syncms = cpu_to_be32(oinfo->dqi_syncms);
3826	lvb->lvb_blocks = cpu_to_be32(oinfo->dqi_gi.dqi_blocks);
3827	lvb->lvb_free_blk = cpu_to_be32(oinfo->dqi_gi.dqi_free_blk);
3828	lvb->lvb_free_entry = cpu_to_be32(oinfo->dqi_gi.dqi_free_entry);
3829}
3830
3831void ocfs2_qinfo_unlock(struct ocfs2_mem_dqinfo *oinfo, int ex)
3832{
3833	struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
3834	struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb);
3835	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
3836
3837	if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb))
3838		ocfs2_cluster_unlock(osb, lockres, level);
3839}
3840
3841static int ocfs2_refresh_qinfo(struct ocfs2_mem_dqinfo *oinfo)
3842{
3843	struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb,
3844					    oinfo->dqi_gi.dqi_type);
3845	struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
3846	struct ocfs2_qinfo_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
3847	struct buffer_head *bh = NULL;
3848	struct ocfs2_global_disk_dqinfo *gdinfo;
3849	int status = 0;
3850
3851	if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
3852	    lvb->lvb_version == OCFS2_QINFO_LVB_VERSION) {
3853		info->dqi_bgrace = be32_to_cpu(lvb->lvb_bgrace);
3854		info->dqi_igrace = be32_to_cpu(lvb->lvb_igrace);
3855		oinfo->dqi_syncms = be32_to_cpu(lvb->lvb_syncms);
3856		oinfo->dqi_gi.dqi_blocks = be32_to_cpu(lvb->lvb_blocks);
3857		oinfo->dqi_gi.dqi_free_blk = be32_to_cpu(lvb->lvb_free_blk);
3858		oinfo->dqi_gi.dqi_free_entry =
3859					be32_to_cpu(lvb->lvb_free_entry);
3860	} else {
3861		status = ocfs2_read_quota_phys_block(oinfo->dqi_gqinode,
3862						     oinfo->dqi_giblk, &bh);
3863		if (status) {
3864			mlog_errno(status);
3865			goto bail;
3866		}
3867		gdinfo = (struct ocfs2_global_disk_dqinfo *)
3868					(bh->b_data + OCFS2_GLOBAL_INFO_OFF);
3869		info->dqi_bgrace = le32_to_cpu(gdinfo->dqi_bgrace);
3870		info->dqi_igrace = le32_to_cpu(gdinfo->dqi_igrace);
3871		oinfo->dqi_syncms = le32_to_cpu(gdinfo->dqi_syncms);
3872		oinfo->dqi_gi.dqi_blocks = le32_to_cpu(gdinfo->dqi_blocks);
3873		oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(gdinfo->dqi_free_blk);
3874		oinfo->dqi_gi.dqi_free_entry =
3875					le32_to_cpu(gdinfo->dqi_free_entry);
3876		brelse(bh);
3877		ocfs2_track_lock_refresh(lockres);
3878	}
3879
3880bail:
3881	return status;
3882}
3883
3884/* Lock quota info, this function expects at least shared lock on the quota file
3885 * so that we can safely refresh quota info from disk. */
3886int ocfs2_qinfo_lock(struct ocfs2_mem_dqinfo *oinfo, int ex)
3887{
3888	struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
3889	struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb);
3890	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
3891	int status = 0;
3892
3893	/* On RO devices, locking really isn't needed... */
3894	if (ocfs2_is_hard_readonly(osb)) {
3895		if (ex)
3896			status = -EROFS;
3897		goto bail;
3898	}
3899	if (ocfs2_mount_local(osb))
3900		goto bail;
3901
3902	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
3903	if (status < 0) {
3904		mlog_errno(status);
3905		goto bail;
3906	}
3907	if (!ocfs2_should_refresh_lock_res(lockres))
3908		goto bail;
3909	/* OK, we have the lock but we need to refresh the quota info */
3910	status = ocfs2_refresh_qinfo(oinfo);
3911	if (status)
3912		ocfs2_qinfo_unlock(oinfo, ex);
3913	ocfs2_complete_lock_res_refresh(lockres, status);
3914bail:
3915	return status;
3916}
3917
3918int ocfs2_refcount_lock(struct ocfs2_refcount_tree *ref_tree, int ex)
3919{
3920	int status;
3921	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
3922	struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres;
3923	struct ocfs2_super *osb = lockres->l_priv;
3924
3925
3926	if (ocfs2_is_hard_readonly(osb))
3927		return -EROFS;
3928
3929	if (ocfs2_mount_local(osb))
3930		return 0;
3931
3932	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
3933	if (status < 0)
3934		mlog_errno(status);
3935
3936	return status;
3937}
3938
3939void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex)
3940{
3941	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
3942	struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres;
3943	struct ocfs2_super *osb = lockres->l_priv;
3944
3945	if (!ocfs2_mount_local(osb))
3946		ocfs2_cluster_unlock(osb, lockres, level);
3947}
3948
3949static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
3950				       struct ocfs2_lock_res *lockres)
3951{
3952	int status;
3953	struct ocfs2_unblock_ctl ctl = {0, 0,};
3954	unsigned long flags;
3955
3956	/* Our reference to the lockres in this function can be
3957	 * considered valid until we remove the OCFS2_LOCK_QUEUED
3958	 * flag. */
3959
3960	BUG_ON(!lockres);
3961	BUG_ON(!lockres->l_ops);
3962
3963	mlog(ML_BASTS, "lockres %s blocked\n", lockres->l_name);
3964
3965	/* Detect whether a lock has been marked as going away while
3966	 * the downconvert thread was processing other things. A lock can
3967	 * still be marked with OCFS2_LOCK_FREEING after this check,
3968	 * but short circuiting here will still save us some
3969	 * performance. */
3970	spin_lock_irqsave(&lockres->l_lock, flags);
3971	if (lockres->l_flags & OCFS2_LOCK_FREEING)
3972		goto unqueue;
3973	spin_unlock_irqrestore(&lockres->l_lock, flags);
3974
3975	status = ocfs2_unblock_lock(osb, lockres, &ctl);
3976	if (status < 0)
3977		mlog_errno(status);
3978
3979	spin_lock_irqsave(&lockres->l_lock, flags);
3980unqueue:
3981	if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
3982		lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
3983	} else
3984		ocfs2_schedule_blocked_lock(osb, lockres);
3985
3986	mlog(ML_BASTS, "lockres %s, requeue = %s.\n", lockres->l_name,
3987	     ctl.requeue ? "yes" : "no");
3988	spin_unlock_irqrestore(&lockres->l_lock, flags);
3989
3990	if (ctl.unblock_action != UNBLOCK_CONTINUE
3991	    && lockres->l_ops->post_unlock)
3992		lockres->l_ops->post_unlock(osb, lockres);
3993}
3994
3995static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
3996					struct ocfs2_lock_res *lockres)
3997{
3998	unsigned long flags;
3999
4000	assert_spin_locked(&lockres->l_lock);
4001
4002	if (lockres->l_flags & OCFS2_LOCK_FREEING) {
4003		/* Do not schedule a lock for downconvert when it's on
4004		 * the way to destruction - any nodes wanting access
4005		 * to the resource will get it soon. */
4006		mlog(ML_BASTS, "lockres %s won't be scheduled: flags 0x%lx\n",
4007		     lockres->l_name, lockres->l_flags);
4008		return;
4009	}
4010
4011	lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
4012
4013	spin_lock_irqsave(&osb->dc_task_lock, flags);
4014	if (list_empty(&lockres->l_blocked_list)) {
4015		list_add_tail(&lockres->l_blocked_list,
4016			      &osb->blocked_lock_list);
4017		osb->blocked_lock_count++;
4018	}
4019	spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4020}
4021
4022static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb)
4023{
4024	unsigned long processed;
4025	unsigned long flags;
4026	struct ocfs2_lock_res *lockres;
4027
4028	spin_lock_irqsave(&osb->dc_task_lock, flags);
4029	/* grab this early so we know to try again if a state change and
4030	 * wake happens part-way through our work  */
4031	osb->dc_work_sequence = osb->dc_wake_sequence;
4032
4033	processed = osb->blocked_lock_count;
4034	/*
4035	 * blocked lock processing in this loop might call iput which can
4036	 * remove items off osb->blocked_lock_list. Downconvert up to
4037	 * 'processed' number of locks, but stop short if we had some
4038	 * removed in ocfs2_mark_lockres_freeing when downconverting.
4039	 */
4040	while (processed && !list_empty(&osb->blocked_lock_list)) {
4041		lockres = list_entry(osb->blocked_lock_list.next,
4042				     struct ocfs2_lock_res, l_blocked_list);
4043		list_del_init(&lockres->l_blocked_list);
4044		osb->blocked_lock_count--;
4045		spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4046
4047		BUG_ON(!processed);
4048		processed--;
4049
4050		ocfs2_process_blocked_lock(osb, lockres);
4051
4052		spin_lock_irqsave(&osb->dc_task_lock, flags);
4053	}
4054	spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4055}
4056
4057static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb)
4058{
4059	int empty = 0;
4060	unsigned long flags;
4061
4062	spin_lock_irqsave(&osb->dc_task_lock, flags);
4063	if (list_empty(&osb->blocked_lock_list))
4064		empty = 1;
4065
4066	spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4067	return empty;
4068}
4069
4070static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb)
4071{
4072	int should_wake = 0;
4073	unsigned long flags;
4074
4075	spin_lock_irqsave(&osb->dc_task_lock, flags);
4076	if (osb->dc_work_sequence != osb->dc_wake_sequence)
4077		should_wake = 1;
4078	spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4079
4080	return should_wake;
4081}
4082
4083static int ocfs2_downconvert_thread(void *arg)
4084{
4085	int status = 0;
4086	struct ocfs2_super *osb = arg;
4087
4088	/* only quit once we've been asked to stop and there is no more
4089	 * work available */
4090	while (!(kthread_should_stop() &&
4091		ocfs2_downconvert_thread_lists_empty(osb))) {
4092
4093		wait_event_interruptible(osb->dc_event,
4094					 ocfs2_downconvert_thread_should_wake(osb) ||
4095					 kthread_should_stop());
4096
4097		mlog(0, "downconvert_thread: awoken\n");
4098
4099		ocfs2_downconvert_thread_do_work(osb);
4100	}
4101
4102	osb->dc_task = NULL;
4103	return status;
4104}
4105
4106void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb)
4107{
4108	unsigned long flags;
4109
4110	spin_lock_irqsave(&osb->dc_task_lock, flags);
4111	/* make sure the voting thread gets a swipe at whatever changes
4112	 * the caller may have made to the voting state */
4113	osb->dc_wake_sequence++;
4114	spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4115	wake_up(&osb->dc_event);
4116}
4117