1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003 Hewlett-Packard Development Company LP.
28  * Developed under the sponsorship of the US Government under
29  * Subcontract No. B514193
30  *
31  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
32  * Use is subject to license terms.
33  *
34  * Copyright (c) 2010, 2012, Intel Corporation.
35  */
36 /*
37  * This file is part of Lustre, http://www.lustre.org/
38  * Lustre is a trademark of Sun Microsystems, Inc.
39  */
40 
41 /**
42  * This file implements POSIX lock type for Lustre.
43  * Its policy properties are start and end of extent and PID.
44  *
45  * These locks are only done through MDS due to POSIX semantics requiring
46  * e.g. that locks could be only partially released and as such split into
47  * two parts, and also that two adjacent locks from the same process may be
48  * merged into a single wider lock.
49  *
50  * Lock modes are mapped like this:
51  * PR and PW for READ and WRITE locks
52  * NL to request a releasing of a portion of the lock
53  *
54  * These flock locks never timeout.
55  */
56 
57 #define DEBUG_SUBSYSTEM S_LDLM
58 
59 #include "../include/lustre_dlm.h"
60 #include "../include/obd_support.h"
61 #include "../include/obd_class.h"
62 #include "../include/lustre_lib.h"
63 #include <linux/list.h>
64 #include "ldlm_internal.h"
65 
66 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
67 			    void *data, int flag);
68 
69 /**
70  * list_for_remaining_safe - iterate over the remaining entries in a list
71  *	      and safeguard against removal of a list entry.
72  * \param pos   the &struct list_head to use as a loop counter. pos MUST
73  *	      have been initialized prior to using it in this macro.
74  * \param n     another &struct list_head to use as temporary storage
75  * \param head  the head for your list.
76  */
77 #define list_for_remaining_safe(pos, n, head) \
78 	for (n = pos->next; pos != (head); pos = n, n = pos->next)
79 
80 static inline int
ldlm_same_flock_owner(struct ldlm_lock * lock,struct ldlm_lock * new)81 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
82 {
83 	return((new->l_policy_data.l_flock.owner ==
84 		lock->l_policy_data.l_flock.owner) &&
85 	       (new->l_export == lock->l_export));
86 }
87 
88 static inline int
ldlm_flocks_overlap(struct ldlm_lock * lock,struct ldlm_lock * new)89 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
90 {
91 	return((new->l_policy_data.l_flock.start <=
92 		lock->l_policy_data.l_flock.end) &&
93 	       (new->l_policy_data.l_flock.end >=
94 		lock->l_policy_data.l_flock.start));
95 }
96 
ldlm_flock_blocking_link(struct ldlm_lock * req,struct ldlm_lock * lock)97 static inline void ldlm_flock_blocking_link(struct ldlm_lock *req,
98 					    struct ldlm_lock *lock)
99 {
100 	/* For server only */
101 	if (req->l_export == NULL)
102 		return;
103 
104 	LASSERT(hlist_unhashed(&req->l_exp_flock_hash));
105 
106 	req->l_policy_data.l_flock.blocking_owner =
107 		lock->l_policy_data.l_flock.owner;
108 	req->l_policy_data.l_flock.blocking_export =
109 		lock->l_export;
110 	req->l_policy_data.l_flock.blocking_refs = 0;
111 
112 	cfs_hash_add(req->l_export->exp_flock_hash,
113 		     &req->l_policy_data.l_flock.owner,
114 		     &req->l_exp_flock_hash);
115 }
116 
ldlm_flock_blocking_unlink(struct ldlm_lock * req)117 static inline void ldlm_flock_blocking_unlink(struct ldlm_lock *req)
118 {
119 	/* For server only */
120 	if (req->l_export == NULL)
121 		return;
122 
123 	check_res_locked(req->l_resource);
124 	if (req->l_export->exp_flock_hash != NULL &&
125 	    !hlist_unhashed(&req->l_exp_flock_hash))
126 		cfs_hash_del(req->l_export->exp_flock_hash,
127 			     &req->l_policy_data.l_flock.owner,
128 			     &req->l_exp_flock_hash);
129 }
130 
131 static inline void
ldlm_flock_destroy(struct ldlm_lock * lock,ldlm_mode_t mode,__u64 flags)132 ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, __u64 flags)
133 {
134 	LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%llx)",
135 		   mode, flags);
136 
137 	/* Safe to not lock here, since it should be empty anyway */
138 	LASSERT(hlist_unhashed(&lock->l_exp_flock_hash));
139 
140 	list_del_init(&lock->l_res_link);
141 	if (flags == LDLM_FL_WAIT_NOREPROC &&
142 	    !(lock->l_flags & LDLM_FL_FAILED)) {
143 		/* client side - set a flag to prevent sending a CANCEL */
144 		lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
145 
146 		/* when reaching here, it is under lock_res_and_lock(). Thus,
147 		   need call the nolock version of ldlm_lock_decref_internal*/
148 		ldlm_lock_decref_internal_nolock(lock, mode);
149 	}
150 
151 	ldlm_lock_destroy_nolock(lock);
152 }
153 
154 /**
155  * POSIX locks deadlock detection code.
156  *
157  * Given a new lock \a req and an existing lock \a bl_lock it conflicts
158  * with, we need to iterate through all blocked POSIX locks for this
159  * export and see if there is a deadlock condition arising. (i.e. when
160  * one client holds a lock on something and want a lock on something
161  * else and at the same time another client has the opposite situation).
162  */
163 static int
ldlm_flock_deadlock(struct ldlm_lock * req,struct ldlm_lock * bl_lock)164 ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *bl_lock)
165 {
166 	struct obd_export *req_exp = req->l_export;
167 	struct obd_export *bl_exp = bl_lock->l_export;
168 	__u64 req_owner = req->l_policy_data.l_flock.owner;
169 	__u64 bl_owner = bl_lock->l_policy_data.l_flock.owner;
170 
171 	/* For server only */
172 	if (req_exp == NULL)
173 		return 0;
174 
175 	class_export_get(bl_exp);
176 	while (1) {
177 		struct obd_export *bl_exp_new;
178 		struct ldlm_lock *lock = NULL;
179 		struct ldlm_flock *flock;
180 
181 		if (bl_exp->exp_flock_hash != NULL)
182 			lock = cfs_hash_lookup(bl_exp->exp_flock_hash,
183 					       &bl_owner);
184 		if (lock == NULL)
185 			break;
186 
187 		LASSERT(req != lock);
188 		flock = &lock->l_policy_data.l_flock;
189 		LASSERT(flock->owner == bl_owner);
190 		bl_owner = flock->blocking_owner;
191 		bl_exp_new = class_export_get(flock->blocking_export);
192 		class_export_put(bl_exp);
193 
194 		cfs_hash_put(bl_exp->exp_flock_hash, &lock->l_exp_flock_hash);
195 		bl_exp = bl_exp_new;
196 
197 		if (bl_owner == req_owner && bl_exp == req_exp) {
198 			class_export_put(bl_exp);
199 			return 1;
200 		}
201 	}
202 	class_export_put(bl_exp);
203 
204 	return 0;
205 }
206 
ldlm_flock_cancel_on_deadlock(struct ldlm_lock * lock,struct list_head * work_list)207 static void ldlm_flock_cancel_on_deadlock(struct ldlm_lock *lock,
208 					  struct list_head *work_list)
209 {
210 	CDEBUG(D_INFO, "reprocess deadlock req=%p\n", lock);
211 
212 	if ((exp_connect_flags(lock->l_export) &
213 				OBD_CONNECT_FLOCK_DEAD) == 0) {
214 		CERROR(
215 		      "deadlock found, but client doesn't support flock canceliation\n");
216 	} else {
217 		LASSERT(lock->l_completion_ast);
218 		LASSERT((lock->l_flags & LDLM_FL_AST_SENT) == 0);
219 		lock->l_flags |= LDLM_FL_AST_SENT | LDLM_FL_CANCEL_ON_BLOCK |
220 			LDLM_FL_FLOCK_DEADLOCK;
221 		ldlm_flock_blocking_unlink(lock);
222 		ldlm_resource_unlink_lock(lock);
223 		ldlm_add_ast_work_item(lock, NULL, work_list);
224 	}
225 }
226 
227 /**
228  * Process a granting attempt for flock lock.
229  * Must be called under ns lock held.
230  *
231  * This function looks for any conflicts for \a lock in the granted or
232  * waiting queues. The lock is granted if no conflicts are found in
233  * either queue.
234  *
235  * It is also responsible for splitting a lock if a portion of the lock
236  * is released.
237  *
238  * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
239  *   - blocking ASTs have already been sent
240  *
241  * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
242  *   - blocking ASTs have not been sent yet, so list of conflicting locks
243  *     would be collected and ASTs sent.
244  */
245 int
ldlm_process_flock_lock(struct ldlm_lock * req,__u64 * flags,int first_enq,ldlm_error_t * err,struct list_head * work_list)246 ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, int first_enq,
247 			ldlm_error_t *err, struct list_head *work_list)
248 {
249 	struct ldlm_resource *res = req->l_resource;
250 	struct ldlm_namespace *ns = ldlm_res_to_ns(res);
251 	struct list_head *tmp;
252 	struct list_head *ownlocks = NULL;
253 	struct ldlm_lock *lock = NULL;
254 	struct ldlm_lock *new = req;
255 	struct ldlm_lock *new2 = NULL;
256 	ldlm_mode_t mode = req->l_req_mode;
257 	int local = ns_is_client(ns);
258 	int added = (mode == LCK_NL);
259 	int overlaps = 0;
260 	int splitted = 0;
261 	const struct ldlm_callback_suite null_cbs = { NULL };
262 
263 	CDEBUG(D_DLMTRACE,
264 	       "flags %#llx owner %llu pid %u mode %u start %llu end %llu\n",
265 	       *flags, new->l_policy_data.l_flock.owner,
266 	       new->l_policy_data.l_flock.pid, mode,
267 	       req->l_policy_data.l_flock.start,
268 	       req->l_policy_data.l_flock.end);
269 
270 	*err = ELDLM_OK;
271 
272 	if (local) {
273 		/* No blocking ASTs are sent to the clients for
274 		 * Posix file & record locks */
275 		req->l_blocking_ast = NULL;
276 	} else {
277 		/* Called on the server for lock cancels. */
278 		req->l_blocking_ast = ldlm_flock_blocking_ast;
279 	}
280 
281 reprocess:
282 	if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
283 		/* This loop determines where this processes locks start
284 		 * in the resource lr_granted list. */
285 		list_for_each(tmp, &res->lr_granted) {
286 			lock = list_entry(tmp, struct ldlm_lock,
287 					      l_res_link);
288 			if (ldlm_same_flock_owner(lock, req)) {
289 				ownlocks = tmp;
290 				break;
291 			}
292 		}
293 	} else {
294 		int reprocess_failed = 0;
295 
296 		lockmode_verify(mode);
297 
298 		/* This loop determines if there are existing locks
299 		 * that conflict with the new lock request. */
300 		list_for_each(tmp, &res->lr_granted) {
301 			lock = list_entry(tmp, struct ldlm_lock,
302 					      l_res_link);
303 
304 			if (ldlm_same_flock_owner(lock, req)) {
305 				if (!ownlocks)
306 					ownlocks = tmp;
307 				continue;
308 			}
309 
310 			/* locks are compatible, overlap doesn't matter */
311 			if (lockmode_compat(lock->l_granted_mode, mode))
312 				continue;
313 
314 			if (!ldlm_flocks_overlap(lock, req))
315 				continue;
316 
317 			if (!first_enq) {
318 				reprocess_failed = 1;
319 				if (ldlm_flock_deadlock(req, lock)) {
320 					ldlm_flock_cancel_on_deadlock(req,
321 							work_list);
322 					return LDLM_ITER_CONTINUE;
323 				}
324 				continue;
325 			}
326 
327 			if (*flags & LDLM_FL_BLOCK_NOWAIT) {
328 				ldlm_flock_destroy(req, mode, *flags);
329 				*err = -EAGAIN;
330 				return LDLM_ITER_STOP;
331 			}
332 
333 			if (*flags & LDLM_FL_TEST_LOCK) {
334 				ldlm_flock_destroy(req, mode, *flags);
335 				req->l_req_mode = lock->l_granted_mode;
336 				req->l_policy_data.l_flock.pid =
337 					lock->l_policy_data.l_flock.pid;
338 				req->l_policy_data.l_flock.start =
339 					lock->l_policy_data.l_flock.start;
340 				req->l_policy_data.l_flock.end =
341 					lock->l_policy_data.l_flock.end;
342 				*flags |= LDLM_FL_LOCK_CHANGED;
343 				return LDLM_ITER_STOP;
344 			}
345 
346 			/* add lock to blocking list before deadlock
347 			 * check to prevent race */
348 			ldlm_flock_blocking_link(req, lock);
349 
350 			if (ldlm_flock_deadlock(req, lock)) {
351 				ldlm_flock_blocking_unlink(req);
352 				ldlm_flock_destroy(req, mode, *flags);
353 				*err = -EDEADLK;
354 				return LDLM_ITER_STOP;
355 			}
356 
357 			ldlm_resource_add_lock(res, &res->lr_waiting, req);
358 			*flags |= LDLM_FL_BLOCK_GRANTED;
359 			return LDLM_ITER_STOP;
360 		}
361 		if (reprocess_failed)
362 			return LDLM_ITER_CONTINUE;
363 	}
364 
365 	if (*flags & LDLM_FL_TEST_LOCK) {
366 		ldlm_flock_destroy(req, mode, *flags);
367 		req->l_req_mode = LCK_NL;
368 		*flags |= LDLM_FL_LOCK_CHANGED;
369 		return LDLM_ITER_STOP;
370 	}
371 
372 	/* In case we had slept on this lock request take it off of the
373 	 * deadlock detection hash list. */
374 	ldlm_flock_blocking_unlink(req);
375 
376 	/* Scan the locks owned by this process that overlap this request.
377 	 * We may have to merge or split existing locks. */
378 
379 	if (!ownlocks)
380 		ownlocks = &res->lr_granted;
381 
382 	list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
383 		lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
384 
385 		if (!ldlm_same_flock_owner(lock, new))
386 			break;
387 
388 		if (lock->l_granted_mode == mode) {
389 			/* If the modes are the same then we need to process
390 			 * locks that overlap OR adjoin the new lock. The extra
391 			 * logic condition is necessary to deal with arithmetic
392 			 * overflow and underflow. */
393 			if ((new->l_policy_data.l_flock.start >
394 			     (lock->l_policy_data.l_flock.end + 1))
395 			    && (lock->l_policy_data.l_flock.end !=
396 				OBD_OBJECT_EOF))
397 				continue;
398 
399 			if ((new->l_policy_data.l_flock.end <
400 			     (lock->l_policy_data.l_flock.start - 1))
401 			    && (lock->l_policy_data.l_flock.start != 0))
402 				break;
403 
404 			if (new->l_policy_data.l_flock.start <
405 			    lock->l_policy_data.l_flock.start) {
406 				lock->l_policy_data.l_flock.start =
407 					new->l_policy_data.l_flock.start;
408 			} else {
409 				new->l_policy_data.l_flock.start =
410 					lock->l_policy_data.l_flock.start;
411 			}
412 
413 			if (new->l_policy_data.l_flock.end >
414 			    lock->l_policy_data.l_flock.end) {
415 				lock->l_policy_data.l_flock.end =
416 					new->l_policy_data.l_flock.end;
417 			} else {
418 				new->l_policy_data.l_flock.end =
419 					lock->l_policy_data.l_flock.end;
420 			}
421 
422 			if (added) {
423 				ldlm_flock_destroy(lock, mode, *flags);
424 			} else {
425 				new = lock;
426 				added = 1;
427 			}
428 			continue;
429 		}
430 
431 		if (new->l_policy_data.l_flock.start >
432 		    lock->l_policy_data.l_flock.end)
433 			continue;
434 
435 		if (new->l_policy_data.l_flock.end <
436 		    lock->l_policy_data.l_flock.start)
437 			break;
438 
439 		++overlaps;
440 
441 		if (new->l_policy_data.l_flock.start <=
442 		    lock->l_policy_data.l_flock.start) {
443 			if (new->l_policy_data.l_flock.end <
444 			    lock->l_policy_data.l_flock.end) {
445 				lock->l_policy_data.l_flock.start =
446 					new->l_policy_data.l_flock.end + 1;
447 				break;
448 			}
449 			ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
450 			continue;
451 		}
452 		if (new->l_policy_data.l_flock.end >=
453 		    lock->l_policy_data.l_flock.end) {
454 			lock->l_policy_data.l_flock.end =
455 				new->l_policy_data.l_flock.start - 1;
456 			continue;
457 		}
458 
459 		/* split the existing lock into two locks */
460 
461 		/* if this is an F_UNLCK operation then we could avoid
462 		 * allocating a new lock and use the req lock passed in
463 		 * with the request but this would complicate the reply
464 		 * processing since updates to req get reflected in the
465 		 * reply. The client side replays the lock request so
466 		 * it must see the original lock data in the reply. */
467 
468 		/* XXX - if ldlm_lock_new() can sleep we should
469 		 * release the lr_lock, allocate the new lock,
470 		 * and restart processing this lock. */
471 		if (!new2) {
472 			unlock_res_and_lock(req);
473 			new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
474 						lock->l_granted_mode, &null_cbs,
475 						NULL, 0, LVB_T_NONE);
476 			lock_res_and_lock(req);
477 			if (!new2) {
478 				ldlm_flock_destroy(req, lock->l_granted_mode,
479 						   *flags);
480 				*err = -ENOLCK;
481 				return LDLM_ITER_STOP;
482 			}
483 			goto reprocess;
484 		}
485 
486 		splitted = 1;
487 
488 		new2->l_granted_mode = lock->l_granted_mode;
489 		new2->l_policy_data.l_flock.pid =
490 			new->l_policy_data.l_flock.pid;
491 		new2->l_policy_data.l_flock.owner =
492 			new->l_policy_data.l_flock.owner;
493 		new2->l_policy_data.l_flock.start =
494 			lock->l_policy_data.l_flock.start;
495 		new2->l_policy_data.l_flock.end =
496 			new->l_policy_data.l_flock.start - 1;
497 		lock->l_policy_data.l_flock.start =
498 			new->l_policy_data.l_flock.end + 1;
499 		new2->l_conn_export = lock->l_conn_export;
500 		if (lock->l_export != NULL) {
501 			new2->l_export = class_export_lock_get(lock->l_export,
502 							       new2);
503 			if (new2->l_export->exp_lock_hash &&
504 			    hlist_unhashed(&new2->l_exp_hash))
505 				cfs_hash_add(new2->l_export->exp_lock_hash,
506 					     &new2->l_remote_handle,
507 					     &new2->l_exp_hash);
508 		}
509 		if (*flags == LDLM_FL_WAIT_NOREPROC)
510 			ldlm_lock_addref_internal_nolock(new2,
511 							 lock->l_granted_mode);
512 
513 		/* insert new2 at lock */
514 		ldlm_resource_add_lock(res, ownlocks, new2);
515 		LDLM_LOCK_RELEASE(new2);
516 		break;
517 	}
518 
519 	/* if new2 is created but never used, destroy it*/
520 	if (splitted == 0 && new2 != NULL)
521 		ldlm_lock_destroy_nolock(new2);
522 
523 	/* At this point we're granting the lock request. */
524 	req->l_granted_mode = req->l_req_mode;
525 
526 	/* Add req to the granted queue before calling ldlm_reprocess_all(). */
527 	if (!added) {
528 		list_del_init(&req->l_res_link);
529 		/* insert new lock before ownlocks in list. */
530 		ldlm_resource_add_lock(res, ownlocks, req);
531 	}
532 
533 	if (*flags != LDLM_FL_WAIT_NOREPROC) {
534 		/* The only one possible case for client-side calls flock
535 		 * policy function is ldlm_flock_completion_ast inside which
536 		 * carries LDLM_FL_WAIT_NOREPROC flag. */
537 		CERROR("Illegal parameter for client-side-only module.\n");
538 		LBUG();
539 	}
540 
541 	/* In case we're reprocessing the requested lock we can't destroy
542 	 * it until after calling ldlm_add_ast_work_item() above so that laawi()
543 	 * can bump the reference count on \a req. Otherwise \a req
544 	 * could be freed before the completion AST can be sent.  */
545 	if (added)
546 		ldlm_flock_destroy(req, mode, *flags);
547 
548 	ldlm_resource_dump(D_INFO, res);
549 	return LDLM_ITER_CONTINUE;
550 }
551 
552 struct ldlm_flock_wait_data {
553 	struct ldlm_lock *fwd_lock;
554 	int	       fwd_generation;
555 };
556 
557 static void
ldlm_flock_interrupted_wait(void * data)558 ldlm_flock_interrupted_wait(void *data)
559 {
560 	struct ldlm_lock *lock;
561 
562 	lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
563 
564 	/* take lock off the deadlock detection hash list. */
565 	lock_res_and_lock(lock);
566 	ldlm_flock_blocking_unlink(lock);
567 
568 	/* client side - set flag to prevent lock from being put on LRU list */
569 	lock->l_flags |= LDLM_FL_CBPENDING;
570 	unlock_res_and_lock(lock);
571 }
572 
573 /**
574  * Flock completion callback function.
575  *
576  * \param lock [in,out]: A lock to be handled
577  * \param flags    [in]: flags
578  * \param *data    [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
579  *
580  * \retval 0    : success
581  * \retval <0   : failure
582  */
583 int
ldlm_flock_completion_ast(struct ldlm_lock * lock,__u64 flags,void * data)584 ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
585 {
586 	struct file_lock		*getlk = lock->l_ast_data;
587 	struct obd_device	      *obd;
588 	struct obd_import	      *imp = NULL;
589 	struct ldlm_flock_wait_data     fwd;
590 	struct l_wait_info	      lwi;
591 	ldlm_error_t		    err;
592 	int			     rc = 0;
593 
594 	CDEBUG(D_DLMTRACE, "flags: 0x%llx data: %p getlk: %p\n",
595 	       flags, data, getlk);
596 
597 	/* Import invalidation. We need to actually release the lock
598 	 * references being held, so that it can go away. No point in
599 	 * holding the lock even if app still believes it has it, since
600 	 * server already dropped it anyway. Only for granted locks too. */
601 	if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) ==
602 	    (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
603 		if (lock->l_req_mode == lock->l_granted_mode &&
604 		    lock->l_granted_mode != LCK_NL &&
605 		    NULL == data)
606 			ldlm_lock_decref_internal(lock, lock->l_req_mode);
607 
608 		/* Need to wake up the waiter if we were evicted */
609 		wake_up(&lock->l_waitq);
610 		return 0;
611 	}
612 
613 	LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
614 
615 	if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
616 		       LDLM_FL_BLOCK_CONV))) {
617 		if (NULL == data)
618 			/* mds granted the lock in the reply */
619 			goto granted;
620 		/* CP AST RPC: lock get granted, wake it up */
621 		wake_up(&lock->l_waitq);
622 		return 0;
623 	}
624 
625 	LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, sleeping");
626 	fwd.fwd_lock = lock;
627 	obd = class_exp2obd(lock->l_conn_export);
628 
629 	/* if this is a local lock, there is no import */
630 	if (NULL != obd)
631 		imp = obd->u.cli.cl_import;
632 
633 	if (NULL != imp) {
634 		spin_lock(&imp->imp_lock);
635 		fwd.fwd_generation = imp->imp_generation;
636 		spin_unlock(&imp->imp_lock);
637 	}
638 
639 	lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
640 
641 	/* Go to sleep until the lock is granted. */
642 	rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
643 
644 	if (rc) {
645 		LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
646 			   rc);
647 		return rc;
648 	}
649 
650 granted:
651 	OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
652 
653 	if (lock->l_flags & LDLM_FL_DESTROYED) {
654 		LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
655 		return 0;
656 	}
657 
658 	if (lock->l_flags & LDLM_FL_FAILED) {
659 		LDLM_DEBUG(lock, "client-side enqueue waking up: failed");
660 		return -EIO;
661 	}
662 
663 	if (rc) {
664 		LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
665 			   rc);
666 		return rc;
667 	}
668 
669 	LDLM_DEBUG(lock, "client-side enqueue granted");
670 
671 	lock_res_and_lock(lock);
672 
673 	/* take lock off the deadlock detection hash list. */
674 	ldlm_flock_blocking_unlink(lock);
675 
676 	/* ldlm_lock_enqueue() has already placed lock on the granted list. */
677 	list_del_init(&lock->l_res_link);
678 
679 	if (lock->l_flags & LDLM_FL_FLOCK_DEADLOCK) {
680 		LDLM_DEBUG(lock, "client-side enqueue deadlock received");
681 		rc = -EDEADLK;
682 	} else if (flags & LDLM_FL_TEST_LOCK) {
683 		/* fcntl(F_GETLK) request */
684 		/* The old mode was saved in getlk->fl_type so that if the mode
685 		 * in the lock changes we can decref the appropriate refcount.*/
686 		ldlm_flock_destroy(lock, getlk->fl_type, LDLM_FL_WAIT_NOREPROC);
687 		switch (lock->l_granted_mode) {
688 		case LCK_PR:
689 			getlk->fl_type = F_RDLCK;
690 			break;
691 		case LCK_PW:
692 			getlk->fl_type = F_WRLCK;
693 			break;
694 		default:
695 			getlk->fl_type = F_UNLCK;
696 		}
697 		getlk->fl_pid = (pid_t)lock->l_policy_data.l_flock.pid;
698 		getlk->fl_start = (loff_t)lock->l_policy_data.l_flock.start;
699 		getlk->fl_end = (loff_t)lock->l_policy_data.l_flock.end;
700 	} else {
701 		__u64 noreproc = LDLM_FL_WAIT_NOREPROC;
702 
703 		/* We need to reprocess the lock to do merges or splits
704 		 * with existing locks owned by this process. */
705 		ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
706 	}
707 	unlock_res_and_lock(lock);
708 	return rc;
709 }
710 EXPORT_SYMBOL(ldlm_flock_completion_ast);
711 
ldlm_flock_blocking_ast(struct ldlm_lock * lock,struct ldlm_lock_desc * desc,void * data,int flag)712 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
713 			    void *data, int flag)
714 {
715 	LASSERT(lock);
716 	LASSERT(flag == LDLM_CB_CANCELING);
717 
718 	/* take lock off the deadlock detection hash list. */
719 	lock_res_and_lock(lock);
720 	ldlm_flock_blocking_unlink(lock);
721 	unlock_res_and_lock(lock);
722 	return 0;
723 }
724 
ldlm_flock_policy_wire18_to_local(const ldlm_wire_policy_data_t * wpolicy,ldlm_policy_data_t * lpolicy)725 void ldlm_flock_policy_wire18_to_local(const ldlm_wire_policy_data_t *wpolicy,
726 				       ldlm_policy_data_t *lpolicy)
727 {
728 	memset(lpolicy, 0, sizeof(*lpolicy));
729 	lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
730 	lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
731 	lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
732 	/* Compat code, old clients had no idea about owner field and
733 	 * relied solely on pid for ownership. Introduced in LU-104, 2.1,
734 	 * April 2011 */
735 	lpolicy->l_flock.owner = wpolicy->l_flock.lfw_pid;
736 }
737 
738 
ldlm_flock_policy_wire21_to_local(const ldlm_wire_policy_data_t * wpolicy,ldlm_policy_data_t * lpolicy)739 void ldlm_flock_policy_wire21_to_local(const ldlm_wire_policy_data_t *wpolicy,
740 				       ldlm_policy_data_t *lpolicy)
741 {
742 	memset(lpolicy, 0, sizeof(*lpolicy));
743 	lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
744 	lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
745 	lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
746 	lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
747 }
748 
ldlm_flock_policy_local_to_wire(const ldlm_policy_data_t * lpolicy,ldlm_wire_policy_data_t * wpolicy)749 void ldlm_flock_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
750 				     ldlm_wire_policy_data_t *wpolicy)
751 {
752 	memset(wpolicy, 0, sizeof(*wpolicy));
753 	wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
754 	wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
755 	wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
756 	wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;
757 }
758 
759 /*
760  * Export handle<->flock hash operations.
761  */
762 static unsigned
ldlm_export_flock_hash(struct cfs_hash * hs,const void * key,unsigned mask)763 ldlm_export_flock_hash(struct cfs_hash *hs, const void *key, unsigned mask)
764 {
765 	return cfs_hash_u64_hash(*(__u64 *)key, mask);
766 }
767 
768 static void *
ldlm_export_flock_key(struct hlist_node * hnode)769 ldlm_export_flock_key(struct hlist_node *hnode)
770 {
771 	struct ldlm_lock *lock;
772 
773 	lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
774 	return &lock->l_policy_data.l_flock.owner;
775 }
776 
777 static int
ldlm_export_flock_keycmp(const void * key,struct hlist_node * hnode)778 ldlm_export_flock_keycmp(const void *key, struct hlist_node *hnode)
779 {
780 	return !memcmp(ldlm_export_flock_key(hnode), key, sizeof(__u64));
781 }
782 
783 static void *
ldlm_export_flock_object(struct hlist_node * hnode)784 ldlm_export_flock_object(struct hlist_node *hnode)
785 {
786 	return hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
787 }
788 
789 static void
ldlm_export_flock_get(struct cfs_hash * hs,struct hlist_node * hnode)790 ldlm_export_flock_get(struct cfs_hash *hs, struct hlist_node *hnode)
791 {
792 	struct ldlm_lock *lock;
793 	struct ldlm_flock *flock;
794 
795 	lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
796 	LDLM_LOCK_GET(lock);
797 
798 	flock = &lock->l_policy_data.l_flock;
799 	LASSERT(flock->blocking_export != NULL);
800 	class_export_get(flock->blocking_export);
801 	flock->blocking_refs++;
802 }
803 
804 static void
ldlm_export_flock_put(struct cfs_hash * hs,struct hlist_node * hnode)805 ldlm_export_flock_put(struct cfs_hash *hs, struct hlist_node *hnode)
806 {
807 	struct ldlm_lock *lock;
808 	struct ldlm_flock *flock;
809 
810 	lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
811 	LDLM_LOCK_RELEASE(lock);
812 
813 	flock = &lock->l_policy_data.l_flock;
814 	LASSERT(flock->blocking_export != NULL);
815 	class_export_put(flock->blocking_export);
816 	if (--flock->blocking_refs == 0) {
817 		flock->blocking_owner = 0;
818 		flock->blocking_export = NULL;
819 	}
820 }
821 
822 static cfs_hash_ops_t ldlm_export_flock_ops = {
823 	.hs_hash	= ldlm_export_flock_hash,
824 	.hs_key	 = ldlm_export_flock_key,
825 	.hs_keycmp      = ldlm_export_flock_keycmp,
826 	.hs_object      = ldlm_export_flock_object,
827 	.hs_get	 = ldlm_export_flock_get,
828 	.hs_put	 = ldlm_export_flock_put,
829 	.hs_put_locked  = ldlm_export_flock_put,
830 };
831 
ldlm_init_flock_export(struct obd_export * exp)832 int ldlm_init_flock_export(struct obd_export *exp)
833 {
834 	if (strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_MDT_NAME) != 0)
835 		return 0;
836 
837 	exp->exp_flock_hash =
838 		cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
839 				HASH_EXP_LOCK_CUR_BITS,
840 				HASH_EXP_LOCK_MAX_BITS,
841 				HASH_EXP_LOCK_BKT_BITS, 0,
842 				CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
843 				&ldlm_export_flock_ops,
844 				CFS_HASH_DEFAULT | CFS_HASH_NBLK_CHANGE);
845 	if (!exp->exp_flock_hash)
846 		return -ENOMEM;
847 
848 	return 0;
849 }
850 EXPORT_SYMBOL(ldlm_init_flock_export);
851 
ldlm_destroy_flock_export(struct obd_export * exp)852 void ldlm_destroy_flock_export(struct obd_export *exp)
853 {
854 	if (exp->exp_flock_hash) {
855 		cfs_hash_putref(exp->exp_flock_hash);
856 		exp->exp_flock_hash = NULL;
857 	}
858 }
859 EXPORT_SYMBOL(ldlm_destroy_flock_export);
860