1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/dt_object.c
37  *
38  * Dt Object.
39  * Generic functions from dt_object.h
40  *
41  * Author: Nikita Danilov <nikita@clusterfs.com>
42  */
43 
44 #define DEBUG_SUBSYSTEM S_CLASS
45 
46 #include "../include/obd.h"
47 #include "../include/dt_object.h"
48 #include <linux/list.h>
49 /* fid_be_to_cpu() */
50 #include "../include/lustre_fid.h"
51 
52 #include "../include/lustre_quota.h"
53 
54 /* context key constructor/destructor: dt_global_key_init, dt_global_key_fini */
55 LU_KEY_INIT(dt_global, struct dt_thread_info);
56 LU_KEY_FINI(dt_global, struct dt_thread_info);
57 
58 struct lu_context_key dt_key = {
59 	.lct_tags = LCT_MD_THREAD | LCT_DT_THREAD | LCT_MG_THREAD | LCT_LOCAL,
60 	.lct_init = dt_global_key_init,
61 	.lct_fini = dt_global_key_fini
62 };
63 EXPORT_SYMBOL(dt_key);
64 
65 /* no lock is necessary to protect the list, because call-backs
66  * are added during system startup. Please refer to "struct dt_device".
67  */
dt_txn_callback_add(struct dt_device * dev,struct dt_txn_callback * cb)68 void dt_txn_callback_add(struct dt_device *dev, struct dt_txn_callback *cb)
69 {
70 	list_add(&cb->dtc_linkage, &dev->dd_txn_callbacks);
71 }
72 EXPORT_SYMBOL(dt_txn_callback_add);
73 
dt_txn_callback_del(struct dt_device * dev,struct dt_txn_callback * cb)74 void dt_txn_callback_del(struct dt_device *dev, struct dt_txn_callback *cb)
75 {
76 	list_del_init(&cb->dtc_linkage);
77 }
78 EXPORT_SYMBOL(dt_txn_callback_del);
79 
dt_txn_hook_start(const struct lu_env * env,struct dt_device * dev,struct thandle * th)80 int dt_txn_hook_start(const struct lu_env *env,
81 		      struct dt_device *dev, struct thandle *th)
82 {
83 	int rc = 0;
84 	struct dt_txn_callback *cb;
85 
86 	if (th->th_local)
87 		return 0;
88 
89 	list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) {
90 		if (cb->dtc_txn_start == NULL ||
91 		    !(cb->dtc_tag & env->le_ctx.lc_tags))
92 			continue;
93 		rc = cb->dtc_txn_start(env, th, cb->dtc_cookie);
94 		if (rc < 0)
95 			break;
96 	}
97 	return rc;
98 }
99 EXPORT_SYMBOL(dt_txn_hook_start);
100 
dt_txn_hook_stop(const struct lu_env * env,struct thandle * txn)101 int dt_txn_hook_stop(const struct lu_env *env, struct thandle *txn)
102 {
103 	struct dt_device       *dev = txn->th_dev;
104 	struct dt_txn_callback *cb;
105 	int		     rc = 0;
106 
107 	if (txn->th_local)
108 		return 0;
109 
110 	list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) {
111 		if (cb->dtc_txn_stop == NULL ||
112 		    !(cb->dtc_tag & env->le_ctx.lc_tags))
113 			continue;
114 		rc = cb->dtc_txn_stop(env, txn, cb->dtc_cookie);
115 		if (rc < 0)
116 			break;
117 	}
118 	return rc;
119 }
120 EXPORT_SYMBOL(dt_txn_hook_stop);
121 
dt_txn_hook_commit(struct thandle * txn)122 void dt_txn_hook_commit(struct thandle *txn)
123 {
124 	struct dt_txn_callback *cb;
125 
126 	if (txn->th_local)
127 		return;
128 
129 	list_for_each_entry(cb, &txn->th_dev->dd_txn_callbacks,
130 				dtc_linkage) {
131 		if (cb->dtc_txn_commit)
132 			cb->dtc_txn_commit(txn, cb->dtc_cookie);
133 	}
134 }
135 EXPORT_SYMBOL(dt_txn_hook_commit);
136 
dt_device_init(struct dt_device * dev,struct lu_device_type * t)137 int dt_device_init(struct dt_device *dev, struct lu_device_type *t)
138 {
139 
140 	INIT_LIST_HEAD(&dev->dd_txn_callbacks);
141 	return lu_device_init(&dev->dd_lu_dev, t);
142 }
143 EXPORT_SYMBOL(dt_device_init);
144 
dt_device_fini(struct dt_device * dev)145 void dt_device_fini(struct dt_device *dev)
146 {
147 	lu_device_fini(&dev->dd_lu_dev);
148 }
149 EXPORT_SYMBOL(dt_device_fini);
150 
dt_object_init(struct dt_object * obj,struct lu_object_header * h,struct lu_device * d)151 int dt_object_init(struct dt_object *obj,
152 		   struct lu_object_header *h, struct lu_device *d)
153 
154 {
155 	return lu_object_init(&obj->do_lu, h, d);
156 }
157 EXPORT_SYMBOL(dt_object_init);
158 
dt_object_fini(struct dt_object * obj)159 void dt_object_fini(struct dt_object *obj)
160 {
161 	lu_object_fini(&obj->do_lu);
162 }
163 EXPORT_SYMBOL(dt_object_fini);
164 
dt_try_as_dir(const struct lu_env * env,struct dt_object * obj)165 int dt_try_as_dir(const struct lu_env *env, struct dt_object *obj)
166 {
167 	if (obj->do_index_ops == NULL)
168 		obj->do_ops->do_index_try(env, obj, &dt_directory_features);
169 	return obj->do_index_ops != NULL;
170 }
171 EXPORT_SYMBOL(dt_try_as_dir);
172 
dt_mode_to_dft(__u32 mode)173 enum dt_format_type dt_mode_to_dft(__u32 mode)
174 {
175 	enum dt_format_type result;
176 
177 	switch (mode & S_IFMT) {
178 	case S_IFDIR:
179 		result = DFT_DIR;
180 		break;
181 	case S_IFREG:
182 		result = DFT_REGULAR;
183 		break;
184 	case S_IFLNK:
185 		result = DFT_SYM;
186 		break;
187 	case S_IFCHR:
188 	case S_IFBLK:
189 	case S_IFIFO:
190 	case S_IFSOCK:
191 		result = DFT_NODE;
192 		break;
193 	default:
194 		LBUG();
195 		break;
196 	}
197 	return result;
198 }
199 EXPORT_SYMBOL(dt_mode_to_dft);
200 
201 /**
202  * lookup fid for object named \a name in directory \a dir.
203  */
204 
dt_lookup_dir(const struct lu_env * env,struct dt_object * dir,const char * name,struct lu_fid * fid)205 int dt_lookup_dir(const struct lu_env *env, struct dt_object *dir,
206 		  const char *name, struct lu_fid *fid)
207 {
208 	if (dt_try_as_dir(env, dir))
209 		return dt_lookup(env, dir, (struct dt_rec *)fid,
210 				 (const struct dt_key *)name, BYPASS_CAPA);
211 	return -ENOTDIR;
212 }
213 EXPORT_SYMBOL(dt_lookup_dir);
214 
215 /* this differs from dt_locate by top_dev as parameter
216  * but not one from lu_site */
dt_locate_at(const struct lu_env * env,struct dt_device * dev,const struct lu_fid * fid,struct lu_device * top_dev)217 struct dt_object *dt_locate_at(const struct lu_env *env,
218 			       struct dt_device *dev, const struct lu_fid *fid,
219 			       struct lu_device *top_dev)
220 {
221 	struct lu_object *lo, *n;
222 
223 	lo = lu_object_find_at(env, top_dev, fid, NULL);
224 	if (IS_ERR(lo))
225 		return (void *)lo;
226 
227 	LASSERT(lo != NULL);
228 
229 	list_for_each_entry(n, &lo->lo_header->loh_layers, lo_linkage) {
230 		if (n->lo_dev == &dev->dd_lu_dev)
231 			return container_of0(n, struct dt_object, do_lu);
232 	}
233 	return ERR_PTR(-ENOENT);
234 }
235 EXPORT_SYMBOL(dt_locate_at);
236 
237 /**
238  * find a object named \a entry in given \a dfh->dfh_o directory.
239  */
dt_find_entry(const struct lu_env * env,const char * entry,void * data)240 static int dt_find_entry(const struct lu_env *env, const char *entry, void *data)
241 {
242 	struct dt_find_hint  *dfh = data;
243 	struct dt_device     *dt = dfh->dfh_dt;
244 	struct lu_fid	*fid = dfh->dfh_fid;
245 	struct dt_object     *obj = dfh->dfh_o;
246 	int		   result;
247 
248 	result = dt_lookup_dir(env, obj, entry, fid);
249 	lu_object_put(env, &obj->do_lu);
250 	if (result == 0) {
251 		obj = dt_locate(env, dt, fid);
252 		if (IS_ERR(obj))
253 			result = PTR_ERR(obj);
254 	}
255 	dfh->dfh_o = obj;
256 	return result;
257 }
258 
259 /**
260  * Abstract function which parses path name. This function feeds
261  * path component to \a entry_func.
262  */
dt_path_parser(const struct lu_env * env,char * path,dt_entry_func_t entry_func,void * data)263 int dt_path_parser(const struct lu_env *env,
264 		   char *path, dt_entry_func_t entry_func,
265 		   void *data)
266 {
267 	char *e;
268 	int rc = 0;
269 
270 	while (1) {
271 		e = strsep(&path, "/");
272 		if (e == NULL)
273 			break;
274 
275 		if (e[0] == 0) {
276 			if (!path || path[0] == '\0')
277 				break;
278 			continue;
279 		}
280 		rc = entry_func(env, e, data);
281 		if (rc)
282 			break;
283 	}
284 
285 	return rc;
286 }
287 
288 struct dt_object *
dt_store_resolve(const struct lu_env * env,struct dt_device * dt,const char * path,struct lu_fid * fid)289 dt_store_resolve(const struct lu_env *env, struct dt_device *dt,
290 		 const char *path, struct lu_fid *fid)
291 {
292 	struct dt_thread_info *info = dt_info(env);
293 	struct dt_find_hint   *dfh = &info->dti_dfh;
294 	struct dt_object      *obj;
295 	char		      *local = info->dti_buf;
296 	int		       result;
297 
298 
299 	dfh->dfh_dt = dt;
300 	dfh->dfh_fid = fid;
301 
302 	strncpy(local, path, DT_MAX_PATH);
303 	local[DT_MAX_PATH - 1] = '\0';
304 
305 	result = dt->dd_ops->dt_root_get(env, dt, fid);
306 	if (result == 0) {
307 		obj = dt_locate(env, dt, fid);
308 		if (!IS_ERR(obj)) {
309 			dfh->dfh_o = obj;
310 			result = dt_path_parser(env, local, dt_find_entry, dfh);
311 			if (result != 0)
312 				obj = ERR_PTR(result);
313 			else
314 				obj = dfh->dfh_o;
315 		}
316 	} else {
317 		obj = ERR_PTR(result);
318 	}
319 	return obj;
320 }
321 EXPORT_SYMBOL(dt_store_resolve);
322 
dt_reg_open(const struct lu_env * env,struct dt_device * dt,struct dt_object * p,const char * name,struct lu_fid * fid)323 static struct dt_object *dt_reg_open(const struct lu_env *env,
324 				     struct dt_device *dt,
325 				     struct dt_object *p,
326 				     const char *name,
327 				     struct lu_fid *fid)
328 {
329 	struct dt_object *o;
330 	int result;
331 
332 	result = dt_lookup_dir(env, p, name, fid);
333 	if (result == 0){
334 		o = dt_locate(env, dt, fid);
335 	} else
336 		o = ERR_PTR(result);
337 
338 	return o;
339 }
340 
341 /**
342  * Open dt object named \a filename from \a dirname directory.
343  *      \param  dt      dt device
344  *      \param  fid     on success, object fid is stored in *fid
345  */
dt_store_open(const struct lu_env * env,struct dt_device * dt,const char * dirname,const char * filename,struct lu_fid * fid)346 struct dt_object *dt_store_open(const struct lu_env *env,
347 				struct dt_device *dt,
348 				const char *dirname,
349 				const char *filename,
350 				struct lu_fid *fid)
351 {
352 	struct dt_object *file;
353 	struct dt_object *dir;
354 
355 	dir = dt_store_resolve(env, dt, dirname, fid);
356 	if (!IS_ERR(dir)) {
357 		file = dt_reg_open(env, dt, dir,
358 				   filename, fid);
359 		lu_object_put(env, &dir->do_lu);
360 	} else {
361 		file = dir;
362 	}
363 	return file;
364 }
365 EXPORT_SYMBOL(dt_store_open);
366 
dt_find_or_create(const struct lu_env * env,struct dt_device * dt,const struct lu_fid * fid,struct dt_object_format * dof,struct lu_attr * at)367 struct dt_object *dt_find_or_create(const struct lu_env *env,
368 				    struct dt_device *dt,
369 				    const struct lu_fid *fid,
370 				    struct dt_object_format *dof,
371 				    struct lu_attr *at)
372 {
373 	struct dt_object *dto;
374 	struct thandle *th;
375 	int rc;
376 
377 	dto = dt_locate(env, dt, fid);
378 	if (IS_ERR(dto))
379 		return dto;
380 
381 	LASSERT(dto != NULL);
382 	if (dt_object_exists(dto))
383 		return dto;
384 
385 	th = dt_trans_create(env, dt);
386 	if (IS_ERR(th)) {
387 		rc = PTR_ERR(th);
388 		goto out;
389 	}
390 
391 	rc = dt_declare_create(env, dto, at, NULL, dof, th);
392 	if (rc)
393 		goto trans_stop;
394 
395 	rc = dt_trans_start_local(env, dt, th);
396 	if (rc)
397 		goto trans_stop;
398 
399 	dt_write_lock(env, dto, 0);
400 	if (dt_object_exists(dto)) {
401 		rc = 0;
402 		goto unlock;
403 	}
404 
405 	CDEBUG(D_OTHER, "create new object "DFID"\n", PFID(fid));
406 
407 	rc = dt_create(env, dto, at, NULL, dof, th);
408 	if (rc)
409 		goto unlock;
410 	LASSERT(dt_object_exists(dto));
411 unlock:
412 	dt_write_unlock(env, dto);
413 trans_stop:
414 	dt_trans_stop(env, dt, th);
415 out:
416 	if (rc) {
417 		lu_object_put(env, &dto->do_lu);
418 		return ERR_PTR(rc);
419 	}
420 	return dto;
421 }
422 EXPORT_SYMBOL(dt_find_or_create);
423 
424 /* dt class init function. */
dt_global_init(void)425 int dt_global_init(void)
426 {
427 	LU_CONTEXT_KEY_INIT(&dt_key);
428 	return lu_context_key_register(&dt_key);
429 }
430 
dt_global_fini(void)431 void dt_global_fini(void)
432 {
433 	lu_context_key_degister(&dt_key);
434 }
435 
436 /**
437  * Generic read helper. May return an error for partial reads.
438  *
439  * \param env  lustre environment
440  * \param dt   object to be read
441  * \param buf  lu_buf to be filled, with buffer pointer and length
442  * \param pos position to start reading, updated as data is read
443  *
444  * \retval real size of data read
445  * \retval -ve errno on failure
446  */
dt_read(const struct lu_env * env,struct dt_object * dt,struct lu_buf * buf,loff_t * pos)447 int dt_read(const struct lu_env *env, struct dt_object *dt,
448 	    struct lu_buf *buf, loff_t *pos)
449 {
450 	LASSERTF(dt != NULL, "dt is NULL when we want to read record\n");
451 	return dt->do_body_ops->dbo_read(env, dt, buf, pos, BYPASS_CAPA);
452 }
453 EXPORT_SYMBOL(dt_read);
454 
455 /**
456  * Read structures of fixed size from storage.  Unlike dt_read(), using
457  * dt_record_read() will return an error for partial reads.
458  *
459  * \param env  lustre environment
460  * \param dt   object to be read
461  * \param buf  lu_buf to be filled, with buffer pointer and length
462  * \param pos position to start reading, updated as data is read
463  *
464  * \retval 0 on successfully reading full buffer
465  * \retval -EFAULT on short read
466  * \retval -ve errno on failure
467  */
dt_record_read(const struct lu_env * env,struct dt_object * dt,struct lu_buf * buf,loff_t * pos)468 int dt_record_read(const struct lu_env *env, struct dt_object *dt,
469 		   struct lu_buf *buf, loff_t *pos)
470 {
471 	int rc;
472 
473 	LASSERTF(dt != NULL, "dt is NULL when we want to read record\n");
474 
475 	rc = dt->do_body_ops->dbo_read(env, dt, buf, pos, BYPASS_CAPA);
476 
477 	if (rc == buf->lb_len)
478 		rc = 0;
479 	else if (rc >= 0)
480 		rc = -EFAULT;
481 	return rc;
482 }
483 EXPORT_SYMBOL(dt_record_read);
484 
dt_record_write(const struct lu_env * env,struct dt_object * dt,const struct lu_buf * buf,loff_t * pos,struct thandle * th)485 int dt_record_write(const struct lu_env *env, struct dt_object *dt,
486 		    const struct lu_buf *buf, loff_t *pos, struct thandle *th)
487 {
488 	int rc;
489 
490 	LASSERTF(dt != NULL, "dt is NULL when we want to write record\n");
491 	LASSERT(th != NULL);
492 	LASSERT(dt->do_body_ops);
493 	LASSERT(dt->do_body_ops->dbo_write);
494 	rc = dt->do_body_ops->dbo_write(env, dt, buf, pos, th, BYPASS_CAPA, 1);
495 	if (rc == buf->lb_len)
496 		rc = 0;
497 	else if (rc >= 0)
498 		rc = -EFAULT;
499 	return rc;
500 }
501 EXPORT_SYMBOL(dt_record_write);
502 
dt_declare_version_set(const struct lu_env * env,struct dt_object * o,struct thandle * th)503 int dt_declare_version_set(const struct lu_env *env, struct dt_object *o,
504 			   struct thandle *th)
505 {
506 	struct lu_buf vbuf;
507 	char *xname = XATTR_NAME_VERSION;
508 
509 	LASSERT(o);
510 	vbuf.lb_buf = NULL;
511 	vbuf.lb_len = sizeof(dt_obj_version_t);
512 	return dt_declare_xattr_set(env, o, &vbuf, xname, 0, th);
513 
514 }
515 EXPORT_SYMBOL(dt_declare_version_set);
516 
dt_version_set(const struct lu_env * env,struct dt_object * o,dt_obj_version_t version,struct thandle * th)517 void dt_version_set(const struct lu_env *env, struct dt_object *o,
518 		    dt_obj_version_t version, struct thandle *th)
519 {
520 	struct lu_buf vbuf;
521 	char *xname = XATTR_NAME_VERSION;
522 	int rc;
523 
524 	LASSERT(o);
525 	vbuf.lb_buf = &version;
526 	vbuf.lb_len = sizeof(version);
527 
528 	rc = dt_xattr_set(env, o, &vbuf, xname, 0, th, BYPASS_CAPA);
529 	if (rc < 0)
530 		CDEBUG(D_INODE, "Can't set version, rc %d\n", rc);
531 	return;
532 }
533 EXPORT_SYMBOL(dt_version_set);
534 
dt_version_get(const struct lu_env * env,struct dt_object * o)535 dt_obj_version_t dt_version_get(const struct lu_env *env, struct dt_object *o)
536 {
537 	struct lu_buf vbuf;
538 	char *xname = XATTR_NAME_VERSION;
539 	dt_obj_version_t version;
540 	int rc;
541 
542 	LASSERT(o);
543 	vbuf.lb_buf = &version;
544 	vbuf.lb_len = sizeof(version);
545 	rc = dt_xattr_get(env, o, &vbuf, xname, BYPASS_CAPA);
546 	if (rc != sizeof(version)) {
547 		CDEBUG(D_INODE, "Can't get version, rc %d\n", rc);
548 		version = 0;
549 	}
550 	return version;
551 }
552 EXPORT_SYMBOL(dt_version_get);
553 
554 /* list of all supported index types */
555 
556 /* directories */
557 const struct dt_index_features dt_directory_features;
558 EXPORT_SYMBOL(dt_directory_features);
559 
560 /* scrub iterator */
561 const struct dt_index_features dt_otable_features;
562 EXPORT_SYMBOL(dt_otable_features);
563 
564 /* lfsck */
565 const struct dt_index_features dt_lfsck_features = {
566 	.dif_flags		= DT_IND_UPDATE,
567 	.dif_keysize_min	= sizeof(struct lu_fid),
568 	.dif_keysize_max	= sizeof(struct lu_fid),
569 	.dif_recsize_min	= sizeof(__u8),
570 	.dif_recsize_max	= sizeof(__u8),
571 	.dif_ptrsize		= 4
572 };
573 EXPORT_SYMBOL(dt_lfsck_features);
574 
575 /* accounting indexes */
576 const struct dt_index_features dt_acct_features = {
577 	.dif_flags		= DT_IND_UPDATE,
578 	.dif_keysize_min	= sizeof(__u64), /* 64-bit uid/gid */
579 	.dif_keysize_max	= sizeof(__u64), /* 64-bit uid/gid */
580 	.dif_recsize_min	= sizeof(struct lquota_acct_rec), /* 16 bytes */
581 	.dif_recsize_max	= sizeof(struct lquota_acct_rec), /* 16 bytes */
582 	.dif_ptrsize		= 4
583 };
584 EXPORT_SYMBOL(dt_acct_features);
585 
586 /* global quota files */
587 const struct dt_index_features dt_quota_glb_features = {
588 	.dif_flags		= DT_IND_UPDATE,
589 	/* a different key would have to be used for per-directory quota */
590 	.dif_keysize_min	= sizeof(__u64), /* 64-bit uid/gid */
591 	.dif_keysize_max	= sizeof(__u64), /* 64-bit uid/gid */
592 	.dif_recsize_min	= sizeof(struct lquota_glb_rec), /* 32 bytes */
593 	.dif_recsize_max	= sizeof(struct lquota_glb_rec), /* 32 bytes */
594 	.dif_ptrsize		= 4
595 };
596 EXPORT_SYMBOL(dt_quota_glb_features);
597 
598 /* slave quota files */
599 const struct dt_index_features dt_quota_slv_features = {
600 	.dif_flags		= DT_IND_UPDATE,
601 	/* a different key would have to be used for per-directory quota */
602 	.dif_keysize_min	= sizeof(__u64), /* 64-bit uid/gid */
603 	.dif_keysize_max	= sizeof(__u64), /* 64-bit uid/gid */
604 	.dif_recsize_min	= sizeof(struct lquota_slv_rec), /* 8 bytes */
605 	.dif_recsize_max	= sizeof(struct lquota_slv_rec), /* 8 bytes */
606 	.dif_ptrsize		= 4
607 };
608 EXPORT_SYMBOL(dt_quota_slv_features);
609 
610 /* helper function returning what dt_index_features structure should be used
611  * based on the FID sequence. This is used by OBD_IDX_READ RPC */
dt_index_feat_select(__u64 seq,__u32 mode)612 static inline const struct dt_index_features *dt_index_feat_select(__u64 seq,
613 								   __u32 mode)
614 {
615 	if (seq == FID_SEQ_QUOTA_GLB) {
616 		/* global quota index */
617 		if (!S_ISREG(mode))
618 			/* global quota index should be a regular file */
619 			return ERR_PTR(-ENOENT);
620 		return &dt_quota_glb_features;
621 	} else if (seq == FID_SEQ_QUOTA) {
622 		/* quota slave index */
623 		if (!S_ISREG(mode))
624 			/* slave index should be a regular file */
625 			return ERR_PTR(-ENOENT);
626 		return &dt_quota_slv_features;
627 	} else if (seq >= FID_SEQ_NORMAL) {
628 		/* object is part of the namespace, verify that it is a
629 		 * directory */
630 		if (!S_ISDIR(mode))
631 			/* sorry, we can only deal with directory */
632 			return ERR_PTR(-ENOTDIR);
633 		return &dt_directory_features;
634 	}
635 
636 	return ERR_PTR(-EOPNOTSUPP);
637 }
638 
639 /*
640  * Fill a lu_idxpage with key/record pairs read for transfer via OBD_IDX_READ
641  * RPC
642  *
643  * \param env - is the environment passed by the caller
644  * \param lp  - is a pointer to the lu_page to fill
645  * \param nob - is the maximum number of bytes that should be copied
646  * \param iops - is the index operation vector associated with the index object
647  * \param it   - is a pointer to the current iterator
648  * \param attr - is the index attribute to pass to iops->rec()
649  * \param arg  - is a pointer to the idx_info structure
650  */
dt_index_page_build(const struct lu_env * env,union lu_page * lp,int nob,const struct dt_it_ops * iops,struct dt_it * it,__u32 attr,void * arg)651 static int dt_index_page_build(const struct lu_env *env, union lu_page *lp,
652 			       int nob, const struct dt_it_ops *iops,
653 			       struct dt_it *it, __u32 attr, void *arg)
654 {
655 	struct idx_info		*ii = (struct idx_info *)arg;
656 	struct lu_idxpage	*lip = &lp->lp_idx;
657 	char			*entry;
658 	int			 rc, size;
659 
660 	/* no support for variable key & record size for now */
661 	LASSERT((ii->ii_flags & II_FL_VARKEY) == 0);
662 	LASSERT((ii->ii_flags & II_FL_VARREC) == 0);
663 
664 	/* initialize the header of the new container */
665 	memset(lip, 0, LIP_HDR_SIZE);
666 	lip->lip_magic = LIP_MAGIC;
667 	nob	   -= LIP_HDR_SIZE;
668 
669 	/* compute size needed to store a key/record pair */
670 	size = ii->ii_recsize + ii->ii_keysize;
671 	if ((ii->ii_flags & II_FL_NOHASH) == 0)
672 		/* add hash if the client wants it */
673 		size += sizeof(__u64);
674 
675 	entry = lip->lip_entries;
676 	do {
677 		char		*tmp_entry = entry;
678 		struct dt_key	*key;
679 		__u64		 hash;
680 
681 		/* fetch 64-bit hash value */
682 		hash = iops->store(env, it);
683 		ii->ii_hash_end = hash;
684 
685 		if (OBD_FAIL_CHECK(OBD_FAIL_OBD_IDX_READ_BREAK)) {
686 			if (lip->lip_nr != 0) {
687 				rc = 0;
688 				goto out;
689 			}
690 		}
691 
692 		if (nob < size) {
693 			if (lip->lip_nr == 0)
694 				rc = -EINVAL;
695 			else
696 				rc = 0;
697 			goto out;
698 		}
699 
700 		if ((ii->ii_flags & II_FL_NOHASH) == 0) {
701 			/* client wants to the 64-bit hash value associated with
702 			 * each record */
703 			memcpy(tmp_entry, &hash, sizeof(hash));
704 			tmp_entry += sizeof(hash);
705 		}
706 
707 		/* then the key value */
708 		LASSERT(iops->key_size(env, it) == ii->ii_keysize);
709 		key = iops->key(env, it);
710 		memcpy(tmp_entry, key, ii->ii_keysize);
711 		tmp_entry += ii->ii_keysize;
712 
713 		/* and finally the record */
714 		rc = iops->rec(env, it, (struct dt_rec *)tmp_entry, attr);
715 		if (rc != -ESTALE) {
716 			if (rc != 0)
717 				goto out;
718 
719 			/* hash/key/record successfully copied! */
720 			lip->lip_nr++;
721 			if (unlikely(lip->lip_nr == 1 && ii->ii_count == 0))
722 				ii->ii_hash_start = hash;
723 			entry = tmp_entry + ii->ii_recsize;
724 			nob -= size;
725 		}
726 
727 		/* move on to the next record */
728 		do {
729 			rc = iops->next(env, it);
730 		} while (rc == -ESTALE);
731 
732 	} while (rc == 0);
733 
734 	goto out;
735 out:
736 	if (rc >= 0 && lip->lip_nr > 0)
737 		/* one more container */
738 		ii->ii_count++;
739 	if (rc > 0)
740 		/* no more entries */
741 		ii->ii_hash_end = II_END_OFF;
742 	return rc;
743 }
744 
745 /*
746  * Walk index and fill lu_page containers with key/record pairs
747  *
748  * \param env - is the environment passed by the caller
749  * \param obj - is the index object to parse
750  * \param rdpg - is the lu_rdpg descriptor associated with the transfer
751  * \param filler - is the callback function responsible for filling a lu_page
752  *		 with key/record pairs in the format wanted by the caller
753  * \param arg    - is an opaq argument passed to the filler function
754  *
755  * \retval sum (in bytes) of all filled lu_pages
756  * \retval -ve errno on failure
757  */
dt_index_walk(const struct lu_env * env,struct dt_object * obj,const struct lu_rdpg * rdpg,dt_index_page_build_t filler,void * arg)758 int dt_index_walk(const struct lu_env *env, struct dt_object *obj,
759 		  const struct lu_rdpg *rdpg, dt_index_page_build_t filler,
760 		  void *arg)
761 {
762 	struct dt_it		*it;
763 	const struct dt_it_ops	*iops;
764 	unsigned int		 pageidx, nob, nlupgs = 0;
765 	int			 rc;
766 
767 	LASSERT(rdpg->rp_pages != NULL);
768 	LASSERT(obj->do_index_ops != NULL);
769 
770 	nob = rdpg->rp_count;
771 	if (nob <= 0)
772 		return -EFAULT;
773 
774 	/* Iterate through index and fill containers from @rdpg */
775 	iops = &obj->do_index_ops->dio_it;
776 	LASSERT(iops != NULL);
777 	it = iops->init(env, obj, rdpg->rp_attrs, BYPASS_CAPA);
778 	if (IS_ERR(it))
779 		return PTR_ERR(it);
780 
781 	rc = iops->load(env, it, rdpg->rp_hash);
782 	if (rc == 0) {
783 		/*
784 		 * Iterator didn't find record with exactly the key requested.
785 		 *
786 		 * It is currently either
787 		 *
788 		 *     - positioned above record with key less than
789 		 *     requested---skip it.
790 		 *     - or not positioned at all (is in IAM_IT_SKEWED
791 		 *     state)---position it on the next item.
792 		 */
793 		rc = iops->next(env, it);
794 	} else if (rc > 0) {
795 		rc = 0;
796 	}
797 
798 	/* Fill containers one after the other. There might be multiple
799 	 * containers per physical page.
800 	 *
801 	 * At this point and across for-loop:
802 	 *  rc == 0 -> ok, proceed.
803 	 *  rc >  0 -> end of index.
804 	 *  rc <  0 -> error. */
805 	for (pageidx = 0; rc == 0 && nob > 0; pageidx++) {
806 		union lu_page	*lp;
807 		int		 i;
808 
809 		LASSERT(pageidx < rdpg->rp_npages);
810 		lp = kmap(rdpg->rp_pages[pageidx]);
811 
812 		/* fill lu pages */
813 		for (i = 0; i < LU_PAGE_COUNT; i++, lp++, nob -= LU_PAGE_SIZE) {
814 			rc = filler(env, lp, min_t(int, nob, LU_PAGE_SIZE),
815 				    iops, it, rdpg->rp_attrs, arg);
816 			if (rc < 0)
817 				break;
818 			/* one more lu_page */
819 			nlupgs++;
820 			if (rc > 0)
821 				/* end of index */
822 				break;
823 		}
824 		kunmap(rdpg->rp_pages[i]);
825 	}
826 
827 	iops->put(env, it);
828 	iops->fini(env, it);
829 
830 	if (rc >= 0)
831 		rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
832 
833 	return rc;
834 }
835 EXPORT_SYMBOL(dt_index_walk);
836 
837 /**
838  * Walk key/record pairs of an index and copy them into 4KB containers to be
839  * transferred over the network. This is the common handler for OBD_IDX_READ
840  * RPC processing.
841  *
842  * \param env - is the environment passed by the caller
843  * \param dev - is the dt_device storing the index
844  * \param ii  - is the idx_info structure packed by the client in the
845  *	      OBD_IDX_READ request
846  * \param rdpg - is the lu_rdpg descriptor
847  *
848  * \retval on success, return sum (in bytes) of all filled containers
849  * \retval appropriate error otherwise.
850  */
dt_index_read(const struct lu_env * env,struct dt_device * dev,struct idx_info * ii,const struct lu_rdpg * rdpg)851 int dt_index_read(const struct lu_env *env, struct dt_device *dev,
852 		  struct idx_info *ii, const struct lu_rdpg *rdpg)
853 {
854 	const struct dt_index_features	*feat;
855 	struct dt_object		*obj;
856 	int				 rc;
857 
858 	/* rp_count shouldn't be null and should be a multiple of the container
859 	 * size */
860 	if (rdpg->rp_count <= 0 && (rdpg->rp_count & (LU_PAGE_SIZE - 1)) != 0)
861 		return -EFAULT;
862 
863 	if (fid_seq(&ii->ii_fid) >= FID_SEQ_NORMAL)
864 		/* we don't support directory transfer via OBD_IDX_READ for the
865 		 * time being */
866 		return -EOPNOTSUPP;
867 
868 	if (!fid_is_quota(&ii->ii_fid))
869 		/* block access to all local files except quota files */
870 		return -EPERM;
871 
872 	/* lookup index object subject to the transfer */
873 	obj = dt_locate(env, dev, &ii->ii_fid);
874 	if (IS_ERR(obj))
875 		return PTR_ERR(obj);
876 	if (dt_object_exists(obj) == 0) {
877 		rc = -ENOENT;
878 		goto out;
879 	}
880 
881 	/* fetch index features associated with index object */
882 	feat = dt_index_feat_select(fid_seq(&ii->ii_fid),
883 				    lu_object_attr(&obj->do_lu));
884 	if (IS_ERR(feat)) {
885 		rc = PTR_ERR(feat);
886 		goto out;
887 	}
888 
889 	/* load index feature if not done already */
890 	if (obj->do_index_ops == NULL) {
891 		rc = obj->do_ops->do_index_try(env, obj, feat);
892 		if (rc)
893 			goto out;
894 	}
895 
896 	/* fill ii_flags with supported index features */
897 	ii->ii_flags &= II_FL_NOHASH;
898 
899 	ii->ii_keysize = feat->dif_keysize_max;
900 	if ((feat->dif_flags & DT_IND_VARKEY) != 0) {
901 		/* key size is variable */
902 		ii->ii_flags |= II_FL_VARKEY;
903 		/* we don't support variable key size for the time being */
904 		rc = -EOPNOTSUPP;
905 		goto out;
906 	}
907 
908 	ii->ii_recsize = feat->dif_recsize_max;
909 	if ((feat->dif_flags & DT_IND_VARREC) != 0) {
910 		/* record size is variable */
911 		ii->ii_flags |= II_FL_VARREC;
912 		/* we don't support variable record size for the time being */
913 		rc = -EOPNOTSUPP;
914 		goto out;
915 	}
916 
917 	if ((feat->dif_flags & DT_IND_NONUNQ) != 0)
918 		/* key isn't necessarily unique */
919 		ii->ii_flags |= II_FL_NONUNQ;
920 
921 	dt_read_lock(env, obj, 0);
922 	/* fetch object version before walking the index */
923 	ii->ii_version = dt_version_get(env, obj);
924 
925 	/* walk the index and fill lu_idxpages with key/record pairs */
926 	rc = dt_index_walk(env, obj, rdpg, dt_index_page_build ,ii);
927 	dt_read_unlock(env, obj);
928 
929 	if (rc == 0) {
930 		/* index is empty */
931 		LASSERT(ii->ii_count == 0);
932 		ii->ii_hash_end = II_END_OFF;
933 	}
934 
935 	goto out;
936 out:
937 	lu_object_put(env, &obj->do_lu);
938 	return rc;
939 }
940 EXPORT_SYMBOL(dt_index_read);
941 
942 #if defined (CONFIG_PROC_FS)
943 
lprocfs_dt_rd_blksize(char * page,char ** start,off_t off,int count,int * eof,void * data)944 int lprocfs_dt_rd_blksize(char *page, char **start, off_t off,
945 			  int count, int *eof, void *data)
946 {
947 	struct dt_device *dt = data;
948 	struct obd_statfs osfs;
949 	int rc = dt_statfs(NULL, dt, &osfs);
950 
951 	if (rc == 0) {
952 		*eof = 1;
953 		rc = snprintf(page, count, "%u\n",
954 				(unsigned) osfs.os_bsize);
955 	}
956 
957 	return rc;
958 }
959 EXPORT_SYMBOL(lprocfs_dt_rd_blksize);
960 
lprocfs_dt_rd_kbytestotal(char * page,char ** start,off_t off,int count,int * eof,void * data)961 int lprocfs_dt_rd_kbytestotal(char *page, char **start, off_t off,
962 			      int count, int *eof, void *data)
963 {
964 	struct dt_device *dt = data;
965 	struct obd_statfs osfs;
966 	int rc = dt_statfs(NULL, dt, &osfs);
967 
968 	if (rc == 0) {
969 		__u32 blk_size = osfs.os_bsize >> 10;
970 		__u64 result = osfs.os_blocks;
971 
972 		while (blk_size >>= 1)
973 			result <<= 1;
974 
975 		*eof = 1;
976 		rc = snprintf(page, count, "%llu\n", result);
977 	}
978 
979 	return rc;
980 }
981 EXPORT_SYMBOL(lprocfs_dt_rd_kbytestotal);
982 
lprocfs_dt_rd_kbytesfree(char * page,char ** start,off_t off,int count,int * eof,void * data)983 int lprocfs_dt_rd_kbytesfree(char *page, char **start, off_t off,
984 			     int count, int *eof, void *data)
985 {
986 	struct dt_device *dt = data;
987 	struct obd_statfs osfs;
988 	int rc = dt_statfs(NULL, dt, &osfs);
989 
990 	if (rc == 0) {
991 		__u32 blk_size = osfs.os_bsize >> 10;
992 		__u64 result = osfs.os_bfree;
993 
994 		while (blk_size >>= 1)
995 			result <<= 1;
996 
997 		*eof = 1;
998 		rc = snprintf(page, count, "%llu\n", result);
999 	}
1000 
1001 	return rc;
1002 }
1003 EXPORT_SYMBOL(lprocfs_dt_rd_kbytesfree);
1004 
lprocfs_dt_rd_kbytesavail(char * page,char ** start,off_t off,int count,int * eof,void * data)1005 int lprocfs_dt_rd_kbytesavail(char *page, char **start, off_t off,
1006 			      int count, int *eof, void *data)
1007 {
1008 	struct dt_device *dt = data;
1009 	struct obd_statfs osfs;
1010 	int rc = dt_statfs(NULL, dt, &osfs);
1011 
1012 	if (rc == 0) {
1013 		__u32 blk_size = osfs.os_bsize >> 10;
1014 		__u64 result = osfs.os_bavail;
1015 
1016 		while (blk_size >>= 1)
1017 			result <<= 1;
1018 
1019 		*eof = 1;
1020 		rc = snprintf(page, count, "%llu\n", result);
1021 	}
1022 
1023 	return rc;
1024 }
1025 EXPORT_SYMBOL(lprocfs_dt_rd_kbytesavail);
1026 
lprocfs_dt_rd_filestotal(char * page,char ** start,off_t off,int count,int * eof,void * data)1027 int lprocfs_dt_rd_filestotal(char *page, char **start, off_t off,
1028 			     int count, int *eof, void *data)
1029 {
1030 	struct dt_device *dt = data;
1031 	struct obd_statfs osfs;
1032 	int rc = dt_statfs(NULL, dt, &osfs);
1033 
1034 	if (rc == 0) {
1035 		*eof = 1;
1036 		rc = snprintf(page, count, "%llu\n", osfs.os_files);
1037 	}
1038 
1039 	return rc;
1040 }
1041 EXPORT_SYMBOL(lprocfs_dt_rd_filestotal);
1042 
lprocfs_dt_rd_filesfree(char * page,char ** start,off_t off,int count,int * eof,void * data)1043 int lprocfs_dt_rd_filesfree(char *page, char **start, off_t off,
1044 			    int count, int *eof, void *data)
1045 {
1046 	struct dt_device *dt = data;
1047 	struct obd_statfs osfs;
1048 	int rc = dt_statfs(NULL, dt, &osfs);
1049 
1050 	if (rc == 0) {
1051 		*eof = 1;
1052 		rc = snprintf(page, count, "%llu\n", osfs.os_ffree);
1053 	}
1054 
1055 	return rc;
1056 }
1057 EXPORT_SYMBOL(lprocfs_dt_rd_filesfree);
1058 
1059 #endif /* CONFIG_PROC_FS */
1060