1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2010, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_lock.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41 
42 #define DEBUG_SUBSYSTEM S_LDLM
43 
44 #include "../../include/linux/libcfs/libcfs.h"
45 #include "../include/lustre_intent.h"
46 #include "../include/obd_class.h"
47 #include "ldlm_internal.h"
48 
49 /* lock types */
50 char *ldlm_lockname[] = {
51 	[0]		= "--",
52 	[LCK_EX]	= "EX",
53 	[LCK_PW]	= "PW",
54 	[LCK_PR]	= "PR",
55 	[LCK_CW]	= "CW",
56 	[LCK_CR]	= "CR",
57 	[LCK_NL]	= "NL",
58 	[LCK_GROUP]	= "GROUP",
59 	[LCK_COS]	= "COS",
60 };
61 EXPORT_SYMBOL(ldlm_lockname);
62 
63 char *ldlm_typename[] = {
64 	[LDLM_PLAIN]	= "PLN",
65 	[LDLM_EXTENT]	= "EXT",
66 	[LDLM_FLOCK]	= "FLK",
67 	[LDLM_IBITS]	= "IBT",
68 };
69 EXPORT_SYMBOL(ldlm_typename);
70 
71 static ldlm_policy_wire_to_local_t ldlm_policy_wire18_to_local[] = {
72 	[LDLM_PLAIN - LDLM_MIN_TYPE]	= ldlm_plain_policy_wire_to_local,
73 	[LDLM_EXTENT - LDLM_MIN_TYPE]	= ldlm_extent_policy_wire_to_local,
74 	[LDLM_FLOCK - LDLM_MIN_TYPE]	= ldlm_flock_policy_wire18_to_local,
75 	[LDLM_IBITS - LDLM_MIN_TYPE]	= ldlm_ibits_policy_wire_to_local,
76 };
77 
78 static ldlm_policy_wire_to_local_t ldlm_policy_wire21_to_local[] = {
79 	[LDLM_PLAIN - LDLM_MIN_TYPE]	= ldlm_plain_policy_wire_to_local,
80 	[LDLM_EXTENT - LDLM_MIN_TYPE]	= ldlm_extent_policy_wire_to_local,
81 	[LDLM_FLOCK - LDLM_MIN_TYPE]	= ldlm_flock_policy_wire21_to_local,
82 	[LDLM_IBITS - LDLM_MIN_TYPE]	= ldlm_ibits_policy_wire_to_local,
83 };
84 
85 static ldlm_policy_local_to_wire_t ldlm_policy_local_to_wire[] = {
86 	[LDLM_PLAIN - LDLM_MIN_TYPE]	= ldlm_plain_policy_local_to_wire,
87 	[LDLM_EXTENT - LDLM_MIN_TYPE]	= ldlm_extent_policy_local_to_wire,
88 	[LDLM_FLOCK - LDLM_MIN_TYPE]	= ldlm_flock_policy_local_to_wire,
89 	[LDLM_IBITS - LDLM_MIN_TYPE]	= ldlm_ibits_policy_local_to_wire,
90 };
91 
92 /**
93  * Converts lock policy from local format to on the wire lock_desc format
94  */
ldlm_convert_policy_to_wire(ldlm_type_t type,const ldlm_policy_data_t * lpolicy,ldlm_wire_policy_data_t * wpolicy)95 void ldlm_convert_policy_to_wire(ldlm_type_t type,
96 				 const ldlm_policy_data_t *lpolicy,
97 				 ldlm_wire_policy_data_t *wpolicy)
98 {
99 	ldlm_policy_local_to_wire_t convert;
100 
101 	convert = ldlm_policy_local_to_wire[type - LDLM_MIN_TYPE];
102 
103 	convert(lpolicy, wpolicy);
104 }
105 
106 /**
107  * Converts lock policy from on the wire lock_desc format to local format
108  */
ldlm_convert_policy_to_local(struct obd_export * exp,ldlm_type_t type,const ldlm_wire_policy_data_t * wpolicy,ldlm_policy_data_t * lpolicy)109 void ldlm_convert_policy_to_local(struct obd_export *exp, ldlm_type_t type,
110 				  const ldlm_wire_policy_data_t *wpolicy,
111 				  ldlm_policy_data_t *lpolicy)
112 {
113 	ldlm_policy_wire_to_local_t convert;
114 	int new_client;
115 
116 	/** some badness for 2.0.0 clients, but 2.0.0 isn't supported */
117 	new_client = (exp_connect_flags(exp) & OBD_CONNECT_FULL20) != 0;
118 	if (new_client)
119 		convert = ldlm_policy_wire21_to_local[type - LDLM_MIN_TYPE];
120 	else
121 		convert = ldlm_policy_wire18_to_local[type - LDLM_MIN_TYPE];
122 
123 	convert(wpolicy, lpolicy);
124 }
125 
ldlm_it2str(int it)126 char *ldlm_it2str(int it)
127 {
128 	switch (it) {
129 	case IT_OPEN:
130 		return "open";
131 	case IT_CREAT:
132 		return "creat";
133 	case (IT_OPEN | IT_CREAT):
134 		return "open|creat";
135 	case IT_READDIR:
136 		return "readdir";
137 	case IT_GETATTR:
138 		return "getattr";
139 	case IT_LOOKUP:
140 		return "lookup";
141 	case IT_UNLINK:
142 		return "unlink";
143 	case IT_GETXATTR:
144 		return "getxattr";
145 	case IT_LAYOUT:
146 		return "layout";
147 	default:
148 		CERROR("Unknown intent %d\n", it);
149 		return "UNKNOWN";
150 	}
151 }
152 EXPORT_SYMBOL(ldlm_it2str);
153 
154 
ldlm_register_intent(struct ldlm_namespace * ns,ldlm_res_policy arg)155 void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg)
156 {
157 	ns->ns_policy = arg;
158 }
159 EXPORT_SYMBOL(ldlm_register_intent);
160 
161 /*
162  * REFCOUNTED LOCK OBJECTS
163  */
164 
165 
166 /**
167  * Get a reference on a lock.
168  *
169  * Lock refcounts, during creation:
170  *   - one special one for allocation, dec'd only once in destroy
171  *   - one for being a lock that's in-use
172  *   - one for the addref associated with a new lock
173  */
ldlm_lock_get(struct ldlm_lock * lock)174 struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
175 {
176 	atomic_inc(&lock->l_refc);
177 	return lock;
178 }
179 EXPORT_SYMBOL(ldlm_lock_get);
180 
181 /**
182  * Release lock reference.
183  *
184  * Also frees the lock if it was last reference.
185  */
ldlm_lock_put(struct ldlm_lock * lock)186 void ldlm_lock_put(struct ldlm_lock *lock)
187 {
188 	LASSERT(lock->l_resource != LP_POISON);
189 	LASSERT(atomic_read(&lock->l_refc) > 0);
190 	if (atomic_dec_and_test(&lock->l_refc)) {
191 		struct ldlm_resource *res;
192 
193 		LDLM_DEBUG(lock,
194 			   "final lock_put on destroyed lock, freeing it.");
195 
196 		res = lock->l_resource;
197 		LASSERT(lock->l_flags & LDLM_FL_DESTROYED);
198 		LASSERT(list_empty(&lock->l_res_link));
199 		LASSERT(list_empty(&lock->l_pending_chain));
200 
201 		lprocfs_counter_decr(ldlm_res_to_ns(res)->ns_stats,
202 				     LDLM_NSS_LOCKS);
203 		lu_ref_del(&res->lr_reference, "lock", lock);
204 		ldlm_resource_putref(res);
205 		lock->l_resource = NULL;
206 		if (lock->l_export) {
207 			class_export_lock_put(lock->l_export, lock);
208 			lock->l_export = NULL;
209 		}
210 
211 		if (lock->l_lvb_data != NULL)
212 			OBD_FREE(lock->l_lvb_data, lock->l_lvb_len);
213 
214 		ldlm_interval_free(ldlm_interval_detach(lock));
215 		lu_ref_fini(&lock->l_reference);
216 		OBD_FREE_RCU(lock, sizeof(*lock), &lock->l_handle);
217 	}
218 }
219 EXPORT_SYMBOL(ldlm_lock_put);
220 
221 /**
222  * Removes LDLM lock \a lock from LRU. Assumes LRU is already locked.
223  */
ldlm_lock_remove_from_lru_nolock(struct ldlm_lock * lock)224 int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock)
225 {
226 	int rc = 0;
227 
228 	if (!list_empty(&lock->l_lru)) {
229 		struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
230 
231 		LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
232 		list_del_init(&lock->l_lru);
233 		LASSERT(ns->ns_nr_unused > 0);
234 		ns->ns_nr_unused--;
235 		rc = 1;
236 	}
237 	return rc;
238 }
239 
240 /**
241  * Removes LDLM lock \a lock from LRU. Obtains the LRU lock first.
242  */
ldlm_lock_remove_from_lru(struct ldlm_lock * lock)243 int ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
244 {
245 	struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
246 	int rc;
247 
248 	if (lock->l_flags & LDLM_FL_NS_SRV) {
249 		LASSERT(list_empty(&lock->l_lru));
250 		return 0;
251 	}
252 
253 	spin_lock(&ns->ns_lock);
254 	rc = ldlm_lock_remove_from_lru_nolock(lock);
255 	spin_unlock(&ns->ns_lock);
256 	return rc;
257 }
258 
259 /**
260  * Adds LDLM lock \a lock to namespace LRU. Assumes LRU is already locked.
261  */
ldlm_lock_add_to_lru_nolock(struct ldlm_lock * lock)262 void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock)
263 {
264 	struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
265 
266 	lock->l_last_used = cfs_time_current();
267 	LASSERT(list_empty(&lock->l_lru));
268 	LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
269 	list_add_tail(&lock->l_lru, &ns->ns_unused_list);
270 	if (lock->l_flags & LDLM_FL_SKIPPED)
271 		lock->l_flags &= ~LDLM_FL_SKIPPED;
272 	LASSERT(ns->ns_nr_unused >= 0);
273 	ns->ns_nr_unused++;
274 }
275 
276 /**
277  * Adds LDLM lock \a lock to namespace LRU. Obtains necessary LRU locks
278  * first.
279  */
ldlm_lock_add_to_lru(struct ldlm_lock * lock)280 void ldlm_lock_add_to_lru(struct ldlm_lock *lock)
281 {
282 	struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
283 
284 	spin_lock(&ns->ns_lock);
285 	ldlm_lock_add_to_lru_nolock(lock);
286 	spin_unlock(&ns->ns_lock);
287 }
288 
289 /**
290  * Moves LDLM lock \a lock that is already in namespace LRU to the tail of
291  * the LRU. Performs necessary LRU locking
292  */
ldlm_lock_touch_in_lru(struct ldlm_lock * lock)293 void ldlm_lock_touch_in_lru(struct ldlm_lock *lock)
294 {
295 	struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
296 
297 	if (lock->l_flags & LDLM_FL_NS_SRV) {
298 		LASSERT(list_empty(&lock->l_lru));
299 		return;
300 	}
301 
302 	spin_lock(&ns->ns_lock);
303 	if (!list_empty(&lock->l_lru)) {
304 		ldlm_lock_remove_from_lru_nolock(lock);
305 		ldlm_lock_add_to_lru_nolock(lock);
306 	}
307 	spin_unlock(&ns->ns_lock);
308 }
309 
310 /**
311  * Helper to destroy a locked lock.
312  *
313  * Used by ldlm_lock_destroy and ldlm_lock_destroy_nolock
314  * Must be called with l_lock and lr_lock held.
315  *
316  * Does not actually free the lock data, but rather marks the lock as
317  * destroyed by setting l_destroyed field in the lock to 1.  Destroys a
318  * handle->lock association too, so that the lock can no longer be found
319  * and removes the lock from LRU list.  Actual lock freeing occurs when
320  * last lock reference goes away.
321  *
322  * Original comment (of some historical value):
323  * This used to have a 'strict' flag, which recovery would use to mark an
324  * in-use lock as needing-to-die.  Lest I am ever tempted to put it back, I
325  * shall explain why it's gone: with the new hash table scheme, once you call
326  * ldlm_lock_destroy, you can never drop your final references on this lock.
327  * Because it's not in the hash table anymore.  -phil
328  */
ldlm_lock_destroy_internal(struct ldlm_lock * lock)329 int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
330 {
331 	if (lock->l_readers || lock->l_writers) {
332 		LDLM_ERROR(lock, "lock still has references");
333 		LBUG();
334 	}
335 
336 	if (!list_empty(&lock->l_res_link)) {
337 		LDLM_ERROR(lock, "lock still on resource");
338 		LBUG();
339 	}
340 
341 	if (lock->l_flags & LDLM_FL_DESTROYED) {
342 		LASSERT(list_empty(&lock->l_lru));
343 		return 0;
344 	}
345 	lock->l_flags |= LDLM_FL_DESTROYED;
346 
347 	if (lock->l_export && lock->l_export->exp_lock_hash) {
348 		/* NB: it's safe to call cfs_hash_del() even lock isn't
349 		 * in exp_lock_hash. */
350 		/* In the function below, .hs_keycmp resolves to
351 		 * ldlm_export_lock_keycmp() */
352 		/* coverity[overrun-buffer-val] */
353 		cfs_hash_del(lock->l_export->exp_lock_hash,
354 			     &lock->l_remote_handle, &lock->l_exp_hash);
355 	}
356 
357 	ldlm_lock_remove_from_lru(lock);
358 	class_handle_unhash(&lock->l_handle);
359 
360 #if 0
361 	/* Wake anyone waiting for this lock */
362 	/* FIXME: I should probably add yet another flag, instead of using
363 	 * l_export to only call this on clients */
364 	if (lock->l_export)
365 		class_export_put(lock->l_export);
366 	lock->l_export = NULL;
367 	if (lock->l_export && lock->l_completion_ast)
368 		lock->l_completion_ast(lock, 0);
369 #endif
370 	return 1;
371 }
372 
373 /**
374  * Destroys a LDLM lock \a lock. Performs necessary locking first.
375  */
ldlm_lock_destroy(struct ldlm_lock * lock)376 void ldlm_lock_destroy(struct ldlm_lock *lock)
377 {
378 	int first;
379 
380 	lock_res_and_lock(lock);
381 	first = ldlm_lock_destroy_internal(lock);
382 	unlock_res_and_lock(lock);
383 
384 	/* drop reference from hashtable only for first destroy */
385 	if (first) {
386 		lu_ref_del(&lock->l_reference, "hash", lock);
387 		LDLM_LOCK_RELEASE(lock);
388 	}
389 }
390 
391 /**
392  * Destroys a LDLM lock \a lock that is already locked.
393  */
ldlm_lock_destroy_nolock(struct ldlm_lock * lock)394 void ldlm_lock_destroy_nolock(struct ldlm_lock *lock)
395 {
396 	int first;
397 
398 	first = ldlm_lock_destroy_internal(lock);
399 	/* drop reference from hashtable only for first destroy */
400 	if (first) {
401 		lu_ref_del(&lock->l_reference, "hash", lock);
402 		LDLM_LOCK_RELEASE(lock);
403 	}
404 }
405 
406 /* this is called by portals_handle2object with the handle lock taken */
lock_handle_addref(void * lock)407 static void lock_handle_addref(void *lock)
408 {
409 	LDLM_LOCK_GET((struct ldlm_lock *)lock);
410 }
411 
lock_handle_free(void * lock,int size)412 static void lock_handle_free(void *lock, int size)
413 {
414 	LASSERT(size == sizeof(struct ldlm_lock));
415 	OBD_SLAB_FREE(lock, ldlm_lock_slab, size);
416 }
417 
418 struct portals_handle_ops lock_handle_ops = {
419 	.hop_addref = lock_handle_addref,
420 	.hop_free   = lock_handle_free,
421 };
422 
423 /**
424  *
425  * Allocate and initialize new lock structure.
426  *
427  * usage: pass in a resource on which you have done ldlm_resource_get
428  *	new lock will take over the refcount.
429  * returns: lock with refcount 2 - one for current caller and one for remote
430  */
ldlm_lock_new(struct ldlm_resource * resource)431 static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
432 {
433 	struct ldlm_lock *lock;
434 
435 	if (resource == NULL)
436 		LBUG();
437 
438 	OBD_SLAB_ALLOC_PTR_GFP(lock, ldlm_lock_slab, GFP_NOFS);
439 	if (lock == NULL)
440 		return NULL;
441 
442 	spin_lock_init(&lock->l_lock);
443 	lock->l_resource = resource;
444 	lu_ref_add(&resource->lr_reference, "lock", lock);
445 
446 	atomic_set(&lock->l_refc, 2);
447 	INIT_LIST_HEAD(&lock->l_res_link);
448 	INIT_LIST_HEAD(&lock->l_lru);
449 	INIT_LIST_HEAD(&lock->l_pending_chain);
450 	INIT_LIST_HEAD(&lock->l_bl_ast);
451 	INIT_LIST_HEAD(&lock->l_cp_ast);
452 	INIT_LIST_HEAD(&lock->l_rk_ast);
453 	init_waitqueue_head(&lock->l_waitq);
454 	lock->l_blocking_lock = NULL;
455 	INIT_LIST_HEAD(&lock->l_sl_mode);
456 	INIT_LIST_HEAD(&lock->l_sl_policy);
457 	INIT_HLIST_NODE(&lock->l_exp_hash);
458 	INIT_HLIST_NODE(&lock->l_exp_flock_hash);
459 
460 	lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
461 			     LDLM_NSS_LOCKS);
462 	INIT_LIST_HEAD(&lock->l_handle.h_link);
463 	class_handle_hash(&lock->l_handle, &lock_handle_ops);
464 
465 	lu_ref_init(&lock->l_reference);
466 	lu_ref_add(&lock->l_reference, "hash", lock);
467 	lock->l_callback_timeout = 0;
468 
469 #if LUSTRE_TRACKS_LOCK_EXP_REFS
470 	INIT_LIST_HEAD(&lock->l_exp_refs_link);
471 	lock->l_exp_refs_nr = 0;
472 	lock->l_exp_refs_target = NULL;
473 #endif
474 	INIT_LIST_HEAD(&lock->l_exp_list);
475 
476 	return lock;
477 }
478 
479 /**
480  * Moves LDLM lock \a lock to another resource.
481  * This is used on client when server returns some other lock than requested
482  * (typically as a result of intent operation)
483  */
ldlm_lock_change_resource(struct ldlm_namespace * ns,struct ldlm_lock * lock,const struct ldlm_res_id * new_resid)484 int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
485 			      const struct ldlm_res_id *new_resid)
486 {
487 	struct ldlm_resource *oldres = lock->l_resource;
488 	struct ldlm_resource *newres;
489 	int type;
490 
491 	LASSERT(ns_is_client(ns));
492 
493 	lock_res_and_lock(lock);
494 	if (memcmp(new_resid, &lock->l_resource->lr_name,
495 		   sizeof(lock->l_resource->lr_name)) == 0) {
496 		/* Nothing to do */
497 		unlock_res_and_lock(lock);
498 		return 0;
499 	}
500 
501 	LASSERT(new_resid->name[0] != 0);
502 
503 	/* This function assumes that the lock isn't on any lists */
504 	LASSERT(list_empty(&lock->l_res_link));
505 
506 	type = oldres->lr_type;
507 	unlock_res_and_lock(lock);
508 
509 	newres = ldlm_resource_get(ns, NULL, new_resid, type, 1);
510 	if (newres == NULL)
511 		return -ENOMEM;
512 
513 	lu_ref_add(&newres->lr_reference, "lock", lock);
514 	/*
515 	 * To flip the lock from the old to the new resource, lock, oldres and
516 	 * newres have to be locked. Resource spin-locks are nested within
517 	 * lock->l_lock, and are taken in the memory address order to avoid
518 	 * dead-locks.
519 	 */
520 	spin_lock(&lock->l_lock);
521 	oldres = lock->l_resource;
522 	if (oldres < newres) {
523 		lock_res(oldres);
524 		lock_res_nested(newres, LRT_NEW);
525 	} else {
526 		lock_res(newres);
527 		lock_res_nested(oldres, LRT_NEW);
528 	}
529 	LASSERT(memcmp(new_resid, &oldres->lr_name,
530 		       sizeof(oldres->lr_name)) != 0);
531 	lock->l_resource = newres;
532 	unlock_res(oldres);
533 	unlock_res_and_lock(lock);
534 
535 	/* ...and the flowers are still standing! */
536 	lu_ref_del(&oldres->lr_reference, "lock", lock);
537 	ldlm_resource_putref(oldres);
538 
539 	return 0;
540 }
541 EXPORT_SYMBOL(ldlm_lock_change_resource);
542 
543 /** \defgroup ldlm_handles LDLM HANDLES
544  * Ways to get hold of locks without any addresses.
545  * @{
546  */
547 
548 /**
549  * Fills in handle for LDLM lock \a lock into supplied \a lockh
550  * Does not take any references.
551  */
ldlm_lock2handle(const struct ldlm_lock * lock,struct lustre_handle * lockh)552 void ldlm_lock2handle(const struct ldlm_lock *lock, struct lustre_handle *lockh)
553 {
554 	lockh->cookie = lock->l_handle.h_cookie;
555 }
556 EXPORT_SYMBOL(ldlm_lock2handle);
557 
558 /**
559  * Obtain a lock reference by handle.
560  *
561  * if \a flags: atomically get the lock and set the flags.
562  *	      Return NULL if flag already set
563  */
__ldlm_handle2lock(const struct lustre_handle * handle,__u64 flags)564 struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle,
565 				     __u64 flags)
566 {
567 	struct ldlm_lock *lock;
568 
569 	LASSERT(handle);
570 
571 	lock = class_handle2object(handle->cookie);
572 	if (lock == NULL)
573 		return NULL;
574 
575 	/* It's unlikely but possible that someone marked the lock as
576 	 * destroyed after we did handle2object on it */
577 	if (flags == 0 && ((lock->l_flags & LDLM_FL_DESTROYED) == 0)) {
578 		lu_ref_add(&lock->l_reference, "handle", current);
579 		return lock;
580 	}
581 
582 	lock_res_and_lock(lock);
583 
584 	LASSERT(lock->l_resource != NULL);
585 
586 	lu_ref_add_atomic(&lock->l_reference, "handle", current);
587 	if (unlikely(lock->l_flags & LDLM_FL_DESTROYED)) {
588 		unlock_res_and_lock(lock);
589 		CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock);
590 		LDLM_LOCK_PUT(lock);
591 		return NULL;
592 	}
593 
594 	if (flags && (lock->l_flags & flags)) {
595 		unlock_res_and_lock(lock);
596 		LDLM_LOCK_PUT(lock);
597 		return NULL;
598 	}
599 
600 	if (flags)
601 		lock->l_flags |= flags;
602 
603 	unlock_res_and_lock(lock);
604 	return lock;
605 }
606 EXPORT_SYMBOL(__ldlm_handle2lock);
607 /** @} ldlm_handles */
608 
609 /**
610  * Fill in "on the wire" representation for given LDLM lock into supplied
611  * lock descriptor \a desc structure.
612  */
ldlm_lock2desc(struct ldlm_lock * lock,struct ldlm_lock_desc * desc)613 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
614 {
615 	ldlm_res2desc(lock->l_resource, &desc->l_resource);
616 	desc->l_req_mode = lock->l_req_mode;
617 	desc->l_granted_mode = lock->l_granted_mode;
618 	ldlm_convert_policy_to_wire(lock->l_resource->lr_type,
619 				    &lock->l_policy_data,
620 				    &desc->l_policy_data);
621 }
622 EXPORT_SYMBOL(ldlm_lock2desc);
623 
624 /**
625  * Add a lock to list of conflicting locks to send AST to.
626  *
627  * Only add if we have not sent a blocking AST to the lock yet.
628  */
ldlm_add_bl_work_item(struct ldlm_lock * lock,struct ldlm_lock * new,struct list_head * work_list)629 void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
630 			   struct list_head *work_list)
631 {
632 	if ((lock->l_flags & LDLM_FL_AST_SENT) == 0) {
633 		LDLM_DEBUG(lock, "lock incompatible; sending blocking AST.");
634 		lock->l_flags |= LDLM_FL_AST_SENT;
635 		/* If the enqueuing client said so, tell the AST recipient to
636 		 * discard dirty data, rather than writing back. */
637 		if (new->l_flags & LDLM_FL_AST_DISCARD_DATA)
638 			lock->l_flags |= LDLM_FL_DISCARD_DATA;
639 		LASSERT(list_empty(&lock->l_bl_ast));
640 		list_add(&lock->l_bl_ast, work_list);
641 		LDLM_LOCK_GET(lock);
642 		LASSERT(lock->l_blocking_lock == NULL);
643 		lock->l_blocking_lock = LDLM_LOCK_GET(new);
644 	}
645 }
646 
647 /**
648  * Add a lock to list of just granted locks to send completion AST to.
649  */
ldlm_add_cp_work_item(struct ldlm_lock * lock,struct list_head * work_list)650 void ldlm_add_cp_work_item(struct ldlm_lock *lock, struct list_head *work_list)
651 {
652 	if ((lock->l_flags & LDLM_FL_CP_REQD) == 0) {
653 		lock->l_flags |= LDLM_FL_CP_REQD;
654 		LDLM_DEBUG(lock, "lock granted; sending completion AST.");
655 		LASSERT(list_empty(&lock->l_cp_ast));
656 		list_add(&lock->l_cp_ast, work_list);
657 		LDLM_LOCK_GET(lock);
658 	}
659 }
660 
661 /**
662  * Aggregator function to add AST work items into a list. Determines
663  * what sort of an AST work needs to be done and calls the proper
664  * adding function.
665  * Must be called with lr_lock held.
666  */
ldlm_add_ast_work_item(struct ldlm_lock * lock,struct ldlm_lock * new,struct list_head * work_list)667 void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
668 			    struct list_head *work_list)
669 {
670 	check_res_locked(lock->l_resource);
671 	if (new)
672 		ldlm_add_bl_work_item(lock, new, work_list);
673 	else
674 		ldlm_add_cp_work_item(lock, work_list);
675 }
676 
677 /**
678  * Add specified reader/writer reference to LDLM lock with handle \a lockh.
679  * r/w reference type is determined by \a mode
680  * Calls ldlm_lock_addref_internal.
681  */
ldlm_lock_addref(struct lustre_handle * lockh,__u32 mode)682 void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
683 {
684 	struct ldlm_lock *lock;
685 
686 	lock = ldlm_handle2lock(lockh);
687 	LASSERT(lock != NULL);
688 	ldlm_lock_addref_internal(lock, mode);
689 	LDLM_LOCK_PUT(lock);
690 }
691 EXPORT_SYMBOL(ldlm_lock_addref);
692 
693 /**
694  * Helper function.
695  * Add specified reader/writer reference to LDLM lock \a lock.
696  * r/w reference type is determined by \a mode
697  * Removes lock from LRU if it is there.
698  * Assumes the LDLM lock is already locked.
699  */
ldlm_lock_addref_internal_nolock(struct ldlm_lock * lock,__u32 mode)700 void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
701 {
702 	ldlm_lock_remove_from_lru(lock);
703 	if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
704 		lock->l_readers++;
705 		lu_ref_add_atomic(&lock->l_reference, "reader", lock);
706 	}
707 	if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
708 		lock->l_writers++;
709 		lu_ref_add_atomic(&lock->l_reference, "writer", lock);
710 	}
711 	LDLM_LOCK_GET(lock);
712 	lu_ref_add_atomic(&lock->l_reference, "user", lock);
713 	LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
714 }
715 
716 /**
717  * Attempts to add reader/writer reference to a lock with handle \a lockh, and
718  * fails if lock is already LDLM_FL_CBPENDING or destroyed.
719  *
720  * \retval 0 success, lock was addref-ed
721  *
722  * \retval -EAGAIN lock is being canceled.
723  */
ldlm_lock_addref_try(struct lustre_handle * lockh,__u32 mode)724 int ldlm_lock_addref_try(struct lustre_handle *lockh, __u32 mode)
725 {
726 	struct ldlm_lock *lock;
727 	int	       result;
728 
729 	result = -EAGAIN;
730 	lock = ldlm_handle2lock(lockh);
731 	if (lock != NULL) {
732 		lock_res_and_lock(lock);
733 		if (lock->l_readers != 0 || lock->l_writers != 0 ||
734 		    !(lock->l_flags & LDLM_FL_CBPENDING)) {
735 			ldlm_lock_addref_internal_nolock(lock, mode);
736 			result = 0;
737 		}
738 		unlock_res_and_lock(lock);
739 		LDLM_LOCK_PUT(lock);
740 	}
741 	return result;
742 }
743 EXPORT_SYMBOL(ldlm_lock_addref_try);
744 
745 /**
746  * Add specified reader/writer reference to LDLM lock \a lock.
747  * Locks LDLM lock and calls ldlm_lock_addref_internal_nolock to do the work.
748  * Only called for local locks.
749  */
ldlm_lock_addref_internal(struct ldlm_lock * lock,__u32 mode)750 void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
751 {
752 	lock_res_and_lock(lock);
753 	ldlm_lock_addref_internal_nolock(lock, mode);
754 	unlock_res_and_lock(lock);
755 }
756 
757 /**
758  * Removes reader/writer reference for LDLM lock \a lock.
759  * Assumes LDLM lock is already locked.
760  * only called in ldlm_flock_destroy and for local locks.
761  * Does NOT add lock to LRU if no r/w references left to accommodate flock locks
762  * that cannot be placed in LRU.
763  */
ldlm_lock_decref_internal_nolock(struct ldlm_lock * lock,__u32 mode)764 void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
765 {
766 	LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
767 	if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
768 		LASSERT(lock->l_readers > 0);
769 		lu_ref_del(&lock->l_reference, "reader", lock);
770 		lock->l_readers--;
771 	}
772 	if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
773 		LASSERT(lock->l_writers > 0);
774 		lu_ref_del(&lock->l_reference, "writer", lock);
775 		lock->l_writers--;
776 	}
777 
778 	lu_ref_del(&lock->l_reference, "user", lock);
779 	LDLM_LOCK_RELEASE(lock);    /* matches the LDLM_LOCK_GET() in addref */
780 }
781 
782 /**
783  * Removes reader/writer reference for LDLM lock \a lock.
784  * Locks LDLM lock first.
785  * If the lock is determined to be client lock on a client and r/w refcount
786  * drops to zero and the lock is not blocked, the lock is added to LRU lock
787  * on the namespace.
788  * For blocked LDLM locks if r/w count drops to zero, blocking_ast is called.
789  */
ldlm_lock_decref_internal(struct ldlm_lock * lock,__u32 mode)790 void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
791 {
792 	struct ldlm_namespace *ns;
793 
794 	lock_res_and_lock(lock);
795 
796 	ns = ldlm_lock_to_ns(lock);
797 
798 	ldlm_lock_decref_internal_nolock(lock, mode);
799 
800 	if (lock->l_flags & LDLM_FL_LOCAL &&
801 	    !lock->l_readers && !lock->l_writers) {
802 		/* If this is a local lock on a server namespace and this was
803 		 * the last reference, cancel the lock. */
804 		CDEBUG(D_INFO, "forcing cancel of local lock\n");
805 		lock->l_flags |= LDLM_FL_CBPENDING;
806 	}
807 
808 	if (!lock->l_readers && !lock->l_writers &&
809 	    (lock->l_flags & LDLM_FL_CBPENDING)) {
810 		/* If we received a blocked AST and this was the last reference,
811 		 * run the callback. */
812 		if ((lock->l_flags & LDLM_FL_NS_SRV) && lock->l_export)
813 			CERROR("FL_CBPENDING set on non-local lock--just a warning\n");
814 
815 		LDLM_DEBUG(lock, "final decref done on cbpending lock");
816 
817 		LDLM_LOCK_GET(lock); /* dropped by bl thread */
818 		ldlm_lock_remove_from_lru(lock);
819 		unlock_res_and_lock(lock);
820 
821 		if (lock->l_flags & LDLM_FL_FAIL_LOC)
822 			OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
823 
824 		if ((lock->l_flags & LDLM_FL_ATOMIC_CB) ||
825 		    ldlm_bl_to_thread_lock(ns, NULL, lock) != 0)
826 			ldlm_handle_bl_callback(ns, NULL, lock);
827 	} else if (ns_is_client(ns) &&
828 		   !lock->l_readers && !lock->l_writers &&
829 		   !(lock->l_flags & LDLM_FL_NO_LRU) &&
830 		   !(lock->l_flags & LDLM_FL_BL_AST)) {
831 
832 		LDLM_DEBUG(lock, "add lock into lru list");
833 
834 		/* If this is a client-side namespace and this was the last
835 		 * reference, put it on the LRU. */
836 		ldlm_lock_add_to_lru(lock);
837 		unlock_res_and_lock(lock);
838 
839 		if (lock->l_flags & LDLM_FL_FAIL_LOC)
840 			OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
841 
842 		/* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE
843 		 * are not supported by the server, otherwise, it is done on
844 		 * enqueue. */
845 		if (!exp_connect_cancelset(lock->l_conn_export) &&
846 		    !ns_connect_lru_resize(ns))
847 			ldlm_cancel_lru(ns, 0, LCF_ASYNC, 0);
848 	} else {
849 		LDLM_DEBUG(lock, "do not add lock into lru list");
850 		unlock_res_and_lock(lock);
851 	}
852 }
853 
854 /**
855  * Decrease reader/writer refcount for LDLM lock with handle \a lockh
856  */
ldlm_lock_decref(struct lustre_handle * lockh,__u32 mode)857 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
858 {
859 	struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
860 
861 	LASSERTF(lock != NULL, "Non-existing lock: %#llx\n", lockh->cookie);
862 	ldlm_lock_decref_internal(lock, mode);
863 	LDLM_LOCK_PUT(lock);
864 }
865 EXPORT_SYMBOL(ldlm_lock_decref);
866 
867 /**
868  * Decrease reader/writer refcount for LDLM lock with handle
869  * \a lockh and mark it for subsequent cancellation once r/w refcount
870  * drops to zero instead of putting into LRU.
871  *
872  * Typical usage is for GROUP locks which we cannot allow to be cached.
873  */
ldlm_lock_decref_and_cancel(struct lustre_handle * lockh,__u32 mode)874 void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode)
875 {
876 	struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
877 
878 	LASSERT(lock != NULL);
879 
880 	LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
881 	lock_res_and_lock(lock);
882 	lock->l_flags |= LDLM_FL_CBPENDING;
883 	unlock_res_and_lock(lock);
884 	ldlm_lock_decref_internal(lock, mode);
885 	LDLM_LOCK_PUT(lock);
886 }
887 EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
888 
889 struct sl_insert_point {
890 	struct list_head *res_link;
891 	struct list_head *mode_link;
892 	struct list_head *policy_link;
893 };
894 
895 /**
896  * Finds a position to insert the new lock into granted lock list.
897  *
898  * Used for locks eligible for skiplist optimization.
899  *
900  * Parameters:
901  *      queue [input]:  the granted list where search acts on;
902  *      req [input]:    the lock whose position to be located;
903  *      prev [output]:  positions within 3 lists to insert @req to
904  * Return Value:
905  *      filled @prev
906  * NOTE: called by
907  *  - ldlm_grant_lock_with_skiplist
908  */
search_granted_lock(struct list_head * queue,struct ldlm_lock * req,struct sl_insert_point * prev)909 static void search_granted_lock(struct list_head *queue,
910 				struct ldlm_lock *req,
911 				struct sl_insert_point *prev)
912 {
913 	struct list_head *tmp;
914 	struct ldlm_lock *lock, *mode_end, *policy_end;
915 
916 	list_for_each(tmp, queue) {
917 		lock = list_entry(tmp, struct ldlm_lock, l_res_link);
918 
919 		mode_end = list_entry(lock->l_sl_mode.prev,
920 					  struct ldlm_lock, l_sl_mode);
921 
922 		if (lock->l_req_mode != req->l_req_mode) {
923 			/* jump to last lock of mode group */
924 			tmp = &mode_end->l_res_link;
925 			continue;
926 		}
927 
928 		/* suitable mode group is found */
929 		if (lock->l_resource->lr_type == LDLM_PLAIN) {
930 			/* insert point is last lock of the mode group */
931 			prev->res_link = &mode_end->l_res_link;
932 			prev->mode_link = &mode_end->l_sl_mode;
933 			prev->policy_link = &req->l_sl_policy;
934 			return;
935 		} else if (lock->l_resource->lr_type == LDLM_IBITS) {
936 			for (;;) {
937 				policy_end =
938 					list_entry(lock->l_sl_policy.prev,
939 						       struct ldlm_lock,
940 						       l_sl_policy);
941 
942 				if (lock->l_policy_data.l_inodebits.bits ==
943 				    req->l_policy_data.l_inodebits.bits) {
944 					/* insert point is last lock of
945 					 * the policy group */
946 					prev->res_link =
947 						&policy_end->l_res_link;
948 					prev->mode_link =
949 						&policy_end->l_sl_mode;
950 					prev->policy_link =
951 						&policy_end->l_sl_policy;
952 					return;
953 				}
954 
955 				if (policy_end == mode_end)
956 					/* done with mode group */
957 					break;
958 
959 				/* go to next policy group within mode group */
960 				tmp = policy_end->l_res_link.next;
961 				lock = list_entry(tmp, struct ldlm_lock,
962 						      l_res_link);
963 			}  /* loop over policy groups within the mode group */
964 
965 			/* insert point is last lock of the mode group,
966 			 * new policy group is started */
967 			prev->res_link = &mode_end->l_res_link;
968 			prev->mode_link = &mode_end->l_sl_mode;
969 			prev->policy_link = &req->l_sl_policy;
970 			return;
971 		} else {
972 			LDLM_ERROR(lock,
973 				   "is not LDLM_PLAIN or LDLM_IBITS lock");
974 			LBUG();
975 		}
976 	}
977 
978 	/* insert point is last lock on the queue,
979 	 * new mode group and new policy group are started */
980 	prev->res_link = queue->prev;
981 	prev->mode_link = &req->l_sl_mode;
982 	prev->policy_link = &req->l_sl_policy;
983 }
984 
985 /**
986  * Add a lock into resource granted list after a position described by
987  * \a prev.
988  */
ldlm_granted_list_add_lock(struct ldlm_lock * lock,struct sl_insert_point * prev)989 static void ldlm_granted_list_add_lock(struct ldlm_lock *lock,
990 				       struct sl_insert_point *prev)
991 {
992 	struct ldlm_resource *res = lock->l_resource;
993 
994 	check_res_locked(res);
995 
996 	ldlm_resource_dump(D_INFO, res);
997 	LDLM_DEBUG(lock, "About to add lock:");
998 
999 	if (lock->l_flags & LDLM_FL_DESTROYED) {
1000 		CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
1001 		return;
1002 	}
1003 
1004 	LASSERT(list_empty(&lock->l_res_link));
1005 	LASSERT(list_empty(&lock->l_sl_mode));
1006 	LASSERT(list_empty(&lock->l_sl_policy));
1007 
1008 	/*
1009 	 * lock->link == prev->link means lock is first starting the group.
1010 	 * Don't re-add to itself to suppress kernel warnings.
1011 	 */
1012 	if (&lock->l_res_link != prev->res_link)
1013 		list_add(&lock->l_res_link, prev->res_link);
1014 	if (&lock->l_sl_mode != prev->mode_link)
1015 		list_add(&lock->l_sl_mode, prev->mode_link);
1016 	if (&lock->l_sl_policy != prev->policy_link)
1017 		list_add(&lock->l_sl_policy, prev->policy_link);
1018 }
1019 
1020 /**
1021  * Add a lock to granted list on a resource maintaining skiplist
1022  * correctness.
1023  */
ldlm_grant_lock_with_skiplist(struct ldlm_lock * lock)1024 static void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock)
1025 {
1026 	struct sl_insert_point prev;
1027 
1028 	LASSERT(lock->l_req_mode == lock->l_granted_mode);
1029 
1030 	search_granted_lock(&lock->l_resource->lr_granted, lock, &prev);
1031 	ldlm_granted_list_add_lock(lock, &prev);
1032 }
1033 
1034 /**
1035  * Perform lock granting bookkeeping.
1036  *
1037  * Includes putting the lock into granted list and updating lock mode.
1038  * NOTE: called by
1039  *  - ldlm_lock_enqueue
1040  *  - ldlm_reprocess_queue
1041  *  - ldlm_lock_convert
1042  *
1043  * must be called with lr_lock held
1044  */
ldlm_grant_lock(struct ldlm_lock * lock,struct list_head * work_list)1045 void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list)
1046 {
1047 	struct ldlm_resource *res = lock->l_resource;
1048 
1049 	check_res_locked(res);
1050 
1051 	lock->l_granted_mode = lock->l_req_mode;
1052 	if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS)
1053 		ldlm_grant_lock_with_skiplist(lock);
1054 	else if (res->lr_type == LDLM_EXTENT)
1055 		ldlm_extent_add_lock(res, lock);
1056 	else
1057 		ldlm_resource_add_lock(res, &res->lr_granted, lock);
1058 
1059 	if (lock->l_granted_mode < res->lr_most_restr)
1060 		res->lr_most_restr = lock->l_granted_mode;
1061 
1062 	if (work_list && lock->l_completion_ast != NULL)
1063 		ldlm_add_ast_work_item(lock, NULL, work_list);
1064 
1065 	ldlm_pool_add(&ldlm_res_to_ns(res)->ns_pool, lock);
1066 }
1067 
1068 /**
1069  * Search for a lock with given properties in a queue.
1070  *
1071  * \retval a referenced lock or NULL.  See the flag descriptions below, in the
1072  * comment above ldlm_lock_match
1073  */
search_queue(struct list_head * queue,ldlm_mode_t * mode,ldlm_policy_data_t * policy,struct ldlm_lock * old_lock,__u64 flags,int unref)1074 static struct ldlm_lock *search_queue(struct list_head *queue,
1075 				      ldlm_mode_t *mode,
1076 				      ldlm_policy_data_t *policy,
1077 				      struct ldlm_lock *old_lock,
1078 				      __u64 flags, int unref)
1079 {
1080 	struct ldlm_lock *lock;
1081 	struct list_head       *tmp;
1082 
1083 	list_for_each(tmp, queue) {
1084 		ldlm_mode_t match;
1085 
1086 		lock = list_entry(tmp, struct ldlm_lock, l_res_link);
1087 
1088 		if (lock == old_lock)
1089 			break;
1090 
1091 		/* Check if this lock can be matched.
1092 		 * Used by LU-2919(exclusive open) for open lease lock */
1093 		if (ldlm_is_excl(lock))
1094 			continue;
1095 
1096 		/* llite sometimes wants to match locks that will be
1097 		 * canceled when their users drop, but we allow it to match
1098 		 * if it passes in CBPENDING and the lock still has users.
1099 		 * this is generally only going to be used by children
1100 		 * whose parents already hold a lock so forward progress
1101 		 * can still happen. */
1102 		if (lock->l_flags & LDLM_FL_CBPENDING &&
1103 		    !(flags & LDLM_FL_CBPENDING))
1104 			continue;
1105 		if (!unref && lock->l_flags & LDLM_FL_CBPENDING &&
1106 		    lock->l_readers == 0 && lock->l_writers == 0)
1107 			continue;
1108 
1109 		if (!(lock->l_req_mode & *mode))
1110 			continue;
1111 		match = lock->l_req_mode;
1112 
1113 		if (lock->l_resource->lr_type == LDLM_EXTENT &&
1114 		    (lock->l_policy_data.l_extent.start >
1115 		     policy->l_extent.start ||
1116 		     lock->l_policy_data.l_extent.end < policy->l_extent.end))
1117 			continue;
1118 
1119 		if (unlikely(match == LCK_GROUP) &&
1120 		    lock->l_resource->lr_type == LDLM_EXTENT &&
1121 		    lock->l_policy_data.l_extent.gid != policy->l_extent.gid)
1122 			continue;
1123 
1124 		/* We match if we have existing lock with same or wider set
1125 		   of bits. */
1126 		if (lock->l_resource->lr_type == LDLM_IBITS &&
1127 		     ((lock->l_policy_data.l_inodebits.bits &
1128 		      policy->l_inodebits.bits) !=
1129 		      policy->l_inodebits.bits))
1130 			continue;
1131 
1132 		if (!unref && (lock->l_flags & LDLM_FL_GONE_MASK))
1133 			continue;
1134 
1135 		if ((flags & LDLM_FL_LOCAL_ONLY) &&
1136 		    !(lock->l_flags & LDLM_FL_LOCAL))
1137 			continue;
1138 
1139 		if (flags & LDLM_FL_TEST_LOCK) {
1140 			LDLM_LOCK_GET(lock);
1141 			ldlm_lock_touch_in_lru(lock);
1142 		} else {
1143 			ldlm_lock_addref_internal_nolock(lock, match);
1144 		}
1145 		*mode = match;
1146 		return lock;
1147 	}
1148 
1149 	return NULL;
1150 }
1151 
ldlm_lock_fail_match_locked(struct ldlm_lock * lock)1152 void ldlm_lock_fail_match_locked(struct ldlm_lock *lock)
1153 {
1154 	if ((lock->l_flags & LDLM_FL_FAIL_NOTIFIED) == 0) {
1155 		lock->l_flags |= LDLM_FL_FAIL_NOTIFIED;
1156 		wake_up_all(&lock->l_waitq);
1157 	}
1158 }
1159 EXPORT_SYMBOL(ldlm_lock_fail_match_locked);
1160 
ldlm_lock_fail_match(struct ldlm_lock * lock)1161 void ldlm_lock_fail_match(struct ldlm_lock *lock)
1162 {
1163 	lock_res_and_lock(lock);
1164 	ldlm_lock_fail_match_locked(lock);
1165 	unlock_res_and_lock(lock);
1166 }
1167 EXPORT_SYMBOL(ldlm_lock_fail_match);
1168 
1169 /**
1170  * Mark lock as "matchable" by OST.
1171  *
1172  * Used to prevent certain races in LOV/OSC where the lock is granted, but LVB
1173  * is not yet valid.
1174  * Assumes LDLM lock is already locked.
1175  */
ldlm_lock_allow_match_locked(struct ldlm_lock * lock)1176 void ldlm_lock_allow_match_locked(struct ldlm_lock *lock)
1177 {
1178 	lock->l_flags |= LDLM_FL_LVB_READY;
1179 	wake_up_all(&lock->l_waitq);
1180 }
1181 EXPORT_SYMBOL(ldlm_lock_allow_match_locked);
1182 
1183 /**
1184  * Mark lock as "matchable" by OST.
1185  * Locks the lock and then \see ldlm_lock_allow_match_locked
1186  */
ldlm_lock_allow_match(struct ldlm_lock * lock)1187 void ldlm_lock_allow_match(struct ldlm_lock *lock)
1188 {
1189 	lock_res_and_lock(lock);
1190 	ldlm_lock_allow_match_locked(lock);
1191 	unlock_res_and_lock(lock);
1192 }
1193 EXPORT_SYMBOL(ldlm_lock_allow_match);
1194 
1195 /**
1196  * Attempt to find a lock with specified properties.
1197  *
1198  * Typically returns a reference to matched lock unless LDLM_FL_TEST_LOCK is
1199  * set in \a flags
1200  *
1201  * Can be called in two ways:
1202  *
1203  * If 'ns' is NULL, then lockh describes an existing lock that we want to look
1204  * for a duplicate of.
1205  *
1206  * Otherwise, all of the fields must be filled in, to match against.
1207  *
1208  * If 'flags' contains LDLM_FL_LOCAL_ONLY, then only match local locks on the
1209  *     server (ie, connh is NULL)
1210  * If 'flags' contains LDLM_FL_BLOCK_GRANTED, then only locks on the granted
1211  *     list will be considered
1212  * If 'flags' contains LDLM_FL_CBPENDING, then locks that have been marked
1213  *     to be canceled can still be matched as long as they still have reader
1214  *     or writer referneces
1215  * If 'flags' contains LDLM_FL_TEST_LOCK, then don't actually reference a lock,
1216  *     just tell us if we would have matched.
1217  *
1218  * \retval 1 if it finds an already-existing lock that is compatible; in this
1219  * case, lockh is filled in with a addref()ed lock
1220  *
1221  * We also check security context, and if that fails we simply return 0 (to
1222  * keep caller code unchanged), the context failure will be discovered by
1223  * caller sometime later.
1224  */
ldlm_lock_match(struct ldlm_namespace * ns,__u64 flags,const struct ldlm_res_id * res_id,ldlm_type_t type,ldlm_policy_data_t * policy,ldlm_mode_t mode,struct lustre_handle * lockh,int unref)1225 ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, __u64 flags,
1226 			    const struct ldlm_res_id *res_id, ldlm_type_t type,
1227 			    ldlm_policy_data_t *policy, ldlm_mode_t mode,
1228 			    struct lustre_handle *lockh, int unref)
1229 {
1230 	struct ldlm_resource *res;
1231 	struct ldlm_lock *lock, *old_lock = NULL;
1232 	int rc = 0;
1233 
1234 	if (ns == NULL) {
1235 		old_lock = ldlm_handle2lock(lockh);
1236 		LASSERT(old_lock);
1237 
1238 		ns = ldlm_lock_to_ns(old_lock);
1239 		res_id = &old_lock->l_resource->lr_name;
1240 		type = old_lock->l_resource->lr_type;
1241 		mode = old_lock->l_req_mode;
1242 	}
1243 
1244 	res = ldlm_resource_get(ns, NULL, res_id, type, 0);
1245 	if (res == NULL) {
1246 		LASSERT(old_lock == NULL);
1247 		return 0;
1248 	}
1249 
1250 	LDLM_RESOURCE_ADDREF(res);
1251 	lock_res(res);
1252 
1253 	lock = search_queue(&res->lr_granted, &mode, policy, old_lock,
1254 			    flags, unref);
1255 	if (lock != NULL) {
1256 		rc = 1;
1257 		goto out;
1258 	}
1259 	if (flags & LDLM_FL_BLOCK_GRANTED) {
1260 		rc = 0;
1261 		goto out;
1262 	}
1263 	lock = search_queue(&res->lr_converting, &mode, policy, old_lock,
1264 			    flags, unref);
1265 	if (lock != NULL) {
1266 		rc = 1;
1267 		goto out;
1268 	}
1269 	lock = search_queue(&res->lr_waiting, &mode, policy, old_lock,
1270 			    flags, unref);
1271 	if (lock != NULL) {
1272 		rc = 1;
1273 		goto out;
1274 	}
1275 
1276  out:
1277 	unlock_res(res);
1278 	LDLM_RESOURCE_DELREF(res);
1279 	ldlm_resource_putref(res);
1280 
1281 	if (lock) {
1282 		ldlm_lock2handle(lock, lockh);
1283 		if ((flags & LDLM_FL_LVB_READY) &&
1284 		    (!(lock->l_flags & LDLM_FL_LVB_READY))) {
1285 			__u64 wait_flags = LDLM_FL_LVB_READY |
1286 				LDLM_FL_DESTROYED | LDLM_FL_FAIL_NOTIFIED;
1287 			struct l_wait_info lwi;
1288 
1289 			if (lock->l_completion_ast) {
1290 				int err = lock->l_completion_ast(lock,
1291 							  LDLM_FL_WAIT_NOREPROC,
1292 								 NULL);
1293 				if (err) {
1294 					if (flags & LDLM_FL_TEST_LOCK)
1295 						LDLM_LOCK_RELEASE(lock);
1296 					else
1297 						ldlm_lock_decref_internal(lock,
1298 									  mode);
1299 					rc = 0;
1300 					goto out2;
1301 				}
1302 			}
1303 
1304 			lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(obd_timeout),
1305 					       NULL, LWI_ON_SIGNAL_NOOP, NULL);
1306 
1307 			/* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
1308 			l_wait_event(lock->l_waitq,
1309 				     lock->l_flags & wait_flags,
1310 				     &lwi);
1311 			if (!(lock->l_flags & LDLM_FL_LVB_READY)) {
1312 				if (flags & LDLM_FL_TEST_LOCK)
1313 					LDLM_LOCK_RELEASE(lock);
1314 				else
1315 					ldlm_lock_decref_internal(lock, mode);
1316 				rc = 0;
1317 			}
1318 		}
1319 	}
1320  out2:
1321 	if (rc) {
1322 		LDLM_DEBUG(lock, "matched (%llu %llu)",
1323 			   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1324 				res_id->name[2] : policy->l_extent.start,
1325 			   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1326 				res_id->name[3] : policy->l_extent.end);
1327 
1328 		/* check user's security context */
1329 		if (lock->l_conn_export &&
1330 		    sptlrpc_import_check_ctx(
1331 				class_exp2cliimp(lock->l_conn_export))) {
1332 			if (!(flags & LDLM_FL_TEST_LOCK))
1333 				ldlm_lock_decref_internal(lock, mode);
1334 			rc = 0;
1335 		}
1336 
1337 		if (flags & LDLM_FL_TEST_LOCK)
1338 			LDLM_LOCK_RELEASE(lock);
1339 
1340 	} else if (!(flags & LDLM_FL_TEST_LOCK)) {/*less verbose for test-only*/
1341 		LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res %llu/%llu (%llu %llu)",
1342 				  ns, type, mode, res_id->name[0],
1343 				  res_id->name[1],
1344 				  (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1345 					res_id->name[2] : policy->l_extent.start,
1346 				  (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1347 					res_id->name[3] : policy->l_extent.end);
1348 	}
1349 	if (old_lock)
1350 		LDLM_LOCK_PUT(old_lock);
1351 
1352 	return rc ? mode : 0;
1353 }
1354 EXPORT_SYMBOL(ldlm_lock_match);
1355 
ldlm_revalidate_lock_handle(struct lustre_handle * lockh,__u64 * bits)1356 ldlm_mode_t ldlm_revalidate_lock_handle(struct lustre_handle *lockh,
1357 					__u64 *bits)
1358 {
1359 	struct ldlm_lock *lock;
1360 	ldlm_mode_t mode = 0;
1361 
1362 	lock = ldlm_handle2lock(lockh);
1363 	if (lock != NULL) {
1364 		lock_res_and_lock(lock);
1365 		if (lock->l_flags & LDLM_FL_GONE_MASK)
1366 			goto out;
1367 
1368 		if (lock->l_flags & LDLM_FL_CBPENDING &&
1369 		    lock->l_readers == 0 && lock->l_writers == 0)
1370 			goto out;
1371 
1372 		if (bits)
1373 			*bits = lock->l_policy_data.l_inodebits.bits;
1374 		mode = lock->l_granted_mode;
1375 		ldlm_lock_addref_internal_nolock(lock, mode);
1376 	}
1377 
1378 out:
1379 	if (lock != NULL) {
1380 		unlock_res_and_lock(lock);
1381 		LDLM_LOCK_PUT(lock);
1382 	}
1383 	return mode;
1384 }
1385 EXPORT_SYMBOL(ldlm_revalidate_lock_handle);
1386 
1387 /** The caller must guarantee that the buffer is large enough. */
ldlm_fill_lvb(struct ldlm_lock * lock,struct req_capsule * pill,enum req_location loc,void * data,int size)1388 int ldlm_fill_lvb(struct ldlm_lock *lock, struct req_capsule *pill,
1389 		  enum req_location loc, void *data, int size)
1390 {
1391 	void *lvb;
1392 
1393 	LASSERT(data != NULL);
1394 	LASSERT(size >= 0);
1395 
1396 	switch (lock->l_lvb_type) {
1397 	case LVB_T_OST:
1398 		if (size == sizeof(struct ost_lvb)) {
1399 			if (loc == RCL_CLIENT)
1400 				lvb = req_capsule_client_swab_get(pill,
1401 						&RMF_DLM_LVB,
1402 						lustre_swab_ost_lvb);
1403 			else
1404 				lvb = req_capsule_server_swab_get(pill,
1405 						&RMF_DLM_LVB,
1406 						lustre_swab_ost_lvb);
1407 			if (unlikely(lvb == NULL)) {
1408 				LDLM_ERROR(lock, "no LVB");
1409 				return -EPROTO;
1410 			}
1411 
1412 			memcpy(data, lvb, size);
1413 		} else if (size == sizeof(struct ost_lvb_v1)) {
1414 			struct ost_lvb *olvb = data;
1415 
1416 			if (loc == RCL_CLIENT)
1417 				lvb = req_capsule_client_swab_get(pill,
1418 						&RMF_DLM_LVB,
1419 						lustre_swab_ost_lvb_v1);
1420 			else
1421 				lvb = req_capsule_server_sized_swab_get(pill,
1422 						&RMF_DLM_LVB, size,
1423 						lustre_swab_ost_lvb_v1);
1424 			if (unlikely(lvb == NULL)) {
1425 				LDLM_ERROR(lock, "no LVB");
1426 				return -EPROTO;
1427 			}
1428 
1429 			memcpy(data, lvb, size);
1430 			olvb->lvb_mtime_ns = 0;
1431 			olvb->lvb_atime_ns = 0;
1432 			olvb->lvb_ctime_ns = 0;
1433 		} else {
1434 			LDLM_ERROR(lock, "Replied unexpected ost LVB size %d",
1435 				   size);
1436 			return -EINVAL;
1437 		}
1438 		break;
1439 	case LVB_T_LQUOTA:
1440 		if (size == sizeof(struct lquota_lvb)) {
1441 			if (loc == RCL_CLIENT)
1442 				lvb = req_capsule_client_swab_get(pill,
1443 						&RMF_DLM_LVB,
1444 						lustre_swab_lquota_lvb);
1445 			else
1446 				lvb = req_capsule_server_swab_get(pill,
1447 						&RMF_DLM_LVB,
1448 						lustre_swab_lquota_lvb);
1449 			if (unlikely(lvb == NULL)) {
1450 				LDLM_ERROR(lock, "no LVB");
1451 				return -EPROTO;
1452 			}
1453 
1454 			memcpy(data, lvb, size);
1455 		} else {
1456 			LDLM_ERROR(lock,
1457 				   "Replied unexpected lquota LVB size %d",
1458 				   size);
1459 			return -EINVAL;
1460 		}
1461 		break;
1462 	case LVB_T_LAYOUT:
1463 		if (size == 0)
1464 			break;
1465 
1466 		if (loc == RCL_CLIENT)
1467 			lvb = req_capsule_client_get(pill, &RMF_DLM_LVB);
1468 		else
1469 			lvb = req_capsule_server_get(pill, &RMF_DLM_LVB);
1470 		if (unlikely(lvb == NULL)) {
1471 			LDLM_ERROR(lock, "no LVB");
1472 			return -EPROTO;
1473 		}
1474 
1475 		memcpy(data, lvb, size);
1476 		break;
1477 	default:
1478 		LDLM_ERROR(lock, "Unknown LVB type: %d\n", lock->l_lvb_type);
1479 		dump_stack();
1480 		return -EINVAL;
1481 	}
1482 
1483 	return 0;
1484 }
1485 
1486 /**
1487  * Create and fill in new LDLM lock with specified properties.
1488  * Returns a referenced lock
1489  */
ldlm_lock_create(struct ldlm_namespace * ns,const struct ldlm_res_id * res_id,ldlm_type_t type,ldlm_mode_t mode,const struct ldlm_callback_suite * cbs,void * data,__u32 lvb_len,enum lvb_type lvb_type)1490 struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
1491 				   const struct ldlm_res_id *res_id,
1492 				   ldlm_type_t type,
1493 				   ldlm_mode_t mode,
1494 				   const struct ldlm_callback_suite *cbs,
1495 				   void *data, __u32 lvb_len,
1496 				   enum lvb_type lvb_type)
1497 {
1498 	struct ldlm_lock *lock;
1499 	struct ldlm_resource *res;
1500 
1501 	res = ldlm_resource_get(ns, NULL, res_id, type, 1);
1502 	if (res == NULL)
1503 		return NULL;
1504 
1505 	lock = ldlm_lock_new(res);
1506 
1507 	if (lock == NULL)
1508 		return NULL;
1509 
1510 	lock->l_req_mode = mode;
1511 	lock->l_ast_data = data;
1512 	lock->l_pid = current_pid();
1513 	if (ns_is_server(ns))
1514 		lock->l_flags |= LDLM_FL_NS_SRV;
1515 	if (cbs) {
1516 		lock->l_blocking_ast = cbs->lcs_blocking;
1517 		lock->l_completion_ast = cbs->lcs_completion;
1518 		lock->l_glimpse_ast = cbs->lcs_glimpse;
1519 	}
1520 
1521 	lock->l_tree_node = NULL;
1522 	/* if this is the extent lock, allocate the interval tree node */
1523 	if (type == LDLM_EXTENT) {
1524 		if (ldlm_interval_alloc(lock) == NULL)
1525 			goto out;
1526 	}
1527 
1528 	if (lvb_len) {
1529 		lock->l_lvb_len = lvb_len;
1530 		OBD_ALLOC(lock->l_lvb_data, lvb_len);
1531 		if (lock->l_lvb_data == NULL)
1532 			goto out;
1533 	}
1534 
1535 	lock->l_lvb_type = lvb_type;
1536 	if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_NEW_LOCK))
1537 		goto out;
1538 
1539 	return lock;
1540 
1541 out:
1542 	ldlm_lock_destroy(lock);
1543 	LDLM_LOCK_RELEASE(lock);
1544 	return NULL;
1545 }
1546 
1547 /**
1548  * Enqueue (request) a lock.
1549  *
1550  * Does not block. As a result of enqueue the lock would be put
1551  * into granted or waiting list.
1552  *
1553  * If namespace has intent policy sent and the lock has LDLM_FL_HAS_INTENT flag
1554  * set, skip all the enqueueing and delegate lock processing to intent policy
1555  * function.
1556  */
ldlm_lock_enqueue(struct ldlm_namespace * ns,struct ldlm_lock ** lockp,void * cookie,__u64 * flags)1557 ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
1558 			       struct ldlm_lock **lockp,
1559 			       void *cookie, __u64 *flags)
1560 {
1561 	struct ldlm_lock *lock = *lockp;
1562 	struct ldlm_resource *res = lock->l_resource;
1563 	int local = ns_is_client(ldlm_res_to_ns(res));
1564 	ldlm_error_t rc = ELDLM_OK;
1565 	struct ldlm_interval *node = NULL;
1566 
1567 	lock->l_last_activity = get_seconds();
1568 	/* policies are not executed on the client or during replay */
1569 	if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
1570 	    && !local && ns->ns_policy) {
1571 		rc = ns->ns_policy(ns, lockp, cookie, lock->l_req_mode, *flags,
1572 				   NULL);
1573 		if (rc == ELDLM_LOCK_REPLACED) {
1574 			/* The lock that was returned has already been granted,
1575 			 * and placed into lockp.  If it's not the same as the
1576 			 * one we passed in, then destroy the old one and our
1577 			 * work here is done. */
1578 			if (lock != *lockp) {
1579 				ldlm_lock_destroy(lock);
1580 				LDLM_LOCK_RELEASE(lock);
1581 			}
1582 			*flags |= LDLM_FL_LOCK_CHANGED;
1583 			return 0;
1584 		} else if (rc != ELDLM_OK ||
1585 			   (rc == ELDLM_OK && (*flags & LDLM_FL_INTENT_ONLY))) {
1586 			ldlm_lock_destroy(lock);
1587 			return rc;
1588 		}
1589 	}
1590 
1591 	/* For a replaying lock, it might be already in granted list. So
1592 	 * unlinking the lock will cause the interval node to be freed, we
1593 	 * have to allocate the interval node early otherwise we can't regrant
1594 	 * this lock in the future. - jay */
1595 	if (!local && (*flags & LDLM_FL_REPLAY) && res->lr_type == LDLM_EXTENT)
1596 		OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
1597 
1598 	lock_res_and_lock(lock);
1599 	if (local && lock->l_req_mode == lock->l_granted_mode) {
1600 		/* The server returned a blocked lock, but it was granted
1601 		 * before we got a chance to actually enqueue it.  We don't
1602 		 * need to do anything else. */
1603 		*flags &= ~(LDLM_FL_BLOCK_GRANTED |
1604 			    LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_WAIT);
1605 		goto out;
1606 	}
1607 
1608 	ldlm_resource_unlink_lock(lock);
1609 	if (res->lr_type == LDLM_EXTENT && lock->l_tree_node == NULL) {
1610 		if (node == NULL) {
1611 			ldlm_lock_destroy_nolock(lock);
1612 			rc = -ENOMEM;
1613 			goto out;
1614 		}
1615 
1616 		INIT_LIST_HEAD(&node->li_group);
1617 		ldlm_interval_attach(node, lock);
1618 		node = NULL;
1619 	}
1620 
1621 	/* Some flags from the enqueue want to make it into the AST, via the
1622 	 * lock's l_flags. */
1623 	lock->l_flags |= *flags & LDLM_FL_AST_DISCARD_DATA;
1624 
1625 	/* This distinction between local lock trees is very important; a client
1626 	 * namespace only has information about locks taken by that client, and
1627 	 * thus doesn't have enough information to decide for itself if it can
1628 	 * be granted (below).  In this case, we do exactly what the server
1629 	 * tells us to do, as dictated by the 'flags'.
1630 	 *
1631 	 * We do exactly the same thing during recovery, when the server is
1632 	 * more or less trusting the clients not to lie.
1633 	 *
1634 	 * FIXME (bug 268): Detect obvious lies by checking compatibility in
1635 	 * granted/converting queues. */
1636 	if (local) {
1637 		if (*flags & LDLM_FL_BLOCK_CONV)
1638 			ldlm_resource_add_lock(res, &res->lr_converting, lock);
1639 		else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
1640 			ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1641 		else
1642 			ldlm_grant_lock(lock, NULL);
1643 		goto out;
1644 	} else {
1645 		CERROR("This is client-side-only module, cannot handle LDLM_NAMESPACE_SERVER resource type lock.\n");
1646 		LBUG();
1647 	}
1648 
1649 out:
1650 	unlock_res_and_lock(lock);
1651 	if (node)
1652 		OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
1653 	return rc;
1654 }
1655 
1656 
1657 /**
1658  * Process a call to blocking AST callback for a lock in ast_work list
1659  */
1660 static int
ldlm_work_bl_ast_lock(struct ptlrpc_request_set * rqset,void * opaq)1661 ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1662 {
1663 	struct ldlm_cb_set_arg *arg = opaq;
1664 	struct ldlm_lock_desc   d;
1665 	int		     rc;
1666 	struct ldlm_lock       *lock;
1667 
1668 	if (list_empty(arg->list))
1669 		return -ENOENT;
1670 
1671 	lock = list_entry(arg->list->next, struct ldlm_lock, l_bl_ast);
1672 
1673 	/* nobody should touch l_bl_ast */
1674 	lock_res_and_lock(lock);
1675 	list_del_init(&lock->l_bl_ast);
1676 
1677 	LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
1678 	LASSERT(lock->l_bl_ast_run == 0);
1679 	LASSERT(lock->l_blocking_lock);
1680 	lock->l_bl_ast_run++;
1681 	unlock_res_and_lock(lock);
1682 
1683 	ldlm_lock2desc(lock->l_blocking_lock, &d);
1684 
1685 	rc = lock->l_blocking_ast(lock, &d, (void *)arg, LDLM_CB_BLOCKING);
1686 	LDLM_LOCK_RELEASE(lock->l_blocking_lock);
1687 	lock->l_blocking_lock = NULL;
1688 	LDLM_LOCK_RELEASE(lock);
1689 
1690 	return rc;
1691 }
1692 
1693 /**
1694  * Process a call to completion AST callback for a lock in ast_work list
1695  */
1696 static int
ldlm_work_cp_ast_lock(struct ptlrpc_request_set * rqset,void * opaq)1697 ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1698 {
1699 	struct ldlm_cb_set_arg  *arg = opaq;
1700 	int		      rc = 0;
1701 	struct ldlm_lock	*lock;
1702 	ldlm_completion_callback completion_callback;
1703 
1704 	if (list_empty(arg->list))
1705 		return -ENOENT;
1706 
1707 	lock = list_entry(arg->list->next, struct ldlm_lock, l_cp_ast);
1708 
1709 	/* It's possible to receive a completion AST before we've set
1710 	 * the l_completion_ast pointer: either because the AST arrived
1711 	 * before the reply, or simply because there's a small race
1712 	 * window between receiving the reply and finishing the local
1713 	 * enqueue. (bug 842)
1714 	 *
1715 	 * This can't happen with the blocking_ast, however, because we
1716 	 * will never call the local blocking_ast until we drop our
1717 	 * reader/writer reference, which we won't do until we get the
1718 	 * reply and finish enqueueing. */
1719 
1720 	/* nobody should touch l_cp_ast */
1721 	lock_res_and_lock(lock);
1722 	list_del_init(&lock->l_cp_ast);
1723 	LASSERT(lock->l_flags & LDLM_FL_CP_REQD);
1724 	/* save l_completion_ast since it can be changed by
1725 	 * mds_intent_policy(), see bug 14225 */
1726 	completion_callback = lock->l_completion_ast;
1727 	lock->l_flags &= ~LDLM_FL_CP_REQD;
1728 	unlock_res_and_lock(lock);
1729 
1730 	if (completion_callback != NULL)
1731 		rc = completion_callback(lock, 0, (void *)arg);
1732 	LDLM_LOCK_RELEASE(lock);
1733 
1734 	return rc;
1735 }
1736 
1737 /**
1738  * Process a call to revocation AST callback for a lock in ast_work list
1739  */
1740 static int
ldlm_work_revoke_ast_lock(struct ptlrpc_request_set * rqset,void * opaq)1741 ldlm_work_revoke_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1742 {
1743 	struct ldlm_cb_set_arg *arg = opaq;
1744 	struct ldlm_lock_desc   desc;
1745 	int		     rc;
1746 	struct ldlm_lock       *lock;
1747 
1748 	if (list_empty(arg->list))
1749 		return -ENOENT;
1750 
1751 	lock = list_entry(arg->list->next, struct ldlm_lock, l_rk_ast);
1752 	list_del_init(&lock->l_rk_ast);
1753 
1754 	/* the desc just pretend to exclusive */
1755 	ldlm_lock2desc(lock, &desc);
1756 	desc.l_req_mode = LCK_EX;
1757 	desc.l_granted_mode = 0;
1758 
1759 	rc = lock->l_blocking_ast(lock, &desc, (void *)arg, LDLM_CB_BLOCKING);
1760 	LDLM_LOCK_RELEASE(lock);
1761 
1762 	return rc;
1763 }
1764 
1765 /**
1766  * Process a call to glimpse AST callback for a lock in ast_work list
1767  */
ldlm_work_gl_ast_lock(struct ptlrpc_request_set * rqset,void * opaq)1768 int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1769 {
1770 	struct ldlm_cb_set_arg		*arg = opaq;
1771 	struct ldlm_glimpse_work	*gl_work;
1772 	struct ldlm_lock		*lock;
1773 	int				 rc = 0;
1774 
1775 	if (list_empty(arg->list))
1776 		return -ENOENT;
1777 
1778 	gl_work = list_entry(arg->list->next, struct ldlm_glimpse_work,
1779 				 gl_list);
1780 	list_del_init(&gl_work->gl_list);
1781 
1782 	lock = gl_work->gl_lock;
1783 
1784 	/* transfer the glimpse descriptor to ldlm_cb_set_arg */
1785 	arg->gl_desc = gl_work->gl_desc;
1786 
1787 	/* invoke the actual glimpse callback */
1788 	if (lock->l_glimpse_ast(lock, (void *)arg) == 0)
1789 		rc = 1;
1790 
1791 	LDLM_LOCK_RELEASE(lock);
1792 
1793 	if ((gl_work->gl_flags & LDLM_GL_WORK_NOFREE) == 0)
1794 		OBD_FREE_PTR(gl_work);
1795 
1796 	return rc;
1797 }
1798 
1799 /**
1800  * Process list of locks in need of ASTs being sent.
1801  *
1802  * Used on server to send multiple ASTs together instead of sending one by
1803  * one.
1804  */
ldlm_run_ast_work(struct ldlm_namespace * ns,struct list_head * rpc_list,enum ldlm_desc_ast_t ast_type)1805 int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list,
1806 		      enum ldlm_desc_ast_t ast_type)
1807 {
1808 	struct ldlm_cb_set_arg *arg;
1809 	set_producer_func       work_ast_lock;
1810 	int		     rc;
1811 
1812 	if (list_empty(rpc_list))
1813 		return 0;
1814 
1815 	OBD_ALLOC_PTR(arg);
1816 	if (arg == NULL)
1817 		return -ENOMEM;
1818 
1819 	atomic_set(&arg->restart, 0);
1820 	arg->list = rpc_list;
1821 
1822 	switch (ast_type) {
1823 	case LDLM_WORK_BL_AST:
1824 		arg->type = LDLM_BL_CALLBACK;
1825 		work_ast_lock = ldlm_work_bl_ast_lock;
1826 		break;
1827 	case LDLM_WORK_CP_AST:
1828 		arg->type = LDLM_CP_CALLBACK;
1829 		work_ast_lock = ldlm_work_cp_ast_lock;
1830 		break;
1831 	case LDLM_WORK_REVOKE_AST:
1832 		arg->type = LDLM_BL_CALLBACK;
1833 		work_ast_lock = ldlm_work_revoke_ast_lock;
1834 		break;
1835 	case LDLM_WORK_GL_AST:
1836 		arg->type = LDLM_GL_CALLBACK;
1837 		work_ast_lock = ldlm_work_gl_ast_lock;
1838 		break;
1839 	default:
1840 		LBUG();
1841 	}
1842 
1843 	/* We create a ptlrpc request set with flow control extension.
1844 	 * This request set will use the work_ast_lock function to produce new
1845 	 * requests and will send a new request each time one completes in order
1846 	 * to keep the number of requests in flight to ns_max_parallel_ast */
1847 	arg->set = ptlrpc_prep_fcset(ns->ns_max_parallel_ast ? : UINT_MAX,
1848 				     work_ast_lock, arg);
1849 	if (arg->set == NULL) {
1850 		rc = -ENOMEM;
1851 		goto out;
1852 	}
1853 
1854 	ptlrpc_set_wait(arg->set);
1855 	ptlrpc_set_destroy(arg->set);
1856 
1857 	rc = atomic_read(&arg->restart) ? -ERESTART : 0;
1858 	goto out;
1859 out:
1860 	OBD_FREE_PTR(arg);
1861 	return rc;
1862 }
1863 
reprocess_one_queue(struct ldlm_resource * res,void * closure)1864 static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
1865 {
1866 	ldlm_reprocess_all(res);
1867 	return LDLM_ITER_CONTINUE;
1868 }
1869 
ldlm_reprocess_res(struct cfs_hash * hs,struct cfs_hash_bd * bd,struct hlist_node * hnode,void * arg)1870 static int ldlm_reprocess_res(struct cfs_hash *hs, struct cfs_hash_bd *bd,
1871 			      struct hlist_node *hnode, void *arg)
1872 {
1873 	struct ldlm_resource *res = cfs_hash_object(hs, hnode);
1874 	int    rc;
1875 
1876 	rc = reprocess_one_queue(res, arg);
1877 
1878 	return rc == LDLM_ITER_STOP;
1879 }
1880 
1881 /**
1882  * Iterate through all resources on a namespace attempting to grant waiting
1883  * locks.
1884  */
ldlm_reprocess_all_ns(struct ldlm_namespace * ns)1885 void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
1886 {
1887 	if (ns != NULL) {
1888 		cfs_hash_for_each_nolock(ns->ns_rs_hash,
1889 					 ldlm_reprocess_res, NULL);
1890 	}
1891 }
1892 EXPORT_SYMBOL(ldlm_reprocess_all_ns);
1893 
1894 /**
1895  * Try to grant all waiting locks on a resource.
1896  *
1897  * Calls ldlm_reprocess_queue on converting and waiting queues.
1898  *
1899  * Typically called after some resource locks are cancelled to see
1900  * if anything could be granted as a result of the cancellation.
1901  */
ldlm_reprocess_all(struct ldlm_resource * res)1902 void ldlm_reprocess_all(struct ldlm_resource *res)
1903 {
1904 	LIST_HEAD(rpc_list);
1905 
1906 	if (!ns_is_client(ldlm_res_to_ns(res))) {
1907 		CERROR("This is client-side-only module, cannot handle LDLM_NAMESPACE_SERVER resource type lock.\n");
1908 		LBUG();
1909 	}
1910 }
1911 
1912 /**
1913  * Helper function to call blocking AST for LDLM lock \a lock in a
1914  * "cancelling" mode.
1915  */
ldlm_cancel_callback(struct ldlm_lock * lock)1916 void ldlm_cancel_callback(struct ldlm_lock *lock)
1917 {
1918 	check_res_locked(lock->l_resource);
1919 	if (!(lock->l_flags & LDLM_FL_CANCEL)) {
1920 		lock->l_flags |= LDLM_FL_CANCEL;
1921 		if (lock->l_blocking_ast) {
1922 			unlock_res_and_lock(lock);
1923 			lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
1924 					     LDLM_CB_CANCELING);
1925 			lock_res_and_lock(lock);
1926 		} else {
1927 			LDLM_DEBUG(lock, "no blocking ast");
1928 		}
1929 	}
1930 	lock->l_flags |= LDLM_FL_BL_DONE;
1931 }
1932 
1933 /**
1934  * Remove skiplist-enabled LDLM lock \a req from granted list
1935  */
ldlm_unlink_lock_skiplist(struct ldlm_lock * req)1936 void ldlm_unlink_lock_skiplist(struct ldlm_lock *req)
1937 {
1938 	if (req->l_resource->lr_type != LDLM_PLAIN &&
1939 	    req->l_resource->lr_type != LDLM_IBITS)
1940 		return;
1941 
1942 	list_del_init(&req->l_sl_policy);
1943 	list_del_init(&req->l_sl_mode);
1944 }
1945 
1946 /**
1947  * Attempts to cancel LDLM lock \a lock that has no reader/writer references.
1948  */
ldlm_lock_cancel(struct ldlm_lock * lock)1949 void ldlm_lock_cancel(struct ldlm_lock *lock)
1950 {
1951 	struct ldlm_resource *res;
1952 	struct ldlm_namespace *ns;
1953 
1954 	lock_res_and_lock(lock);
1955 
1956 	res = lock->l_resource;
1957 	ns  = ldlm_res_to_ns(res);
1958 
1959 	/* Please do not, no matter how tempting, remove this LBUG without
1960 	 * talking to me first. -phik */
1961 	if (lock->l_readers || lock->l_writers) {
1962 		LDLM_ERROR(lock, "lock still has references");
1963 		LBUG();
1964 	}
1965 
1966 	if (lock->l_flags & LDLM_FL_WAITED)
1967 		ldlm_del_waiting_lock(lock);
1968 
1969 	/* Releases cancel callback. */
1970 	ldlm_cancel_callback(lock);
1971 
1972 	/* Yes, second time, just in case it was added again while we were
1973 	 * running with no res lock in ldlm_cancel_callback */
1974 	if (lock->l_flags & LDLM_FL_WAITED)
1975 		ldlm_del_waiting_lock(lock);
1976 
1977 	ldlm_resource_unlink_lock(lock);
1978 	ldlm_lock_destroy_nolock(lock);
1979 
1980 	if (lock->l_granted_mode == lock->l_req_mode)
1981 		ldlm_pool_del(&ns->ns_pool, lock);
1982 
1983 	/* Make sure we will not be called again for same lock what is possible
1984 	 * if not to zero out lock->l_granted_mode */
1985 	lock->l_granted_mode = LCK_MINMODE;
1986 	unlock_res_and_lock(lock);
1987 }
1988 EXPORT_SYMBOL(ldlm_lock_cancel);
1989 
1990 /**
1991  * Set opaque data into the lock that only makes sense to upper layer.
1992  */
ldlm_lock_set_data(struct lustre_handle * lockh,void * data)1993 int ldlm_lock_set_data(struct lustre_handle *lockh, void *data)
1994 {
1995 	struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1996 	int rc = -EINVAL;
1997 
1998 	if (lock) {
1999 		if (lock->l_ast_data == NULL)
2000 			lock->l_ast_data = data;
2001 		if (lock->l_ast_data == data)
2002 			rc = 0;
2003 		LDLM_LOCK_PUT(lock);
2004 	}
2005 	return rc;
2006 }
2007 EXPORT_SYMBOL(ldlm_lock_set_data);
2008 
2009 struct export_cl_data {
2010 	struct obd_export	*ecl_exp;
2011 	int			ecl_loop;
2012 };
2013 
2014 /**
2015  * Iterator function for ldlm_cancel_locks_for_export.
2016  * Cancels passed locks.
2017  */
ldlm_cancel_locks_for_export_cb(struct cfs_hash * hs,struct cfs_hash_bd * bd,struct hlist_node * hnode,void * data)2018 int ldlm_cancel_locks_for_export_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
2019 				    struct hlist_node *hnode, void *data)
2020 
2021 {
2022 	struct export_cl_data	*ecl = (struct export_cl_data *)data;
2023 	struct obd_export	*exp  = ecl->ecl_exp;
2024 	struct ldlm_lock     *lock = cfs_hash_object(hs, hnode);
2025 	struct ldlm_resource *res;
2026 
2027 	res = ldlm_resource_getref(lock->l_resource);
2028 	LDLM_LOCK_GET(lock);
2029 
2030 	LDLM_DEBUG(lock, "export %p", exp);
2031 	ldlm_res_lvbo_update(res, NULL, 1);
2032 	ldlm_lock_cancel(lock);
2033 	ldlm_reprocess_all(res);
2034 	ldlm_resource_putref(res);
2035 	LDLM_LOCK_RELEASE(lock);
2036 
2037 	ecl->ecl_loop++;
2038 	if ((ecl->ecl_loop & -ecl->ecl_loop) == ecl->ecl_loop) {
2039 		CDEBUG(D_INFO,
2040 		       "Cancel lock %p for export %p (loop %d), still have %d locks left on hash table.\n",
2041 		       lock, exp, ecl->ecl_loop,
2042 		       atomic_read(&hs->hs_count));
2043 	}
2044 
2045 	return 0;
2046 }
2047 
2048 /**
2049  * Cancel all locks for given export.
2050  *
2051  * Typically called on client disconnection/eviction
2052  */
ldlm_cancel_locks_for_export(struct obd_export * exp)2053 void ldlm_cancel_locks_for_export(struct obd_export *exp)
2054 {
2055 	struct export_cl_data	ecl = {
2056 		.ecl_exp	= exp,
2057 		.ecl_loop	= 0,
2058 	};
2059 
2060 	cfs_hash_for_each_empty(exp->exp_lock_hash,
2061 				ldlm_cancel_locks_for_export_cb, &ecl);
2062 }
2063 
2064 /**
2065  * Downgrade an exclusive lock.
2066  *
2067  * A fast variant of ldlm_lock_convert for conversion of exclusive
2068  * locks. The conversion is always successful.
2069  * Used by Commit on Sharing (COS) code.
2070  *
2071  * \param lock A lock to convert
2072  * \param new_mode new lock mode
2073  */
ldlm_lock_downgrade(struct ldlm_lock * lock,int new_mode)2074 void ldlm_lock_downgrade(struct ldlm_lock *lock, int new_mode)
2075 {
2076 	LASSERT(lock->l_granted_mode & (LCK_PW | LCK_EX));
2077 	LASSERT(new_mode == LCK_COS);
2078 
2079 	lock_res_and_lock(lock);
2080 	ldlm_resource_unlink_lock(lock);
2081 	/*
2082 	 * Remove the lock from pool as it will be added again in
2083 	 * ldlm_grant_lock() called below.
2084 	 */
2085 	ldlm_pool_del(&ldlm_lock_to_ns(lock)->ns_pool, lock);
2086 
2087 	lock->l_req_mode = new_mode;
2088 	ldlm_grant_lock(lock, NULL);
2089 	unlock_res_and_lock(lock);
2090 	ldlm_reprocess_all(lock->l_resource);
2091 }
2092 EXPORT_SYMBOL(ldlm_lock_downgrade);
2093 
2094 /**
2095  * Attempt to convert already granted lock to a different mode.
2096  *
2097  * While lock conversion is not currently used, future client-side
2098  * optimizations could take advantage of it to avoid discarding cached
2099  * pages on a file.
2100  */
ldlm_lock_convert(struct ldlm_lock * lock,int new_mode,__u32 * flags)2101 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
2102 					__u32 *flags)
2103 {
2104 	LIST_HEAD(rpc_list);
2105 	struct ldlm_resource *res;
2106 	struct ldlm_namespace *ns;
2107 	int granted = 0;
2108 	struct ldlm_interval *node;
2109 
2110 	/* Just return if mode is unchanged. */
2111 	if (new_mode == lock->l_granted_mode) {
2112 		*flags |= LDLM_FL_BLOCK_GRANTED;
2113 		return lock->l_resource;
2114 	}
2115 
2116 	/* I can't check the type of lock here because the bitlock of lock
2117 	 * is not held here, so do the allocation blindly. -jay */
2118 	OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
2119 	if (node == NULL)
2120 		/* Actually, this causes EDEADLOCK to be returned */
2121 		return NULL;
2122 
2123 	LASSERTF((new_mode == LCK_PW && lock->l_granted_mode == LCK_PR),
2124 		 "new_mode %u, granted %u\n", new_mode, lock->l_granted_mode);
2125 
2126 	lock_res_and_lock(lock);
2127 
2128 	res = lock->l_resource;
2129 	ns  = ldlm_res_to_ns(res);
2130 
2131 	lock->l_req_mode = new_mode;
2132 	if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS) {
2133 		ldlm_resource_unlink_lock(lock);
2134 	} else {
2135 		ldlm_resource_unlink_lock(lock);
2136 		if (res->lr_type == LDLM_EXTENT) {
2137 			/* FIXME: ugly code, I have to attach the lock to a
2138 			 * interval node again since perhaps it will be granted
2139 			 * soon */
2140 			INIT_LIST_HEAD(&node->li_group);
2141 			ldlm_interval_attach(node, lock);
2142 			node = NULL;
2143 		}
2144 	}
2145 
2146 	/*
2147 	 * Remove old lock from the pool before adding the lock with new
2148 	 * mode below in ->policy()
2149 	 */
2150 	ldlm_pool_del(&ns->ns_pool, lock);
2151 
2152 	/* If this is a local resource, put it on the appropriate list. */
2153 	if (ns_is_client(ldlm_res_to_ns(res))) {
2154 		if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) {
2155 			ldlm_resource_add_lock(res, &res->lr_converting, lock);
2156 		} else {
2157 			/* This should never happen, because of the way the
2158 			 * server handles conversions. */
2159 			LDLM_ERROR(lock, "Erroneous flags %x on local lock\n",
2160 				   *flags);
2161 			LBUG();
2162 
2163 			ldlm_grant_lock(lock, &rpc_list);
2164 			granted = 1;
2165 			/* FIXME: completion handling not with lr_lock held ! */
2166 			if (lock->l_completion_ast)
2167 				lock->l_completion_ast(lock, 0, NULL);
2168 		}
2169 	} else {
2170 		CERROR("This is client-side-only module, cannot handle LDLM_NAMESPACE_SERVER resource type lock.\n");
2171 		LBUG();
2172 	}
2173 	unlock_res_and_lock(lock);
2174 
2175 	if (granted)
2176 		ldlm_run_ast_work(ns, &rpc_list, LDLM_WORK_CP_AST);
2177 	if (node)
2178 		OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
2179 	return res;
2180 }
2181 EXPORT_SYMBOL(ldlm_lock_convert);
2182 
2183 /**
2184  * Print lock with lock handle \a lockh description into debug log.
2185  *
2186  * Used when printing all locks on a resource for debug purposes.
2187  */
ldlm_lock_dump_handle(int level,struct lustre_handle * lockh)2188 void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh)
2189 {
2190 	struct ldlm_lock *lock;
2191 
2192 	if (!((libcfs_debug | D_ERROR) & level))
2193 		return;
2194 
2195 	lock = ldlm_handle2lock(lockh);
2196 	if (lock == NULL)
2197 		return;
2198 
2199 	LDLM_DEBUG_LIMIT(level, lock, "###");
2200 
2201 	LDLM_LOCK_PUT(lock);
2202 }
2203 EXPORT_SYMBOL(ldlm_lock_dump_handle);
2204 
2205 /**
2206  * Print lock information with custom message into debug log.
2207  * Helper function.
2208  */
_ldlm_lock_debug(struct ldlm_lock * lock,struct libcfs_debug_msg_data * msgdata,const char * fmt,...)2209 void _ldlm_lock_debug(struct ldlm_lock *lock,
2210 		      struct libcfs_debug_msg_data *msgdata,
2211 		      const char *fmt, ...)
2212 {
2213 	va_list args;
2214 	struct obd_export *exp = lock->l_export;
2215 	struct ldlm_resource *resource = lock->l_resource;
2216 	char *nid = "local";
2217 
2218 	va_start(args, fmt);
2219 
2220 	if (exp && exp->exp_connection) {
2221 		nid = libcfs_nid2str(exp->exp_connection->c_peer.nid);
2222 	} else if (exp && exp->exp_obd != NULL) {
2223 		struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
2224 
2225 		nid = libcfs_nid2str(imp->imp_connection->c_peer.nid);
2226 	}
2227 
2228 	if (resource == NULL) {
2229 		libcfs_debug_vmsg2(msgdata, fmt, args,
2230 				   " ns: \?\? lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: \?\? rrc=\?\? type: \?\?\? flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lu lvb_type: %d\n",
2231 				   lock,
2232 				   lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
2233 				   lock->l_readers, lock->l_writers,
2234 				   ldlm_lockname[lock->l_granted_mode],
2235 				   ldlm_lockname[lock->l_req_mode],
2236 				   lock->l_flags, nid, lock->l_remote_handle.cookie,
2237 				   exp ? atomic_read(&exp->exp_refcount) : -99,
2238 				   lock->l_pid, lock->l_callback_timeout, lock->l_lvb_type);
2239 		va_end(args);
2240 		return;
2241 	}
2242 
2243 	switch (resource->lr_type) {
2244 	case LDLM_EXTENT:
2245 		libcfs_debug_vmsg2(msgdata, fmt, args,
2246 				   " ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES " rrc: %d type: %s [%llu->%llu] (req %llu->%llu) flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lu lvb_type: %d\n",
2247 				   ldlm_lock_to_ns_name(lock), lock,
2248 				   lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
2249 				   lock->l_readers, lock->l_writers,
2250 				   ldlm_lockname[lock->l_granted_mode],
2251 				   ldlm_lockname[lock->l_req_mode],
2252 				   PLDLMRES(resource),
2253 				   atomic_read(&resource->lr_refcount),
2254 				   ldlm_typename[resource->lr_type],
2255 				   lock->l_policy_data.l_extent.start,
2256 				   lock->l_policy_data.l_extent.end,
2257 				   lock->l_req_extent.start, lock->l_req_extent.end,
2258 				   lock->l_flags, nid, lock->l_remote_handle.cookie,
2259 				   exp ? atomic_read(&exp->exp_refcount) : -99,
2260 				   lock->l_pid, lock->l_callback_timeout,
2261 				   lock->l_lvb_type);
2262 		break;
2263 
2264 	case LDLM_FLOCK:
2265 		libcfs_debug_vmsg2(msgdata, fmt, args,
2266 				   " ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES " rrc: %d type: %s pid: %d [%llu->%llu] flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lu\n",
2267 				   ldlm_lock_to_ns_name(lock), lock,
2268 				   lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
2269 				   lock->l_readers, lock->l_writers,
2270 				   ldlm_lockname[lock->l_granted_mode],
2271 				   ldlm_lockname[lock->l_req_mode],
2272 				   PLDLMRES(resource),
2273 				   atomic_read(&resource->lr_refcount),
2274 				   ldlm_typename[resource->lr_type],
2275 				   lock->l_policy_data.l_flock.pid,
2276 				   lock->l_policy_data.l_flock.start,
2277 				   lock->l_policy_data.l_flock.end,
2278 				   lock->l_flags, nid, lock->l_remote_handle.cookie,
2279 				   exp ? atomic_read(&exp->exp_refcount) : -99,
2280 				   lock->l_pid, lock->l_callback_timeout);
2281 		break;
2282 
2283 	case LDLM_IBITS:
2284 		libcfs_debug_vmsg2(msgdata, fmt, args,
2285 				   " ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES " bits %#llx rrc: %d type: %s flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lu lvb_type: %d\n",
2286 				   ldlm_lock_to_ns_name(lock),
2287 				   lock, lock->l_handle.h_cookie,
2288 				   atomic_read(&lock->l_refc),
2289 				   lock->l_readers, lock->l_writers,
2290 				   ldlm_lockname[lock->l_granted_mode],
2291 				   ldlm_lockname[lock->l_req_mode],
2292 				   PLDLMRES(resource),
2293 				   lock->l_policy_data.l_inodebits.bits,
2294 				   atomic_read(&resource->lr_refcount),
2295 				   ldlm_typename[resource->lr_type],
2296 				   lock->l_flags, nid, lock->l_remote_handle.cookie,
2297 				   exp ? atomic_read(&exp->exp_refcount) : -99,
2298 				   lock->l_pid, lock->l_callback_timeout,
2299 				   lock->l_lvb_type);
2300 		break;
2301 
2302 	default:
2303 		libcfs_debug_vmsg2(msgdata, fmt, args,
2304 				   " ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES " rrc: %d type: %s flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lu lvb_type: %d\n",
2305 				   ldlm_lock_to_ns_name(lock),
2306 				   lock, lock->l_handle.h_cookie,
2307 				   atomic_read(&lock->l_refc),
2308 				   lock->l_readers, lock->l_writers,
2309 				   ldlm_lockname[lock->l_granted_mode],
2310 				   ldlm_lockname[lock->l_req_mode],
2311 				   PLDLMRES(resource),
2312 				   atomic_read(&resource->lr_refcount),
2313 				   ldlm_typename[resource->lr_type],
2314 				   lock->l_flags, nid, lock->l_remote_handle.cookie,
2315 				   exp ? atomic_read(&exp->exp_refcount) : -99,
2316 				   lock->l_pid, lock->l_callback_timeout,
2317 				   lock->l_lvb_type);
2318 		break;
2319 	}
2320 	va_end(args);
2321 }
2322 EXPORT_SYMBOL(_ldlm_lock_debug);
2323