1 /*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26 /*
27 * Copyright (c) 2003 Hewlett-Packard Development Company LP.
28 * Developed under the sponsorship of the US Government under
29 * Subcontract No. B514193
30 *
31 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
32 * Use is subject to license terms.
33 *
34 * Copyright (c) 2010, 2012, Intel Corporation.
35 */
36 /*
37 * This file is part of Lustre, http://www.lustre.org/
38 * Lustre is a trademark of Sun Microsystems, Inc.
39 */
40
41 /**
42 * This file implements POSIX lock type for Lustre.
43 * Its policy properties are start and end of extent and PID.
44 *
45 * These locks are only done through MDS due to POSIX semantics requiring
46 * e.g. that locks could be only partially released and as such split into
47 * two parts, and also that two adjacent locks from the same process may be
48 * merged into a single wider lock.
49 *
50 * Lock modes are mapped like this:
51 * PR and PW for READ and WRITE locks
52 * NL to request a releasing of a portion of the lock
53 *
54 * These flock locks never timeout.
55 */
56
57 #define DEBUG_SUBSYSTEM S_LDLM
58
59 #include "../include/lustre_dlm.h"
60 #include "../include/obd_support.h"
61 #include "../include/obd_class.h"
62 #include "../include/lustre_lib.h"
63 #include <linux/list.h>
64 #include "ldlm_internal.h"
65
66 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
67 void *data, int flag);
68
69 /**
70 * list_for_remaining_safe - iterate over the remaining entries in a list
71 * and safeguard against removal of a list entry.
72 * \param pos the &struct list_head to use as a loop counter. pos MUST
73 * have been initialized prior to using it in this macro.
74 * \param n another &struct list_head to use as temporary storage
75 * \param head the head for your list.
76 */
77 #define list_for_remaining_safe(pos, n, head) \
78 for (n = pos->next; pos != (head); pos = n, n = pos->next)
79
80 static inline int
ldlm_same_flock_owner(struct ldlm_lock * lock,struct ldlm_lock * new)81 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
82 {
83 return((new->l_policy_data.l_flock.owner ==
84 lock->l_policy_data.l_flock.owner) &&
85 (new->l_export == lock->l_export));
86 }
87
88 static inline int
ldlm_flocks_overlap(struct ldlm_lock * lock,struct ldlm_lock * new)89 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
90 {
91 return((new->l_policy_data.l_flock.start <=
92 lock->l_policy_data.l_flock.end) &&
93 (new->l_policy_data.l_flock.end >=
94 lock->l_policy_data.l_flock.start));
95 }
96
ldlm_flock_blocking_link(struct ldlm_lock * req,struct ldlm_lock * lock)97 static inline void ldlm_flock_blocking_link(struct ldlm_lock *req,
98 struct ldlm_lock *lock)
99 {
100 /* For server only */
101 if (req->l_export == NULL)
102 return;
103
104 LASSERT(hlist_unhashed(&req->l_exp_flock_hash));
105
106 req->l_policy_data.l_flock.blocking_owner =
107 lock->l_policy_data.l_flock.owner;
108 req->l_policy_data.l_flock.blocking_export =
109 lock->l_export;
110 req->l_policy_data.l_flock.blocking_refs = 0;
111
112 cfs_hash_add(req->l_export->exp_flock_hash,
113 &req->l_policy_data.l_flock.owner,
114 &req->l_exp_flock_hash);
115 }
116
ldlm_flock_blocking_unlink(struct ldlm_lock * req)117 static inline void ldlm_flock_blocking_unlink(struct ldlm_lock *req)
118 {
119 /* For server only */
120 if (req->l_export == NULL)
121 return;
122
123 check_res_locked(req->l_resource);
124 if (req->l_export->exp_flock_hash != NULL &&
125 !hlist_unhashed(&req->l_exp_flock_hash))
126 cfs_hash_del(req->l_export->exp_flock_hash,
127 &req->l_policy_data.l_flock.owner,
128 &req->l_exp_flock_hash);
129 }
130
131 static inline void
ldlm_flock_destroy(struct ldlm_lock * lock,ldlm_mode_t mode,__u64 flags)132 ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, __u64 flags)
133 {
134 LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%llx)",
135 mode, flags);
136
137 /* Safe to not lock here, since it should be empty anyway */
138 LASSERT(hlist_unhashed(&lock->l_exp_flock_hash));
139
140 list_del_init(&lock->l_res_link);
141 if (flags == LDLM_FL_WAIT_NOREPROC &&
142 !(lock->l_flags & LDLM_FL_FAILED)) {
143 /* client side - set a flag to prevent sending a CANCEL */
144 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
145
146 /* when reaching here, it is under lock_res_and_lock(). Thus,
147 need call the nolock version of ldlm_lock_decref_internal*/
148 ldlm_lock_decref_internal_nolock(lock, mode);
149 }
150
151 ldlm_lock_destroy_nolock(lock);
152 }
153
154 /**
155 * POSIX locks deadlock detection code.
156 *
157 * Given a new lock \a req and an existing lock \a bl_lock it conflicts
158 * with, we need to iterate through all blocked POSIX locks for this
159 * export and see if there is a deadlock condition arising. (i.e. when
160 * one client holds a lock on something and want a lock on something
161 * else and at the same time another client has the opposite situation).
162 */
163 static int
ldlm_flock_deadlock(struct ldlm_lock * req,struct ldlm_lock * bl_lock)164 ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *bl_lock)
165 {
166 struct obd_export *req_exp = req->l_export;
167 struct obd_export *bl_exp = bl_lock->l_export;
168 __u64 req_owner = req->l_policy_data.l_flock.owner;
169 __u64 bl_owner = bl_lock->l_policy_data.l_flock.owner;
170
171 /* For server only */
172 if (req_exp == NULL)
173 return 0;
174
175 class_export_get(bl_exp);
176 while (1) {
177 struct obd_export *bl_exp_new;
178 struct ldlm_lock *lock = NULL;
179 struct ldlm_flock *flock;
180
181 if (bl_exp->exp_flock_hash != NULL)
182 lock = cfs_hash_lookup(bl_exp->exp_flock_hash,
183 &bl_owner);
184 if (lock == NULL)
185 break;
186
187 LASSERT(req != lock);
188 flock = &lock->l_policy_data.l_flock;
189 LASSERT(flock->owner == bl_owner);
190 bl_owner = flock->blocking_owner;
191 bl_exp_new = class_export_get(flock->blocking_export);
192 class_export_put(bl_exp);
193
194 cfs_hash_put(bl_exp->exp_flock_hash, &lock->l_exp_flock_hash);
195 bl_exp = bl_exp_new;
196
197 if (bl_owner == req_owner && bl_exp == req_exp) {
198 class_export_put(bl_exp);
199 return 1;
200 }
201 }
202 class_export_put(bl_exp);
203
204 return 0;
205 }
206
ldlm_flock_cancel_on_deadlock(struct ldlm_lock * lock,struct list_head * work_list)207 static void ldlm_flock_cancel_on_deadlock(struct ldlm_lock *lock,
208 struct list_head *work_list)
209 {
210 CDEBUG(D_INFO, "reprocess deadlock req=%p\n", lock);
211
212 if ((exp_connect_flags(lock->l_export) &
213 OBD_CONNECT_FLOCK_DEAD) == 0) {
214 CERROR(
215 "deadlock found, but client doesn't support flock canceliation\n");
216 } else {
217 LASSERT(lock->l_completion_ast);
218 LASSERT((lock->l_flags & LDLM_FL_AST_SENT) == 0);
219 lock->l_flags |= LDLM_FL_AST_SENT | LDLM_FL_CANCEL_ON_BLOCK |
220 LDLM_FL_FLOCK_DEADLOCK;
221 ldlm_flock_blocking_unlink(lock);
222 ldlm_resource_unlink_lock(lock);
223 ldlm_add_ast_work_item(lock, NULL, work_list);
224 }
225 }
226
227 /**
228 * Process a granting attempt for flock lock.
229 * Must be called under ns lock held.
230 *
231 * This function looks for any conflicts for \a lock in the granted or
232 * waiting queues. The lock is granted if no conflicts are found in
233 * either queue.
234 *
235 * It is also responsible for splitting a lock if a portion of the lock
236 * is released.
237 *
238 * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
239 * - blocking ASTs have already been sent
240 *
241 * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
242 * - blocking ASTs have not been sent yet, so list of conflicting locks
243 * would be collected and ASTs sent.
244 */
245 int
ldlm_process_flock_lock(struct ldlm_lock * req,__u64 * flags,int first_enq,ldlm_error_t * err,struct list_head * work_list)246 ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, int first_enq,
247 ldlm_error_t *err, struct list_head *work_list)
248 {
249 struct ldlm_resource *res = req->l_resource;
250 struct ldlm_namespace *ns = ldlm_res_to_ns(res);
251 struct list_head *tmp;
252 struct list_head *ownlocks = NULL;
253 struct ldlm_lock *lock = NULL;
254 struct ldlm_lock *new = req;
255 struct ldlm_lock *new2 = NULL;
256 ldlm_mode_t mode = req->l_req_mode;
257 int local = ns_is_client(ns);
258 int added = (mode == LCK_NL);
259 int overlaps = 0;
260 int splitted = 0;
261 const struct ldlm_callback_suite null_cbs = { NULL };
262
263 CDEBUG(D_DLMTRACE,
264 "flags %#llx owner %llu pid %u mode %u start %llu end %llu\n",
265 *flags, new->l_policy_data.l_flock.owner,
266 new->l_policy_data.l_flock.pid, mode,
267 req->l_policy_data.l_flock.start,
268 req->l_policy_data.l_flock.end);
269
270 *err = ELDLM_OK;
271
272 if (local) {
273 /* No blocking ASTs are sent to the clients for
274 * Posix file & record locks */
275 req->l_blocking_ast = NULL;
276 } else {
277 /* Called on the server for lock cancels. */
278 req->l_blocking_ast = ldlm_flock_blocking_ast;
279 }
280
281 reprocess:
282 if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
283 /* This loop determines where this processes locks start
284 * in the resource lr_granted list. */
285 list_for_each(tmp, &res->lr_granted) {
286 lock = list_entry(tmp, struct ldlm_lock,
287 l_res_link);
288 if (ldlm_same_flock_owner(lock, req)) {
289 ownlocks = tmp;
290 break;
291 }
292 }
293 } else {
294 int reprocess_failed = 0;
295
296 lockmode_verify(mode);
297
298 /* This loop determines if there are existing locks
299 * that conflict with the new lock request. */
300 list_for_each(tmp, &res->lr_granted) {
301 lock = list_entry(tmp, struct ldlm_lock,
302 l_res_link);
303
304 if (ldlm_same_flock_owner(lock, req)) {
305 if (!ownlocks)
306 ownlocks = tmp;
307 continue;
308 }
309
310 /* locks are compatible, overlap doesn't matter */
311 if (lockmode_compat(lock->l_granted_mode, mode))
312 continue;
313
314 if (!ldlm_flocks_overlap(lock, req))
315 continue;
316
317 if (!first_enq) {
318 reprocess_failed = 1;
319 if (ldlm_flock_deadlock(req, lock)) {
320 ldlm_flock_cancel_on_deadlock(req,
321 work_list);
322 return LDLM_ITER_CONTINUE;
323 }
324 continue;
325 }
326
327 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
328 ldlm_flock_destroy(req, mode, *flags);
329 *err = -EAGAIN;
330 return LDLM_ITER_STOP;
331 }
332
333 if (*flags & LDLM_FL_TEST_LOCK) {
334 ldlm_flock_destroy(req, mode, *flags);
335 req->l_req_mode = lock->l_granted_mode;
336 req->l_policy_data.l_flock.pid =
337 lock->l_policy_data.l_flock.pid;
338 req->l_policy_data.l_flock.start =
339 lock->l_policy_data.l_flock.start;
340 req->l_policy_data.l_flock.end =
341 lock->l_policy_data.l_flock.end;
342 *flags |= LDLM_FL_LOCK_CHANGED;
343 return LDLM_ITER_STOP;
344 }
345
346 /* add lock to blocking list before deadlock
347 * check to prevent race */
348 ldlm_flock_blocking_link(req, lock);
349
350 if (ldlm_flock_deadlock(req, lock)) {
351 ldlm_flock_blocking_unlink(req);
352 ldlm_flock_destroy(req, mode, *flags);
353 *err = -EDEADLK;
354 return LDLM_ITER_STOP;
355 }
356
357 ldlm_resource_add_lock(res, &res->lr_waiting, req);
358 *flags |= LDLM_FL_BLOCK_GRANTED;
359 return LDLM_ITER_STOP;
360 }
361 if (reprocess_failed)
362 return LDLM_ITER_CONTINUE;
363 }
364
365 if (*flags & LDLM_FL_TEST_LOCK) {
366 ldlm_flock_destroy(req, mode, *flags);
367 req->l_req_mode = LCK_NL;
368 *flags |= LDLM_FL_LOCK_CHANGED;
369 return LDLM_ITER_STOP;
370 }
371
372 /* In case we had slept on this lock request take it off of the
373 * deadlock detection hash list. */
374 ldlm_flock_blocking_unlink(req);
375
376 /* Scan the locks owned by this process that overlap this request.
377 * We may have to merge or split existing locks. */
378
379 if (!ownlocks)
380 ownlocks = &res->lr_granted;
381
382 list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
383 lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
384
385 if (!ldlm_same_flock_owner(lock, new))
386 break;
387
388 if (lock->l_granted_mode == mode) {
389 /* If the modes are the same then we need to process
390 * locks that overlap OR adjoin the new lock. The extra
391 * logic condition is necessary to deal with arithmetic
392 * overflow and underflow. */
393 if ((new->l_policy_data.l_flock.start >
394 (lock->l_policy_data.l_flock.end + 1))
395 && (lock->l_policy_data.l_flock.end !=
396 OBD_OBJECT_EOF))
397 continue;
398
399 if ((new->l_policy_data.l_flock.end <
400 (lock->l_policy_data.l_flock.start - 1))
401 && (lock->l_policy_data.l_flock.start != 0))
402 break;
403
404 if (new->l_policy_data.l_flock.start <
405 lock->l_policy_data.l_flock.start) {
406 lock->l_policy_data.l_flock.start =
407 new->l_policy_data.l_flock.start;
408 } else {
409 new->l_policy_data.l_flock.start =
410 lock->l_policy_data.l_flock.start;
411 }
412
413 if (new->l_policy_data.l_flock.end >
414 lock->l_policy_data.l_flock.end) {
415 lock->l_policy_data.l_flock.end =
416 new->l_policy_data.l_flock.end;
417 } else {
418 new->l_policy_data.l_flock.end =
419 lock->l_policy_data.l_flock.end;
420 }
421
422 if (added) {
423 ldlm_flock_destroy(lock, mode, *flags);
424 } else {
425 new = lock;
426 added = 1;
427 }
428 continue;
429 }
430
431 if (new->l_policy_data.l_flock.start >
432 lock->l_policy_data.l_flock.end)
433 continue;
434
435 if (new->l_policy_data.l_flock.end <
436 lock->l_policy_data.l_flock.start)
437 break;
438
439 ++overlaps;
440
441 if (new->l_policy_data.l_flock.start <=
442 lock->l_policy_data.l_flock.start) {
443 if (new->l_policy_data.l_flock.end <
444 lock->l_policy_data.l_flock.end) {
445 lock->l_policy_data.l_flock.start =
446 new->l_policy_data.l_flock.end + 1;
447 break;
448 }
449 ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
450 continue;
451 }
452 if (new->l_policy_data.l_flock.end >=
453 lock->l_policy_data.l_flock.end) {
454 lock->l_policy_data.l_flock.end =
455 new->l_policy_data.l_flock.start - 1;
456 continue;
457 }
458
459 /* split the existing lock into two locks */
460
461 /* if this is an F_UNLCK operation then we could avoid
462 * allocating a new lock and use the req lock passed in
463 * with the request but this would complicate the reply
464 * processing since updates to req get reflected in the
465 * reply. The client side replays the lock request so
466 * it must see the original lock data in the reply. */
467
468 /* XXX - if ldlm_lock_new() can sleep we should
469 * release the lr_lock, allocate the new lock,
470 * and restart processing this lock. */
471 if (!new2) {
472 unlock_res_and_lock(req);
473 new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
474 lock->l_granted_mode, &null_cbs,
475 NULL, 0, LVB_T_NONE);
476 lock_res_and_lock(req);
477 if (!new2) {
478 ldlm_flock_destroy(req, lock->l_granted_mode,
479 *flags);
480 *err = -ENOLCK;
481 return LDLM_ITER_STOP;
482 }
483 goto reprocess;
484 }
485
486 splitted = 1;
487
488 new2->l_granted_mode = lock->l_granted_mode;
489 new2->l_policy_data.l_flock.pid =
490 new->l_policy_data.l_flock.pid;
491 new2->l_policy_data.l_flock.owner =
492 new->l_policy_data.l_flock.owner;
493 new2->l_policy_data.l_flock.start =
494 lock->l_policy_data.l_flock.start;
495 new2->l_policy_data.l_flock.end =
496 new->l_policy_data.l_flock.start - 1;
497 lock->l_policy_data.l_flock.start =
498 new->l_policy_data.l_flock.end + 1;
499 new2->l_conn_export = lock->l_conn_export;
500 if (lock->l_export != NULL) {
501 new2->l_export = class_export_lock_get(lock->l_export,
502 new2);
503 if (new2->l_export->exp_lock_hash &&
504 hlist_unhashed(&new2->l_exp_hash))
505 cfs_hash_add(new2->l_export->exp_lock_hash,
506 &new2->l_remote_handle,
507 &new2->l_exp_hash);
508 }
509 if (*flags == LDLM_FL_WAIT_NOREPROC)
510 ldlm_lock_addref_internal_nolock(new2,
511 lock->l_granted_mode);
512
513 /* insert new2 at lock */
514 ldlm_resource_add_lock(res, ownlocks, new2);
515 LDLM_LOCK_RELEASE(new2);
516 break;
517 }
518
519 /* if new2 is created but never used, destroy it*/
520 if (splitted == 0 && new2 != NULL)
521 ldlm_lock_destroy_nolock(new2);
522
523 /* At this point we're granting the lock request. */
524 req->l_granted_mode = req->l_req_mode;
525
526 /* Add req to the granted queue before calling ldlm_reprocess_all(). */
527 if (!added) {
528 list_del_init(&req->l_res_link);
529 /* insert new lock before ownlocks in list. */
530 ldlm_resource_add_lock(res, ownlocks, req);
531 }
532
533 if (*flags != LDLM_FL_WAIT_NOREPROC) {
534 /* The only one possible case for client-side calls flock
535 * policy function is ldlm_flock_completion_ast inside which
536 * carries LDLM_FL_WAIT_NOREPROC flag. */
537 CERROR("Illegal parameter for client-side-only module.\n");
538 LBUG();
539 }
540
541 /* In case we're reprocessing the requested lock we can't destroy
542 * it until after calling ldlm_add_ast_work_item() above so that laawi()
543 * can bump the reference count on \a req. Otherwise \a req
544 * could be freed before the completion AST can be sent. */
545 if (added)
546 ldlm_flock_destroy(req, mode, *flags);
547
548 ldlm_resource_dump(D_INFO, res);
549 return LDLM_ITER_CONTINUE;
550 }
551
552 struct ldlm_flock_wait_data {
553 struct ldlm_lock *fwd_lock;
554 int fwd_generation;
555 };
556
557 static void
ldlm_flock_interrupted_wait(void * data)558 ldlm_flock_interrupted_wait(void *data)
559 {
560 struct ldlm_lock *lock;
561
562 lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
563
564 /* take lock off the deadlock detection hash list. */
565 lock_res_and_lock(lock);
566 ldlm_flock_blocking_unlink(lock);
567
568 /* client side - set flag to prevent lock from being put on LRU list */
569 lock->l_flags |= LDLM_FL_CBPENDING;
570 unlock_res_and_lock(lock);
571 }
572
573 /**
574 * Flock completion callback function.
575 *
576 * \param lock [in,out]: A lock to be handled
577 * \param flags [in]: flags
578 * \param *data [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
579 *
580 * \retval 0 : success
581 * \retval <0 : failure
582 */
583 int
ldlm_flock_completion_ast(struct ldlm_lock * lock,__u64 flags,void * data)584 ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
585 {
586 struct file_lock *getlk = lock->l_ast_data;
587 struct obd_device *obd;
588 struct obd_import *imp = NULL;
589 struct ldlm_flock_wait_data fwd;
590 struct l_wait_info lwi;
591 ldlm_error_t err;
592 int rc = 0;
593
594 CDEBUG(D_DLMTRACE, "flags: 0x%llx data: %p getlk: %p\n",
595 flags, data, getlk);
596
597 /* Import invalidation. We need to actually release the lock
598 * references being held, so that it can go away. No point in
599 * holding the lock even if app still believes it has it, since
600 * server already dropped it anyway. Only for granted locks too. */
601 if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) ==
602 (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
603 if (lock->l_req_mode == lock->l_granted_mode &&
604 lock->l_granted_mode != LCK_NL &&
605 NULL == data)
606 ldlm_lock_decref_internal(lock, lock->l_req_mode);
607
608 /* Need to wake up the waiter if we were evicted */
609 wake_up(&lock->l_waitq);
610 return 0;
611 }
612
613 LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
614
615 if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
616 LDLM_FL_BLOCK_CONV))) {
617 if (NULL == data)
618 /* mds granted the lock in the reply */
619 goto granted;
620 /* CP AST RPC: lock get granted, wake it up */
621 wake_up(&lock->l_waitq);
622 return 0;
623 }
624
625 LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, sleeping");
626 fwd.fwd_lock = lock;
627 obd = class_exp2obd(lock->l_conn_export);
628
629 /* if this is a local lock, there is no import */
630 if (NULL != obd)
631 imp = obd->u.cli.cl_import;
632
633 if (NULL != imp) {
634 spin_lock(&imp->imp_lock);
635 fwd.fwd_generation = imp->imp_generation;
636 spin_unlock(&imp->imp_lock);
637 }
638
639 lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
640
641 /* Go to sleep until the lock is granted. */
642 rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
643
644 if (rc) {
645 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
646 rc);
647 return rc;
648 }
649
650 granted:
651 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
652
653 if (lock->l_flags & LDLM_FL_DESTROYED) {
654 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
655 return 0;
656 }
657
658 if (lock->l_flags & LDLM_FL_FAILED) {
659 LDLM_DEBUG(lock, "client-side enqueue waking up: failed");
660 return -EIO;
661 }
662
663 if (rc) {
664 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
665 rc);
666 return rc;
667 }
668
669 LDLM_DEBUG(lock, "client-side enqueue granted");
670
671 lock_res_and_lock(lock);
672
673 /* take lock off the deadlock detection hash list. */
674 ldlm_flock_blocking_unlink(lock);
675
676 /* ldlm_lock_enqueue() has already placed lock on the granted list. */
677 list_del_init(&lock->l_res_link);
678
679 if (lock->l_flags & LDLM_FL_FLOCK_DEADLOCK) {
680 LDLM_DEBUG(lock, "client-side enqueue deadlock received");
681 rc = -EDEADLK;
682 } else if (flags & LDLM_FL_TEST_LOCK) {
683 /* fcntl(F_GETLK) request */
684 /* The old mode was saved in getlk->fl_type so that if the mode
685 * in the lock changes we can decref the appropriate refcount.*/
686 ldlm_flock_destroy(lock, getlk->fl_type, LDLM_FL_WAIT_NOREPROC);
687 switch (lock->l_granted_mode) {
688 case LCK_PR:
689 getlk->fl_type = F_RDLCK;
690 break;
691 case LCK_PW:
692 getlk->fl_type = F_WRLCK;
693 break;
694 default:
695 getlk->fl_type = F_UNLCK;
696 }
697 getlk->fl_pid = (pid_t)lock->l_policy_data.l_flock.pid;
698 getlk->fl_start = (loff_t)lock->l_policy_data.l_flock.start;
699 getlk->fl_end = (loff_t)lock->l_policy_data.l_flock.end;
700 } else {
701 __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
702
703 /* We need to reprocess the lock to do merges or splits
704 * with existing locks owned by this process. */
705 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
706 }
707 unlock_res_and_lock(lock);
708 return rc;
709 }
710 EXPORT_SYMBOL(ldlm_flock_completion_ast);
711
ldlm_flock_blocking_ast(struct ldlm_lock * lock,struct ldlm_lock_desc * desc,void * data,int flag)712 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
713 void *data, int flag)
714 {
715 LASSERT(lock);
716 LASSERT(flag == LDLM_CB_CANCELING);
717
718 /* take lock off the deadlock detection hash list. */
719 lock_res_and_lock(lock);
720 ldlm_flock_blocking_unlink(lock);
721 unlock_res_and_lock(lock);
722 return 0;
723 }
724
ldlm_flock_policy_wire18_to_local(const ldlm_wire_policy_data_t * wpolicy,ldlm_policy_data_t * lpolicy)725 void ldlm_flock_policy_wire18_to_local(const ldlm_wire_policy_data_t *wpolicy,
726 ldlm_policy_data_t *lpolicy)
727 {
728 memset(lpolicy, 0, sizeof(*lpolicy));
729 lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
730 lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
731 lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
732 /* Compat code, old clients had no idea about owner field and
733 * relied solely on pid for ownership. Introduced in LU-104, 2.1,
734 * April 2011 */
735 lpolicy->l_flock.owner = wpolicy->l_flock.lfw_pid;
736 }
737
738
ldlm_flock_policy_wire21_to_local(const ldlm_wire_policy_data_t * wpolicy,ldlm_policy_data_t * lpolicy)739 void ldlm_flock_policy_wire21_to_local(const ldlm_wire_policy_data_t *wpolicy,
740 ldlm_policy_data_t *lpolicy)
741 {
742 memset(lpolicy, 0, sizeof(*lpolicy));
743 lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
744 lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
745 lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
746 lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
747 }
748
ldlm_flock_policy_local_to_wire(const ldlm_policy_data_t * lpolicy,ldlm_wire_policy_data_t * wpolicy)749 void ldlm_flock_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
750 ldlm_wire_policy_data_t *wpolicy)
751 {
752 memset(wpolicy, 0, sizeof(*wpolicy));
753 wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
754 wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
755 wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
756 wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;
757 }
758
759 /*
760 * Export handle<->flock hash operations.
761 */
762 static unsigned
ldlm_export_flock_hash(struct cfs_hash * hs,const void * key,unsigned mask)763 ldlm_export_flock_hash(struct cfs_hash *hs, const void *key, unsigned mask)
764 {
765 return cfs_hash_u64_hash(*(__u64 *)key, mask);
766 }
767
768 static void *
ldlm_export_flock_key(struct hlist_node * hnode)769 ldlm_export_flock_key(struct hlist_node *hnode)
770 {
771 struct ldlm_lock *lock;
772
773 lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
774 return &lock->l_policy_data.l_flock.owner;
775 }
776
777 static int
ldlm_export_flock_keycmp(const void * key,struct hlist_node * hnode)778 ldlm_export_flock_keycmp(const void *key, struct hlist_node *hnode)
779 {
780 return !memcmp(ldlm_export_flock_key(hnode), key, sizeof(__u64));
781 }
782
783 static void *
ldlm_export_flock_object(struct hlist_node * hnode)784 ldlm_export_flock_object(struct hlist_node *hnode)
785 {
786 return hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
787 }
788
789 static void
ldlm_export_flock_get(struct cfs_hash * hs,struct hlist_node * hnode)790 ldlm_export_flock_get(struct cfs_hash *hs, struct hlist_node *hnode)
791 {
792 struct ldlm_lock *lock;
793 struct ldlm_flock *flock;
794
795 lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
796 LDLM_LOCK_GET(lock);
797
798 flock = &lock->l_policy_data.l_flock;
799 LASSERT(flock->blocking_export != NULL);
800 class_export_get(flock->blocking_export);
801 flock->blocking_refs++;
802 }
803
804 static void
ldlm_export_flock_put(struct cfs_hash * hs,struct hlist_node * hnode)805 ldlm_export_flock_put(struct cfs_hash *hs, struct hlist_node *hnode)
806 {
807 struct ldlm_lock *lock;
808 struct ldlm_flock *flock;
809
810 lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
811 LDLM_LOCK_RELEASE(lock);
812
813 flock = &lock->l_policy_data.l_flock;
814 LASSERT(flock->blocking_export != NULL);
815 class_export_put(flock->blocking_export);
816 if (--flock->blocking_refs == 0) {
817 flock->blocking_owner = 0;
818 flock->blocking_export = NULL;
819 }
820 }
821
822 static cfs_hash_ops_t ldlm_export_flock_ops = {
823 .hs_hash = ldlm_export_flock_hash,
824 .hs_key = ldlm_export_flock_key,
825 .hs_keycmp = ldlm_export_flock_keycmp,
826 .hs_object = ldlm_export_flock_object,
827 .hs_get = ldlm_export_flock_get,
828 .hs_put = ldlm_export_flock_put,
829 .hs_put_locked = ldlm_export_flock_put,
830 };
831
ldlm_init_flock_export(struct obd_export * exp)832 int ldlm_init_flock_export(struct obd_export *exp)
833 {
834 if (strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_MDT_NAME) != 0)
835 return 0;
836
837 exp->exp_flock_hash =
838 cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
839 HASH_EXP_LOCK_CUR_BITS,
840 HASH_EXP_LOCK_MAX_BITS,
841 HASH_EXP_LOCK_BKT_BITS, 0,
842 CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
843 &ldlm_export_flock_ops,
844 CFS_HASH_DEFAULT | CFS_HASH_NBLK_CHANGE);
845 if (!exp->exp_flock_hash)
846 return -ENOMEM;
847
848 return 0;
849 }
850 EXPORT_SYMBOL(ldlm_init_flock_export);
851
ldlm_destroy_flock_export(struct obd_export * exp)852 void ldlm_destroy_flock_export(struct obd_export *exp)
853 {
854 if (exp->exp_flock_hash) {
855 cfs_hash_putref(exp->exp_flock_hash);
856 exp->exp_flock_hash = NULL;
857 }
858 }
859 EXPORT_SYMBOL(ldlm_destroy_flock_export);
860