1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Implementation of cl_lock for LOVSUB layer.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  */
40 
41 #define DEBUG_SUBSYSTEM S_LOV
42 
43 #include "lov_cl_internal.h"
44 
45 /** \addtogroup lov
46  *  @{
47  */
48 
49 /*****************************************************************************
50  *
51  * Lovsub lock operations.
52  *
53  */
54 
lovsub_lock_fini(const struct lu_env * env,struct cl_lock_slice * slice)55 static void lovsub_lock_fini(const struct lu_env *env,
56 			     struct cl_lock_slice *slice)
57 {
58 	struct lovsub_lock   *lsl;
59 
60 	lsl = cl2lovsub_lock(slice);
61 	LASSERT(list_empty(&lsl->lss_parents));
62 	kmem_cache_free(lovsub_lock_kmem, lsl);
63 }
64 
lovsub_parent_lock(const struct lu_env * env,struct lov_lock * lov)65 static void lovsub_parent_lock(const struct lu_env *env, struct lov_lock *lov)
66 {
67 	struct cl_lock *parent;
68 
69 	parent = lov->lls_cl.cls_lock;
70 	cl_lock_get(parent);
71 	lu_ref_add(&parent->cll_reference, "lovsub-parent", current);
72 	cl_lock_mutex_get(env, parent);
73 }
74 
lovsub_parent_unlock(const struct lu_env * env,struct lov_lock * lov)75 static void lovsub_parent_unlock(const struct lu_env *env, struct lov_lock *lov)
76 {
77 	struct cl_lock *parent;
78 
79 	parent = lov->lls_cl.cls_lock;
80 	cl_lock_mutex_put(env, lov->lls_cl.cls_lock);
81 	lu_ref_del(&parent->cll_reference, "lovsub-parent", current);
82 	cl_lock_put(env, parent);
83 }
84 
85 /**
86  * Implements cl_lock_operations::clo_state() method for lovsub layer, which
87  * method is called whenever sub-lock state changes. Propagates state change
88  * to the top-locks.
89  */
lovsub_lock_state(const struct lu_env * env,const struct cl_lock_slice * slice,enum cl_lock_state state)90 static void lovsub_lock_state(const struct lu_env *env,
91 			      const struct cl_lock_slice *slice,
92 			      enum cl_lock_state state)
93 {
94 	struct lovsub_lock   *sub = cl2lovsub_lock(slice);
95 	struct lov_lock_link *scan;
96 
97 	LASSERT(cl_lock_is_mutexed(slice->cls_lock));
98 
99 	list_for_each_entry(scan, &sub->lss_parents, lll_list) {
100 		struct lov_lock *lov    = scan->lll_super;
101 		struct cl_lock  *parent = lov->lls_cl.cls_lock;
102 
103 		if (sub->lss_active != parent) {
104 			lovsub_parent_lock(env, lov);
105 			cl_lock_signal(env, parent);
106 			lovsub_parent_unlock(env, lov);
107 		}
108 	}
109 }
110 
111 /**
112  * Implementation of cl_lock_operation::clo_weigh() estimating lock weight by
113  * asking parent lock.
114  */
lovsub_lock_weigh(const struct lu_env * env,const struct cl_lock_slice * slice)115 static unsigned long lovsub_lock_weigh(const struct lu_env *env,
116 				       const struct cl_lock_slice *slice)
117 {
118 	struct lovsub_lock *lock = cl2lovsub_lock(slice);
119 	struct lov_lock    *lov;
120 	unsigned long       dumbbell;
121 
122 	LASSERT(cl_lock_is_mutexed(slice->cls_lock));
123 
124 	if (!list_empty(&lock->lss_parents)) {
125 		/*
126 		 * It is not clear whether all parents have to be asked and
127 		 * their estimations summed, or it is enough to ask one. For
128 		 * the current usages, one is always enough.
129 		 */
130 		lov = container_of(lock->lss_parents.next,
131 				   struct lov_lock_link, lll_list)->lll_super;
132 
133 		lovsub_parent_lock(env, lov);
134 		dumbbell = cl_lock_weigh(env, lov->lls_cl.cls_lock);
135 		lovsub_parent_unlock(env, lov);
136 	} else
137 		dumbbell = 0;
138 
139 	return dumbbell;
140 }
141 
142 /**
143  * Maps start/end offsets within a stripe, to offsets within a file.
144  */
lovsub_lock_descr_map(const struct cl_lock_descr * in,struct lov_object * lov,int stripe,struct cl_lock_descr * out)145 static void lovsub_lock_descr_map(const struct cl_lock_descr *in,
146 				  struct lov_object *lov,
147 				  int stripe, struct cl_lock_descr *out)
148 {
149 	pgoff_t size; /* stripe size in pages */
150 	pgoff_t skip; /* how many pages in every stripe are occupied by
151 		       * "other" stripes */
152 	pgoff_t start;
153 	pgoff_t end;
154 
155 	start = in->cld_start;
156 	end   = in->cld_end;
157 
158 	if (lov->lo_lsm->lsm_stripe_count > 1) {
159 		size = cl_index(lov2cl(lov), lov->lo_lsm->lsm_stripe_size);
160 		skip = (lov->lo_lsm->lsm_stripe_count - 1) * size;
161 
162 		/* XXX overflow check here? */
163 		start += start/size * skip + stripe * size;
164 
165 		if (end != CL_PAGE_EOF) {
166 			end += end/size * skip + stripe * size;
167 			/*
168 			 * And check for overflow...
169 			 */
170 			if (end < in->cld_end)
171 				end = CL_PAGE_EOF;
172 		}
173 	}
174 	out->cld_start = start;
175 	out->cld_end   = end;
176 }
177 
178 /**
179  * Adjusts parent lock extent when a sub-lock is attached to a parent. This is
180  * called in two ways:
181  *
182  *     - as part of receive call-back, when server returns granted extent to
183  *       the client, and
184  *
185  *     - when top-lock finds existing sub-lock in the cache.
186  *
187  * Note, that lock mode is not propagated to the parent: i.e., if CLM_READ
188  * top-lock matches CLM_WRITE sub-lock, top-lock is still CLM_READ.
189  */
lov_sublock_modify(const struct lu_env * env,struct lov_lock * lov,struct lovsub_lock * sublock,const struct cl_lock_descr * d,int idx)190 int lov_sublock_modify(const struct lu_env *env, struct lov_lock *lov,
191 		       struct lovsub_lock *sublock,
192 		       const struct cl_lock_descr *d, int idx)
193 {
194 	struct cl_lock       *parent;
195 	struct lovsub_object *subobj;
196 	struct cl_lock_descr *pd;
197 	struct cl_lock_descr *parent_descr;
198 	int		   result;
199 
200 	parent       = lov->lls_cl.cls_lock;
201 	parent_descr = &parent->cll_descr;
202 	LASSERT(cl_lock_mode_match(d->cld_mode, parent_descr->cld_mode));
203 
204 	subobj = cl2lovsub(sublock->lss_cl.cls_obj);
205 	pd     = &lov_env_info(env)->lti_ldescr;
206 
207 	pd->cld_obj  = parent_descr->cld_obj;
208 	pd->cld_mode = parent_descr->cld_mode;
209 	pd->cld_gid  = parent_descr->cld_gid;
210 	lovsub_lock_descr_map(d, subobj->lso_super, subobj->lso_index, pd);
211 	lov->lls_sub[idx].sub_got = *d;
212 	/*
213 	 * Notify top-lock about modification, if lock description changes
214 	 * materially.
215 	 */
216 	if (!cl_lock_ext_match(parent_descr, pd))
217 		result = cl_lock_modify(env, parent, pd);
218 	else
219 		result = 0;
220 	return result;
221 }
222 
lovsub_lock_modify(const struct lu_env * env,const struct cl_lock_slice * s,const struct cl_lock_descr * d)223 static int lovsub_lock_modify(const struct lu_env *env,
224 			      const struct cl_lock_slice *s,
225 			      const struct cl_lock_descr *d)
226 {
227 	struct lovsub_lock   *lock   = cl2lovsub_lock(s);
228 	struct lov_lock_link *scan;
229 	struct lov_lock      *lov;
230 	int result		   = 0;
231 
232 	LASSERT(cl_lock_mode_match(d->cld_mode,
233 				   s->cls_lock->cll_descr.cld_mode));
234 	list_for_each_entry(scan, &lock->lss_parents, lll_list) {
235 		int rc;
236 
237 		lov = scan->lll_super;
238 		lovsub_parent_lock(env, lov);
239 		rc = lov_sublock_modify(env, lov, lock, d, scan->lll_idx);
240 		lovsub_parent_unlock(env, lov);
241 		result = result ?: rc;
242 	}
243 	return result;
244 }
245 
lovsub_lock_closure(const struct lu_env * env,const struct cl_lock_slice * slice,struct cl_lock_closure * closure)246 static int lovsub_lock_closure(const struct lu_env *env,
247 			       const struct cl_lock_slice *slice,
248 			       struct cl_lock_closure *closure)
249 {
250 	struct lovsub_lock   *sub;
251 	struct cl_lock       *parent;
252 	struct lov_lock_link *scan;
253 	int		   result;
254 
255 	LASSERT(cl_lock_is_mutexed(slice->cls_lock));
256 
257 	sub    = cl2lovsub_lock(slice);
258 	result = 0;
259 
260 	list_for_each_entry(scan, &sub->lss_parents, lll_list) {
261 		parent = scan->lll_super->lls_cl.cls_lock;
262 		result = cl_lock_closure_build(env, parent, closure);
263 		if (result != 0)
264 			break;
265 	}
266 	return result;
267 }
268 
269 /**
270  * A helper function for lovsub_lock_delete() that deals with a given parent
271  * top-lock.
272  */
lovsub_lock_delete_one(const struct lu_env * env,struct cl_lock * child,struct lov_lock * lov)273 static int lovsub_lock_delete_one(const struct lu_env *env,
274 				  struct cl_lock *child, struct lov_lock *lov)
275 {
276 	struct cl_lock *parent;
277 	int	     result;
278 
279 	parent = lov->lls_cl.cls_lock;
280 	if (parent->cll_error)
281 		return 0;
282 
283 	result = 0;
284 	switch (parent->cll_state) {
285 	case CLS_ENQUEUED:
286 		/* See LU-1355 for the case that a glimpse lock is
287 		 * interrupted by signal */
288 		LASSERT(parent->cll_flags & CLF_CANCELLED);
289 		break;
290 	case CLS_QUEUING:
291 	case CLS_FREEING:
292 		cl_lock_signal(env, parent);
293 		break;
294 	case CLS_INTRANSIT:
295 		/*
296 		 * Here lies a problem: a sub-lock is canceled while top-lock
297 		 * is being unlocked. Top-lock cannot be moved into CLS_NEW
298 		 * state, because unlocking has to succeed eventually by
299 		 * placing lock into CLS_CACHED (or failing it), see
300 		 * cl_unuse_try(). Nor can top-lock be left in CLS_CACHED
301 		 * state, because lov maintains an invariant that all
302 		 * sub-locks exist in CLS_CACHED (this allows cached top-lock
303 		 * to be reused immediately). Nor can we wait for top-lock
304 		 * state to change, because this can be synchronous to the
305 		 * current thread.
306 		 *
307 		 * We know for sure that lov_lock_unuse() will be called at
308 		 * least one more time to finish un-using, so leave a mark on
309 		 * the top-lock, that will be seen by the next call to
310 		 * lov_lock_unuse().
311 		 */
312 		if (cl_lock_is_intransit(parent))
313 			lov->lls_cancel_race = 1;
314 		break;
315 	case CLS_CACHED:
316 		/*
317 		 * if a sub-lock is canceled move its top-lock into CLS_NEW
318 		 * state to preserve an invariant that a top-lock in
319 		 * CLS_CACHED is immediately ready for re-use (i.e., has all
320 		 * sub-locks), and so that next attempt to re-use the top-lock
321 		 * enqueues missing sub-lock.
322 		 */
323 		cl_lock_state_set(env, parent, CLS_NEW);
324 		/* fall through */
325 	case CLS_NEW:
326 		/*
327 		 * if last sub-lock is canceled, destroy the top-lock (which
328 		 * is now `empty') proactively.
329 		 */
330 		if (lov->lls_nr_filled == 0) {
331 			/* ... but unfortunately, this cannot be done easily,
332 			 * as cancellation of a top-lock might acquire mutices
333 			 * of its other sub-locks, violating lock ordering,
334 			 * see cl_lock_{cancel,delete}() preconditions.
335 			 *
336 			 * To work around this, the mutex of this sub-lock is
337 			 * released, top-lock is destroyed, and sub-lock mutex
338 			 * acquired again. The list of parents has to be
339 			 * re-scanned from the beginning after this.
340 			 *
341 			 * Only do this if no mutices other than on @child and
342 			 * @parent are held by the current thread.
343 			 *
344 			 * TODO: The lock modal here is too complex, because
345 			 * the lock may be canceled and deleted by voluntarily:
346 			 *    cl_lock_request
347 			 *      -> osc_lock_enqueue_wait
348 			 *	-> osc_lock_cancel_wait
349 			 *	  -> cl_lock_delete
350 			 *	    -> lovsub_lock_delete
351 			 *	      -> cl_lock_cancel/delete
352 			 *		-> ...
353 			 *
354 			 * The better choice is to spawn a kernel thread for
355 			 * this purpose. -jay
356 			 */
357 			if (cl_lock_nr_mutexed(env) == 2) {
358 				cl_lock_mutex_put(env, child);
359 				cl_lock_cancel(env, parent);
360 				cl_lock_delete(env, parent);
361 				result = 1;
362 			}
363 		}
364 		break;
365 	case CLS_HELD:
366 		CL_LOCK_DEBUG(D_ERROR, env, parent, "Delete CLS_HELD lock\n");
367 	default:
368 		CERROR("Impossible state: %d\n", parent->cll_state);
369 		LBUG();
370 		break;
371 	}
372 
373 	return result;
374 }
375 
376 /**
377  * An implementation of cl_lock_operations::clo_delete() method. This is
378  * invoked in "bottom-to-top" delete, when lock destruction starts from the
379  * sub-lock (e.g, as a result of ldlm lock LRU policy).
380  */
lovsub_lock_delete(const struct lu_env * env,const struct cl_lock_slice * slice)381 static void lovsub_lock_delete(const struct lu_env *env,
382 			       const struct cl_lock_slice *slice)
383 {
384 	struct cl_lock     *child = slice->cls_lock;
385 	struct lovsub_lock *sub   = cl2lovsub_lock(slice);
386 	int restart;
387 
388 	LASSERT(cl_lock_is_mutexed(child));
389 
390 	/*
391 	 * Destruction of a sub-lock might take multiple iterations, because
392 	 * when the last sub-lock of a given top-lock is deleted, top-lock is
393 	 * canceled proactively, and this requires to release sub-lock
394 	 * mutex. Once sub-lock mutex has been released, list of its parents
395 	 * has to be re-scanned from the beginning.
396 	 */
397 	do {
398 		struct lov_lock      *lov;
399 		struct lov_lock_link *scan;
400 		struct lov_lock_link *temp;
401 		struct lov_lock_sub  *subdata;
402 
403 		restart = 0;
404 		list_for_each_entry_safe(scan, temp,
405 					     &sub->lss_parents, lll_list) {
406 			lov     = scan->lll_super;
407 			subdata = &lov->lls_sub[scan->lll_idx];
408 			lovsub_parent_lock(env, lov);
409 			subdata->sub_got = subdata->sub_descr;
410 			lov_lock_unlink(env, scan, sub);
411 			restart = lovsub_lock_delete_one(env, child, lov);
412 			lovsub_parent_unlock(env, lov);
413 
414 			if (restart) {
415 				cl_lock_mutex_get(env, child);
416 				break;
417 			}
418 	       }
419 	} while (restart);
420 }
421 
lovsub_lock_print(const struct lu_env * env,void * cookie,lu_printer_t p,const struct cl_lock_slice * slice)422 static int lovsub_lock_print(const struct lu_env *env, void *cookie,
423 			     lu_printer_t p, const struct cl_lock_slice *slice)
424 {
425 	struct lovsub_lock   *sub = cl2lovsub_lock(slice);
426 	struct lov_lock      *lov;
427 	struct lov_lock_link *scan;
428 
429 	list_for_each_entry(scan, &sub->lss_parents, lll_list) {
430 		lov = scan->lll_super;
431 		(*p)(env, cookie, "[%d %p ", scan->lll_idx, lov);
432 		if (lov != NULL)
433 			cl_lock_descr_print(env, cookie, p,
434 					    &lov->lls_cl.cls_lock->cll_descr);
435 		(*p)(env, cookie, "] ");
436 	}
437 	return 0;
438 }
439 
440 static const struct cl_lock_operations lovsub_lock_ops = {
441 	.clo_fini    = lovsub_lock_fini,
442 	.clo_state   = lovsub_lock_state,
443 	.clo_delete  = lovsub_lock_delete,
444 	.clo_modify  = lovsub_lock_modify,
445 	.clo_closure = lovsub_lock_closure,
446 	.clo_weigh   = lovsub_lock_weigh,
447 	.clo_print   = lovsub_lock_print
448 };
449 
lovsub_lock_init(const struct lu_env * env,struct cl_object * obj,struct cl_lock * lock,const struct cl_io * io)450 int lovsub_lock_init(const struct lu_env *env, struct cl_object *obj,
451 		     struct cl_lock *lock, const struct cl_io *io)
452 {
453 	struct lovsub_lock *lsk;
454 	int result;
455 
456 	lsk = kmem_cache_alloc(lovsub_lock_kmem, GFP_NOFS | __GFP_ZERO);
457 	if (lsk != NULL) {
458 		INIT_LIST_HEAD(&lsk->lss_parents);
459 		cl_lock_slice_add(lock, &lsk->lss_cl, obj, &lovsub_lock_ops);
460 		result = 0;
461 	} else
462 		result = -ENOMEM;
463 	return result;
464 }
465 
466 /** @} lov */
467