1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Client Lustre Page.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  */
40 
41 #define DEBUG_SUBSYSTEM S_CLASS
42 
43 #include "../../include/linux/libcfs/libcfs.h"
44 #include "../include/obd_class.h"
45 #include "../include/obd_support.h"
46 #include <linux/list.h>
47 
48 #include "../include/cl_object.h"
49 #include "cl_internal.h"
50 
51 static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg,
52 			    int radix);
53 
54 # define PASSERT(env, page, expr)				       \
55   do {								    \
56 	  if (unlikely(!(expr))) {				      \
57 		  CL_PAGE_DEBUG(D_ERROR, (env), (page), #expr "\n");    \
58 		  LASSERT(0);					   \
59 	  }							     \
60   } while (0)
61 
62 # define PINVRNT(env, page, exp) \
63 	((void)sizeof(env), (void)sizeof(page), (void)sizeof !!(exp))
64 
65 /* Disable page statistic by default due to huge performance penalty. */
66 #define CS_PAGE_INC(o, item)
67 #define CS_PAGE_DEC(o, item)
68 #define CS_PAGESTATE_INC(o, state)
69 #define CS_PAGESTATE_DEC(o, state)
70 
71 /**
72  * Internal version of cl_page_top, it should be called if the page is
73  * known to be not freed, says with page referenced, or radix tree lock held,
74  * or page owned.
75  */
cl_page_top_trusted(struct cl_page * page)76 static struct cl_page *cl_page_top_trusted(struct cl_page *page)
77 {
78 	while (page->cp_parent != NULL)
79 		page = page->cp_parent;
80 	return page;
81 }
82 
83 /**
84  * Internal version of cl_page_get().
85  *
86  * This function can be used to obtain initial reference to previously
87  * unreferenced cached object. It can be called only if concurrent page
88  * reclamation is somehow prevented, e.g., by locking page radix-tree
89  * (cl_object_header::hdr->coh_page_guard), or by keeping a lock on a VM page,
90  * associated with \a page.
91  *
92  * Use with care! Not exported.
93  */
cl_page_get_trust(struct cl_page * page)94 static void cl_page_get_trust(struct cl_page *page)
95 {
96 	LASSERT(atomic_read(&page->cp_ref) > 0);
97 	atomic_inc(&page->cp_ref);
98 }
99 
100 /**
101  * Returns a slice within a page, corresponding to the given layer in the
102  * device stack.
103  *
104  * \see cl_lock_at()
105  */
106 static const struct cl_page_slice *
cl_page_at_trusted(const struct cl_page * page,const struct lu_device_type * dtype)107 cl_page_at_trusted(const struct cl_page *page,
108 		   const struct lu_device_type *dtype)
109 {
110 	const struct cl_page_slice *slice;
111 
112 	page = cl_page_top_trusted((struct cl_page *)page);
113 	do {
114 		list_for_each_entry(slice, &page->cp_layers, cpl_linkage) {
115 			if (slice->cpl_obj->co_lu.lo_dev->ld_type == dtype)
116 				return slice;
117 		}
118 		page = page->cp_child;
119 	} while (page != NULL);
120 	return NULL;
121 }
122 
123 /**
124  * Returns a page with given index in the given object, or NULL if no page is
125  * found. Acquires a reference on \a page.
126  *
127  * Locking: called under cl_object_header::coh_page_guard spin-lock.
128  */
cl_page_lookup(struct cl_object_header * hdr,pgoff_t index)129 struct cl_page *cl_page_lookup(struct cl_object_header *hdr, pgoff_t index)
130 {
131 	struct cl_page *page;
132 
133 	assert_spin_locked(&hdr->coh_page_guard);
134 
135 	page = radix_tree_lookup(&hdr->coh_tree, index);
136 	if (page != NULL)
137 		cl_page_get_trust(page);
138 	return page;
139 }
140 EXPORT_SYMBOL(cl_page_lookup);
141 
142 /**
143  * Returns a list of pages by a given [start, end] of \a obj.
144  *
145  * \param resched If not NULL, then we give up before hogging CPU for too
146  * long and set *resched = 1, in that case caller should implement a retry
147  * logic.
148  *
149  * Gang tree lookup (radix_tree_gang_lookup()) optimization is absolutely
150  * crucial in the face of [offset, EOF] locks.
151  *
152  * Return at least one page in @queue unless there is no covered page.
153  */
cl_page_gang_lookup(const struct lu_env * env,struct cl_object * obj,struct cl_io * io,pgoff_t start,pgoff_t end,cl_page_gang_cb_t cb,void * cbdata)154 int cl_page_gang_lookup(const struct lu_env *env, struct cl_object *obj,
155 			struct cl_io *io, pgoff_t start, pgoff_t end,
156 			cl_page_gang_cb_t cb, void *cbdata)
157 {
158 	struct cl_object_header *hdr;
159 	struct cl_page	  *page;
160 	struct cl_page	 **pvec;
161 	const struct cl_page_slice  *slice;
162 	const struct lu_device_type *dtype;
163 	pgoff_t		  idx;
164 	unsigned int	     nr;
165 	unsigned int	     i;
166 	unsigned int	     j;
167 	int		      res = CLP_GANG_OKAY;
168 	int		      tree_lock = 1;
169 
170 	idx = start;
171 	hdr = cl_object_header(obj);
172 	pvec = cl_env_info(env)->clt_pvec;
173 	dtype = cl_object_top(obj)->co_lu.lo_dev->ld_type;
174 	spin_lock(&hdr->coh_page_guard);
175 	while ((nr = radix_tree_gang_lookup(&hdr->coh_tree, (void **)pvec,
176 					    idx, CLT_PVEC_SIZE)) > 0) {
177 		int end_of_region = 0;
178 		idx = pvec[nr - 1]->cp_index + 1;
179 		for (i = 0, j = 0; i < nr; ++i) {
180 			page = pvec[i];
181 			pvec[i] = NULL;
182 
183 			LASSERT(page->cp_type == CPT_CACHEABLE);
184 			if (page->cp_index > end) {
185 				end_of_region = 1;
186 				break;
187 			}
188 			if (page->cp_state == CPS_FREEING)
189 				continue;
190 
191 			slice = cl_page_at_trusted(page, dtype);
192 			/*
193 			 * Pages for lsm-less file has no underneath sub-page
194 			 * for osc, in case of ...
195 			 */
196 			PASSERT(env, page, slice != NULL);
197 
198 			page = slice->cpl_page;
199 			/*
200 			 * Can safely call cl_page_get_trust() under
201 			 * radix-tree spin-lock.
202 			 *
203 			 * XXX not true, because @page is from object another
204 			 * than @hdr and protected by different tree lock.
205 			 */
206 			cl_page_get_trust(page);
207 			lu_ref_add_atomic(&page->cp_reference,
208 					  "gang_lookup", current);
209 			pvec[j++] = page;
210 		}
211 
212 		/*
213 		 * Here a delicate locking dance is performed. Current thread
214 		 * holds a reference to a page, but has to own it before it
215 		 * can be placed into queue. Owning implies waiting, so
216 		 * radix-tree lock is to be released. After a wait one has to
217 		 * check that pages weren't truncated (cl_page_own() returns
218 		 * error in the latter case).
219 		 */
220 		spin_unlock(&hdr->coh_page_guard);
221 		tree_lock = 0;
222 
223 		for (i = 0; i < j; ++i) {
224 			page = pvec[i];
225 			if (res == CLP_GANG_OKAY)
226 				res = (*cb)(env, io, page, cbdata);
227 			lu_ref_del(&page->cp_reference,
228 				   "gang_lookup", current);
229 			cl_page_put(env, page);
230 		}
231 		if (nr < CLT_PVEC_SIZE || end_of_region)
232 			break;
233 
234 		if (res == CLP_GANG_OKAY && need_resched())
235 			res = CLP_GANG_RESCHED;
236 		if (res != CLP_GANG_OKAY)
237 			break;
238 
239 		spin_lock(&hdr->coh_page_guard);
240 		tree_lock = 1;
241 	}
242 	if (tree_lock)
243 		spin_unlock(&hdr->coh_page_guard);
244 	return res;
245 }
246 EXPORT_SYMBOL(cl_page_gang_lookup);
247 
cl_page_free(const struct lu_env * env,struct cl_page * page)248 static void cl_page_free(const struct lu_env *env, struct cl_page *page)
249 {
250 	struct cl_object *obj  = page->cp_obj;
251 	int pagesize = cl_object_header(obj)->coh_page_bufsize;
252 
253 	PASSERT(env, page, list_empty(&page->cp_batch));
254 	PASSERT(env, page, page->cp_owner == NULL);
255 	PASSERT(env, page, page->cp_req == NULL);
256 	PASSERT(env, page, page->cp_parent == NULL);
257 	PASSERT(env, page, page->cp_state == CPS_FREEING);
258 
259 	might_sleep();
260 	while (!list_empty(&page->cp_layers)) {
261 		struct cl_page_slice *slice;
262 
263 		slice = list_entry(page->cp_layers.next,
264 				       struct cl_page_slice, cpl_linkage);
265 		list_del_init(page->cp_layers.next);
266 		slice->cpl_ops->cpo_fini(env, slice);
267 	}
268 	CS_PAGE_DEC(obj, total);
269 	CS_PAGESTATE_DEC(obj, page->cp_state);
270 	lu_object_ref_del_at(&obj->co_lu, &page->cp_obj_ref, "cl_page", page);
271 	cl_object_put(env, obj);
272 	lu_ref_fini(&page->cp_reference);
273 	OBD_FREE(page, pagesize);
274 }
275 
276 /**
277  * Helper function updating page state. This is the only place in the code
278  * where cl_page::cp_state field is mutated.
279  */
cl_page_state_set_trust(struct cl_page * page,enum cl_page_state state)280 static inline void cl_page_state_set_trust(struct cl_page *page,
281 					   enum cl_page_state state)
282 {
283 	/* bypass const. */
284 	*(enum cl_page_state *)&page->cp_state = state;
285 }
286 
cl_page_alloc(const struct lu_env * env,struct cl_object * o,pgoff_t ind,struct page * vmpage,enum cl_page_type type)287 static struct cl_page *cl_page_alloc(const struct lu_env *env,
288 		struct cl_object *o, pgoff_t ind, struct page *vmpage,
289 		enum cl_page_type type)
290 {
291 	struct cl_page	  *page;
292 	struct lu_object_header *head;
293 
294 	OBD_ALLOC_GFP(page, cl_object_header(o)->coh_page_bufsize,
295 			GFP_NOFS);
296 	if (page != NULL) {
297 		int result = 0;
298 		atomic_set(&page->cp_ref, 1);
299 		if (type == CPT_CACHEABLE) /* for radix tree */
300 			atomic_inc(&page->cp_ref);
301 		page->cp_obj = o;
302 		cl_object_get(o);
303 		lu_object_ref_add_at(&o->co_lu, &page->cp_obj_ref, "cl_page",
304 				     page);
305 		page->cp_index = ind;
306 		cl_page_state_set_trust(page, CPS_CACHED);
307 		page->cp_type = type;
308 		INIT_LIST_HEAD(&page->cp_layers);
309 		INIT_LIST_HEAD(&page->cp_batch);
310 		INIT_LIST_HEAD(&page->cp_flight);
311 		mutex_init(&page->cp_mutex);
312 		lu_ref_init(&page->cp_reference);
313 		head = o->co_lu.lo_header;
314 		list_for_each_entry(o, &head->loh_layers,
315 					co_lu.lo_linkage) {
316 			if (o->co_ops->coo_page_init != NULL) {
317 				result = o->co_ops->coo_page_init(env, o,
318 								  page, vmpage);
319 				if (result != 0) {
320 					cl_page_delete0(env, page, 0);
321 					cl_page_free(env, page);
322 					page = ERR_PTR(result);
323 					break;
324 				}
325 			}
326 		}
327 		if (result == 0) {
328 			CS_PAGE_INC(o, total);
329 			CS_PAGE_INC(o, create);
330 			CS_PAGESTATE_DEC(o, CPS_CACHED);
331 		}
332 	} else {
333 		page = ERR_PTR(-ENOMEM);
334 	}
335 	return page;
336 }
337 
338 /**
339  * Returns a cl_page with index \a idx at the object \a o, and associated with
340  * the VM page \a vmpage.
341  *
342  * This is the main entry point into the cl_page caching interface. First, a
343  * cache (implemented as a per-object radix tree) is consulted. If page is
344  * found there, it is returned immediately. Otherwise new page is allocated
345  * and returned. In any case, additional reference to page is acquired.
346  *
347  * \see cl_object_find(), cl_lock_find()
348  */
cl_page_find0(const struct lu_env * env,struct cl_object * o,pgoff_t idx,struct page * vmpage,enum cl_page_type type,struct cl_page * parent)349 static struct cl_page *cl_page_find0(const struct lu_env *env,
350 				     struct cl_object *o,
351 				     pgoff_t idx, struct page *vmpage,
352 				     enum cl_page_type type,
353 				     struct cl_page *parent)
354 {
355 	struct cl_page	  *page = NULL;
356 	struct cl_page	  *ghost = NULL;
357 	struct cl_object_header *hdr;
358 	int err;
359 
360 	LASSERT(type == CPT_CACHEABLE || type == CPT_TRANSIENT);
361 	might_sleep();
362 
363 	hdr = cl_object_header(o);
364 	CS_PAGE_INC(o, lookup);
365 
366 	CDEBUG(D_PAGE, "%lu@"DFID" %p %lx %d\n",
367 	       idx, PFID(&hdr->coh_lu.loh_fid), vmpage, vmpage->private, type);
368 	/* fast path. */
369 	if (type == CPT_CACHEABLE) {
370 		/* vmpage lock is used to protect the child/parent
371 		 * relationship */
372 		KLASSERT(PageLocked(vmpage));
373 		/*
374 		 * cl_vmpage_page() can be called here without any locks as
375 		 *
376 		 *     - "vmpage" is locked (which prevents ->private from
377 		 *       concurrent updates), and
378 		 *
379 		 *     - "o" cannot be destroyed while current thread holds a
380 		 *       reference on it.
381 		 */
382 		page = cl_vmpage_page(vmpage, o);
383 		PINVRNT(env, page,
384 			ergo(page != NULL,
385 			     cl_page_vmpage(env, page) == vmpage &&
386 			     (void *)radix_tree_lookup(&hdr->coh_tree,
387 						       idx) == page));
388 	}
389 
390 	if (page != NULL) {
391 		CS_PAGE_INC(o, hit);
392 		return page;
393 	}
394 
395 	/* allocate and initialize cl_page */
396 	page = cl_page_alloc(env, o, idx, vmpage, type);
397 	if (IS_ERR(page))
398 		return page;
399 
400 	if (type == CPT_TRANSIENT) {
401 		if (parent) {
402 			LASSERT(page->cp_parent == NULL);
403 			page->cp_parent = parent;
404 			parent->cp_child = page;
405 		}
406 		return page;
407 	}
408 
409 	/*
410 	 * XXX optimization: use radix_tree_preload() here, and change tree
411 	 * gfp mask to GFP_KERNEL in cl_object_header_init().
412 	 */
413 	spin_lock(&hdr->coh_page_guard);
414 	err = radix_tree_insert(&hdr->coh_tree, idx, page);
415 	if (err != 0) {
416 		ghost = page;
417 		/*
418 		 * Noted by Jay: a lock on \a vmpage protects cl_page_find()
419 		 * from this race, but
420 		 *
421 		 *     0. it's better to have cl_page interface "locally
422 		 *     consistent" so that its correctness can be reasoned
423 		 *     about without appealing to the (obscure world of) VM
424 		 *     locking.
425 		 *
426 		 *     1. handling this race allows ->coh_tree to remain
427 		 *     consistent even when VM locking is somehow busted,
428 		 *     which is very useful during diagnosing and debugging.
429 		 */
430 		page = ERR_PTR(err);
431 		CL_PAGE_DEBUG(D_ERROR, env, ghost,
432 			      "fail to insert into radix tree: %d\n", err);
433 	} else {
434 		if (parent) {
435 			LASSERT(page->cp_parent == NULL);
436 			page->cp_parent = parent;
437 			parent->cp_child = page;
438 		}
439 		hdr->coh_pages++;
440 	}
441 	spin_unlock(&hdr->coh_page_guard);
442 
443 	if (unlikely(ghost != NULL)) {
444 		cl_page_delete0(env, ghost, 0);
445 		cl_page_free(env, ghost);
446 	}
447 	return page;
448 }
449 
cl_page_find(const struct lu_env * env,struct cl_object * o,pgoff_t idx,struct page * vmpage,enum cl_page_type type)450 struct cl_page *cl_page_find(const struct lu_env *env, struct cl_object *o,
451 			     pgoff_t idx, struct page *vmpage,
452 			     enum cl_page_type type)
453 {
454 	return cl_page_find0(env, o, idx, vmpage, type, NULL);
455 }
456 EXPORT_SYMBOL(cl_page_find);
457 
458 
cl_page_find_sub(const struct lu_env * env,struct cl_object * o,pgoff_t idx,struct page * vmpage,struct cl_page * parent)459 struct cl_page *cl_page_find_sub(const struct lu_env *env, struct cl_object *o,
460 				 pgoff_t idx, struct page *vmpage,
461 				 struct cl_page *parent)
462 {
463 	return cl_page_find0(env, o, idx, vmpage, parent->cp_type, parent);
464 }
465 EXPORT_SYMBOL(cl_page_find_sub);
466 
cl_page_invariant(const struct cl_page * pg)467 static inline int cl_page_invariant(const struct cl_page *pg)
468 {
469 	struct cl_object_header *header;
470 	struct cl_page	  *parent;
471 	struct cl_page	  *child;
472 	struct cl_io	    *owner;
473 
474 	/*
475 	 * Page invariant is protected by a VM lock.
476 	 */
477 	LINVRNT(cl_page_is_vmlocked(NULL, pg));
478 
479 	header = cl_object_header(pg->cp_obj);
480 	parent = pg->cp_parent;
481 	child  = pg->cp_child;
482 	owner  = pg->cp_owner;
483 
484 	return cl_page_in_use(pg) &&
485 		ergo(parent != NULL, parent->cp_child == pg) &&
486 		ergo(child != NULL, child->cp_parent == pg) &&
487 		ergo(child != NULL, pg->cp_obj != child->cp_obj) &&
488 		ergo(parent != NULL, pg->cp_obj != parent->cp_obj) &&
489 		ergo(owner != NULL && parent != NULL,
490 		     parent->cp_owner == pg->cp_owner->ci_parent) &&
491 		ergo(owner != NULL && child != NULL,
492 		     child->cp_owner->ci_parent == owner) &&
493 		/*
494 		 * Either page is early in initialization (has neither child
495 		 * nor parent yet), or it is in the object radix tree.
496 		 */
497 		ergo(pg->cp_state < CPS_FREEING && pg->cp_type == CPT_CACHEABLE,
498 		     (void *)radix_tree_lookup(&header->coh_tree,
499 					       pg->cp_index) == pg ||
500 		     (child == NULL && parent == NULL));
501 }
502 
cl_page_state_set0(const struct lu_env * env,struct cl_page * page,enum cl_page_state state)503 static void cl_page_state_set0(const struct lu_env *env,
504 			       struct cl_page *page, enum cl_page_state state)
505 {
506 	enum cl_page_state old;
507 
508 	/*
509 	 * Matrix of allowed state transitions [old][new], for sanity
510 	 * checking.
511 	 */
512 	static const int allowed_transitions[CPS_NR][CPS_NR] = {
513 		[CPS_CACHED] = {
514 			[CPS_CACHED]  = 0,
515 			[CPS_OWNED]   = 1, /* io finds existing cached page */
516 			[CPS_PAGEIN]  = 0,
517 			[CPS_PAGEOUT] = 1, /* write-out from the cache */
518 			[CPS_FREEING] = 1, /* eviction on the memory pressure */
519 		},
520 		[CPS_OWNED] = {
521 			[CPS_CACHED]  = 1, /* release to the cache */
522 			[CPS_OWNED]   = 0,
523 			[CPS_PAGEIN]  = 1, /* start read immediately */
524 			[CPS_PAGEOUT] = 1, /* start write immediately */
525 			[CPS_FREEING] = 1, /* lock invalidation or truncate */
526 		},
527 		[CPS_PAGEIN] = {
528 			[CPS_CACHED]  = 1, /* io completion */
529 			[CPS_OWNED]   = 0,
530 			[CPS_PAGEIN]  = 0,
531 			[CPS_PAGEOUT] = 0,
532 			[CPS_FREEING] = 0,
533 		},
534 		[CPS_PAGEOUT] = {
535 			[CPS_CACHED]  = 1, /* io completion */
536 			[CPS_OWNED]   = 0,
537 			[CPS_PAGEIN]  = 0,
538 			[CPS_PAGEOUT] = 0,
539 			[CPS_FREEING] = 0,
540 		},
541 		[CPS_FREEING] = {
542 			[CPS_CACHED]  = 0,
543 			[CPS_OWNED]   = 0,
544 			[CPS_PAGEIN]  = 0,
545 			[CPS_PAGEOUT] = 0,
546 			[CPS_FREEING] = 0,
547 		}
548 	};
549 
550 	old = page->cp_state;
551 	PASSERT(env, page, allowed_transitions[old][state]);
552 	CL_PAGE_HEADER(D_TRACE, env, page, "%d -> %d\n", old, state);
553 	for (; page != NULL; page = page->cp_child) {
554 		PASSERT(env, page, page->cp_state == old);
555 		PASSERT(env, page,
556 			equi(state == CPS_OWNED, page->cp_owner != NULL));
557 
558 		CS_PAGESTATE_DEC(page->cp_obj, page->cp_state);
559 		CS_PAGESTATE_INC(page->cp_obj, state);
560 		cl_page_state_set_trust(page, state);
561 	}
562 }
563 
cl_page_state_set(const struct lu_env * env,struct cl_page * page,enum cl_page_state state)564 static void cl_page_state_set(const struct lu_env *env,
565 			      struct cl_page *page, enum cl_page_state state)
566 {
567 	cl_page_state_set0(env, page, state);
568 }
569 
570 /**
571  * Acquires an additional reference to a page.
572  *
573  * This can be called only by caller already possessing a reference to \a
574  * page.
575  *
576  * \see cl_object_get(), cl_lock_get().
577  */
cl_page_get(struct cl_page * page)578 void cl_page_get(struct cl_page *page)
579 {
580 	cl_page_get_trust(page);
581 }
582 EXPORT_SYMBOL(cl_page_get);
583 
584 /**
585  * Releases a reference to a page.
586  *
587  * When last reference is released, page is returned to the cache, unless it
588  * is in cl_page_state::CPS_FREEING state, in which case it is immediately
589  * destroyed.
590  *
591  * \see cl_object_put(), cl_lock_put().
592  */
cl_page_put(const struct lu_env * env,struct cl_page * page)593 void cl_page_put(const struct lu_env *env, struct cl_page *page)
594 {
595 	PASSERT(env, page, atomic_read(&page->cp_ref) > !!page->cp_parent);
596 
597 	CL_PAGE_HEADER(D_TRACE, env, page, "%d\n",
598 		       atomic_read(&page->cp_ref));
599 
600 	if (atomic_dec_and_test(&page->cp_ref)) {
601 		LASSERT(page->cp_state == CPS_FREEING);
602 
603 		LASSERT(atomic_read(&page->cp_ref) == 0);
604 		PASSERT(env, page, page->cp_owner == NULL);
605 		PASSERT(env, page, list_empty(&page->cp_batch));
606 		/*
607 		 * Page is no longer reachable by other threads. Tear
608 		 * it down.
609 		 */
610 		cl_page_free(env, page);
611 	}
612 }
613 EXPORT_SYMBOL(cl_page_put);
614 
615 /**
616  * Returns a VM page associated with a given cl_page.
617  */
cl_page_vmpage(const struct lu_env * env,struct cl_page * page)618 struct page *cl_page_vmpage(const struct lu_env *env, struct cl_page *page)
619 {
620 	const struct cl_page_slice *slice;
621 
622 	/*
623 	 * Find uppermost layer with ->cpo_vmpage() method, and return its
624 	 * result.
625 	 */
626 	page = cl_page_top(page);
627 	do {
628 		list_for_each_entry(slice, &page->cp_layers, cpl_linkage) {
629 			if (slice->cpl_ops->cpo_vmpage != NULL)
630 				return slice->cpl_ops->cpo_vmpage(env, slice);
631 		}
632 		page = page->cp_child;
633 	} while (page != NULL);
634 	LBUG(); /* ->cpo_vmpage() has to be defined somewhere in the stack */
635 }
636 EXPORT_SYMBOL(cl_page_vmpage);
637 
638 /**
639  * Returns a cl_page associated with a VM page, and given cl_object.
640  */
cl_vmpage_page(struct page * vmpage,struct cl_object * obj)641 struct cl_page *cl_vmpage_page(struct page *vmpage, struct cl_object *obj)
642 {
643 	struct cl_page *top;
644 	struct cl_page *page;
645 
646 	KLASSERT(PageLocked(vmpage));
647 
648 	/*
649 	 * NOTE: absence of races and liveness of data are guaranteed by page
650 	 *       lock on a "vmpage". That works because object destruction has
651 	 *       bottom-to-top pass.
652 	 */
653 
654 	/*
655 	 * This loop assumes that ->private points to the top-most page. This
656 	 * can be rectified easily.
657 	 */
658 	top = (struct cl_page *)vmpage->private;
659 	if (top == NULL)
660 		return NULL;
661 
662 	for (page = top; page != NULL; page = page->cp_child) {
663 		if (cl_object_same(page->cp_obj, obj)) {
664 			cl_page_get_trust(page);
665 			break;
666 		}
667 	}
668 	LASSERT(ergo(page, page->cp_type == CPT_CACHEABLE));
669 	return page;
670 }
671 EXPORT_SYMBOL(cl_vmpage_page);
672 
673 /**
674  * Returns the top-page for a given page.
675  *
676  * \see cl_object_top(), cl_io_top()
677  */
cl_page_top(struct cl_page * page)678 struct cl_page *cl_page_top(struct cl_page *page)
679 {
680 	return cl_page_top_trusted(page);
681 }
682 EXPORT_SYMBOL(cl_page_top);
683 
cl_page_at(const struct cl_page * page,const struct lu_device_type * dtype)684 const struct cl_page_slice *cl_page_at(const struct cl_page *page,
685 				       const struct lu_device_type *dtype)
686 {
687 	return cl_page_at_trusted(page, dtype);
688 }
689 EXPORT_SYMBOL(cl_page_at);
690 
691 #define CL_PAGE_OP(opname) offsetof(struct cl_page_operations, opname)
692 
693 #define CL_PAGE_INVOKE(_env, _page, _op, _proto, ...)		   \
694 ({								      \
695 	const struct lu_env	*__env  = (_env);		    \
696 	struct cl_page	     *__page = (_page);		   \
697 	const struct cl_page_slice *__scan;			     \
698 	int			 __result;			   \
699 	ptrdiff_t		   __op   = (_op);		     \
700 	int		       (*__method)_proto;		    \
701 									\
702 	__result = 0;						   \
703 	__page = cl_page_top(__page);				   \
704 	do {							    \
705 		list_for_each_entry(__scan, &__page->cp_layers,     \
706 					cpl_linkage) {		  \
707 			__method = *(void **)((char *)__scan->cpl_ops + \
708 					      __op);		    \
709 			if (__method != NULL) {			 \
710 				__result = (*__method)(__env, __scan,   \
711 						       ## __VA_ARGS__); \
712 				if (__result != 0)		      \
713 					break;			  \
714 			}					       \
715 		}						       \
716 		__page = __page->cp_child;			      \
717 	} while (__page != NULL && __result == 0);		      \
718 	if (__result > 0)					       \
719 		__result = 0;					   \
720 	__result;						       \
721 })
722 
723 #define CL_PAGE_INVOID(_env, _page, _op, _proto, ...)		   \
724 do {								    \
725 	const struct lu_env	*__env  = (_env);		    \
726 	struct cl_page	     *__page = (_page);		   \
727 	const struct cl_page_slice *__scan;			     \
728 	ptrdiff_t		   __op   = (_op);		     \
729 	void		      (*__method)_proto;		    \
730 									\
731 	__page = cl_page_top(__page);				   \
732 	do {							    \
733 		list_for_each_entry(__scan, &__page->cp_layers,     \
734 					cpl_linkage) {		  \
735 			__method = *(void **)((char *)__scan->cpl_ops + \
736 					      __op);		    \
737 			if (__method != NULL)			   \
738 				(*__method)(__env, __scan,	      \
739 					    ## __VA_ARGS__);	    \
740 		}						       \
741 		__page = __page->cp_child;			      \
742 	} while (__page != NULL);				       \
743 } while (0)
744 
745 #define CL_PAGE_INVOID_REVERSE(_env, _page, _op, _proto, ...)	       \
746 do {									\
747 	const struct lu_env	*__env  = (_env);			\
748 	struct cl_page	     *__page = (_page);		       \
749 	const struct cl_page_slice *__scan;				 \
750 	ptrdiff_t		   __op   = (_op);			 \
751 	void		      (*__method)_proto;			\
752 									    \
753 	/* get to the bottom page. */				       \
754 	while (__page->cp_child != NULL)				    \
755 		__page = __page->cp_child;				  \
756 	do {								\
757 		list_for_each_entry_reverse(__scan, &__page->cp_layers, \
758 						cpl_linkage) {	      \
759 			__method = *(void **)((char *)__scan->cpl_ops +     \
760 					      __op);			\
761 			if (__method != NULL)			       \
762 				(*__method)(__env, __scan,		  \
763 					    ## __VA_ARGS__);		\
764 		}							   \
765 		__page = __page->cp_parent;				 \
766 	} while (__page != NULL);					   \
767 } while (0)
768 
cl_page_invoke(const struct lu_env * env,struct cl_io * io,struct cl_page * page,ptrdiff_t op)769 static int cl_page_invoke(const struct lu_env *env,
770 			  struct cl_io *io, struct cl_page *page, ptrdiff_t op)
771 
772 {
773 	PINVRNT(env, page, cl_object_same(page->cp_obj, io->ci_obj));
774 	return CL_PAGE_INVOKE(env, page, op,
775 			      (const struct lu_env *,
776 			       const struct cl_page_slice *, struct cl_io *),
777 			      io);
778 }
779 
cl_page_invoid(const struct lu_env * env,struct cl_io * io,struct cl_page * page,ptrdiff_t op)780 static void cl_page_invoid(const struct lu_env *env,
781 			   struct cl_io *io, struct cl_page *page, ptrdiff_t op)
782 
783 {
784 	PINVRNT(env, page, cl_object_same(page->cp_obj, io->ci_obj));
785 	CL_PAGE_INVOID(env, page, op,
786 		       (const struct lu_env *,
787 			const struct cl_page_slice *, struct cl_io *), io);
788 }
789 
cl_page_owner_clear(struct cl_page * page)790 static void cl_page_owner_clear(struct cl_page *page)
791 {
792 	for (page = cl_page_top(page); page != NULL; page = page->cp_child) {
793 		if (page->cp_owner != NULL) {
794 			LASSERT(page->cp_owner->ci_owned_nr > 0);
795 			page->cp_owner->ci_owned_nr--;
796 			page->cp_owner = NULL;
797 			page->cp_task = NULL;
798 		}
799 	}
800 }
801 
cl_page_owner_set(struct cl_page * page)802 static void cl_page_owner_set(struct cl_page *page)
803 {
804 	for (page = cl_page_top(page); page != NULL; page = page->cp_child) {
805 		LASSERT(page->cp_owner != NULL);
806 		page->cp_owner->ci_owned_nr++;
807 	}
808 }
809 
cl_page_disown0(const struct lu_env * env,struct cl_io * io,struct cl_page * pg)810 void cl_page_disown0(const struct lu_env *env,
811 		     struct cl_io *io, struct cl_page *pg)
812 {
813 	enum cl_page_state state;
814 
815 	state = pg->cp_state;
816 	PINVRNT(env, pg, state == CPS_OWNED || state == CPS_FREEING);
817 	PINVRNT(env, pg, cl_page_invariant(pg));
818 	cl_page_owner_clear(pg);
819 
820 	if (state == CPS_OWNED)
821 		cl_page_state_set(env, pg, CPS_CACHED);
822 	/*
823 	 * Completion call-backs are executed in the bottom-up order, so that
824 	 * uppermost layer (llite), responsible for VFS/VM interaction runs
825 	 * last and can release locks safely.
826 	 */
827 	CL_PAGE_INVOID_REVERSE(env, pg, CL_PAGE_OP(cpo_disown),
828 			       (const struct lu_env *,
829 				const struct cl_page_slice *, struct cl_io *),
830 			       io);
831 }
832 
833 /**
834  * returns true, iff page is owned by the given io.
835  */
cl_page_is_owned(const struct cl_page * pg,const struct cl_io * io)836 int cl_page_is_owned(const struct cl_page *pg, const struct cl_io *io)
837 {
838 	LINVRNT(cl_object_same(pg->cp_obj, io->ci_obj));
839 	return pg->cp_state == CPS_OWNED && pg->cp_owner == io;
840 }
841 EXPORT_SYMBOL(cl_page_is_owned);
842 
843 /**
844  * Try to own a page by IO.
845  *
846  * Waits until page is in cl_page_state::CPS_CACHED state, and then switch it
847  * into cl_page_state::CPS_OWNED state.
848  *
849  * \pre  !cl_page_is_owned(pg, io)
850  * \post result == 0 iff cl_page_is_owned(pg, io)
851  *
852  * \retval 0   success
853  *
854  * \retval -ve failure, e.g., page was destroyed (and landed in
855  *	     cl_page_state::CPS_FREEING instead of cl_page_state::CPS_CACHED).
856  *	     or, page was owned by another thread, or in IO.
857  *
858  * \see cl_page_disown()
859  * \see cl_page_operations::cpo_own()
860  * \see cl_page_own_try()
861  * \see cl_page_own
862  */
cl_page_own0(const struct lu_env * env,struct cl_io * io,struct cl_page * pg,int nonblock)863 static int cl_page_own0(const struct lu_env *env, struct cl_io *io,
864 			struct cl_page *pg, int nonblock)
865 {
866 	int result;
867 
868 	PINVRNT(env, pg, !cl_page_is_owned(pg, io));
869 
870 	pg = cl_page_top(pg);
871 	io = cl_io_top(io);
872 
873 	if (pg->cp_state == CPS_FREEING) {
874 		result = -ENOENT;
875 	} else {
876 		result = CL_PAGE_INVOKE(env, pg, CL_PAGE_OP(cpo_own),
877 					(const struct lu_env *,
878 					 const struct cl_page_slice *,
879 					 struct cl_io *, int),
880 					io, nonblock);
881 		if (result == 0) {
882 			PASSERT(env, pg, pg->cp_owner == NULL);
883 			PASSERT(env, pg, pg->cp_req == NULL);
884 			pg->cp_owner = io;
885 			pg->cp_task  = current;
886 			cl_page_owner_set(pg);
887 			if (pg->cp_state != CPS_FREEING) {
888 				cl_page_state_set(env, pg, CPS_OWNED);
889 			} else {
890 				cl_page_disown0(env, io, pg);
891 				result = -ENOENT;
892 			}
893 		}
894 	}
895 	PINVRNT(env, pg, ergo(result == 0, cl_page_invariant(pg)));
896 	return result;
897 }
898 
899 /**
900  * Own a page, might be blocked.
901  *
902  * \see cl_page_own0()
903  */
cl_page_own(const struct lu_env * env,struct cl_io * io,struct cl_page * pg)904 int cl_page_own(const struct lu_env *env, struct cl_io *io, struct cl_page *pg)
905 {
906 	return cl_page_own0(env, io, pg, 0);
907 }
908 EXPORT_SYMBOL(cl_page_own);
909 
910 /**
911  * Nonblock version of cl_page_own().
912  *
913  * \see cl_page_own0()
914  */
cl_page_own_try(const struct lu_env * env,struct cl_io * io,struct cl_page * pg)915 int cl_page_own_try(const struct lu_env *env, struct cl_io *io,
916 		    struct cl_page *pg)
917 {
918 	return cl_page_own0(env, io, pg, 1);
919 }
920 EXPORT_SYMBOL(cl_page_own_try);
921 
922 
923 /**
924  * Assume page ownership.
925  *
926  * Called when page is already locked by the hosting VM.
927  *
928  * \pre !cl_page_is_owned(pg, io)
929  * \post cl_page_is_owned(pg, io)
930  *
931  * \see cl_page_operations::cpo_assume()
932  */
cl_page_assume(const struct lu_env * env,struct cl_io * io,struct cl_page * pg)933 void cl_page_assume(const struct lu_env *env,
934 		    struct cl_io *io, struct cl_page *pg)
935 {
936 	PINVRNT(env, pg, cl_object_same(pg->cp_obj, io->ci_obj));
937 
938 	pg = cl_page_top(pg);
939 	io = cl_io_top(io);
940 
941 	cl_page_invoid(env, io, pg, CL_PAGE_OP(cpo_assume));
942 	PASSERT(env, pg, pg->cp_owner == NULL);
943 	pg->cp_owner = io;
944 	pg->cp_task = current;
945 	cl_page_owner_set(pg);
946 	cl_page_state_set(env, pg, CPS_OWNED);
947 }
948 EXPORT_SYMBOL(cl_page_assume);
949 
950 /**
951  * Releases page ownership without unlocking the page.
952  *
953  * Moves page into cl_page_state::CPS_CACHED without releasing a lock on the
954  * underlying VM page (as VM is supposed to do this itself).
955  *
956  * \pre   cl_page_is_owned(pg, io)
957  * \post !cl_page_is_owned(pg, io)
958  *
959  * \see cl_page_assume()
960  */
cl_page_unassume(const struct lu_env * env,struct cl_io * io,struct cl_page * pg)961 void cl_page_unassume(const struct lu_env *env,
962 		      struct cl_io *io, struct cl_page *pg)
963 {
964 	PINVRNT(env, pg, cl_page_is_owned(pg, io));
965 	PINVRNT(env, pg, cl_page_invariant(pg));
966 
967 	pg = cl_page_top(pg);
968 	io = cl_io_top(io);
969 	cl_page_owner_clear(pg);
970 	cl_page_state_set(env, pg, CPS_CACHED);
971 	CL_PAGE_INVOID_REVERSE(env, pg, CL_PAGE_OP(cpo_unassume),
972 			       (const struct lu_env *,
973 				const struct cl_page_slice *, struct cl_io *),
974 			       io);
975 }
976 EXPORT_SYMBOL(cl_page_unassume);
977 
978 /**
979  * Releases page ownership.
980  *
981  * Moves page into cl_page_state::CPS_CACHED.
982  *
983  * \pre   cl_page_is_owned(pg, io)
984  * \post !cl_page_is_owned(pg, io)
985  *
986  * \see cl_page_own()
987  * \see cl_page_operations::cpo_disown()
988  */
cl_page_disown(const struct lu_env * env,struct cl_io * io,struct cl_page * pg)989 void cl_page_disown(const struct lu_env *env,
990 		    struct cl_io *io, struct cl_page *pg)
991 {
992 	PINVRNT(env, pg, cl_page_is_owned(pg, io));
993 
994 	pg = cl_page_top(pg);
995 	io = cl_io_top(io);
996 	cl_page_disown0(env, io, pg);
997 }
998 EXPORT_SYMBOL(cl_page_disown);
999 
1000 /**
1001  * Called when page is to be removed from the object, e.g., as a result of
1002  * truncate.
1003  *
1004  * Calls cl_page_operations::cpo_discard() top-to-bottom.
1005  *
1006  * \pre cl_page_is_owned(pg, io)
1007  *
1008  * \see cl_page_operations::cpo_discard()
1009  */
cl_page_discard(const struct lu_env * env,struct cl_io * io,struct cl_page * pg)1010 void cl_page_discard(const struct lu_env *env,
1011 		     struct cl_io *io, struct cl_page *pg)
1012 {
1013 	PINVRNT(env, pg, cl_page_is_owned(pg, io));
1014 	PINVRNT(env, pg, cl_page_invariant(pg));
1015 
1016 	cl_page_invoid(env, io, pg, CL_PAGE_OP(cpo_discard));
1017 }
1018 EXPORT_SYMBOL(cl_page_discard);
1019 
1020 /**
1021  * Version of cl_page_delete() that can be called for not fully constructed
1022  * pages, e.g,. in a error handling cl_page_find()->cl_page_delete0()
1023  * path. Doesn't check page invariant.
1024  */
cl_page_delete0(const struct lu_env * env,struct cl_page * pg,int radix)1025 static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg,
1026 			    int radix)
1027 {
1028 	struct cl_page *tmp = pg;
1029 
1030 	PASSERT(env, pg, pg == cl_page_top(pg));
1031 	PASSERT(env, pg, pg->cp_state != CPS_FREEING);
1032 
1033 	/*
1034 	 * Severe all ways to obtain new pointers to @pg.
1035 	 */
1036 	cl_page_owner_clear(pg);
1037 
1038 	/*
1039 	 * unexport the page firstly before freeing it so that
1040 	 * the page content is considered to be invalid.
1041 	 * We have to do this because a CPS_FREEING cl_page may
1042 	 * be NOT under the protection of a cl_lock.
1043 	 * Afterwards, if this page is found by other threads, then this
1044 	 * page will be forced to reread.
1045 	 */
1046 	cl_page_export(env, pg, 0);
1047 	cl_page_state_set0(env, pg, CPS_FREEING);
1048 
1049 	CL_PAGE_INVOID(env, pg, CL_PAGE_OP(cpo_delete),
1050 		       (const struct lu_env *, const struct cl_page_slice *));
1051 
1052 	if (tmp->cp_type == CPT_CACHEABLE) {
1053 		if (!radix)
1054 			/* !radix means that @pg is not yet in the radix tree,
1055 			 * skip removing it.
1056 			 */
1057 			tmp = pg->cp_child;
1058 		for (; tmp != NULL; tmp = tmp->cp_child) {
1059 			void		    *value;
1060 			struct cl_object_header *hdr;
1061 
1062 			hdr = cl_object_header(tmp->cp_obj);
1063 			spin_lock(&hdr->coh_page_guard);
1064 			value = radix_tree_delete(&hdr->coh_tree,
1065 						  tmp->cp_index);
1066 			PASSERT(env, tmp, value == tmp);
1067 			PASSERT(env, tmp, hdr->coh_pages > 0);
1068 			hdr->coh_pages--;
1069 			spin_unlock(&hdr->coh_page_guard);
1070 			cl_page_put(env, tmp);
1071 		}
1072 	}
1073 }
1074 
1075 /**
1076  * Called when a decision is made to throw page out of memory.
1077  *
1078  * Notifies all layers about page destruction by calling
1079  * cl_page_operations::cpo_delete() method top-to-bottom.
1080  *
1081  * Moves page into cl_page_state::CPS_FREEING state (this is the only place
1082  * where transition to this state happens).
1083  *
1084  * Eliminates all venues through which new references to the page can be
1085  * obtained:
1086  *
1087  *     - removes page from the radix trees,
1088  *
1089  *     - breaks linkage from VM page to cl_page.
1090  *
1091  * Once page reaches cl_page_state::CPS_FREEING, all remaining references will
1092  * drain after some time, at which point page will be recycled.
1093  *
1094  * \pre  pg == cl_page_top(pg)
1095  * \pre  VM page is locked
1096  * \post pg->cp_state == CPS_FREEING
1097  *
1098  * \see cl_page_operations::cpo_delete()
1099  */
cl_page_delete(const struct lu_env * env,struct cl_page * pg)1100 void cl_page_delete(const struct lu_env *env, struct cl_page *pg)
1101 {
1102 	PINVRNT(env, pg, cl_page_invariant(pg));
1103 	cl_page_delete0(env, pg, 1);
1104 }
1105 EXPORT_SYMBOL(cl_page_delete);
1106 
1107 /**
1108  * Unmaps page from user virtual memory.
1109  *
1110  * Calls cl_page_operations::cpo_unmap() through all layers top-to-bottom. The
1111  * layer responsible for VM interaction has to unmap page from user space
1112  * virtual memory.
1113  *
1114  * \see cl_page_operations::cpo_unmap()
1115  */
cl_page_unmap(const struct lu_env * env,struct cl_io * io,struct cl_page * pg)1116 int cl_page_unmap(const struct lu_env *env,
1117 		  struct cl_io *io, struct cl_page *pg)
1118 {
1119 	PINVRNT(env, pg, cl_page_is_owned(pg, io));
1120 	PINVRNT(env, pg, cl_page_invariant(pg));
1121 
1122 	return cl_page_invoke(env, io, pg, CL_PAGE_OP(cpo_unmap));
1123 }
1124 EXPORT_SYMBOL(cl_page_unmap);
1125 
1126 /**
1127  * Marks page up-to-date.
1128  *
1129  * Call cl_page_operations::cpo_export() through all layers top-to-bottom. The
1130  * layer responsible for VM interaction has to mark/clear page as up-to-date
1131  * by the \a uptodate argument.
1132  *
1133  * \see cl_page_operations::cpo_export()
1134  */
cl_page_export(const struct lu_env * env,struct cl_page * pg,int uptodate)1135 void cl_page_export(const struct lu_env *env, struct cl_page *pg, int uptodate)
1136 {
1137 	PINVRNT(env, pg, cl_page_invariant(pg));
1138 	CL_PAGE_INVOID(env, pg, CL_PAGE_OP(cpo_export),
1139 		       (const struct lu_env *,
1140 			const struct cl_page_slice *, int), uptodate);
1141 }
1142 EXPORT_SYMBOL(cl_page_export);
1143 
1144 /**
1145  * Returns true, iff \a pg is VM locked in a suitable sense by the calling
1146  * thread.
1147  */
cl_page_is_vmlocked(const struct lu_env * env,const struct cl_page * pg)1148 int cl_page_is_vmlocked(const struct lu_env *env, const struct cl_page *pg)
1149 {
1150 	int result;
1151 	const struct cl_page_slice *slice;
1152 
1153 	pg = cl_page_top_trusted((struct cl_page *)pg);
1154 	slice = container_of(pg->cp_layers.next,
1155 			     const struct cl_page_slice, cpl_linkage);
1156 	PASSERT(env, pg, slice->cpl_ops->cpo_is_vmlocked != NULL);
1157 	/*
1158 	 * Call ->cpo_is_vmlocked() directly instead of going through
1159 	 * CL_PAGE_INVOKE(), because cl_page_is_vmlocked() is used by
1160 	 * cl_page_invariant().
1161 	 */
1162 	result = slice->cpl_ops->cpo_is_vmlocked(env, slice);
1163 	PASSERT(env, pg, result == -EBUSY || result == -ENODATA);
1164 	return result == -EBUSY;
1165 }
1166 EXPORT_SYMBOL(cl_page_is_vmlocked);
1167 
cl_req_type_state(enum cl_req_type crt)1168 static enum cl_page_state cl_req_type_state(enum cl_req_type crt)
1169 {
1170 	return crt == CRT_WRITE ? CPS_PAGEOUT : CPS_PAGEIN;
1171 }
1172 
cl_page_io_start(const struct lu_env * env,struct cl_page * pg,enum cl_req_type crt)1173 static void cl_page_io_start(const struct lu_env *env,
1174 			     struct cl_page *pg, enum cl_req_type crt)
1175 {
1176 	/*
1177 	 * Page is queued for IO, change its state.
1178 	 */
1179 	cl_page_owner_clear(pg);
1180 	cl_page_state_set(env, pg, cl_req_type_state(crt));
1181 }
1182 
1183 /**
1184  * Prepares page for immediate transfer. cl_page_operations::cpo_prep() is
1185  * called top-to-bottom. Every layer either agrees to submit this page (by
1186  * returning 0), or requests to omit this page (by returning -EALREADY). Layer
1187  * handling interactions with the VM also has to inform VM that page is under
1188  * transfer now.
1189  */
cl_page_prep(const struct lu_env * env,struct cl_io * io,struct cl_page * pg,enum cl_req_type crt)1190 int cl_page_prep(const struct lu_env *env, struct cl_io *io,
1191 		 struct cl_page *pg, enum cl_req_type crt)
1192 {
1193 	int result;
1194 
1195 	PINVRNT(env, pg, cl_page_is_owned(pg, io));
1196 	PINVRNT(env, pg, cl_page_invariant(pg));
1197 	PINVRNT(env, pg, crt < CRT_NR);
1198 
1199 	/*
1200 	 * XXX this has to be called bottom-to-top, so that llite can set up
1201 	 * PG_writeback without risking other layers deciding to skip this
1202 	 * page.
1203 	 */
1204 	if (crt >= CRT_NR)
1205 		return -EINVAL;
1206 	result = cl_page_invoke(env, io, pg, CL_PAGE_OP(io[crt].cpo_prep));
1207 	if (result == 0)
1208 		cl_page_io_start(env, pg, crt);
1209 
1210 	KLASSERT(ergo(crt == CRT_WRITE && pg->cp_type == CPT_CACHEABLE,
1211 		      equi(result == 0,
1212 			   PageWriteback(cl_page_vmpage(env, pg)))));
1213 	CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", crt, result);
1214 	return result;
1215 }
1216 EXPORT_SYMBOL(cl_page_prep);
1217 
1218 /**
1219  * Notify layers about transfer completion.
1220  *
1221  * Invoked by transfer sub-system (which is a part of osc) to notify layers
1222  * that a transfer, of which this page is a part of has completed.
1223  *
1224  * Completion call-backs are executed in the bottom-up order, so that
1225  * uppermost layer (llite), responsible for the VFS/VM interaction runs last
1226  * and can release locks safely.
1227  *
1228  * \pre  pg->cp_state == CPS_PAGEIN || pg->cp_state == CPS_PAGEOUT
1229  * \post pg->cp_state == CPS_CACHED
1230  *
1231  * \see cl_page_operations::cpo_completion()
1232  */
cl_page_completion(const struct lu_env * env,struct cl_page * pg,enum cl_req_type crt,int ioret)1233 void cl_page_completion(const struct lu_env *env,
1234 			struct cl_page *pg, enum cl_req_type crt, int ioret)
1235 {
1236 	struct cl_sync_io *anchor = pg->cp_sync_io;
1237 
1238 	PASSERT(env, pg, crt < CRT_NR);
1239 	/* cl_page::cp_req already cleared by the caller (osc_completion()) */
1240 	PASSERT(env, pg, pg->cp_req == NULL);
1241 	PASSERT(env, pg, pg->cp_state == cl_req_type_state(crt));
1242 
1243 	CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", crt, ioret);
1244 	if (crt == CRT_READ && ioret == 0) {
1245 		PASSERT(env, pg, !(pg->cp_flags & CPF_READ_COMPLETED));
1246 		pg->cp_flags |= CPF_READ_COMPLETED;
1247 	}
1248 
1249 	cl_page_state_set(env, pg, CPS_CACHED);
1250 	if (crt >= CRT_NR)
1251 		return;
1252 	CL_PAGE_INVOID_REVERSE(env, pg, CL_PAGE_OP(io[crt].cpo_completion),
1253 			       (const struct lu_env *,
1254 				const struct cl_page_slice *, int), ioret);
1255 	if (anchor) {
1256 		LASSERT(cl_page_is_vmlocked(env, pg));
1257 		LASSERT(pg->cp_sync_io == anchor);
1258 		pg->cp_sync_io = NULL;
1259 	}
1260 	/*
1261 	 * As page->cp_obj is pinned by a reference from page->cp_req, it is
1262 	 * safe to call cl_page_put() without risking object destruction in a
1263 	 * non-blocking context.
1264 	 */
1265 	cl_page_put(env, pg);
1266 
1267 	if (anchor)
1268 		cl_sync_io_note(anchor, ioret);
1269 }
1270 EXPORT_SYMBOL(cl_page_completion);
1271 
1272 /**
1273  * Notify layers that transfer formation engine decided to yank this page from
1274  * the cache and to make it a part of a transfer.
1275  *
1276  * \pre  pg->cp_state == CPS_CACHED
1277  * \post pg->cp_state == CPS_PAGEIN || pg->cp_state == CPS_PAGEOUT
1278  *
1279  * \see cl_page_operations::cpo_make_ready()
1280  */
cl_page_make_ready(const struct lu_env * env,struct cl_page * pg,enum cl_req_type crt)1281 int cl_page_make_ready(const struct lu_env *env, struct cl_page *pg,
1282 		       enum cl_req_type crt)
1283 {
1284 	int result;
1285 
1286 	PINVRNT(env, pg, crt < CRT_NR);
1287 
1288 	if (crt >= CRT_NR)
1289 		return -EINVAL;
1290 	result = CL_PAGE_INVOKE(env, pg, CL_PAGE_OP(io[crt].cpo_make_ready),
1291 				(const struct lu_env *,
1292 				 const struct cl_page_slice *));
1293 	if (result == 0) {
1294 		PASSERT(env, pg, pg->cp_state == CPS_CACHED);
1295 		cl_page_io_start(env, pg, crt);
1296 	}
1297 	CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", crt, result);
1298 	return result;
1299 }
1300 EXPORT_SYMBOL(cl_page_make_ready);
1301 
1302 /**
1303  * Notify layers that high level io decided to place this page into a cache
1304  * for future transfer.
1305  *
1306  * The layer implementing transfer engine (osc) has to register this page in
1307  * its queues.
1308  *
1309  * \pre  cl_page_is_owned(pg, io)
1310  * \post cl_page_is_owned(pg, io)
1311  *
1312  * \see cl_page_operations::cpo_cache_add()
1313  */
cl_page_cache_add(const struct lu_env * env,struct cl_io * io,struct cl_page * pg,enum cl_req_type crt)1314 int cl_page_cache_add(const struct lu_env *env, struct cl_io *io,
1315 		      struct cl_page *pg, enum cl_req_type crt)
1316 {
1317 	const struct cl_page_slice *scan;
1318 	int result = 0;
1319 
1320 	PINVRNT(env, pg, crt < CRT_NR);
1321 	PINVRNT(env, pg, cl_page_is_owned(pg, io));
1322 	PINVRNT(env, pg, cl_page_invariant(pg));
1323 
1324 	if (crt >= CRT_NR)
1325 		return -EINVAL;
1326 
1327 	list_for_each_entry(scan, &pg->cp_layers, cpl_linkage) {
1328 		if (scan->cpl_ops->io[crt].cpo_cache_add == NULL)
1329 			continue;
1330 
1331 		result = scan->cpl_ops->io[crt].cpo_cache_add(env, scan, io);
1332 		if (result != 0)
1333 			break;
1334 	}
1335 	CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", crt, result);
1336 	return result;
1337 }
1338 EXPORT_SYMBOL(cl_page_cache_add);
1339 
1340 /**
1341  * Called if a pge is being written back by kernel's intention.
1342  *
1343  * \pre  cl_page_is_owned(pg, io)
1344  * \post ergo(result == 0, pg->cp_state == CPS_PAGEOUT)
1345  *
1346  * \see cl_page_operations::cpo_flush()
1347  */
cl_page_flush(const struct lu_env * env,struct cl_io * io,struct cl_page * pg)1348 int cl_page_flush(const struct lu_env *env, struct cl_io *io,
1349 		  struct cl_page *pg)
1350 {
1351 	int result;
1352 
1353 	PINVRNT(env, pg, cl_page_is_owned(pg, io));
1354 	PINVRNT(env, pg, cl_page_invariant(pg));
1355 
1356 	result = cl_page_invoke(env, io, pg, CL_PAGE_OP(cpo_flush));
1357 
1358 	CL_PAGE_HEADER(D_TRACE, env, pg, "%d\n", result);
1359 	return result;
1360 }
1361 EXPORT_SYMBOL(cl_page_flush);
1362 
1363 /**
1364  * Checks whether page is protected by any extent lock is at least required
1365  * mode.
1366  *
1367  * \return the same as in cl_page_operations::cpo_is_under_lock() method.
1368  * \see cl_page_operations::cpo_is_under_lock()
1369  */
cl_page_is_under_lock(const struct lu_env * env,struct cl_io * io,struct cl_page * page)1370 int cl_page_is_under_lock(const struct lu_env *env, struct cl_io *io,
1371 			  struct cl_page *page)
1372 {
1373 	int rc;
1374 
1375 	PINVRNT(env, page, cl_page_invariant(page));
1376 
1377 	rc = CL_PAGE_INVOKE(env, page, CL_PAGE_OP(cpo_is_under_lock),
1378 			    (const struct lu_env *,
1379 			     const struct cl_page_slice *, struct cl_io *),
1380 			    io);
1381 	PASSERT(env, page, rc != 0);
1382 	return rc;
1383 }
1384 EXPORT_SYMBOL(cl_page_is_under_lock);
1385 
page_prune_cb(const struct lu_env * env,struct cl_io * io,struct cl_page * page,void * cbdata)1386 static int page_prune_cb(const struct lu_env *env, struct cl_io *io,
1387 			 struct cl_page *page, void *cbdata)
1388 {
1389 	cl_page_own(env, io, page);
1390 	cl_page_unmap(env, io, page);
1391 	cl_page_discard(env, io, page);
1392 	cl_page_disown(env, io, page);
1393 	return CLP_GANG_OKAY;
1394 }
1395 
1396 /**
1397  * Purges all cached pages belonging to the object \a obj.
1398  */
cl_pages_prune(const struct lu_env * env,struct cl_object * clobj)1399 int cl_pages_prune(const struct lu_env *env, struct cl_object *clobj)
1400 {
1401 	struct cl_thread_info   *info;
1402 	struct cl_object	*obj = cl_object_top(clobj);
1403 	struct cl_io	    *io;
1404 	int		      result;
1405 
1406 	info  = cl_env_info(env);
1407 	io    = &info->clt_io;
1408 
1409 	/*
1410 	 * initialize the io. This is ugly since we never do IO in this
1411 	 * function, we just make cl_page_list functions happy. -jay
1412 	 */
1413 	io->ci_obj = obj;
1414 	io->ci_ignore_layout = 1;
1415 	result = cl_io_init(env, io, CIT_MISC, obj);
1416 	if (result != 0) {
1417 		cl_io_fini(env, io);
1418 		return io->ci_result;
1419 	}
1420 
1421 	do {
1422 		result = cl_page_gang_lookup(env, obj, io, 0, CL_PAGE_EOF,
1423 					     page_prune_cb, NULL);
1424 		if (result == CLP_GANG_RESCHED)
1425 			cond_resched();
1426 	} while (result != CLP_GANG_OKAY);
1427 
1428 	cl_io_fini(env, io);
1429 	return result;
1430 }
1431 EXPORT_SYMBOL(cl_pages_prune);
1432 
1433 /**
1434  * Tells transfer engine that only part of a page is to be transmitted.
1435  *
1436  * \see cl_page_operations::cpo_clip()
1437  */
cl_page_clip(const struct lu_env * env,struct cl_page * pg,int from,int to)1438 void cl_page_clip(const struct lu_env *env, struct cl_page *pg,
1439 		  int from, int to)
1440 {
1441 	PINVRNT(env, pg, cl_page_invariant(pg));
1442 
1443 	CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", from, to);
1444 	CL_PAGE_INVOID(env, pg, CL_PAGE_OP(cpo_clip),
1445 		       (const struct lu_env *,
1446 			const struct cl_page_slice *,int, int),
1447 		       from, to);
1448 }
1449 EXPORT_SYMBOL(cl_page_clip);
1450 
1451 /**
1452  * Prints human readable representation of \a pg to the \a f.
1453  */
cl_page_header_print(const struct lu_env * env,void * cookie,lu_printer_t printer,const struct cl_page * pg)1454 void cl_page_header_print(const struct lu_env *env, void *cookie,
1455 			  lu_printer_t printer, const struct cl_page *pg)
1456 {
1457 	(*printer)(env, cookie,
1458 		   "page@%p[%d %p:%lu ^%p_%p %d %d %d %p %p %#x]\n",
1459 		   pg, atomic_read(&pg->cp_ref), pg->cp_obj,
1460 		   pg->cp_index, pg->cp_parent, pg->cp_child,
1461 		   pg->cp_state, pg->cp_error, pg->cp_type,
1462 		   pg->cp_owner, pg->cp_req, pg->cp_flags);
1463 }
1464 EXPORT_SYMBOL(cl_page_header_print);
1465 
1466 /**
1467  * Prints human readable representation of \a pg to the \a f.
1468  */
cl_page_print(const struct lu_env * env,void * cookie,lu_printer_t printer,const struct cl_page * pg)1469 void cl_page_print(const struct lu_env *env, void *cookie,
1470 		   lu_printer_t printer, const struct cl_page *pg)
1471 {
1472 	struct cl_page *scan;
1473 
1474 	for (scan = cl_page_top((struct cl_page *)pg);
1475 	     scan != NULL; scan = scan->cp_child)
1476 		cl_page_header_print(env, cookie, printer, scan);
1477 	CL_PAGE_INVOKE(env, (struct cl_page *)pg, CL_PAGE_OP(cpo_print),
1478 		       (const struct lu_env *env,
1479 			const struct cl_page_slice *slice,
1480 			void *cookie, lu_printer_t p), cookie, printer);
1481 	(*printer)(env, cookie, "end page@%p\n", pg);
1482 }
1483 EXPORT_SYMBOL(cl_page_print);
1484 
1485 /**
1486  * Cancel a page which is still in a transfer.
1487  */
cl_page_cancel(const struct lu_env * env,struct cl_page * page)1488 int cl_page_cancel(const struct lu_env *env, struct cl_page *page)
1489 {
1490 	return CL_PAGE_INVOKE(env, page, CL_PAGE_OP(cpo_cancel),
1491 			      (const struct lu_env *,
1492 			       const struct cl_page_slice *));
1493 }
1494 EXPORT_SYMBOL(cl_page_cancel);
1495 
1496 /**
1497  * Converts a byte offset within object \a obj into a page index.
1498  */
cl_offset(const struct cl_object * obj,pgoff_t idx)1499 loff_t cl_offset(const struct cl_object *obj, pgoff_t idx)
1500 {
1501 	/*
1502 	 * XXX for now.
1503 	 */
1504 	return (loff_t)idx << PAGE_CACHE_SHIFT;
1505 }
1506 EXPORT_SYMBOL(cl_offset);
1507 
1508 /**
1509  * Converts a page index into a byte offset within object \a obj.
1510  */
cl_index(const struct cl_object * obj,loff_t offset)1511 pgoff_t cl_index(const struct cl_object *obj, loff_t offset)
1512 {
1513 	/*
1514 	 * XXX for now.
1515 	 */
1516 	return offset >> PAGE_CACHE_SHIFT;
1517 }
1518 EXPORT_SYMBOL(cl_index);
1519 
cl_page_size(const struct cl_object * obj)1520 int cl_page_size(const struct cl_object *obj)
1521 {
1522 	return 1 << PAGE_CACHE_SHIFT;
1523 }
1524 EXPORT_SYMBOL(cl_page_size);
1525 
1526 /**
1527  * Adds page slice to the compound page.
1528  *
1529  * This is called by cl_object_operations::coo_page_init() methods to add a
1530  * per-layer state to the page. New state is added at the end of
1531  * cl_page::cp_layers list, that is, it is at the bottom of the stack.
1532  *
1533  * \see cl_lock_slice_add(), cl_req_slice_add(), cl_io_slice_add()
1534  */
cl_page_slice_add(struct cl_page * page,struct cl_page_slice * slice,struct cl_object * obj,const struct cl_page_operations * ops)1535 void cl_page_slice_add(struct cl_page *page, struct cl_page_slice *slice,
1536 		       struct cl_object *obj,
1537 		       const struct cl_page_operations *ops)
1538 {
1539 	list_add_tail(&slice->cpl_linkage, &page->cp_layers);
1540 	slice->cpl_obj  = obj;
1541 	slice->cpl_ops  = ops;
1542 	slice->cpl_page = page;
1543 }
1544 EXPORT_SYMBOL(cl_page_slice_add);
1545 
cl_page_init(void)1546 int  cl_page_init(void)
1547 {
1548 	return 0;
1549 }
1550 
cl_page_fini(void)1551 void cl_page_fini(void)
1552 {
1553 }
1554