1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Implementation of cl_lock for LOV layer.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  */
40 
41 #define DEBUG_SUBSYSTEM S_LOV
42 
43 #include "lov_cl_internal.h"
44 
45 /** \addtogroup lov
46  *  @{
47  */
48 
49 static struct cl_lock_closure *lov_closure_get(const struct lu_env *env,
50 					       struct cl_lock *parent);
51 
52 static int lov_lock_unuse(const struct lu_env *env,
53 			  const struct cl_lock_slice *slice);
54 /*****************************************************************************
55  *
56  * Lov lock operations.
57  *
58  */
59 
lov_sublock_env_get(const struct lu_env * env,struct cl_lock * parent,struct lov_lock_sub * lls)60 static struct lov_sublock_env *lov_sublock_env_get(const struct lu_env *env,
61 						   struct cl_lock *parent,
62 						   struct lov_lock_sub *lls)
63 {
64 	struct lov_sublock_env *subenv;
65 	struct lov_io	  *lio    = lov_env_io(env);
66 	struct cl_io	   *io     = lio->lis_cl.cis_io;
67 	struct lov_io_sub      *sub;
68 
69 	subenv = &lov_env_session(env)->ls_subenv;
70 
71 	/*
72 	 * FIXME: We tend to use the subio's env & io to call the sublock
73 	 * lock operations because osc lock sometimes stores some control
74 	 * variables in thread's IO information(Now only lockless information).
75 	 * However, if the lock's host(object) is different from the object
76 	 * for current IO, we have no way to get the subenv and subio because
77 	 * they are not initialized at all. As a temp fix, in this case,
78 	 * we still borrow the parent's env to call sublock operations.
79 	 */
80 	if (!io || !cl_object_same(io->ci_obj, parent->cll_descr.cld_obj)) {
81 		subenv->lse_env = env;
82 		subenv->lse_io  = io;
83 		subenv->lse_sub = NULL;
84 	} else {
85 		sub = lov_sub_get(env, lio, lls->sub_stripe);
86 		if (!IS_ERR(sub)) {
87 			subenv->lse_env = sub->sub_env;
88 			subenv->lse_io  = sub->sub_io;
89 			subenv->lse_sub = sub;
90 		} else {
91 			subenv = (void *)sub;
92 		}
93 	}
94 	return subenv;
95 }
96 
lov_sublock_env_put(struct lov_sublock_env * subenv)97 static void lov_sublock_env_put(struct lov_sublock_env *subenv)
98 {
99 	if (subenv && subenv->lse_sub)
100 		lov_sub_put(subenv->lse_sub);
101 }
102 
lov_sublock_adopt(const struct lu_env * env,struct lov_lock * lck,struct cl_lock * sublock,int idx,struct lov_lock_link * link)103 static void lov_sublock_adopt(const struct lu_env *env, struct lov_lock *lck,
104 			      struct cl_lock *sublock, int idx,
105 			      struct lov_lock_link *link)
106 {
107 	struct lovsub_lock *lsl;
108 	struct cl_lock     *parent = lck->lls_cl.cls_lock;
109 	int		 rc;
110 
111 	LASSERT(cl_lock_is_mutexed(parent));
112 	LASSERT(cl_lock_is_mutexed(sublock));
113 
114 	lsl = cl2sub_lock(sublock);
115 	/*
116 	 * check that sub-lock doesn't have lock link to this top-lock.
117 	 */
118 	LASSERT(lov_lock_link_find(env, lck, lsl) == NULL);
119 	LASSERT(idx < lck->lls_nr);
120 
121 	lck->lls_sub[idx].sub_lock = lsl;
122 	lck->lls_nr_filled++;
123 	LASSERT(lck->lls_nr_filled <= lck->lls_nr);
124 	list_add_tail(&link->lll_list, &lsl->lss_parents);
125 	link->lll_idx = idx;
126 	link->lll_super = lck;
127 	cl_lock_get(parent);
128 	lu_ref_add(&parent->cll_reference, "lov-child", sublock);
129 	lck->lls_sub[idx].sub_flags |= LSF_HELD;
130 	cl_lock_user_add(env, sublock);
131 
132 	rc = lov_sublock_modify(env, lck, lsl, &sublock->cll_descr, idx);
133 	LASSERT(rc == 0); /* there is no way this can fail, currently */
134 }
135 
lov_sublock_alloc(const struct lu_env * env,const struct cl_io * io,struct lov_lock * lck,int idx,struct lov_lock_link ** out)136 static struct cl_lock *lov_sublock_alloc(const struct lu_env *env,
137 					 const struct cl_io *io,
138 					 struct lov_lock *lck,
139 					 int idx, struct lov_lock_link **out)
140 {
141 	struct cl_lock       *sublock;
142 	struct cl_lock       *parent;
143 	struct lov_lock_link *link;
144 
145 	LASSERT(idx < lck->lls_nr);
146 
147 	OBD_SLAB_ALLOC_PTR_GFP(link, lov_lock_link_kmem, GFP_NOFS);
148 	if (link != NULL) {
149 		struct lov_sublock_env *subenv;
150 		struct lov_lock_sub  *lls;
151 		struct cl_lock_descr *descr;
152 
153 		parent = lck->lls_cl.cls_lock;
154 		lls    = &lck->lls_sub[idx];
155 		descr  = &lls->sub_got;
156 
157 		subenv = lov_sublock_env_get(env, parent, lls);
158 		if (!IS_ERR(subenv)) {
159 			/* CAVEAT: Don't try to add a field in lov_lock_sub
160 			 * to remember the subio. This is because lock is able
161 			 * to be cached, but this is not true for IO. This
162 			 * further means a sublock might be referenced in
163 			 * different io context. -jay */
164 
165 			sublock = cl_lock_hold(subenv->lse_env, subenv->lse_io,
166 					       descr, "lov-parent", parent);
167 			lov_sublock_env_put(subenv);
168 		} else {
169 			/* error occurs. */
170 			sublock = (void *)subenv;
171 		}
172 
173 		if (!IS_ERR(sublock))
174 			*out = link;
175 		else
176 			OBD_SLAB_FREE_PTR(link, lov_lock_link_kmem);
177 	} else
178 		sublock = ERR_PTR(-ENOMEM);
179 	return sublock;
180 }
181 
lov_sublock_unlock(const struct lu_env * env,struct lovsub_lock * lsl,struct cl_lock_closure * closure,struct lov_sublock_env * subenv)182 static void lov_sublock_unlock(const struct lu_env *env,
183 			       struct lovsub_lock *lsl,
184 			       struct cl_lock_closure *closure,
185 			       struct lov_sublock_env *subenv)
186 {
187 	lov_sublock_env_put(subenv);
188 	lsl->lss_active = NULL;
189 	cl_lock_disclosure(env, closure);
190 }
191 
lov_sublock_lock(const struct lu_env * env,struct lov_lock * lck,struct lov_lock_sub * lls,struct cl_lock_closure * closure,struct lov_sublock_env ** lsep)192 static int lov_sublock_lock(const struct lu_env *env,
193 			    struct lov_lock *lck,
194 			    struct lov_lock_sub *lls,
195 			    struct cl_lock_closure *closure,
196 			    struct lov_sublock_env **lsep)
197 {
198 	struct lovsub_lock *sublock;
199 	struct cl_lock     *child;
200 	int		 result = 0;
201 
202 	LASSERT(list_empty(&closure->clc_list));
203 
204 	sublock = lls->sub_lock;
205 	child = sublock->lss_cl.cls_lock;
206 	result = cl_lock_closure_build(env, child, closure);
207 	if (result == 0) {
208 		struct cl_lock *parent = closure->clc_origin;
209 
210 		LASSERT(cl_lock_is_mutexed(child));
211 		sublock->lss_active = parent;
212 
213 		if (unlikely((child->cll_state == CLS_FREEING) ||
214 			     (child->cll_flags & CLF_CANCELLED))) {
215 			struct lov_lock_link *link;
216 			/*
217 			 * we could race with lock deletion which temporarily
218 			 * put the lock in freeing state, bug 19080.
219 			 */
220 			LASSERT(!(lls->sub_flags & LSF_HELD));
221 
222 			link = lov_lock_link_find(env, lck, sublock);
223 			LASSERT(link != NULL);
224 			lov_lock_unlink(env, link, sublock);
225 			lov_sublock_unlock(env, sublock, closure, NULL);
226 			lck->lls_cancel_race = 1;
227 			result = CLO_REPEAT;
228 		} else if (lsep) {
229 			struct lov_sublock_env *subenv;
230 			subenv = lov_sublock_env_get(env, parent, lls);
231 			if (IS_ERR(subenv)) {
232 				lov_sublock_unlock(env, sublock,
233 						   closure, NULL);
234 				result = PTR_ERR(subenv);
235 			} else {
236 				*lsep = subenv;
237 			}
238 		}
239 	}
240 	return result;
241 }
242 
243 /**
244  * Updates the result of a top-lock operation from a result of sub-lock
245  * sub-operations. Top-operations like lov_lock_{enqueue,use,unuse}() iterate
246  * over sub-locks and lov_subresult() is used to calculate return value of a
247  * top-operation. To this end, possible return values of sub-operations are
248  * ordered as
249  *
250  *     - 0		  success
251  *     - CLO_WAIT	   wait for event
252  *     - CLO_REPEAT	 repeat top-operation
253  *     - -ne		fundamental error
254  *
255  * Top-level return code can only go down through this list. CLO_REPEAT
256  * overwrites CLO_WAIT, because lock mutex was released and sleeping condition
257  * has to be rechecked by the upper layer.
258  */
lov_subresult(int result,int rc)259 static int lov_subresult(int result, int rc)
260 {
261 	int result_rank;
262 	int rc_rank;
263 
264 	LASSERTF(result <= 0 || result == CLO_REPEAT || result == CLO_WAIT,
265 		 "result = %d", result);
266 	LASSERTF(rc <= 0 || rc == CLO_REPEAT || rc == CLO_WAIT,
267 		 "rc = %d\n", rc);
268 	CLASSERT(CLO_WAIT < CLO_REPEAT);
269 
270 	/* calculate ranks in the ordering above */
271 	result_rank = result < 0 ? 1 + CLO_REPEAT : result;
272 	rc_rank = rc < 0 ? 1 + CLO_REPEAT : rc;
273 
274 	if (result_rank < rc_rank)
275 		result = rc;
276 	return result;
277 }
278 
279 /**
280  * Creates sub-locks for a given lov_lock for the first time.
281  *
282  * Goes through all sub-objects of top-object, and creates sub-locks on every
283  * sub-object intersecting with top-lock extent. This is complicated by the
284  * fact that top-lock (that is being created) can be accessed concurrently
285  * through already created sub-locks (possibly shared with other top-locks).
286  */
lov_lock_sub_init(const struct lu_env * env,struct lov_lock * lck,const struct cl_io * io)287 static int lov_lock_sub_init(const struct lu_env *env,
288 			     struct lov_lock *lck, const struct cl_io *io)
289 {
290 	int result = 0;
291 	int i;
292 	int nr;
293 	u64 start;
294 	u64 end;
295 	u64 file_start;
296 	u64 file_end;
297 
298 	struct lov_object       *loo    = cl2lov(lck->lls_cl.cls_obj);
299 	struct lov_layout_raid0 *r0     = lov_r0(loo);
300 	struct cl_lock	  *parent = lck->lls_cl.cls_lock;
301 
302 	lck->lls_orig = parent->cll_descr;
303 	file_start = cl_offset(lov2cl(loo), parent->cll_descr.cld_start);
304 	file_end   = cl_offset(lov2cl(loo), parent->cll_descr.cld_end + 1) - 1;
305 
306 	for (i = 0, nr = 0; i < r0->lo_nr; i++) {
307 		/*
308 		 * XXX for wide striping smarter algorithm is desirable,
309 		 * breaking out of the loop, early.
310 		 */
311 		if (likely(r0->lo_sub[i] != NULL) &&
312 		    lov_stripe_intersects(loo->lo_lsm, i,
313 					  file_start, file_end, &start, &end))
314 			nr++;
315 	}
316 	LASSERT(nr > 0);
317 	OBD_ALLOC_LARGE(lck->lls_sub, nr * sizeof(lck->lls_sub[0]));
318 	if (lck->lls_sub == NULL)
319 		return -ENOMEM;
320 
321 	lck->lls_nr = nr;
322 	/*
323 	 * First, fill in sub-lock descriptions in
324 	 * lck->lls_sub[].sub_descr. They are used by lov_sublock_alloc()
325 	 * (called below in this function, and by lov_lock_enqueue()) to
326 	 * create sub-locks. At this moment, no other thread can access
327 	 * top-lock.
328 	 */
329 	for (i = 0, nr = 0; i < r0->lo_nr; ++i) {
330 		if (likely(r0->lo_sub[i] != NULL) &&
331 		    lov_stripe_intersects(loo->lo_lsm, i,
332 					  file_start, file_end, &start, &end)) {
333 			struct cl_lock_descr *descr;
334 
335 			descr = &lck->lls_sub[nr].sub_descr;
336 
337 			LASSERT(descr->cld_obj == NULL);
338 			descr->cld_obj   = lovsub2cl(r0->lo_sub[i]);
339 			descr->cld_start = cl_index(descr->cld_obj, start);
340 			descr->cld_end   = cl_index(descr->cld_obj, end);
341 			descr->cld_mode  = parent->cll_descr.cld_mode;
342 			descr->cld_gid   = parent->cll_descr.cld_gid;
343 			descr->cld_enq_flags   = parent->cll_descr.cld_enq_flags;
344 			/* XXX has no effect */
345 			lck->lls_sub[nr].sub_got = *descr;
346 			lck->lls_sub[nr].sub_stripe = i;
347 			nr++;
348 		}
349 	}
350 	LASSERT(nr == lck->lls_nr);
351 
352 	/*
353 	 * Some sub-locks can be missing at this point. This is not a problem,
354 	 * because enqueue will create them anyway. Main duty of this function
355 	 * is to fill in sub-lock descriptions in a race free manner.
356 	 */
357 	return result;
358 }
359 
lov_sublock_release(const struct lu_env * env,struct lov_lock * lck,int i,int deluser,int rc)360 static int lov_sublock_release(const struct lu_env *env, struct lov_lock *lck,
361 			       int i, int deluser, int rc)
362 {
363 	struct cl_lock *parent = lck->lls_cl.cls_lock;
364 
365 	LASSERT(cl_lock_is_mutexed(parent));
366 
367 	if (lck->lls_sub[i].sub_flags & LSF_HELD) {
368 		struct cl_lock    *sublock;
369 		int dying;
370 
371 		LASSERT(lck->lls_sub[i].sub_lock != NULL);
372 		sublock = lck->lls_sub[i].sub_lock->lss_cl.cls_lock;
373 		LASSERT(cl_lock_is_mutexed(sublock));
374 
375 		lck->lls_sub[i].sub_flags &= ~LSF_HELD;
376 		if (deluser)
377 			cl_lock_user_del(env, sublock);
378 		/*
379 		 * If the last hold is released, and cancellation is pending
380 		 * for a sub-lock, release parent mutex, to avoid keeping it
381 		 * while sub-lock is being paged out.
382 		 */
383 		dying = (sublock->cll_descr.cld_mode == CLM_PHANTOM ||
384 			 sublock->cll_descr.cld_mode == CLM_GROUP ||
385 			 (sublock->cll_flags & (CLF_CANCELPEND|CLF_DOOMED))) &&
386 			sublock->cll_holds == 1;
387 		if (dying)
388 			cl_lock_mutex_put(env, parent);
389 		cl_lock_unhold(env, sublock, "lov-parent", parent);
390 		if (dying) {
391 			cl_lock_mutex_get(env, parent);
392 			rc = lov_subresult(rc, CLO_REPEAT);
393 		}
394 		/*
395 		 * From now on lck->lls_sub[i].sub_lock is a "weak" pointer,
396 		 * not backed by a reference on a
397 		 * sub-lock. lovsub_lock_delete() will clear
398 		 * lck->lls_sub[i].sub_lock under semaphores, just before
399 		 * sub-lock is destroyed.
400 		 */
401 	}
402 	return rc;
403 }
404 
lov_sublock_hold(const struct lu_env * env,struct lov_lock * lck,int i)405 static void lov_sublock_hold(const struct lu_env *env, struct lov_lock *lck,
406 			     int i)
407 {
408 	struct cl_lock *parent = lck->lls_cl.cls_lock;
409 
410 	LASSERT(cl_lock_is_mutexed(parent));
411 
412 	if (!(lck->lls_sub[i].sub_flags & LSF_HELD)) {
413 		struct cl_lock *sublock;
414 
415 		LASSERT(lck->lls_sub[i].sub_lock != NULL);
416 		sublock = lck->lls_sub[i].sub_lock->lss_cl.cls_lock;
417 		LASSERT(cl_lock_is_mutexed(sublock));
418 		LASSERT(sublock->cll_state != CLS_FREEING);
419 
420 		lck->lls_sub[i].sub_flags |= LSF_HELD;
421 
422 		cl_lock_get_trust(sublock);
423 		cl_lock_hold_add(env, sublock, "lov-parent", parent);
424 		cl_lock_user_add(env, sublock);
425 		cl_lock_put(env, sublock);
426 	}
427 }
428 
lov_lock_fini(const struct lu_env * env,struct cl_lock_slice * slice)429 static void lov_lock_fini(const struct lu_env *env,
430 			  struct cl_lock_slice *slice)
431 {
432 	struct lov_lock *lck;
433 	int i;
434 
435 	lck = cl2lov_lock(slice);
436 	LASSERT(lck->lls_nr_filled == 0);
437 	if (lck->lls_sub != NULL) {
438 		for (i = 0; i < lck->lls_nr; ++i)
439 			/*
440 			 * No sub-locks exists at this point, as sub-lock has
441 			 * a reference on its parent.
442 			 */
443 			LASSERT(lck->lls_sub[i].sub_lock == NULL);
444 		OBD_FREE_LARGE(lck->lls_sub,
445 			       lck->lls_nr * sizeof(lck->lls_sub[0]));
446 	}
447 	OBD_SLAB_FREE_PTR(lck, lov_lock_kmem);
448 }
449 
lov_lock_enqueue_wait(const struct lu_env * env,struct lov_lock * lck,struct cl_lock * sublock)450 static int lov_lock_enqueue_wait(const struct lu_env *env,
451 				 struct lov_lock *lck,
452 				 struct cl_lock *sublock)
453 {
454 	struct cl_lock *lock = lck->lls_cl.cls_lock;
455 	int	     result;
456 
457 	LASSERT(cl_lock_is_mutexed(lock));
458 
459 	cl_lock_mutex_put(env, lock);
460 	result = cl_lock_enqueue_wait(env, sublock, 0);
461 	cl_lock_mutex_get(env, lock);
462 	return result ?: CLO_REPEAT;
463 }
464 
465 /**
466  * Tries to advance a state machine of a given sub-lock toward enqueuing of
467  * the top-lock.
468  *
469  * \retval 0 if state-transition can proceed
470  * \retval -ve otherwise.
471  */
lov_lock_enqueue_one(const struct lu_env * env,struct lov_lock * lck,struct cl_lock * sublock,struct cl_io * io,__u32 enqflags,int last)472 static int lov_lock_enqueue_one(const struct lu_env *env, struct lov_lock *lck,
473 				struct cl_lock *sublock,
474 				struct cl_io *io, __u32 enqflags, int last)
475 {
476 	int result;
477 
478 	/* first, try to enqueue a sub-lock ... */
479 	result = cl_enqueue_try(env, sublock, io, enqflags);
480 	if ((sublock->cll_state == CLS_ENQUEUED) && !(enqflags & CEF_AGL)) {
481 		/* if it is enqueued, try to `wait' on it---maybe it's already
482 		 * granted */
483 		result = cl_wait_try(env, sublock);
484 		if (result == CLO_REENQUEUED)
485 			result = CLO_WAIT;
486 	}
487 	/*
488 	 * If CEF_ASYNC flag is set, then all sub-locks can be enqueued in
489 	 * parallel, otherwise---enqueue has to wait until sub-lock is granted
490 	 * before proceeding to the next one.
491 	 */
492 	if ((result == CLO_WAIT) && (sublock->cll_state <= CLS_HELD) &&
493 	    (enqflags & CEF_ASYNC) && (!last || (enqflags & CEF_AGL)))
494 		result = 0;
495 	return result;
496 }
497 
498 /**
499  * Helper function for lov_lock_enqueue() that creates missing sub-lock.
500  */
lov_sublock_fill(const struct lu_env * env,struct cl_lock * parent,struct cl_io * io,struct lov_lock * lck,int idx)501 static int lov_sublock_fill(const struct lu_env *env, struct cl_lock *parent,
502 			    struct cl_io *io, struct lov_lock *lck, int idx)
503 {
504 	struct lov_lock_link *link = NULL;
505 	struct cl_lock       *sublock;
506 	int		   result;
507 
508 	LASSERT(parent->cll_depth == 1);
509 	cl_lock_mutex_put(env, parent);
510 	sublock = lov_sublock_alloc(env, io, lck, idx, &link);
511 	if (!IS_ERR(sublock))
512 		cl_lock_mutex_get(env, sublock);
513 	cl_lock_mutex_get(env, parent);
514 
515 	if (!IS_ERR(sublock)) {
516 		cl_lock_get_trust(sublock);
517 		if (parent->cll_state == CLS_QUEUING &&
518 		    lck->lls_sub[idx].sub_lock == NULL) {
519 			lov_sublock_adopt(env, lck, sublock, idx, link);
520 		} else {
521 			OBD_SLAB_FREE_PTR(link, lov_lock_link_kmem);
522 			/* other thread allocated sub-lock, or enqueue is no
523 			 * longer going on */
524 			cl_lock_mutex_put(env, parent);
525 			cl_lock_unhold(env, sublock, "lov-parent", parent);
526 			cl_lock_mutex_get(env, parent);
527 		}
528 		cl_lock_mutex_put(env, sublock);
529 		cl_lock_put(env, sublock);
530 		result = CLO_REPEAT;
531 	} else
532 		result = PTR_ERR(sublock);
533 	return result;
534 }
535 
536 /**
537  * Implementation of cl_lock_operations::clo_enqueue() for lov layer. This
538  * function is rather subtle, as it enqueues top-lock (i.e., advances top-lock
539  * state machine from CLS_QUEUING to CLS_ENQUEUED states) by juggling sub-lock
540  * state machines in the face of sub-locks sharing (by multiple top-locks),
541  * and concurrent sub-lock cancellations.
542  */
lov_lock_enqueue(const struct lu_env * env,const struct cl_lock_slice * slice,struct cl_io * io,__u32 enqflags)543 static int lov_lock_enqueue(const struct lu_env *env,
544 			    const struct cl_lock_slice *slice,
545 			    struct cl_io *io, __u32 enqflags)
546 {
547 	struct cl_lock	 *lock    = slice->cls_lock;
548 	struct lov_lock	*lck     = cl2lov_lock(slice);
549 	struct cl_lock_closure *closure = lov_closure_get(env, lock);
550 	int i;
551 	int result;
552 	enum cl_lock_state minstate;
553 
554 	for (result = 0, minstate = CLS_FREEING, i = 0; i < lck->lls_nr; ++i) {
555 		int rc;
556 		struct lovsub_lock     *sub;
557 		struct lov_lock_sub    *lls;
558 		struct cl_lock	 *sublock;
559 		struct lov_sublock_env *subenv;
560 
561 		if (lock->cll_state != CLS_QUEUING) {
562 			/*
563 			 * Lock might have left QUEUING state if previous
564 			 * iteration released its mutex. Stop enqueing in this
565 			 * case and let the upper layer to decide what to do.
566 			 */
567 			LASSERT(i > 0 && result != 0);
568 			break;
569 		}
570 
571 		lls = &lck->lls_sub[i];
572 		sub = lls->sub_lock;
573 		/*
574 		 * Sub-lock might have been canceled, while top-lock was
575 		 * cached.
576 		 */
577 		if (sub == NULL) {
578 			result = lov_sublock_fill(env, lock, io, lck, i);
579 			/* lov_sublock_fill() released @lock mutex,
580 			 * restart. */
581 			break;
582 		}
583 		sublock = sub->lss_cl.cls_lock;
584 		rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
585 		if (rc == 0) {
586 			lov_sublock_hold(env, lck, i);
587 			rc = lov_lock_enqueue_one(subenv->lse_env, lck, sublock,
588 						  subenv->lse_io, enqflags,
589 						  i == lck->lls_nr - 1);
590 			minstate = min(minstate, sublock->cll_state);
591 			if (rc == CLO_WAIT) {
592 				switch (sublock->cll_state) {
593 				case CLS_QUEUING:
594 					/* take recursive mutex, the lock is
595 					 * released in lov_lock_enqueue_wait.
596 					 */
597 					cl_lock_mutex_get(env, sublock);
598 					lov_sublock_unlock(env, sub, closure,
599 							   subenv);
600 					rc = lov_lock_enqueue_wait(env, lck,
601 								   sublock);
602 					break;
603 				case CLS_CACHED:
604 					cl_lock_get(sublock);
605 					/* take recursive mutex of sublock */
606 					cl_lock_mutex_get(env, sublock);
607 					/* need to release all locks in closure
608 					 * otherwise it may deadlock. LU-2683.*/
609 					lov_sublock_unlock(env, sub, closure,
610 							   subenv);
611 					/* sublock and parent are held. */
612 					rc = lov_sublock_release(env, lck, i,
613 								 1, rc);
614 					cl_lock_mutex_put(env, sublock);
615 					cl_lock_put(env, sublock);
616 					break;
617 				default:
618 					lov_sublock_unlock(env, sub, closure,
619 							   subenv);
620 					break;
621 				}
622 			} else {
623 				LASSERT(sublock->cll_conflict == NULL);
624 				lov_sublock_unlock(env, sub, closure, subenv);
625 			}
626 		}
627 		result = lov_subresult(result, rc);
628 		if (result != 0)
629 			break;
630 	}
631 	cl_lock_closure_fini(closure);
632 	return result ?: minstate >= CLS_ENQUEUED ? 0 : CLO_WAIT;
633 }
634 
lov_lock_unuse(const struct lu_env * env,const struct cl_lock_slice * slice)635 static int lov_lock_unuse(const struct lu_env *env,
636 			  const struct cl_lock_slice *slice)
637 {
638 	struct lov_lock	*lck     = cl2lov_lock(slice);
639 	struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
640 	int i;
641 	int result;
642 
643 	for (result = 0, i = 0; i < lck->lls_nr; ++i) {
644 		int rc;
645 		struct lovsub_lock     *sub;
646 		struct cl_lock	 *sublock;
647 		struct lov_lock_sub    *lls;
648 		struct lov_sublock_env *subenv;
649 
650 		/* top-lock state cannot change concurrently, because single
651 		 * thread (one that released the last hold) carries unlocking
652 		 * to the completion. */
653 		LASSERT(slice->cls_lock->cll_state == CLS_INTRANSIT);
654 		lls = &lck->lls_sub[i];
655 		sub = lls->sub_lock;
656 		if (sub == NULL)
657 			continue;
658 
659 		sublock = sub->lss_cl.cls_lock;
660 		rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
661 		if (rc == 0) {
662 			if (lls->sub_flags & LSF_HELD) {
663 				LASSERT(sublock->cll_state == CLS_HELD ||
664 					sublock->cll_state == CLS_ENQUEUED);
665 				rc = cl_unuse_try(subenv->lse_env, sublock);
666 				rc = lov_sublock_release(env, lck, i, 0, rc);
667 			}
668 			lov_sublock_unlock(env, sub, closure, subenv);
669 		}
670 		result = lov_subresult(result, rc);
671 	}
672 
673 	if (result == 0 && lck->lls_cancel_race) {
674 		lck->lls_cancel_race = 0;
675 		result = -ESTALE;
676 	}
677 	cl_lock_closure_fini(closure);
678 	return result;
679 }
680 
681 
lov_lock_cancel(const struct lu_env * env,const struct cl_lock_slice * slice)682 static void lov_lock_cancel(const struct lu_env *env,
683 			   const struct cl_lock_slice *slice)
684 {
685 	struct lov_lock	*lck     = cl2lov_lock(slice);
686 	struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
687 	int i;
688 	int result;
689 
690 	for (result = 0, i = 0; i < lck->lls_nr; ++i) {
691 		int rc;
692 		struct lovsub_lock     *sub;
693 		struct cl_lock	 *sublock;
694 		struct lov_lock_sub    *lls;
695 		struct lov_sublock_env *subenv;
696 
697 		/* top-lock state cannot change concurrently, because single
698 		 * thread (one that released the last hold) carries unlocking
699 		 * to the completion. */
700 		lls = &lck->lls_sub[i];
701 		sub = lls->sub_lock;
702 		if (sub == NULL)
703 			continue;
704 
705 		sublock = sub->lss_cl.cls_lock;
706 		rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
707 		if (rc == 0) {
708 			if (!(lls->sub_flags & LSF_HELD)) {
709 				lov_sublock_unlock(env, sub, closure, subenv);
710 				continue;
711 			}
712 
713 			switch (sublock->cll_state) {
714 			case CLS_HELD:
715 				rc = cl_unuse_try(subenv->lse_env, sublock);
716 				lov_sublock_release(env, lck, i, 0, 0);
717 				break;
718 			default:
719 				lov_sublock_release(env, lck, i, 1, 0);
720 				break;
721 			}
722 			lov_sublock_unlock(env, sub, closure, subenv);
723 		}
724 
725 		if (rc == CLO_REPEAT) {
726 			--i;
727 			continue;
728 		}
729 
730 		result = lov_subresult(result, rc);
731 	}
732 
733 	if (result)
734 		CL_LOCK_DEBUG(D_ERROR, env, slice->cls_lock,
735 			      "lov_lock_cancel fails with %d.\n", result);
736 
737 	cl_lock_closure_fini(closure);
738 }
739 
lov_lock_wait(const struct lu_env * env,const struct cl_lock_slice * slice)740 static int lov_lock_wait(const struct lu_env *env,
741 			 const struct cl_lock_slice *slice)
742 {
743 	struct lov_lock	*lck     = cl2lov_lock(slice);
744 	struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
745 	enum cl_lock_state      minstate;
746 	int		     reenqueued;
747 	int		     result;
748 	int		     i;
749 
750 again:
751 	for (result = 0, minstate = CLS_FREEING, i = 0, reenqueued = 0;
752 	     i < lck->lls_nr; ++i) {
753 		int rc;
754 		struct lovsub_lock     *sub;
755 		struct cl_lock	 *sublock;
756 		struct lov_lock_sub    *lls;
757 		struct lov_sublock_env *subenv;
758 
759 		lls = &lck->lls_sub[i];
760 		sub = lls->sub_lock;
761 		LASSERT(sub != NULL);
762 		sublock = sub->lss_cl.cls_lock;
763 		rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
764 		if (rc == 0) {
765 			LASSERT(sublock->cll_state >= CLS_ENQUEUED);
766 			if (sublock->cll_state < CLS_HELD)
767 				rc = cl_wait_try(env, sublock);
768 
769 			minstate = min(minstate, sublock->cll_state);
770 			lov_sublock_unlock(env, sub, closure, subenv);
771 		}
772 		if (rc == CLO_REENQUEUED) {
773 			reenqueued++;
774 			rc = 0;
775 		}
776 		result = lov_subresult(result, rc);
777 		if (result != 0)
778 			break;
779 	}
780 	/* Each sublock only can be reenqueued once, so will not loop for
781 	 * ever. */
782 	if (result == 0 && reenqueued != 0)
783 		goto again;
784 	cl_lock_closure_fini(closure);
785 	return result ?: minstate >= CLS_HELD ? 0 : CLO_WAIT;
786 }
787 
lov_lock_use(const struct lu_env * env,const struct cl_lock_slice * slice)788 static int lov_lock_use(const struct lu_env *env,
789 			const struct cl_lock_slice *slice)
790 {
791 	struct lov_lock	*lck     = cl2lov_lock(slice);
792 	struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
793 	int		     result;
794 	int		     i;
795 
796 	LASSERT(slice->cls_lock->cll_state == CLS_INTRANSIT);
797 
798 	for (result = 0, i = 0; i < lck->lls_nr; ++i) {
799 		int rc;
800 		struct lovsub_lock     *sub;
801 		struct cl_lock	 *sublock;
802 		struct lov_lock_sub    *lls;
803 		struct lov_sublock_env *subenv;
804 
805 		LASSERT(slice->cls_lock->cll_state == CLS_INTRANSIT);
806 
807 		lls = &lck->lls_sub[i];
808 		sub = lls->sub_lock;
809 		if (sub == NULL) {
810 			/*
811 			 * Sub-lock might have been canceled, while top-lock was
812 			 * cached.
813 			 */
814 			result = -ESTALE;
815 			break;
816 		}
817 
818 		sublock = sub->lss_cl.cls_lock;
819 		rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
820 		if (rc == 0) {
821 			LASSERT(sublock->cll_state != CLS_FREEING);
822 			lov_sublock_hold(env, lck, i);
823 			if (sublock->cll_state == CLS_CACHED) {
824 				rc = cl_use_try(subenv->lse_env, sublock, 0);
825 				if (rc != 0)
826 					rc = lov_sublock_release(env, lck,
827 								 i, 1, rc);
828 			} else if (sublock->cll_state == CLS_NEW) {
829 				/* Sub-lock might have been canceled, while
830 				 * top-lock was cached. */
831 				result = -ESTALE;
832 				lov_sublock_release(env, lck, i, 1, result);
833 			}
834 			lov_sublock_unlock(env, sub, closure, subenv);
835 		}
836 		result = lov_subresult(result, rc);
837 		if (result != 0)
838 			break;
839 	}
840 
841 	if (lck->lls_cancel_race) {
842 		/*
843 		 * If there is unlocking happened at the same time, then
844 		 * sublock_lock state should be FREEING, and lov_sublock_lock
845 		 * should return CLO_REPEAT. In this case, it should return
846 		 * ESTALE, and up layer should reset the lock state to be NEW.
847 		 */
848 		lck->lls_cancel_race = 0;
849 		LASSERT(result != 0);
850 		result = -ESTALE;
851 	}
852 	cl_lock_closure_fini(closure);
853 	return result;
854 }
855 
856 #if 0
857 static int lock_lock_multi_match()
858 {
859 	struct cl_lock	  *lock    = slice->cls_lock;
860 	struct cl_lock_descr    *subneed = &lov_env_info(env)->lti_ldescr;
861 	struct lov_object       *loo     = cl2lov(lov->lls_cl.cls_obj);
862 	struct lov_layout_raid0 *r0      = lov_r0(loo);
863 	struct lov_lock_sub     *sub;
864 	struct cl_object	*subobj;
865 	u64  fstart;
866 	u64  fend;
867 	u64  start;
868 	u64  end;
869 	int i;
870 
871 	fstart = cl_offset(need->cld_obj, need->cld_start);
872 	fend   = cl_offset(need->cld_obj, need->cld_end + 1) - 1;
873 	subneed->cld_mode = need->cld_mode;
874 	cl_lock_mutex_get(env, lock);
875 	for (i = 0; i < lov->lls_nr; ++i) {
876 		sub = &lov->lls_sub[i];
877 		if (sub->sub_lock == NULL)
878 			continue;
879 		subobj = sub->sub_descr.cld_obj;
880 		if (!lov_stripe_intersects(loo->lo_lsm, sub->sub_stripe,
881 					   fstart, fend, &start, &end))
882 			continue;
883 		subneed->cld_start = cl_index(subobj, start);
884 		subneed->cld_end   = cl_index(subobj, end);
885 		subneed->cld_obj   = subobj;
886 		if (!cl_lock_ext_match(&sub->sub_got, subneed)) {
887 			result = 0;
888 			break;
889 		}
890 	}
891 	cl_lock_mutex_put(env, lock);
892 }
893 #endif
894 
895 /**
896  * Check if the extent region \a descr is covered by \a child against the
897  * specific \a stripe.
898  */
lov_lock_stripe_is_matching(const struct lu_env * env,struct lov_object * lov,int stripe,const struct cl_lock_descr * child,const struct cl_lock_descr * descr)899 static int lov_lock_stripe_is_matching(const struct lu_env *env,
900 				       struct lov_object *lov, int stripe,
901 				       const struct cl_lock_descr *child,
902 				       const struct cl_lock_descr *descr)
903 {
904 	struct lov_stripe_md *lsm = lov->lo_lsm;
905 	u64 start;
906 	u64 end;
907 	int result;
908 
909 	if (lov_r0(lov)->lo_nr == 1)
910 		return cl_lock_ext_match(child, descr);
911 
912 	/*
913 	 * For a multi-stripes object:
914 	 * - make sure the descr only covers child's stripe, and
915 	 * - check if extent is matching.
916 	 */
917 	start = cl_offset(&lov->lo_cl, descr->cld_start);
918 	end   = cl_offset(&lov->lo_cl, descr->cld_end + 1) - 1;
919 	result = 0;
920 	/* glimpse should work on the object with LOV EA hole. */
921 	if (end - start <= lsm->lsm_stripe_size) {
922 		int idx;
923 
924 		idx = lov_stripe_number(lsm, start);
925 		if (idx == stripe ||
926 		    unlikely(lov_r0(lov)->lo_sub[idx] == NULL)) {
927 			idx = lov_stripe_number(lsm, end);
928 			if (idx == stripe ||
929 			    unlikely(lov_r0(lov)->lo_sub[idx] == NULL))
930 				result = 1;
931 		}
932 	}
933 
934 	if (result != 0) {
935 		struct cl_lock_descr *subd = &lov_env_info(env)->lti_ldescr;
936 		u64 sub_start;
937 		u64 sub_end;
938 
939 		subd->cld_obj  = NULL;   /* don't need sub object at all */
940 		subd->cld_mode = descr->cld_mode;
941 		subd->cld_gid  = descr->cld_gid;
942 		result = lov_stripe_intersects(lsm, stripe, start, end,
943 					       &sub_start, &sub_end);
944 		LASSERT(result);
945 		subd->cld_start = cl_index(child->cld_obj, sub_start);
946 		subd->cld_end   = cl_index(child->cld_obj, sub_end);
947 		result = cl_lock_ext_match(child, subd);
948 	}
949 	return result;
950 }
951 
952 /**
953  * An implementation of cl_lock_operations::clo_fits_into() method.
954  *
955  * Checks whether a lock (given by \a slice) is suitable for \a
956  * io. Multi-stripe locks can be used only for "quick" io, like truncate, or
957  * O_APPEND write.
958  *
959  * \see ccc_lock_fits_into().
960  */
lov_lock_fits_into(const struct lu_env * env,const struct cl_lock_slice * slice,const struct cl_lock_descr * need,const struct cl_io * io)961 static int lov_lock_fits_into(const struct lu_env *env,
962 			      const struct cl_lock_slice *slice,
963 			      const struct cl_lock_descr *need,
964 			      const struct cl_io *io)
965 {
966 	struct lov_lock   *lov = cl2lov_lock(slice);
967 	struct lov_object *obj = cl2lov(slice->cls_obj);
968 	int result;
969 
970 	LASSERT(cl_object_same(need->cld_obj, slice->cls_obj));
971 	LASSERT(lov->lls_nr > 0);
972 
973 	/* for top lock, it's necessary to match enq flags otherwise it will
974 	 * run into problem if a sublock is missing and reenqueue. */
975 	if (need->cld_enq_flags != lov->lls_orig.cld_enq_flags)
976 		return 0;
977 
978 	if (need->cld_mode == CLM_GROUP)
979 		/*
980 		 * always allow to match group lock.
981 		 */
982 		result = cl_lock_ext_match(&lov->lls_orig, need);
983 	else if (lov->lls_nr == 1) {
984 		struct cl_lock_descr *got = &lov->lls_sub[0].sub_got;
985 		result = lov_lock_stripe_is_matching(env,
986 						     cl2lov(slice->cls_obj),
987 						     lov->lls_sub[0].sub_stripe,
988 						     got, need);
989 	} else if (io->ci_type != CIT_SETATTR && io->ci_type != CIT_MISC &&
990 		   !cl_io_is_append(io) && need->cld_mode != CLM_PHANTOM)
991 		/*
992 		 * Multi-stripe locks are only suitable for `quick' IO and for
993 		 * glimpse.
994 		 */
995 		result = 0;
996 	else
997 		/*
998 		 * Most general case: multi-stripe existing lock, and
999 		 * (potentially) multi-stripe @need lock. Check that @need is
1000 		 * covered by @lov's sub-locks.
1001 		 *
1002 		 * For now, ignore lock expansions made by the server, and
1003 		 * match against original lock extent.
1004 		 */
1005 		result = cl_lock_ext_match(&lov->lls_orig, need);
1006 	CDEBUG(D_DLMTRACE, DDESCR"/"DDESCR" %d %d/%d: %d\n",
1007 	       PDESCR(&lov->lls_orig), PDESCR(&lov->lls_sub[0].sub_got),
1008 	       lov->lls_sub[0].sub_stripe, lov->lls_nr, lov_r0(obj)->lo_nr,
1009 	       result);
1010 	return result;
1011 }
1012 
lov_lock_unlink(const struct lu_env * env,struct lov_lock_link * link,struct lovsub_lock * sub)1013 void lov_lock_unlink(const struct lu_env *env,
1014 		     struct lov_lock_link *link, struct lovsub_lock *sub)
1015 {
1016 	struct lov_lock *lck    = link->lll_super;
1017 	struct cl_lock  *parent = lck->lls_cl.cls_lock;
1018 
1019 	LASSERT(cl_lock_is_mutexed(parent));
1020 	LASSERT(cl_lock_is_mutexed(sub->lss_cl.cls_lock));
1021 
1022 	list_del_init(&link->lll_list);
1023 	LASSERT(lck->lls_sub[link->lll_idx].sub_lock == sub);
1024 	/* yank this sub-lock from parent's array */
1025 	lck->lls_sub[link->lll_idx].sub_lock = NULL;
1026 	LASSERT(lck->lls_nr_filled > 0);
1027 	lck->lls_nr_filled--;
1028 	lu_ref_del(&parent->cll_reference, "lov-child", sub->lss_cl.cls_lock);
1029 	cl_lock_put(env, parent);
1030 	OBD_SLAB_FREE_PTR(link, lov_lock_link_kmem);
1031 }
1032 
lov_lock_link_find(const struct lu_env * env,struct lov_lock * lck,struct lovsub_lock * sub)1033 struct lov_lock_link *lov_lock_link_find(const struct lu_env *env,
1034 					 struct lov_lock *lck,
1035 					 struct lovsub_lock *sub)
1036 {
1037 	struct lov_lock_link *scan;
1038 
1039 	LASSERT(cl_lock_is_mutexed(sub->lss_cl.cls_lock));
1040 
1041 	list_for_each_entry(scan, &sub->lss_parents, lll_list) {
1042 		if (scan->lll_super == lck)
1043 			return scan;
1044 	}
1045 	return NULL;
1046 }
1047 
1048 /**
1049  * An implementation of cl_lock_operations::clo_delete() method. This is
1050  * invoked for "top-to-bottom" delete, when lock destruction starts from the
1051  * top-lock, e.g., as a result of inode destruction.
1052  *
1053  * Unlinks top-lock from all its sub-locks. Sub-locks are not deleted there:
1054  * this is done separately elsewhere:
1055  *
1056  *     - for inode destruction, lov_object_delete() calls cl_object_kill() for
1057  *       each sub-object, purging its locks;
1058  *
1059  *     - in other cases (e.g., a fatal error with a top-lock) sub-locks are
1060  *       left in the cache.
1061  */
lov_lock_delete(const struct lu_env * env,const struct cl_lock_slice * slice)1062 static void lov_lock_delete(const struct lu_env *env,
1063 			    const struct cl_lock_slice *slice)
1064 {
1065 	struct lov_lock	*lck     = cl2lov_lock(slice);
1066 	struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
1067 	struct lov_lock_link   *link;
1068 	int		     rc;
1069 	int		     i;
1070 
1071 	LASSERT(slice->cls_lock->cll_state == CLS_FREEING);
1072 
1073 	for (i = 0; i < lck->lls_nr; ++i) {
1074 		struct lov_lock_sub *lls = &lck->lls_sub[i];
1075 		struct lovsub_lock  *lsl = lls->sub_lock;
1076 
1077 		if (lsl == NULL) /* already removed */
1078 			continue;
1079 
1080 		rc = lov_sublock_lock(env, lck, lls, closure, NULL);
1081 		if (rc == CLO_REPEAT) {
1082 			--i;
1083 			continue;
1084 		}
1085 
1086 		LASSERT(rc == 0);
1087 		LASSERT(lsl->lss_cl.cls_lock->cll_state < CLS_FREEING);
1088 
1089 		if (lls->sub_flags & LSF_HELD)
1090 			lov_sublock_release(env, lck, i, 1, 0);
1091 
1092 		link = lov_lock_link_find(env, lck, lsl);
1093 		LASSERT(link != NULL);
1094 		lov_lock_unlink(env, link, lsl);
1095 		LASSERT(lck->lls_sub[i].sub_lock == NULL);
1096 
1097 		lov_sublock_unlock(env, lsl, closure, NULL);
1098 	}
1099 
1100 	cl_lock_closure_fini(closure);
1101 }
1102 
lov_lock_print(const struct lu_env * env,void * cookie,lu_printer_t p,const struct cl_lock_slice * slice)1103 static int lov_lock_print(const struct lu_env *env, void *cookie,
1104 			  lu_printer_t p, const struct cl_lock_slice *slice)
1105 {
1106 	struct lov_lock *lck = cl2lov_lock(slice);
1107 	int	      i;
1108 
1109 	(*p)(env, cookie, "%d\n", lck->lls_nr);
1110 	for (i = 0; i < lck->lls_nr; ++i) {
1111 		struct lov_lock_sub *sub;
1112 
1113 		sub = &lck->lls_sub[i];
1114 		(*p)(env, cookie, "    %d %x: ", i, sub->sub_flags);
1115 		if (sub->sub_lock != NULL)
1116 			cl_lock_print(env, cookie, p,
1117 				      sub->sub_lock->lss_cl.cls_lock);
1118 		else
1119 			(*p)(env, cookie, "---\n");
1120 	}
1121 	return 0;
1122 }
1123 
1124 static const struct cl_lock_operations lov_lock_ops = {
1125 	.clo_fini      = lov_lock_fini,
1126 	.clo_enqueue   = lov_lock_enqueue,
1127 	.clo_wait      = lov_lock_wait,
1128 	.clo_use       = lov_lock_use,
1129 	.clo_unuse     = lov_lock_unuse,
1130 	.clo_cancel    = lov_lock_cancel,
1131 	.clo_fits_into = lov_lock_fits_into,
1132 	.clo_delete    = lov_lock_delete,
1133 	.clo_print     = lov_lock_print
1134 };
1135 
lov_lock_init_raid0(const struct lu_env * env,struct cl_object * obj,struct cl_lock * lock,const struct cl_io * io)1136 int lov_lock_init_raid0(const struct lu_env *env, struct cl_object *obj,
1137 			struct cl_lock *lock, const struct cl_io *io)
1138 {
1139 	struct lov_lock *lck;
1140 	int result;
1141 
1142 	OBD_SLAB_ALLOC_PTR_GFP(lck, lov_lock_kmem, GFP_NOFS);
1143 	if (lck != NULL) {
1144 		cl_lock_slice_add(lock, &lck->lls_cl, obj, &lov_lock_ops);
1145 		result = lov_lock_sub_init(env, lck, io);
1146 	} else
1147 		result = -ENOMEM;
1148 	return result;
1149 }
1150 
lov_empty_lock_fini(const struct lu_env * env,struct cl_lock_slice * slice)1151 static void lov_empty_lock_fini(const struct lu_env *env,
1152 				struct cl_lock_slice *slice)
1153 {
1154 	struct lov_lock *lck = cl2lov_lock(slice);
1155 	OBD_SLAB_FREE_PTR(lck, lov_lock_kmem);
1156 }
1157 
lov_empty_lock_print(const struct lu_env * env,void * cookie,lu_printer_t p,const struct cl_lock_slice * slice)1158 static int lov_empty_lock_print(const struct lu_env *env, void *cookie,
1159 			lu_printer_t p, const struct cl_lock_slice *slice)
1160 {
1161 	(*p)(env, cookie, "empty\n");
1162 	return 0;
1163 }
1164 
1165 /* XXX: more methods will be added later. */
1166 static const struct cl_lock_operations lov_empty_lock_ops = {
1167 	.clo_fini  = lov_empty_lock_fini,
1168 	.clo_print = lov_empty_lock_print
1169 };
1170 
lov_lock_init_empty(const struct lu_env * env,struct cl_object * obj,struct cl_lock * lock,const struct cl_io * io)1171 int lov_lock_init_empty(const struct lu_env *env, struct cl_object *obj,
1172 		struct cl_lock *lock, const struct cl_io *io)
1173 {
1174 	struct lov_lock *lck;
1175 	int result = -ENOMEM;
1176 
1177 	OBD_SLAB_ALLOC_PTR_GFP(lck, lov_lock_kmem, GFP_NOFS);
1178 	if (lck != NULL) {
1179 		cl_lock_slice_add(lock, &lck->lls_cl, obj, &lov_empty_lock_ops);
1180 		lck->lls_orig = lock->cll_descr;
1181 		result = 0;
1182 	}
1183 	return result;
1184 }
1185 
lov_closure_get(const struct lu_env * env,struct cl_lock * parent)1186 static struct cl_lock_closure *lov_closure_get(const struct lu_env *env,
1187 					       struct cl_lock *parent)
1188 {
1189 	struct cl_lock_closure *closure;
1190 
1191 	closure = &lov_env_info(env)->lti_closure;
1192 	LASSERT(list_empty(&closure->clc_list));
1193 	cl_lock_closure_init(env, closure, parent, 1);
1194 	return closure;
1195 }
1196 
1197 
1198 /** @} lov */
1199