1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Implementation of cl_lock for LOV layer.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  */
40 
41 #define DEBUG_SUBSYSTEM S_LOV
42 
43 #include "lov_cl_internal.h"
44 
45 /** \addtogroup lov
46  *  @{
47  */
48 
49 static struct cl_lock_closure *lov_closure_get(const struct lu_env *env,
50 					       struct cl_lock *parent);
51 
52 static int lov_lock_unuse(const struct lu_env *env,
53 			  const struct cl_lock_slice *slice);
54 /*****************************************************************************
55  *
56  * Lov lock operations.
57  *
58  */
59 
lov_sublock_env_get(const struct lu_env * env,struct cl_lock * parent,struct lov_lock_sub * lls)60 static struct lov_sublock_env *lov_sublock_env_get(const struct lu_env *env,
61 						   struct cl_lock *parent,
62 						   struct lov_lock_sub *lls)
63 {
64 	struct lov_sublock_env *subenv;
65 	struct lov_io	  *lio    = lov_env_io(env);
66 	struct cl_io	   *io     = lio->lis_cl.cis_io;
67 	struct lov_io_sub      *sub;
68 
69 	subenv = &lov_env_session(env)->ls_subenv;
70 
71 	/*
72 	 * FIXME: We tend to use the subio's env & io to call the sublock
73 	 * lock operations because osc lock sometimes stores some control
74 	 * variables in thread's IO information(Now only lockless information).
75 	 * However, if the lock's host(object) is different from the object
76 	 * for current IO, we have no way to get the subenv and subio because
77 	 * they are not initialized at all. As a temp fix, in this case,
78 	 * we still borrow the parent's env to call sublock operations.
79 	 */
80 	if (!io || !cl_object_same(io->ci_obj, parent->cll_descr.cld_obj)) {
81 		subenv->lse_env = env;
82 		subenv->lse_io  = io;
83 		subenv->lse_sub = NULL;
84 	} else {
85 		sub = lov_sub_get(env, lio, lls->sub_stripe);
86 		if (!IS_ERR(sub)) {
87 			subenv->lse_env = sub->sub_env;
88 			subenv->lse_io  = sub->sub_io;
89 			subenv->lse_sub = sub;
90 		} else {
91 			subenv = (void *)sub;
92 		}
93 	}
94 	return subenv;
95 }
96 
lov_sublock_env_put(struct lov_sublock_env * subenv)97 static void lov_sublock_env_put(struct lov_sublock_env *subenv)
98 {
99 	if (subenv && subenv->lse_sub)
100 		lov_sub_put(subenv->lse_sub);
101 }
102 
lov_sublock_adopt(const struct lu_env * env,struct lov_lock * lck,struct cl_lock * sublock,int idx,struct lov_lock_link * link)103 static void lov_sublock_adopt(const struct lu_env *env, struct lov_lock *lck,
104 			      struct cl_lock *sublock, int idx,
105 			      struct lov_lock_link *link)
106 {
107 	struct lovsub_lock *lsl;
108 	struct cl_lock     *parent = lck->lls_cl.cls_lock;
109 	int		 rc;
110 
111 	LASSERT(cl_lock_is_mutexed(parent));
112 	LASSERT(cl_lock_is_mutexed(sublock));
113 
114 	lsl = cl2sub_lock(sublock);
115 	/*
116 	 * check that sub-lock doesn't have lock link to this top-lock.
117 	 */
118 	LASSERT(lov_lock_link_find(env, lck, lsl) == NULL);
119 	LASSERT(idx < lck->lls_nr);
120 
121 	lck->lls_sub[idx].sub_lock = lsl;
122 	lck->lls_nr_filled++;
123 	LASSERT(lck->lls_nr_filled <= lck->lls_nr);
124 	list_add_tail(&link->lll_list, &lsl->lss_parents);
125 	link->lll_idx = idx;
126 	link->lll_super = lck;
127 	cl_lock_get(parent);
128 	lu_ref_add(&parent->cll_reference, "lov-child", sublock);
129 	lck->lls_sub[idx].sub_flags |= LSF_HELD;
130 	cl_lock_user_add(env, sublock);
131 
132 	rc = lov_sublock_modify(env, lck, lsl, &sublock->cll_descr, idx);
133 	LASSERT(rc == 0); /* there is no way this can fail, currently */
134 }
135 
lov_sublock_alloc(const struct lu_env * env,const struct cl_io * io,struct lov_lock * lck,int idx,struct lov_lock_link ** out)136 static struct cl_lock *lov_sublock_alloc(const struct lu_env *env,
137 					 const struct cl_io *io,
138 					 struct lov_lock *lck,
139 					 int idx, struct lov_lock_link **out)
140 {
141 	struct cl_lock       *sublock;
142 	struct cl_lock       *parent;
143 	struct lov_lock_link *link;
144 
145 	LASSERT(idx < lck->lls_nr);
146 
147 	link = kmem_cache_alloc(lov_lock_link_kmem, GFP_NOFS | __GFP_ZERO);
148 	if (link != NULL) {
149 		struct lov_sublock_env *subenv;
150 		struct lov_lock_sub  *lls;
151 		struct cl_lock_descr *descr;
152 
153 		parent = lck->lls_cl.cls_lock;
154 		lls    = &lck->lls_sub[idx];
155 		descr  = &lls->sub_got;
156 
157 		subenv = lov_sublock_env_get(env, parent, lls);
158 		if (!IS_ERR(subenv)) {
159 			/* CAVEAT: Don't try to add a field in lov_lock_sub
160 			 * to remember the subio. This is because lock is able
161 			 * to be cached, but this is not true for IO. This
162 			 * further means a sublock might be referenced in
163 			 * different io context. -jay */
164 
165 			sublock = cl_lock_hold(subenv->lse_env, subenv->lse_io,
166 					       descr, "lov-parent", parent);
167 			lov_sublock_env_put(subenv);
168 		} else {
169 			/* error occurs. */
170 			sublock = (void *)subenv;
171 		}
172 
173 		if (!IS_ERR(sublock))
174 			*out = link;
175 		else
176 			kmem_cache_free(lov_lock_link_kmem, link);
177 	} else
178 		sublock = ERR_PTR(-ENOMEM);
179 	return sublock;
180 }
181 
lov_sublock_unlock(const struct lu_env * env,struct lovsub_lock * lsl,struct cl_lock_closure * closure,struct lov_sublock_env * subenv)182 static void lov_sublock_unlock(const struct lu_env *env,
183 			       struct lovsub_lock *lsl,
184 			       struct cl_lock_closure *closure,
185 			       struct lov_sublock_env *subenv)
186 {
187 	lov_sublock_env_put(subenv);
188 	lsl->lss_active = NULL;
189 	cl_lock_disclosure(env, closure);
190 }
191 
lov_sublock_lock(const struct lu_env * env,struct lov_lock * lck,struct lov_lock_sub * lls,struct cl_lock_closure * closure,struct lov_sublock_env ** lsep)192 static int lov_sublock_lock(const struct lu_env *env,
193 			    struct lov_lock *lck,
194 			    struct lov_lock_sub *lls,
195 			    struct cl_lock_closure *closure,
196 			    struct lov_sublock_env **lsep)
197 {
198 	struct lovsub_lock *sublock;
199 	struct cl_lock     *child;
200 	int		 result = 0;
201 
202 	LASSERT(list_empty(&closure->clc_list));
203 
204 	sublock = lls->sub_lock;
205 	child = sublock->lss_cl.cls_lock;
206 	result = cl_lock_closure_build(env, child, closure);
207 	if (result == 0) {
208 		struct cl_lock *parent = closure->clc_origin;
209 
210 		LASSERT(cl_lock_is_mutexed(child));
211 		sublock->lss_active = parent;
212 
213 		if (unlikely((child->cll_state == CLS_FREEING) ||
214 			     (child->cll_flags & CLF_CANCELLED))) {
215 			struct lov_lock_link *link;
216 			/*
217 			 * we could race with lock deletion which temporarily
218 			 * put the lock in freeing state, bug 19080.
219 			 */
220 			LASSERT(!(lls->sub_flags & LSF_HELD));
221 
222 			link = lov_lock_link_find(env, lck, sublock);
223 			LASSERT(link != NULL);
224 			lov_lock_unlink(env, link, sublock);
225 			lov_sublock_unlock(env, sublock, closure, NULL);
226 			lck->lls_cancel_race = 1;
227 			result = CLO_REPEAT;
228 		} else if (lsep) {
229 			struct lov_sublock_env *subenv;
230 
231 			subenv = lov_sublock_env_get(env, parent, lls);
232 			if (IS_ERR(subenv)) {
233 				lov_sublock_unlock(env, sublock,
234 						   closure, NULL);
235 				result = PTR_ERR(subenv);
236 			} else {
237 				*lsep = subenv;
238 			}
239 		}
240 	}
241 	return result;
242 }
243 
244 /**
245  * Updates the result of a top-lock operation from a result of sub-lock
246  * sub-operations. Top-operations like lov_lock_{enqueue,use,unuse}() iterate
247  * over sub-locks and lov_subresult() is used to calculate return value of a
248  * top-operation. To this end, possible return values of sub-operations are
249  * ordered as
250  *
251  *     - 0		  success
252  *     - CLO_WAIT	   wait for event
253  *     - CLO_REPEAT	 repeat top-operation
254  *     - -ne		fundamental error
255  *
256  * Top-level return code can only go down through this list. CLO_REPEAT
257  * overwrites CLO_WAIT, because lock mutex was released and sleeping condition
258  * has to be rechecked by the upper layer.
259  */
lov_subresult(int result,int rc)260 static int lov_subresult(int result, int rc)
261 {
262 	int result_rank;
263 	int rc_rank;
264 
265 	LASSERTF(result <= 0 || result == CLO_REPEAT || result == CLO_WAIT,
266 		 "result = %d", result);
267 	LASSERTF(rc <= 0 || rc == CLO_REPEAT || rc == CLO_WAIT,
268 		 "rc = %d\n", rc);
269 	CLASSERT(CLO_WAIT < CLO_REPEAT);
270 
271 	/* calculate ranks in the ordering above */
272 	result_rank = result < 0 ? 1 + CLO_REPEAT : result;
273 	rc_rank = rc < 0 ? 1 + CLO_REPEAT : rc;
274 
275 	if (result_rank < rc_rank)
276 		result = rc;
277 	return result;
278 }
279 
280 /**
281  * Creates sub-locks for a given lov_lock for the first time.
282  *
283  * Goes through all sub-objects of top-object, and creates sub-locks on every
284  * sub-object intersecting with top-lock extent. This is complicated by the
285  * fact that top-lock (that is being created) can be accessed concurrently
286  * through already created sub-locks (possibly shared with other top-locks).
287  */
lov_lock_sub_init(const struct lu_env * env,struct lov_lock * lck,const struct cl_io * io)288 static int lov_lock_sub_init(const struct lu_env *env,
289 			     struct lov_lock *lck, const struct cl_io *io)
290 {
291 	int result = 0;
292 	int i;
293 	int nr;
294 	u64 start;
295 	u64 end;
296 	u64 file_start;
297 	u64 file_end;
298 
299 	struct lov_object       *loo    = cl2lov(lck->lls_cl.cls_obj);
300 	struct lov_layout_raid0 *r0     = lov_r0(loo);
301 	struct cl_lock	  *parent = lck->lls_cl.cls_lock;
302 
303 	lck->lls_orig = parent->cll_descr;
304 	file_start = cl_offset(lov2cl(loo), parent->cll_descr.cld_start);
305 	file_end   = cl_offset(lov2cl(loo), parent->cll_descr.cld_end + 1) - 1;
306 
307 	for (i = 0, nr = 0; i < r0->lo_nr; i++) {
308 		/*
309 		 * XXX for wide striping smarter algorithm is desirable,
310 		 * breaking out of the loop, early.
311 		 */
312 		if (likely(r0->lo_sub[i] != NULL) &&
313 		    lov_stripe_intersects(loo->lo_lsm, i,
314 					  file_start, file_end, &start, &end))
315 			nr++;
316 	}
317 	LASSERT(nr > 0);
318 	lck->lls_sub = libcfs_kvzalloc(nr * sizeof(lck->lls_sub[0]), GFP_NOFS);
319 	if (lck->lls_sub == NULL)
320 		return -ENOMEM;
321 
322 	lck->lls_nr = nr;
323 	/*
324 	 * First, fill in sub-lock descriptions in
325 	 * lck->lls_sub[].sub_descr. They are used by lov_sublock_alloc()
326 	 * (called below in this function, and by lov_lock_enqueue()) to
327 	 * create sub-locks. At this moment, no other thread can access
328 	 * top-lock.
329 	 */
330 	for (i = 0, nr = 0; i < r0->lo_nr; ++i) {
331 		if (likely(r0->lo_sub[i] != NULL) &&
332 		    lov_stripe_intersects(loo->lo_lsm, i,
333 					  file_start, file_end, &start, &end)) {
334 			struct cl_lock_descr *descr;
335 
336 			descr = &lck->lls_sub[nr].sub_descr;
337 
338 			LASSERT(descr->cld_obj == NULL);
339 			descr->cld_obj   = lovsub2cl(r0->lo_sub[i]);
340 			descr->cld_start = cl_index(descr->cld_obj, start);
341 			descr->cld_end   = cl_index(descr->cld_obj, end);
342 			descr->cld_mode  = parent->cll_descr.cld_mode;
343 			descr->cld_gid   = parent->cll_descr.cld_gid;
344 			descr->cld_enq_flags   = parent->cll_descr.cld_enq_flags;
345 			/* XXX has no effect */
346 			lck->lls_sub[nr].sub_got = *descr;
347 			lck->lls_sub[nr].sub_stripe = i;
348 			nr++;
349 		}
350 	}
351 	LASSERT(nr == lck->lls_nr);
352 
353 	/*
354 	 * Some sub-locks can be missing at this point. This is not a problem,
355 	 * because enqueue will create them anyway. Main duty of this function
356 	 * is to fill in sub-lock descriptions in a race free manner.
357 	 */
358 	return result;
359 }
360 
lov_sublock_release(const struct lu_env * env,struct lov_lock * lck,int i,int deluser,int rc)361 static int lov_sublock_release(const struct lu_env *env, struct lov_lock *lck,
362 			       int i, int deluser, int rc)
363 {
364 	struct cl_lock *parent = lck->lls_cl.cls_lock;
365 
366 	LASSERT(cl_lock_is_mutexed(parent));
367 
368 	if (lck->lls_sub[i].sub_flags & LSF_HELD) {
369 		struct cl_lock    *sublock;
370 		int dying;
371 
372 		LASSERT(lck->lls_sub[i].sub_lock != NULL);
373 		sublock = lck->lls_sub[i].sub_lock->lss_cl.cls_lock;
374 		LASSERT(cl_lock_is_mutexed(sublock));
375 
376 		lck->lls_sub[i].sub_flags &= ~LSF_HELD;
377 		if (deluser)
378 			cl_lock_user_del(env, sublock);
379 		/*
380 		 * If the last hold is released, and cancellation is pending
381 		 * for a sub-lock, release parent mutex, to avoid keeping it
382 		 * while sub-lock is being paged out.
383 		 */
384 		dying = (sublock->cll_descr.cld_mode == CLM_PHANTOM ||
385 			 sublock->cll_descr.cld_mode == CLM_GROUP ||
386 			 (sublock->cll_flags & (CLF_CANCELPEND|CLF_DOOMED))) &&
387 			sublock->cll_holds == 1;
388 		if (dying)
389 			cl_lock_mutex_put(env, parent);
390 		cl_lock_unhold(env, sublock, "lov-parent", parent);
391 		if (dying) {
392 			cl_lock_mutex_get(env, parent);
393 			rc = lov_subresult(rc, CLO_REPEAT);
394 		}
395 		/*
396 		 * From now on lck->lls_sub[i].sub_lock is a "weak" pointer,
397 		 * not backed by a reference on a
398 		 * sub-lock. lovsub_lock_delete() will clear
399 		 * lck->lls_sub[i].sub_lock under semaphores, just before
400 		 * sub-lock is destroyed.
401 		 */
402 	}
403 	return rc;
404 }
405 
lov_sublock_hold(const struct lu_env * env,struct lov_lock * lck,int i)406 static void lov_sublock_hold(const struct lu_env *env, struct lov_lock *lck,
407 			     int i)
408 {
409 	struct cl_lock *parent = lck->lls_cl.cls_lock;
410 
411 	LASSERT(cl_lock_is_mutexed(parent));
412 
413 	if (!(lck->lls_sub[i].sub_flags & LSF_HELD)) {
414 		struct cl_lock *sublock;
415 
416 		LASSERT(lck->lls_sub[i].sub_lock != NULL);
417 		sublock = lck->lls_sub[i].sub_lock->lss_cl.cls_lock;
418 		LASSERT(cl_lock_is_mutexed(sublock));
419 		LASSERT(sublock->cll_state != CLS_FREEING);
420 
421 		lck->lls_sub[i].sub_flags |= LSF_HELD;
422 
423 		cl_lock_get_trust(sublock);
424 		cl_lock_hold_add(env, sublock, "lov-parent", parent);
425 		cl_lock_user_add(env, sublock);
426 		cl_lock_put(env, sublock);
427 	}
428 }
429 
lov_lock_fini(const struct lu_env * env,struct cl_lock_slice * slice)430 static void lov_lock_fini(const struct lu_env *env,
431 			  struct cl_lock_slice *slice)
432 {
433 	struct lov_lock *lck;
434 	int i;
435 
436 	lck = cl2lov_lock(slice);
437 	LASSERT(lck->lls_nr_filled == 0);
438 	if (lck->lls_sub != NULL) {
439 		for (i = 0; i < lck->lls_nr; ++i)
440 			/*
441 			 * No sub-locks exists at this point, as sub-lock has
442 			 * a reference on its parent.
443 			 */
444 			LASSERT(lck->lls_sub[i].sub_lock == NULL);
445 		kvfree(lck->lls_sub);
446 	}
447 	kmem_cache_free(lov_lock_kmem, lck);
448 }
449 
lov_lock_enqueue_wait(const struct lu_env * env,struct lov_lock * lck,struct cl_lock * sublock)450 static int lov_lock_enqueue_wait(const struct lu_env *env,
451 				 struct lov_lock *lck,
452 				 struct cl_lock *sublock)
453 {
454 	struct cl_lock *lock = lck->lls_cl.cls_lock;
455 	int	     result;
456 
457 	LASSERT(cl_lock_is_mutexed(lock));
458 
459 	cl_lock_mutex_put(env, lock);
460 	result = cl_lock_enqueue_wait(env, sublock, 0);
461 	cl_lock_mutex_get(env, lock);
462 	return result ?: CLO_REPEAT;
463 }
464 
465 /**
466  * Tries to advance a state machine of a given sub-lock toward enqueuing of
467  * the top-lock.
468  *
469  * \retval 0 if state-transition can proceed
470  * \retval -ve otherwise.
471  */
lov_lock_enqueue_one(const struct lu_env * env,struct lov_lock * lck,struct cl_lock * sublock,struct cl_io * io,__u32 enqflags,int last)472 static int lov_lock_enqueue_one(const struct lu_env *env, struct lov_lock *lck,
473 				struct cl_lock *sublock,
474 				struct cl_io *io, __u32 enqflags, int last)
475 {
476 	int result;
477 
478 	/* first, try to enqueue a sub-lock ... */
479 	result = cl_enqueue_try(env, sublock, io, enqflags);
480 	if ((sublock->cll_state == CLS_ENQUEUED) && !(enqflags & CEF_AGL)) {
481 		/* if it is enqueued, try to `wait' on it---maybe it's already
482 		 * granted */
483 		result = cl_wait_try(env, sublock);
484 		if (result == CLO_REENQUEUED)
485 			result = CLO_WAIT;
486 	}
487 	/*
488 	 * If CEF_ASYNC flag is set, then all sub-locks can be enqueued in
489 	 * parallel, otherwise---enqueue has to wait until sub-lock is granted
490 	 * before proceeding to the next one.
491 	 */
492 	if ((result == CLO_WAIT) && (sublock->cll_state <= CLS_HELD) &&
493 	    (enqflags & CEF_ASYNC) && (!last || (enqflags & CEF_AGL)))
494 		result = 0;
495 	return result;
496 }
497 
498 /**
499  * Helper function for lov_lock_enqueue() that creates missing sub-lock.
500  */
lov_sublock_fill(const struct lu_env * env,struct cl_lock * parent,struct cl_io * io,struct lov_lock * lck,int idx)501 static int lov_sublock_fill(const struct lu_env *env, struct cl_lock *parent,
502 			    struct cl_io *io, struct lov_lock *lck, int idx)
503 {
504 	struct lov_lock_link *link = NULL;
505 	struct cl_lock       *sublock;
506 	int		   result;
507 
508 	LASSERT(parent->cll_depth == 1);
509 	cl_lock_mutex_put(env, parent);
510 	sublock = lov_sublock_alloc(env, io, lck, idx, &link);
511 	if (!IS_ERR(sublock))
512 		cl_lock_mutex_get(env, sublock);
513 	cl_lock_mutex_get(env, parent);
514 
515 	if (!IS_ERR(sublock)) {
516 		cl_lock_get_trust(sublock);
517 		if (parent->cll_state == CLS_QUEUING &&
518 		    lck->lls_sub[idx].sub_lock == NULL) {
519 			lov_sublock_adopt(env, lck, sublock, idx, link);
520 		} else {
521 			kmem_cache_free(lov_lock_link_kmem, link);
522 			/* other thread allocated sub-lock, or enqueue is no
523 			 * longer going on */
524 			cl_lock_mutex_put(env, parent);
525 			cl_lock_unhold(env, sublock, "lov-parent", parent);
526 			cl_lock_mutex_get(env, parent);
527 		}
528 		cl_lock_mutex_put(env, sublock);
529 		cl_lock_put(env, sublock);
530 		result = CLO_REPEAT;
531 	} else
532 		result = PTR_ERR(sublock);
533 	return result;
534 }
535 
536 /**
537  * Implementation of cl_lock_operations::clo_enqueue() for lov layer. This
538  * function is rather subtle, as it enqueues top-lock (i.e., advances top-lock
539  * state machine from CLS_QUEUING to CLS_ENQUEUED states) by juggling sub-lock
540  * state machines in the face of sub-locks sharing (by multiple top-locks),
541  * and concurrent sub-lock cancellations.
542  */
lov_lock_enqueue(const struct lu_env * env,const struct cl_lock_slice * slice,struct cl_io * io,__u32 enqflags)543 static int lov_lock_enqueue(const struct lu_env *env,
544 			    const struct cl_lock_slice *slice,
545 			    struct cl_io *io, __u32 enqflags)
546 {
547 	struct cl_lock	 *lock    = slice->cls_lock;
548 	struct lov_lock	*lck     = cl2lov_lock(slice);
549 	struct cl_lock_closure *closure = lov_closure_get(env, lock);
550 	int i;
551 	int result;
552 	enum cl_lock_state minstate;
553 
554 	for (result = 0, minstate = CLS_FREEING, i = 0; i < lck->lls_nr; ++i) {
555 		int rc;
556 		struct lovsub_lock     *sub;
557 		struct lov_lock_sub    *lls;
558 		struct cl_lock	 *sublock;
559 		struct lov_sublock_env *subenv;
560 
561 		if (lock->cll_state != CLS_QUEUING) {
562 			/*
563 			 * Lock might have left QUEUING state if previous
564 			 * iteration released its mutex. Stop enqueing in this
565 			 * case and let the upper layer to decide what to do.
566 			 */
567 			LASSERT(i > 0 && result != 0);
568 			break;
569 		}
570 
571 		lls = &lck->lls_sub[i];
572 		sub = lls->sub_lock;
573 		/*
574 		 * Sub-lock might have been canceled, while top-lock was
575 		 * cached.
576 		 */
577 		if (sub == NULL) {
578 			result = lov_sublock_fill(env, lock, io, lck, i);
579 			/* lov_sublock_fill() released @lock mutex,
580 			 * restart. */
581 			break;
582 		}
583 		sublock = sub->lss_cl.cls_lock;
584 		rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
585 		if (rc == 0) {
586 			lov_sublock_hold(env, lck, i);
587 			rc = lov_lock_enqueue_one(subenv->lse_env, lck, sublock,
588 						  subenv->lse_io, enqflags,
589 						  i == lck->lls_nr - 1);
590 			minstate = min(minstate, sublock->cll_state);
591 			if (rc == CLO_WAIT) {
592 				switch (sublock->cll_state) {
593 				case CLS_QUEUING:
594 					/* take recursive mutex, the lock is
595 					 * released in lov_lock_enqueue_wait.
596 					 */
597 					cl_lock_mutex_get(env, sublock);
598 					lov_sublock_unlock(env, sub, closure,
599 							   subenv);
600 					rc = lov_lock_enqueue_wait(env, lck,
601 								   sublock);
602 					break;
603 				case CLS_CACHED:
604 					cl_lock_get(sublock);
605 					/* take recursive mutex of sublock */
606 					cl_lock_mutex_get(env, sublock);
607 					/* need to release all locks in closure
608 					 * otherwise it may deadlock. LU-2683.*/
609 					lov_sublock_unlock(env, sub, closure,
610 							   subenv);
611 					/* sublock and parent are held. */
612 					rc = lov_sublock_release(env, lck, i,
613 								 1, rc);
614 					cl_lock_mutex_put(env, sublock);
615 					cl_lock_put(env, sublock);
616 					break;
617 				default:
618 					lov_sublock_unlock(env, sub, closure,
619 							   subenv);
620 					break;
621 				}
622 			} else {
623 				LASSERT(sublock->cll_conflict == NULL);
624 				lov_sublock_unlock(env, sub, closure, subenv);
625 			}
626 		}
627 		result = lov_subresult(result, rc);
628 		if (result != 0)
629 			break;
630 	}
631 	cl_lock_closure_fini(closure);
632 	return result ?: minstate >= CLS_ENQUEUED ? 0 : CLO_WAIT;
633 }
634 
lov_lock_unuse(const struct lu_env * env,const struct cl_lock_slice * slice)635 static int lov_lock_unuse(const struct lu_env *env,
636 			  const struct cl_lock_slice *slice)
637 {
638 	struct lov_lock	*lck     = cl2lov_lock(slice);
639 	struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
640 	int i;
641 	int result;
642 
643 	for (result = 0, i = 0; i < lck->lls_nr; ++i) {
644 		int rc;
645 		struct lovsub_lock     *sub;
646 		struct cl_lock	 *sublock;
647 		struct lov_lock_sub    *lls;
648 		struct lov_sublock_env *subenv;
649 
650 		/* top-lock state cannot change concurrently, because single
651 		 * thread (one that released the last hold) carries unlocking
652 		 * to the completion. */
653 		LASSERT(slice->cls_lock->cll_state == CLS_INTRANSIT);
654 		lls = &lck->lls_sub[i];
655 		sub = lls->sub_lock;
656 		if (sub == NULL)
657 			continue;
658 
659 		sublock = sub->lss_cl.cls_lock;
660 		rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
661 		if (rc == 0) {
662 			if (lls->sub_flags & LSF_HELD) {
663 				LASSERT(sublock->cll_state == CLS_HELD ||
664 					sublock->cll_state == CLS_ENQUEUED);
665 				rc = cl_unuse_try(subenv->lse_env, sublock);
666 				rc = lov_sublock_release(env, lck, i, 0, rc);
667 			}
668 			lov_sublock_unlock(env, sub, closure, subenv);
669 		}
670 		result = lov_subresult(result, rc);
671 	}
672 
673 	if (result == 0 && lck->lls_cancel_race) {
674 		lck->lls_cancel_race = 0;
675 		result = -ESTALE;
676 	}
677 	cl_lock_closure_fini(closure);
678 	return result;
679 }
680 
lov_lock_cancel(const struct lu_env * env,const struct cl_lock_slice * slice)681 static void lov_lock_cancel(const struct lu_env *env,
682 			   const struct cl_lock_slice *slice)
683 {
684 	struct lov_lock	*lck     = cl2lov_lock(slice);
685 	struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
686 	int i;
687 	int result;
688 
689 	for (result = 0, i = 0; i < lck->lls_nr; ++i) {
690 		int rc;
691 		struct lovsub_lock     *sub;
692 		struct cl_lock	 *sublock;
693 		struct lov_lock_sub    *lls;
694 		struct lov_sublock_env *subenv;
695 
696 		/* top-lock state cannot change concurrently, because single
697 		 * thread (one that released the last hold) carries unlocking
698 		 * to the completion. */
699 		lls = &lck->lls_sub[i];
700 		sub = lls->sub_lock;
701 		if (sub == NULL)
702 			continue;
703 
704 		sublock = sub->lss_cl.cls_lock;
705 		rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
706 		if (rc == 0) {
707 			if (!(lls->sub_flags & LSF_HELD)) {
708 				lov_sublock_unlock(env, sub, closure, subenv);
709 				continue;
710 			}
711 
712 			switch (sublock->cll_state) {
713 			case CLS_HELD:
714 				rc = cl_unuse_try(subenv->lse_env, sublock);
715 				lov_sublock_release(env, lck, i, 0, 0);
716 				break;
717 			default:
718 				lov_sublock_release(env, lck, i, 1, 0);
719 				break;
720 			}
721 			lov_sublock_unlock(env, sub, closure, subenv);
722 		}
723 
724 		if (rc == CLO_REPEAT) {
725 			--i;
726 			continue;
727 		}
728 
729 		result = lov_subresult(result, rc);
730 	}
731 
732 	if (result)
733 		CL_LOCK_DEBUG(D_ERROR, env, slice->cls_lock,
734 			      "lov_lock_cancel fails with %d.\n", result);
735 
736 	cl_lock_closure_fini(closure);
737 }
738 
lov_lock_wait(const struct lu_env * env,const struct cl_lock_slice * slice)739 static int lov_lock_wait(const struct lu_env *env,
740 			 const struct cl_lock_slice *slice)
741 {
742 	struct lov_lock	*lck     = cl2lov_lock(slice);
743 	struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
744 	enum cl_lock_state      minstate;
745 	int		     reenqueued;
746 	int		     result;
747 	int		     i;
748 
749 again:
750 	for (result = 0, minstate = CLS_FREEING, i = 0, reenqueued = 0;
751 	     i < lck->lls_nr; ++i) {
752 		int rc;
753 		struct lovsub_lock     *sub;
754 		struct cl_lock	 *sublock;
755 		struct lov_lock_sub    *lls;
756 		struct lov_sublock_env *subenv;
757 
758 		lls = &lck->lls_sub[i];
759 		sub = lls->sub_lock;
760 		LASSERT(sub != NULL);
761 		sublock = sub->lss_cl.cls_lock;
762 		rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
763 		if (rc == 0) {
764 			LASSERT(sublock->cll_state >= CLS_ENQUEUED);
765 			if (sublock->cll_state < CLS_HELD)
766 				rc = cl_wait_try(env, sublock);
767 
768 			minstate = min(minstate, sublock->cll_state);
769 			lov_sublock_unlock(env, sub, closure, subenv);
770 		}
771 		if (rc == CLO_REENQUEUED) {
772 			reenqueued++;
773 			rc = 0;
774 		}
775 		result = lov_subresult(result, rc);
776 		if (result != 0)
777 			break;
778 	}
779 	/* Each sublock only can be reenqueued once, so will not loop for
780 	 * ever. */
781 	if (result == 0 && reenqueued != 0)
782 		goto again;
783 	cl_lock_closure_fini(closure);
784 	return result ?: minstate >= CLS_HELD ? 0 : CLO_WAIT;
785 }
786 
lov_lock_use(const struct lu_env * env,const struct cl_lock_slice * slice)787 static int lov_lock_use(const struct lu_env *env,
788 			const struct cl_lock_slice *slice)
789 {
790 	struct lov_lock	*lck     = cl2lov_lock(slice);
791 	struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
792 	int		     result;
793 	int		     i;
794 
795 	LASSERT(slice->cls_lock->cll_state == CLS_INTRANSIT);
796 
797 	for (result = 0, i = 0; i < lck->lls_nr; ++i) {
798 		int rc;
799 		struct lovsub_lock     *sub;
800 		struct cl_lock	 *sublock;
801 		struct lov_lock_sub    *lls;
802 		struct lov_sublock_env *subenv;
803 
804 		LASSERT(slice->cls_lock->cll_state == CLS_INTRANSIT);
805 
806 		lls = &lck->lls_sub[i];
807 		sub = lls->sub_lock;
808 		if (sub == NULL) {
809 			/*
810 			 * Sub-lock might have been canceled, while top-lock was
811 			 * cached.
812 			 */
813 			result = -ESTALE;
814 			break;
815 		}
816 
817 		sublock = sub->lss_cl.cls_lock;
818 		rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
819 		if (rc == 0) {
820 			LASSERT(sublock->cll_state != CLS_FREEING);
821 			lov_sublock_hold(env, lck, i);
822 			if (sublock->cll_state == CLS_CACHED) {
823 				rc = cl_use_try(subenv->lse_env, sublock, 0);
824 				if (rc != 0)
825 					rc = lov_sublock_release(env, lck,
826 								 i, 1, rc);
827 			} else if (sublock->cll_state == CLS_NEW) {
828 				/* Sub-lock might have been canceled, while
829 				 * top-lock was cached. */
830 				result = -ESTALE;
831 				lov_sublock_release(env, lck, i, 1, result);
832 			}
833 			lov_sublock_unlock(env, sub, closure, subenv);
834 		}
835 		result = lov_subresult(result, rc);
836 		if (result != 0)
837 			break;
838 	}
839 
840 	if (lck->lls_cancel_race) {
841 		/*
842 		 * If there is unlocking happened at the same time, then
843 		 * sublock_lock state should be FREEING, and lov_sublock_lock
844 		 * should return CLO_REPEAT. In this case, it should return
845 		 * ESTALE, and up layer should reset the lock state to be NEW.
846 		 */
847 		lck->lls_cancel_race = 0;
848 		LASSERT(result != 0);
849 		result = -ESTALE;
850 	}
851 	cl_lock_closure_fini(closure);
852 	return result;
853 }
854 
855 #if 0
856 static int lock_lock_multi_match()
857 {
858 	struct cl_lock	  *lock    = slice->cls_lock;
859 	struct cl_lock_descr    *subneed = &lov_env_info(env)->lti_ldescr;
860 	struct lov_object       *loo     = cl2lov(lov->lls_cl.cls_obj);
861 	struct lov_layout_raid0 *r0      = lov_r0(loo);
862 	struct lov_lock_sub     *sub;
863 	struct cl_object	*subobj;
864 	u64  fstart;
865 	u64  fend;
866 	u64  start;
867 	u64  end;
868 	int i;
869 
870 	fstart = cl_offset(need->cld_obj, need->cld_start);
871 	fend   = cl_offset(need->cld_obj, need->cld_end + 1) - 1;
872 	subneed->cld_mode = need->cld_mode;
873 	cl_lock_mutex_get(env, lock);
874 	for (i = 0; i < lov->lls_nr; ++i) {
875 		sub = &lov->lls_sub[i];
876 		if (sub->sub_lock == NULL)
877 			continue;
878 		subobj = sub->sub_descr.cld_obj;
879 		if (!lov_stripe_intersects(loo->lo_lsm, sub->sub_stripe,
880 					   fstart, fend, &start, &end))
881 			continue;
882 		subneed->cld_start = cl_index(subobj, start);
883 		subneed->cld_end   = cl_index(subobj, end);
884 		subneed->cld_obj   = subobj;
885 		if (!cl_lock_ext_match(&sub->sub_got, subneed)) {
886 			result = 0;
887 			break;
888 		}
889 	}
890 	cl_lock_mutex_put(env, lock);
891 }
892 #endif
893 
894 /**
895  * Check if the extent region \a descr is covered by \a child against the
896  * specific \a stripe.
897  */
lov_lock_stripe_is_matching(const struct lu_env * env,struct lov_object * lov,int stripe,const struct cl_lock_descr * child,const struct cl_lock_descr * descr)898 static int lov_lock_stripe_is_matching(const struct lu_env *env,
899 				       struct lov_object *lov, int stripe,
900 				       const struct cl_lock_descr *child,
901 				       const struct cl_lock_descr *descr)
902 {
903 	struct lov_stripe_md *lsm = lov->lo_lsm;
904 	u64 start;
905 	u64 end;
906 	int result;
907 
908 	if (lov_r0(lov)->lo_nr == 1)
909 		return cl_lock_ext_match(child, descr);
910 
911 	/*
912 	 * For a multi-stripes object:
913 	 * - make sure the descr only covers child's stripe, and
914 	 * - check if extent is matching.
915 	 */
916 	start = cl_offset(&lov->lo_cl, descr->cld_start);
917 	end   = cl_offset(&lov->lo_cl, descr->cld_end + 1) - 1;
918 	result = 0;
919 	/* glimpse should work on the object with LOV EA hole. */
920 	if (end - start <= lsm->lsm_stripe_size) {
921 		int idx;
922 
923 		idx = lov_stripe_number(lsm, start);
924 		if (idx == stripe ||
925 		    unlikely(lov_r0(lov)->lo_sub[idx] == NULL)) {
926 			idx = lov_stripe_number(lsm, end);
927 			if (idx == stripe ||
928 			    unlikely(lov_r0(lov)->lo_sub[idx] == NULL))
929 				result = 1;
930 		}
931 	}
932 
933 	if (result != 0) {
934 		struct cl_lock_descr *subd = &lov_env_info(env)->lti_ldescr;
935 		u64 sub_start;
936 		u64 sub_end;
937 
938 		subd->cld_obj  = NULL;   /* don't need sub object at all */
939 		subd->cld_mode = descr->cld_mode;
940 		subd->cld_gid  = descr->cld_gid;
941 		result = lov_stripe_intersects(lsm, stripe, start, end,
942 					       &sub_start, &sub_end);
943 		LASSERT(result);
944 		subd->cld_start = cl_index(child->cld_obj, sub_start);
945 		subd->cld_end   = cl_index(child->cld_obj, sub_end);
946 		result = cl_lock_ext_match(child, subd);
947 	}
948 	return result;
949 }
950 
951 /**
952  * An implementation of cl_lock_operations::clo_fits_into() method.
953  *
954  * Checks whether a lock (given by \a slice) is suitable for \a
955  * io. Multi-stripe locks can be used only for "quick" io, like truncate, or
956  * O_APPEND write.
957  *
958  * \see ccc_lock_fits_into().
959  */
lov_lock_fits_into(const struct lu_env * env,const struct cl_lock_slice * slice,const struct cl_lock_descr * need,const struct cl_io * io)960 static int lov_lock_fits_into(const struct lu_env *env,
961 			      const struct cl_lock_slice *slice,
962 			      const struct cl_lock_descr *need,
963 			      const struct cl_io *io)
964 {
965 	struct lov_lock   *lov = cl2lov_lock(slice);
966 	struct lov_object *obj = cl2lov(slice->cls_obj);
967 	int result;
968 
969 	LASSERT(cl_object_same(need->cld_obj, slice->cls_obj));
970 	LASSERT(lov->lls_nr > 0);
971 
972 	/* for top lock, it's necessary to match enq flags otherwise it will
973 	 * run into problem if a sublock is missing and reenqueue. */
974 	if (need->cld_enq_flags != lov->lls_orig.cld_enq_flags)
975 		return 0;
976 
977 	if (need->cld_mode == CLM_GROUP)
978 		/*
979 		 * always allow to match group lock.
980 		 */
981 		result = cl_lock_ext_match(&lov->lls_orig, need);
982 	else if (lov->lls_nr == 1) {
983 		struct cl_lock_descr *got = &lov->lls_sub[0].sub_got;
984 
985 		result = lov_lock_stripe_is_matching(env,
986 						     cl2lov(slice->cls_obj),
987 						     lov->lls_sub[0].sub_stripe,
988 						     got, need);
989 	} else if (io->ci_type != CIT_SETATTR && io->ci_type != CIT_MISC &&
990 		   !cl_io_is_append(io) && need->cld_mode != CLM_PHANTOM)
991 		/*
992 		 * Multi-stripe locks are only suitable for `quick' IO and for
993 		 * glimpse.
994 		 */
995 		result = 0;
996 	else
997 		/*
998 		 * Most general case: multi-stripe existing lock, and
999 		 * (potentially) multi-stripe @need lock. Check that @need is
1000 		 * covered by @lov's sub-locks.
1001 		 *
1002 		 * For now, ignore lock expansions made by the server, and
1003 		 * match against original lock extent.
1004 		 */
1005 		result = cl_lock_ext_match(&lov->lls_orig, need);
1006 	CDEBUG(D_DLMTRACE, DDESCR"/"DDESCR" %d %d/%d: %d\n",
1007 	       PDESCR(&lov->lls_orig), PDESCR(&lov->lls_sub[0].sub_got),
1008 	       lov->lls_sub[0].sub_stripe, lov->lls_nr, lov_r0(obj)->lo_nr,
1009 	       result);
1010 	return result;
1011 }
1012 
lov_lock_unlink(const struct lu_env * env,struct lov_lock_link * link,struct lovsub_lock * sub)1013 void lov_lock_unlink(const struct lu_env *env,
1014 		     struct lov_lock_link *link, struct lovsub_lock *sub)
1015 {
1016 	struct lov_lock *lck    = link->lll_super;
1017 	struct cl_lock  *parent = lck->lls_cl.cls_lock;
1018 
1019 	LASSERT(cl_lock_is_mutexed(parent));
1020 	LASSERT(cl_lock_is_mutexed(sub->lss_cl.cls_lock));
1021 
1022 	list_del_init(&link->lll_list);
1023 	LASSERT(lck->lls_sub[link->lll_idx].sub_lock == sub);
1024 	/* yank this sub-lock from parent's array */
1025 	lck->lls_sub[link->lll_idx].sub_lock = NULL;
1026 	LASSERT(lck->lls_nr_filled > 0);
1027 	lck->lls_nr_filled--;
1028 	lu_ref_del(&parent->cll_reference, "lov-child", sub->lss_cl.cls_lock);
1029 	cl_lock_put(env, parent);
1030 	kmem_cache_free(lov_lock_link_kmem, link);
1031 }
1032 
lov_lock_link_find(const struct lu_env * env,struct lov_lock * lck,struct lovsub_lock * sub)1033 struct lov_lock_link *lov_lock_link_find(const struct lu_env *env,
1034 					 struct lov_lock *lck,
1035 					 struct lovsub_lock *sub)
1036 {
1037 	struct lov_lock_link *scan;
1038 
1039 	LASSERT(cl_lock_is_mutexed(sub->lss_cl.cls_lock));
1040 
1041 	list_for_each_entry(scan, &sub->lss_parents, lll_list) {
1042 		if (scan->lll_super == lck)
1043 			return scan;
1044 	}
1045 	return NULL;
1046 }
1047 
1048 /**
1049  * An implementation of cl_lock_operations::clo_delete() method. This is
1050  * invoked for "top-to-bottom" delete, when lock destruction starts from the
1051  * top-lock, e.g., as a result of inode destruction.
1052  *
1053  * Unlinks top-lock from all its sub-locks. Sub-locks are not deleted there:
1054  * this is done separately elsewhere:
1055  *
1056  *     - for inode destruction, lov_object_delete() calls cl_object_kill() for
1057  *       each sub-object, purging its locks;
1058  *
1059  *     - in other cases (e.g., a fatal error with a top-lock) sub-locks are
1060  *       left in the cache.
1061  */
lov_lock_delete(const struct lu_env * env,const struct cl_lock_slice * slice)1062 static void lov_lock_delete(const struct lu_env *env,
1063 			    const struct cl_lock_slice *slice)
1064 {
1065 	struct lov_lock	*lck     = cl2lov_lock(slice);
1066 	struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
1067 	struct lov_lock_link   *link;
1068 	int		     rc;
1069 	int		     i;
1070 
1071 	LASSERT(slice->cls_lock->cll_state == CLS_FREEING);
1072 
1073 	for (i = 0; i < lck->lls_nr; ++i) {
1074 		struct lov_lock_sub *lls = &lck->lls_sub[i];
1075 		struct lovsub_lock  *lsl = lls->sub_lock;
1076 
1077 		if (lsl == NULL) /* already removed */
1078 			continue;
1079 
1080 		rc = lov_sublock_lock(env, lck, lls, closure, NULL);
1081 		if (rc == CLO_REPEAT) {
1082 			--i;
1083 			continue;
1084 		}
1085 
1086 		LASSERT(rc == 0);
1087 		LASSERT(lsl->lss_cl.cls_lock->cll_state < CLS_FREEING);
1088 
1089 		if (lls->sub_flags & LSF_HELD)
1090 			lov_sublock_release(env, lck, i, 1, 0);
1091 
1092 		link = lov_lock_link_find(env, lck, lsl);
1093 		LASSERT(link != NULL);
1094 		lov_lock_unlink(env, link, lsl);
1095 		LASSERT(lck->lls_sub[i].sub_lock == NULL);
1096 
1097 		lov_sublock_unlock(env, lsl, closure, NULL);
1098 	}
1099 
1100 	cl_lock_closure_fini(closure);
1101 }
1102 
lov_lock_print(const struct lu_env * env,void * cookie,lu_printer_t p,const struct cl_lock_slice * slice)1103 static int lov_lock_print(const struct lu_env *env, void *cookie,
1104 			  lu_printer_t p, const struct cl_lock_slice *slice)
1105 {
1106 	struct lov_lock *lck = cl2lov_lock(slice);
1107 	int	      i;
1108 
1109 	(*p)(env, cookie, "%d\n", lck->lls_nr);
1110 	for (i = 0; i < lck->lls_nr; ++i) {
1111 		struct lov_lock_sub *sub;
1112 
1113 		sub = &lck->lls_sub[i];
1114 		(*p)(env, cookie, "    %d %x: ", i, sub->sub_flags);
1115 		if (sub->sub_lock != NULL)
1116 			cl_lock_print(env, cookie, p,
1117 				      sub->sub_lock->lss_cl.cls_lock);
1118 		else
1119 			(*p)(env, cookie, "---\n");
1120 	}
1121 	return 0;
1122 }
1123 
1124 static const struct cl_lock_operations lov_lock_ops = {
1125 	.clo_fini      = lov_lock_fini,
1126 	.clo_enqueue   = lov_lock_enqueue,
1127 	.clo_wait      = lov_lock_wait,
1128 	.clo_use       = lov_lock_use,
1129 	.clo_unuse     = lov_lock_unuse,
1130 	.clo_cancel    = lov_lock_cancel,
1131 	.clo_fits_into = lov_lock_fits_into,
1132 	.clo_delete    = lov_lock_delete,
1133 	.clo_print     = lov_lock_print
1134 };
1135 
lov_lock_init_raid0(const struct lu_env * env,struct cl_object * obj,struct cl_lock * lock,const struct cl_io * io)1136 int lov_lock_init_raid0(const struct lu_env *env, struct cl_object *obj,
1137 			struct cl_lock *lock, const struct cl_io *io)
1138 {
1139 	struct lov_lock *lck;
1140 	int result;
1141 
1142 	lck = kmem_cache_alloc(lov_lock_kmem, GFP_NOFS | __GFP_ZERO);
1143 	if (lck != NULL) {
1144 		cl_lock_slice_add(lock, &lck->lls_cl, obj, &lov_lock_ops);
1145 		result = lov_lock_sub_init(env, lck, io);
1146 	} else
1147 		result = -ENOMEM;
1148 	return result;
1149 }
1150 
lov_empty_lock_fini(const struct lu_env * env,struct cl_lock_slice * slice)1151 static void lov_empty_lock_fini(const struct lu_env *env,
1152 				struct cl_lock_slice *slice)
1153 {
1154 	struct lov_lock *lck = cl2lov_lock(slice);
1155 
1156 	kmem_cache_free(lov_lock_kmem, lck);
1157 }
1158 
lov_empty_lock_print(const struct lu_env * env,void * cookie,lu_printer_t p,const struct cl_lock_slice * slice)1159 static int lov_empty_lock_print(const struct lu_env *env, void *cookie,
1160 			lu_printer_t p, const struct cl_lock_slice *slice)
1161 {
1162 	(*p)(env, cookie, "empty\n");
1163 	return 0;
1164 }
1165 
1166 /* XXX: more methods will be added later. */
1167 static const struct cl_lock_operations lov_empty_lock_ops = {
1168 	.clo_fini  = lov_empty_lock_fini,
1169 	.clo_print = lov_empty_lock_print
1170 };
1171 
lov_lock_init_empty(const struct lu_env * env,struct cl_object * obj,struct cl_lock * lock,const struct cl_io * io)1172 int lov_lock_init_empty(const struct lu_env *env, struct cl_object *obj,
1173 		struct cl_lock *lock, const struct cl_io *io)
1174 {
1175 	struct lov_lock *lck;
1176 	int result = -ENOMEM;
1177 
1178 	lck = kmem_cache_alloc(lov_lock_kmem, GFP_NOFS | __GFP_ZERO);
1179 	if (lck != NULL) {
1180 		cl_lock_slice_add(lock, &lck->lls_cl, obj, &lov_empty_lock_ops);
1181 		lck->lls_orig = lock->cll_descr;
1182 		result = 0;
1183 	}
1184 	return result;
1185 }
1186 
lov_closure_get(const struct lu_env * env,struct cl_lock * parent)1187 static struct cl_lock_closure *lov_closure_get(const struct lu_env *env,
1188 					       struct cl_lock *parent)
1189 {
1190 	struct cl_lock_closure *closure;
1191 
1192 	closure = &lov_env_info(env)->lti_closure;
1193 	LASSERT(list_empty(&closure->clc_list));
1194 	cl_lock_closure_init(env, closure, parent, 1);
1195 	return closure;
1196 }
1197 
1198 /** @} lov */
1199