1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36 
37 #define DEBUG_SUBSYSTEM S_LMV
38 #include <linux/slab.h>
39 #include <linux/module.h>
40 #include <linux/init.h>
41 #include <linux/pagemap.h>
42 #include <linux/mm.h>
43 #include <asm/div64.h>
44 #include <linux/seq_file.h>
45 #include <linux/namei.h>
46 #include <linux/uaccess.h>
47 
48 #include "../include/lustre/lustre_idl.h"
49 #include "../include/obd_support.h"
50 #include "../include/lustre_lib.h"
51 #include "../include/lustre_net.h"
52 #include "../include/obd_class.h"
53 #include "../include/lprocfs_status.h"
54 #include "../include/lustre_lite.h"
55 #include "../include/lustre_fid.h"
56 #include "lmv_internal.h"
57 
lmv_activate_target(struct lmv_obd * lmv,struct lmv_tgt_desc * tgt,int activate)58 static void lmv_activate_target(struct lmv_obd *lmv,
59 				struct lmv_tgt_desc *tgt,
60 				int activate)
61 {
62 	if (tgt->ltd_active == activate)
63 		return;
64 
65 	tgt->ltd_active = activate;
66 	lmv->desc.ld_active_tgt_count += (activate ? 1 : -1);
67 }
68 
69 /**
70  * Error codes:
71  *
72  *  -EINVAL  : UUID can't be found in the LMV's target list
73  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
74  *  -EBADF   : The UUID is found, but the OBD of the wrong type (!)
75  */
lmv_set_mdc_active(struct lmv_obd * lmv,struct obd_uuid * uuid,int activate)76 static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
77 			      int activate)
78 {
79 	struct lmv_tgt_desc    *uninitialized_var(tgt);
80 	struct obd_device      *obd;
81 	int		     i;
82 	int		     rc = 0;
83 
84 	CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n",
85 	       lmv, uuid->uuid, activate);
86 
87 	spin_lock(&lmv->lmv_lock);
88 	for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
89 		tgt = lmv->tgts[i];
90 		if (tgt == NULL || tgt->ltd_exp == NULL)
91 			continue;
92 
93 		CDEBUG(D_INFO, "Target idx %d is %s conn %#llx\n", i,
94 		       tgt->ltd_uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
95 
96 		if (obd_uuid_equals(uuid, &tgt->ltd_uuid))
97 			break;
98 	}
99 
100 	if (i == lmv->desc.ld_tgt_count) {
101 		rc = -EINVAL;
102 		goto out_lmv_lock;
103 	}
104 
105 	obd = class_exp2obd(tgt->ltd_exp);
106 	if (obd == NULL) {
107 		rc = -ENOTCONN;
108 		goto out_lmv_lock;
109 	}
110 
111 	CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LMV idx %d\n",
112 	       obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
113 	       obd->obd_type->typ_name, i);
114 	LASSERT(strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) == 0);
115 
116 	if (tgt->ltd_active == activate) {
117 		CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
118 		       activate ? "" : "in");
119 		goto out_lmv_lock;
120 	}
121 
122 	CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd,
123 	       activate ? "" : "in");
124 	lmv_activate_target(lmv, tgt, activate);
125 
126  out_lmv_lock:
127 	spin_unlock(&lmv->lmv_lock);
128 	return rc;
129 }
130 
lmv_get_uuid(struct obd_export * exp)131 static struct obd_uuid *lmv_get_uuid(struct obd_export *exp)
132 {
133 	struct lmv_obd *lmv = &exp->exp_obd->u.lmv;
134 
135 	return obd_get_uuid(lmv->tgts[0]->ltd_exp);
136 }
137 
lmv_notify(struct obd_device * obd,struct obd_device * watched,enum obd_notify_event ev,void * data)138 static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
139 		      enum obd_notify_event ev, void *data)
140 {
141 	struct obd_connect_data *conn_data;
142 	struct lmv_obd	  *lmv = &obd->u.lmv;
143 	struct obd_uuid	 *uuid;
144 	int		      rc = 0;
145 
146 	if (strcmp(watched->obd_type->typ_name, LUSTRE_MDC_NAME)) {
147 		CERROR("unexpected notification of %s %s!\n",
148 		       watched->obd_type->typ_name,
149 		       watched->obd_name);
150 		return -EINVAL;
151 	}
152 
153 	uuid = &watched->u.cli.cl_target_uuid;
154 	if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE) {
155 		/*
156 		 * Set MDC as active before notifying the observer, so the
157 		 * observer can use the MDC normally.
158 		 */
159 		rc = lmv_set_mdc_active(lmv, uuid,
160 					ev == OBD_NOTIFY_ACTIVE);
161 		if (rc) {
162 			CERROR("%sactivation of %s failed: %d\n",
163 			       ev == OBD_NOTIFY_ACTIVE ? "" : "de",
164 			       uuid->uuid, rc);
165 			return rc;
166 		}
167 	} else if (ev == OBD_NOTIFY_OCD) {
168 		conn_data = &watched->u.cli.cl_import->imp_connect_data;
169 		/*
170 		 * XXX: Make sure that ocd_connect_flags from all targets are
171 		 * the same. Otherwise one of MDTs runs wrong version or
172 		 * something like this.  --umka
173 		 */
174 		obd->obd_self_export->exp_connect_data = *conn_data;
175 	}
176 #if 0
177 	else if (ev == OBD_NOTIFY_DISCON) {
178 		/*
179 		 * For disconnect event, flush fld cache for failout MDS case.
180 		 */
181 		fld_client_flush(&lmv->lmv_fld);
182 	}
183 #endif
184 	/*
185 	 * Pass the notification up the chain.
186 	 */
187 	if (obd->obd_observer)
188 		rc = obd_notify(obd->obd_observer, watched, ev, data);
189 
190 	return rc;
191 }
192 
193 /**
194  * This is fake connect function. Its purpose is to initialize lmv and say
195  * caller that everything is okay. Real connection will be performed later.
196  */
lmv_connect(const struct lu_env * env,struct obd_export ** exp,struct obd_device * obd,struct obd_uuid * cluuid,struct obd_connect_data * data,void * localdata)197 static int lmv_connect(const struct lu_env *env,
198 		       struct obd_export **exp, struct obd_device *obd,
199 		       struct obd_uuid *cluuid, struct obd_connect_data *data,
200 		       void *localdata)
201 {
202 	struct proc_dir_entry *lmv_proc_dir;
203 	struct lmv_obd	*lmv = &obd->u.lmv;
204 	struct lustre_handle  conn = { 0 };
205 	int		    rc = 0;
206 
207 	/*
208 	 * We don't want to actually do the underlying connections more than
209 	 * once, so keep track.
210 	 */
211 	lmv->refcount++;
212 	if (lmv->refcount > 1) {
213 		*exp = NULL;
214 		return 0;
215 	}
216 
217 	rc = class_connect(&conn, obd, cluuid);
218 	if (rc) {
219 		CERROR("class_connection() returned %d\n", rc);
220 		return rc;
221 	}
222 
223 	*exp = class_conn2export(&conn);
224 	class_export_get(*exp);
225 
226 	lmv->exp = *exp;
227 	lmv->connected = 0;
228 	lmv->cluuid = *cluuid;
229 
230 	if (data)
231 		lmv->conn_data = *data;
232 
233 	if (obd->obd_proc_private != NULL) {
234 		lmv_proc_dir = obd->obd_proc_private;
235 	} else {
236 		lmv_proc_dir = lprocfs_register("target_obds", obd->obd_proc_entry,
237 						NULL, NULL);
238 		if (IS_ERR(lmv_proc_dir)) {
239 			CERROR("could not register /proc/fs/lustre/%s/%s/target_obds.",
240 			       obd->obd_type->typ_name, obd->obd_name);
241 			lmv_proc_dir = NULL;
242 		}
243 		obd->obd_proc_private = lmv_proc_dir;
244 	}
245 
246 	/*
247 	 * All real clients should perform actual connection right away, because
248 	 * it is possible, that LMV will not have opportunity to connect targets
249 	 * and MDC stuff will be called directly, for instance while reading
250 	 * ../mdc/../kbytesfree procfs file, etc.
251 	 */
252 	if (data->ocd_connect_flags & OBD_CONNECT_REAL)
253 		rc = lmv_check_connect(obd);
254 
255 	if (rc && lmv_proc_dir) {
256 		lprocfs_remove(&lmv_proc_dir);
257 		obd->obd_proc_private = NULL;
258 	}
259 
260 	return rc;
261 }
262 
lmv_set_timeouts(struct obd_device * obd)263 static void lmv_set_timeouts(struct obd_device *obd)
264 {
265 	struct lmv_tgt_desc   *tgt;
266 	struct lmv_obd	*lmv;
267 	int		    i;
268 
269 	lmv = &obd->u.lmv;
270 	if (lmv->server_timeout == 0)
271 		return;
272 
273 	if (lmv->connected == 0)
274 		return;
275 
276 	for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
277 		tgt = lmv->tgts[i];
278 		if (tgt == NULL || tgt->ltd_exp == NULL || tgt->ltd_active == 0)
279 			continue;
280 
281 		obd_set_info_async(NULL, tgt->ltd_exp, sizeof(KEY_INTERMDS),
282 				   KEY_INTERMDS, 0, NULL, NULL);
283 	}
284 }
285 
lmv_init_ea_size(struct obd_export * exp,int easize,int def_easize,int cookiesize,int def_cookiesize)286 static int lmv_init_ea_size(struct obd_export *exp, int easize,
287 			    int def_easize, int cookiesize, int def_cookiesize)
288 {
289 	struct obd_device   *obd = exp->exp_obd;
290 	struct lmv_obd      *lmv = &obd->u.lmv;
291 	int		  i;
292 	int		  rc = 0;
293 	int		  change = 0;
294 
295 	if (lmv->max_easize < easize) {
296 		lmv->max_easize = easize;
297 		change = 1;
298 	}
299 	if (lmv->max_def_easize < def_easize) {
300 		lmv->max_def_easize = def_easize;
301 		change = 1;
302 	}
303 	if (lmv->max_cookiesize < cookiesize) {
304 		lmv->max_cookiesize = cookiesize;
305 		change = 1;
306 	}
307 	if (lmv->max_def_cookiesize < def_cookiesize) {
308 		lmv->max_def_cookiesize = def_cookiesize;
309 		change = 1;
310 	}
311 	if (change == 0)
312 		return 0;
313 
314 	if (lmv->connected == 0)
315 		return 0;
316 
317 	for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
318 		if (lmv->tgts[i] == NULL ||
319 		    lmv->tgts[i]->ltd_exp == NULL ||
320 		    lmv->tgts[i]->ltd_active == 0) {
321 			CWARN("%s: NULL export for %d\n", obd->obd_name, i);
322 			continue;
323 		}
324 
325 		rc = md_init_ea_size(lmv->tgts[i]->ltd_exp, easize, def_easize,
326 				     cookiesize, def_cookiesize);
327 		if (rc) {
328 			CERROR("%s: obd_init_ea_size() failed on MDT target %d: rc = %d.\n",
329 			       obd->obd_name, i, rc);
330 			break;
331 		}
332 	}
333 	return rc;
334 }
335 
336 #define MAX_STRING_SIZE 128
337 
lmv_connect_mdc(struct obd_device * obd,struct lmv_tgt_desc * tgt)338 static int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
339 {
340 	struct proc_dir_entry   *lmv_proc_dir;
341 	struct lmv_obd	  *lmv = &obd->u.lmv;
342 	struct obd_uuid	 *cluuid = &lmv->cluuid;
343 	struct obd_uuid	  lmv_mdc_uuid = { "LMV_MDC_UUID" };
344 	struct obd_device       *mdc_obd;
345 	struct obd_export       *mdc_exp;
346 	struct lu_fld_target     target;
347 	int		      rc;
348 
349 	mdc_obd = class_find_client_obd(&tgt->ltd_uuid, LUSTRE_MDC_NAME,
350 					&obd->obd_uuid);
351 	if (!mdc_obd) {
352 		CERROR("target %s not attached\n", tgt->ltd_uuid.uuid);
353 		return -EINVAL;
354 	}
355 
356 	CDEBUG(D_CONFIG, "connect to %s(%s) - %s, %s FOR %s\n",
357 		mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
358 		tgt->ltd_uuid.uuid, obd->obd_uuid.uuid,
359 		cluuid->uuid);
360 
361 	if (!mdc_obd->obd_set_up) {
362 		CERROR("target %s is not set up\n", tgt->ltd_uuid.uuid);
363 		return -EINVAL;
364 	}
365 
366 	rc = obd_connect(NULL, &mdc_exp, mdc_obd, &lmv_mdc_uuid,
367 			 &lmv->conn_data, NULL);
368 	if (rc) {
369 		CERROR("target %s connect error %d\n", tgt->ltd_uuid.uuid, rc);
370 		return rc;
371 	}
372 
373 	/*
374 	 * Init fid sequence client for this mdc and add new fld target.
375 	 */
376 	rc = obd_fid_init(mdc_obd, mdc_exp, LUSTRE_SEQ_METADATA);
377 	if (rc)
378 		return rc;
379 
380 	target.ft_srv = NULL;
381 	target.ft_exp = mdc_exp;
382 	target.ft_idx = tgt->ltd_idx;
383 
384 	fld_client_add_target(&lmv->lmv_fld, &target);
385 
386 	rc = obd_register_observer(mdc_obd, obd);
387 	if (rc) {
388 		obd_disconnect(mdc_exp);
389 		CERROR("target %s register_observer error %d\n",
390 		       tgt->ltd_uuid.uuid, rc);
391 		return rc;
392 	}
393 
394 	if (obd->obd_observer) {
395 		/*
396 		 * Tell the observer about the new target.
397 		 */
398 		rc = obd_notify(obd->obd_observer, mdc_exp->exp_obd,
399 				OBD_NOTIFY_ACTIVE,
400 				(void *)(tgt - lmv->tgts[0]));
401 		if (rc) {
402 			obd_disconnect(mdc_exp);
403 			return rc;
404 		}
405 	}
406 
407 	tgt->ltd_active = 1;
408 	tgt->ltd_exp = mdc_exp;
409 	lmv->desc.ld_active_tgt_count++;
410 
411 	md_init_ea_size(tgt->ltd_exp, lmv->max_easize, lmv->max_def_easize,
412 			lmv->max_cookiesize, lmv->max_def_cookiesize);
413 
414 	CDEBUG(D_CONFIG, "Connected to %s(%s) successfully (%d)\n",
415 		mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
416 		atomic_read(&obd->obd_refcount));
417 
418 	lmv_proc_dir = obd->obd_proc_private;
419 	if (lmv_proc_dir) {
420 		struct proc_dir_entry *mdc_symlink;
421 
422 		LASSERT(mdc_obd->obd_type != NULL);
423 		LASSERT(mdc_obd->obd_type->typ_name != NULL);
424 		mdc_symlink = lprocfs_add_symlink(mdc_obd->obd_name,
425 						  lmv_proc_dir,
426 						  "../../../%s/%s",
427 						  mdc_obd->obd_type->typ_name,
428 						  mdc_obd->obd_name);
429 		if (mdc_symlink == NULL) {
430 			CERROR("Could not register LMV target /proc/fs/lustre/%s/%s/target_obds/%s.",
431 			       obd->obd_type->typ_name, obd->obd_name,
432 			       mdc_obd->obd_name);
433 			lprocfs_remove(&lmv_proc_dir);
434 			obd->obd_proc_private = NULL;
435 		}
436 	}
437 	return 0;
438 }
439 
lmv_del_target(struct lmv_obd * lmv,int index)440 static void lmv_del_target(struct lmv_obd *lmv, int index)
441 {
442 	if (lmv->tgts[index] == NULL)
443 		return;
444 
445 	OBD_FREE_PTR(lmv->tgts[index]);
446 	lmv->tgts[index] = NULL;
447 	return;
448 }
449 
lmv_add_target(struct obd_device * obd,struct obd_uuid * uuidp,__u32 index,int gen)450 static int lmv_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
451 			   __u32 index, int gen)
452 {
453 	struct lmv_obd      *lmv = &obd->u.lmv;
454 	struct lmv_tgt_desc *tgt;
455 	int		  rc = 0;
456 
457 	CDEBUG(D_CONFIG, "Target uuid: %s. index %d\n", uuidp->uuid, index);
458 
459 	lmv_init_lock(lmv);
460 
461 	if (lmv->desc.ld_tgt_count == 0) {
462 		struct obd_device *mdc_obd;
463 
464 		mdc_obd = class_find_client_obd(uuidp, LUSTRE_MDC_NAME,
465 						&obd->obd_uuid);
466 		if (!mdc_obd) {
467 			lmv_init_unlock(lmv);
468 			CERROR("%s: Target %s not attached: rc = %d\n",
469 			       obd->obd_name, uuidp->uuid, -EINVAL);
470 			return -EINVAL;
471 		}
472 	}
473 
474 	if ((index < lmv->tgts_size) && (lmv->tgts[index] != NULL)) {
475 		tgt = lmv->tgts[index];
476 		CERROR("%s: UUID %s already assigned at LOV target index %d: rc = %d\n",
477 		       obd->obd_name,
478 		       obd_uuid2str(&tgt->ltd_uuid), index, -EEXIST);
479 		lmv_init_unlock(lmv);
480 		return -EEXIST;
481 	}
482 
483 	if (index >= lmv->tgts_size) {
484 		/* We need to reallocate the lmv target array. */
485 		struct lmv_tgt_desc **newtgts, **old = NULL;
486 		__u32 newsize = 1;
487 		__u32 oldsize = 0;
488 
489 		while (newsize < index + 1)
490 			newsize <<= 1;
491 		OBD_ALLOC(newtgts, sizeof(*newtgts) * newsize);
492 		if (newtgts == NULL) {
493 			lmv_init_unlock(lmv);
494 			return -ENOMEM;
495 		}
496 
497 		if (lmv->tgts_size) {
498 			memcpy(newtgts, lmv->tgts,
499 			       sizeof(*newtgts) * lmv->tgts_size);
500 			old = lmv->tgts;
501 			oldsize = lmv->tgts_size;
502 		}
503 
504 		lmv->tgts = newtgts;
505 		lmv->tgts_size = newsize;
506 		smp_rmb();
507 		if (old)
508 			OBD_FREE(old, sizeof(*old) * oldsize);
509 
510 		CDEBUG(D_CONFIG, "tgts: %p size: %d\n", lmv->tgts,
511 		       lmv->tgts_size);
512 	}
513 
514 	OBD_ALLOC_PTR(tgt);
515 	if (!tgt) {
516 		lmv_init_unlock(lmv);
517 		return -ENOMEM;
518 	}
519 
520 	mutex_init(&tgt->ltd_fid_mutex);
521 	tgt->ltd_idx = index;
522 	tgt->ltd_uuid = *uuidp;
523 	tgt->ltd_active = 0;
524 	lmv->tgts[index] = tgt;
525 	if (index >= lmv->desc.ld_tgt_count)
526 		lmv->desc.ld_tgt_count = index + 1;
527 
528 	if (lmv->connected) {
529 		rc = lmv_connect_mdc(obd, tgt);
530 		if (rc) {
531 			spin_lock(&lmv->lmv_lock);
532 			lmv->desc.ld_tgt_count--;
533 			memset(tgt, 0, sizeof(*tgt));
534 			spin_unlock(&lmv->lmv_lock);
535 		} else {
536 			int easize = sizeof(struct lmv_stripe_md) +
537 				lmv->desc.ld_tgt_count * sizeof(struct lu_fid);
538 			lmv_init_ea_size(obd->obd_self_export, easize, 0, 0, 0);
539 		}
540 	}
541 
542 	lmv_init_unlock(lmv);
543 	return rc;
544 }
545 
lmv_check_connect(struct obd_device * obd)546 int lmv_check_connect(struct obd_device *obd)
547 {
548 	struct lmv_obd       *lmv = &obd->u.lmv;
549 	struct lmv_tgt_desc  *tgt;
550 	int		   i;
551 	int		   rc;
552 	int		   easize;
553 
554 	if (lmv->connected)
555 		return 0;
556 
557 	lmv_init_lock(lmv);
558 	if (lmv->connected) {
559 		lmv_init_unlock(lmv);
560 		return 0;
561 	}
562 
563 	if (lmv->desc.ld_tgt_count == 0) {
564 		lmv_init_unlock(lmv);
565 		CERROR("%s: no targets configured.\n", obd->obd_name);
566 		return -EINVAL;
567 	}
568 
569 	CDEBUG(D_CONFIG, "Time to connect %s to %s\n",
570 	       lmv->cluuid.uuid, obd->obd_name);
571 
572 	LASSERT(lmv->tgts != NULL);
573 
574 	for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
575 		tgt = lmv->tgts[i];
576 		if (tgt == NULL)
577 			continue;
578 		rc = lmv_connect_mdc(obd, tgt);
579 		if (rc)
580 			goto out_disc;
581 	}
582 
583 	lmv_set_timeouts(obd);
584 	class_export_put(lmv->exp);
585 	lmv->connected = 1;
586 	easize = lmv_get_easize(lmv);
587 	lmv_init_ea_size(obd->obd_self_export, easize, 0, 0, 0);
588 	lmv_init_unlock(lmv);
589 	return 0;
590 
591  out_disc:
592 	while (i-- > 0) {
593 		int rc2;
594 		tgt = lmv->tgts[i];
595 		if (tgt == NULL)
596 			continue;
597 		tgt->ltd_active = 0;
598 		if (tgt->ltd_exp) {
599 			--lmv->desc.ld_active_tgt_count;
600 			rc2 = obd_disconnect(tgt->ltd_exp);
601 			if (rc2) {
602 				CERROR("LMV target %s disconnect on MDC idx %d: error %d\n",
603 				       tgt->ltd_uuid.uuid, i, rc2);
604 			}
605 		}
606 	}
607 	class_disconnect(lmv->exp);
608 	lmv_init_unlock(lmv);
609 	return rc;
610 }
611 
lmv_disconnect_mdc(struct obd_device * obd,struct lmv_tgt_desc * tgt)612 static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
613 {
614 	struct proc_dir_entry  *lmv_proc_dir;
615 	struct lmv_obd	 *lmv = &obd->u.lmv;
616 	struct obd_device      *mdc_obd;
617 	int		     rc;
618 
619 	LASSERT(tgt != NULL);
620 	LASSERT(obd != NULL);
621 
622 	mdc_obd = class_exp2obd(tgt->ltd_exp);
623 
624 	if (mdc_obd) {
625 		mdc_obd->obd_force = obd->obd_force;
626 		mdc_obd->obd_fail = obd->obd_fail;
627 		mdc_obd->obd_no_recov = obd->obd_no_recov;
628 	}
629 
630 	lmv_proc_dir = obd->obd_proc_private;
631 	if (lmv_proc_dir)
632 		lprocfs_remove_proc_entry(mdc_obd->obd_name, lmv_proc_dir);
633 
634 	rc = obd_fid_fini(tgt->ltd_exp->exp_obd);
635 	if (rc)
636 		CERROR("Can't finalize fids factory\n");
637 
638 	CDEBUG(D_INFO, "Disconnected from %s(%s) successfully\n",
639 	       tgt->ltd_exp->exp_obd->obd_name,
640 	       tgt->ltd_exp->exp_obd->obd_uuid.uuid);
641 
642 	obd_register_observer(tgt->ltd_exp->exp_obd, NULL);
643 	rc = obd_disconnect(tgt->ltd_exp);
644 	if (rc) {
645 		if (tgt->ltd_active) {
646 			CERROR("Target %s disconnect error %d\n",
647 			       tgt->ltd_uuid.uuid, rc);
648 		}
649 	}
650 
651 	lmv_activate_target(lmv, tgt, 0);
652 	tgt->ltd_exp = NULL;
653 	return 0;
654 }
655 
lmv_disconnect(struct obd_export * exp)656 static int lmv_disconnect(struct obd_export *exp)
657 {
658 	struct obd_device     *obd = class_exp2obd(exp);
659 	struct lmv_obd	*lmv = &obd->u.lmv;
660 	int		    rc;
661 	int		    i;
662 
663 	if (!lmv->tgts)
664 		goto out_local;
665 
666 	/*
667 	 * Only disconnect the underlying layers on the final disconnect.
668 	 */
669 	lmv->refcount--;
670 	if (lmv->refcount != 0)
671 		goto out_local;
672 
673 	for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
674 		if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
675 			continue;
676 
677 		lmv_disconnect_mdc(obd, lmv->tgts[i]);
678 	}
679 
680 	if (obd->obd_proc_private)
681 		lprocfs_remove((struct proc_dir_entry **)&obd->obd_proc_private);
682 	else
683 		CERROR("/proc/fs/lustre/%s/%s/target_obds missing\n",
684 		       obd->obd_type->typ_name, obd->obd_name);
685 
686 out_local:
687 	/*
688 	 * This is the case when no real connection is established by
689 	 * lmv_check_connect().
690 	 */
691 	if (!lmv->connected)
692 		class_export_put(exp);
693 	rc = class_disconnect(exp);
694 	if (lmv->refcount == 0)
695 		lmv->connected = 0;
696 	return rc;
697 }
698 
lmv_fid2path(struct obd_export * exp,int len,void * karg,void * uarg)699 static int lmv_fid2path(struct obd_export *exp, int len, void *karg, void *uarg)
700 {
701 	struct obd_device	*obddev = class_exp2obd(exp);
702 	struct lmv_obd		*lmv = &obddev->u.lmv;
703 	struct getinfo_fid2path *gf;
704 	struct lmv_tgt_desc     *tgt;
705 	struct getinfo_fid2path *remote_gf = NULL;
706 	int			remote_gf_size = 0;
707 	int			rc;
708 
709 	gf = (struct getinfo_fid2path *)karg;
710 	tgt = lmv_find_target(lmv, &gf->gf_fid);
711 	if (IS_ERR(tgt))
712 		return PTR_ERR(tgt);
713 
714 repeat_fid2path:
715 	rc = obd_iocontrol(OBD_IOC_FID2PATH, tgt->ltd_exp, len, gf, uarg);
716 	if (rc != 0 && rc != -EREMOTE)
717 		goto out_fid2path;
718 
719 	/* If remote_gf != NULL, it means just building the
720 	 * path on the remote MDT, copy this path segment to gf */
721 	if (remote_gf != NULL) {
722 		struct getinfo_fid2path *ori_gf;
723 		char *ptr;
724 
725 		ori_gf = (struct getinfo_fid2path *)karg;
726 		if (strlen(ori_gf->gf_path) +
727 		    strlen(gf->gf_path) > ori_gf->gf_pathlen) {
728 			rc = -EOVERFLOW;
729 			goto out_fid2path;
730 		}
731 
732 		ptr = ori_gf->gf_path;
733 
734 		memmove(ptr + strlen(gf->gf_path) + 1, ptr,
735 			strlen(ori_gf->gf_path));
736 
737 		strncpy(ptr, gf->gf_path, strlen(gf->gf_path));
738 		ptr += strlen(gf->gf_path);
739 		*ptr = '/';
740 	}
741 
742 	CDEBUG(D_INFO, "%s: get path %s "DFID" rec: %llu ln: %u\n",
743 	       tgt->ltd_exp->exp_obd->obd_name,
744 	       gf->gf_path, PFID(&gf->gf_fid), gf->gf_recno,
745 	       gf->gf_linkno);
746 
747 	if (rc == 0)
748 		goto out_fid2path;
749 
750 	/* sigh, has to go to another MDT to do path building further */
751 	if (remote_gf == NULL) {
752 		remote_gf_size = sizeof(*remote_gf) + PATH_MAX;
753 		OBD_ALLOC(remote_gf, remote_gf_size);
754 		if (remote_gf == NULL) {
755 			rc = -ENOMEM;
756 			goto out_fid2path;
757 		}
758 		remote_gf->gf_pathlen = PATH_MAX;
759 	}
760 
761 	if (!fid_is_sane(&gf->gf_fid)) {
762 		CERROR("%s: invalid FID "DFID": rc = %d\n",
763 		       tgt->ltd_exp->exp_obd->obd_name,
764 		       PFID(&gf->gf_fid), -EINVAL);
765 		rc = -EINVAL;
766 		goto out_fid2path;
767 	}
768 
769 	tgt = lmv_find_target(lmv, &gf->gf_fid);
770 	if (IS_ERR(tgt)) {
771 		rc = -EINVAL;
772 		goto out_fid2path;
773 	}
774 
775 	remote_gf->gf_fid = gf->gf_fid;
776 	remote_gf->gf_recno = -1;
777 	remote_gf->gf_linkno = -1;
778 	memset(remote_gf->gf_path, 0, remote_gf->gf_pathlen);
779 	gf = remote_gf;
780 	goto repeat_fid2path;
781 
782 out_fid2path:
783 	if (remote_gf != NULL)
784 		OBD_FREE(remote_gf, remote_gf_size);
785 	return rc;
786 }
787 
lmv_hsm_req_count(struct lmv_obd * lmv,const struct hsm_user_request * hur,const struct lmv_tgt_desc * tgt_mds)788 static int lmv_hsm_req_count(struct lmv_obd *lmv,
789 			     const struct hsm_user_request *hur,
790 			     const struct lmv_tgt_desc *tgt_mds)
791 {
792 	int			i, nr = 0;
793 	struct lmv_tgt_desc    *curr_tgt;
794 
795 	/* count how many requests must be sent to the given target */
796 	for (i = 0; i < hur->hur_request.hr_itemcount; i++) {
797 		curr_tgt = lmv_find_target(lmv, &hur->hur_user_item[i].hui_fid);
798 		if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid))
799 			nr++;
800 	}
801 	return nr;
802 }
803 
lmv_hsm_req_build(struct lmv_obd * lmv,struct hsm_user_request * hur_in,const struct lmv_tgt_desc * tgt_mds,struct hsm_user_request * hur_out)804 static void lmv_hsm_req_build(struct lmv_obd *lmv,
805 			      struct hsm_user_request *hur_in,
806 			      const struct lmv_tgt_desc *tgt_mds,
807 			      struct hsm_user_request *hur_out)
808 {
809 	int			i, nr_out;
810 	struct lmv_tgt_desc    *curr_tgt;
811 
812 	/* build the hsm_user_request for the given target */
813 	hur_out->hur_request = hur_in->hur_request;
814 	nr_out = 0;
815 	for (i = 0; i < hur_in->hur_request.hr_itemcount; i++) {
816 		curr_tgt = lmv_find_target(lmv,
817 					&hur_in->hur_user_item[i].hui_fid);
818 		if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid)) {
819 			hur_out->hur_user_item[nr_out] =
820 				hur_in->hur_user_item[i];
821 			nr_out++;
822 		}
823 	}
824 	hur_out->hur_request.hr_itemcount = nr_out;
825 	memcpy(hur_data(hur_out), hur_data(hur_in),
826 	       hur_in->hur_request.hr_data_len);
827 }
828 
lmv_hsm_ct_unregister(struct lmv_obd * lmv,unsigned int cmd,int len,struct lustre_kernelcomm * lk,void * uarg)829 static int lmv_hsm_ct_unregister(struct lmv_obd *lmv, unsigned int cmd, int len,
830 				 struct lustre_kernelcomm *lk, void *uarg)
831 {
832 	int	i, rc = 0;
833 
834 	/* unregister request (call from llapi_hsm_copytool_fini) */
835 	for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
836 		/* best effort: try to clean as much as possible
837 		 * (continue on error) */
838 		obd_iocontrol(cmd, lmv->tgts[i]->ltd_exp, len, lk, uarg);
839 	}
840 
841 	/* Whatever the result, remove copytool from kuc groups.
842 	 * Unreached coordinators will get EPIPE on next requests
843 	 * and will unregister automatically.
844 	 */
845 	rc = libcfs_kkuc_group_rem(lk->lk_uid, lk->lk_group);
846 	return rc;
847 }
848 
lmv_hsm_ct_register(struct lmv_obd * lmv,unsigned int cmd,int len,struct lustre_kernelcomm * lk,void * uarg)849 static int lmv_hsm_ct_register(struct lmv_obd *lmv, unsigned int cmd, int len,
850 			       struct lustre_kernelcomm *lk, void *uarg)
851 {
852 	struct file	*filp;
853 	int		 i, j, err;
854 	int		 rc = 0;
855 	bool		 any_set = false;
856 
857 	/* All or nothing: try to register to all MDS.
858 	 * In case of failure, unregister from previous MDS,
859 	 * except if it because of inactive target. */
860 	for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
861 		err = obd_iocontrol(cmd, lmv->tgts[i]->ltd_exp,
862 				   len, lk, uarg);
863 		if (err) {
864 			if (lmv->tgts[i]->ltd_active) {
865 				/* permanent error */
866 				CERROR("error: iocontrol MDC %s on MDTidx %d cmd %x: err = %d\n",
867 				       lmv->tgts[i]->ltd_uuid.uuid,
868 				       i, cmd, err);
869 				rc = err;
870 				lk->lk_flags |= LK_FLG_STOP;
871 				/* unregister from previous MDS */
872 				for (j = 0; j < i; j++)
873 					obd_iocontrol(cmd,
874 						  lmv->tgts[j]->ltd_exp,
875 						  len, lk, uarg);
876 				return rc;
877 			}
878 			/* else: transient error.
879 			 * kuc will register to the missing MDT
880 			 * when it is back */
881 		} else {
882 			any_set = true;
883 		}
884 	}
885 
886 	if (!any_set)
887 		/* no registration done: return error */
888 		return -ENOTCONN;
889 
890 	/* at least one registration done, with no failure */
891 	filp = fget(lk->lk_wfd);
892 	if (filp == NULL) {
893 		return -EBADF;
894 	}
895 	rc = libcfs_kkuc_group_add(filp, lk->lk_uid, lk->lk_group, lk->lk_data);
896 	if (rc != 0 && filp != NULL)
897 		fput(filp);
898 	return rc;
899 }
900 
901 
902 
903 
lmv_iocontrol(unsigned int cmd,struct obd_export * exp,int len,void * karg,void * uarg)904 static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
905 			 int len, void *karg, void *uarg)
906 {
907 	struct obd_device    *obddev = class_exp2obd(exp);
908 	struct lmv_obd       *lmv = &obddev->u.lmv;
909 	int		   i = 0;
910 	int		   rc = 0;
911 	int		   set = 0;
912 	int		   count = lmv->desc.ld_tgt_count;
913 
914 	if (count == 0)
915 		return -ENOTTY;
916 
917 	switch (cmd) {
918 	case IOC_OBD_STATFS: {
919 		struct obd_ioctl_data *data = karg;
920 		struct obd_device *mdc_obd;
921 		struct obd_statfs stat_buf = {0};
922 		__u32 index;
923 
924 		memcpy(&index, data->ioc_inlbuf2, sizeof(__u32));
925 		if (index >= count)
926 			return -ENODEV;
927 
928 		if (lmv->tgts[index] == NULL ||
929 		    lmv->tgts[index]->ltd_active == 0)
930 			return -ENODATA;
931 
932 		mdc_obd = class_exp2obd(lmv->tgts[index]->ltd_exp);
933 		if (!mdc_obd)
934 			return -EINVAL;
935 
936 		/* copy UUID */
937 		if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd),
938 				     min((int) data->ioc_plen2,
939 					 (int) sizeof(struct obd_uuid))))
940 			return -EFAULT;
941 
942 		rc = obd_statfs(NULL, lmv->tgts[index]->ltd_exp, &stat_buf,
943 				cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
944 				0);
945 		if (rc)
946 			return rc;
947 		if (copy_to_user(data->ioc_pbuf1, &stat_buf,
948 				     min((int) data->ioc_plen1,
949 					 (int) sizeof(stat_buf))))
950 			return -EFAULT;
951 		break;
952 	}
953 	case OBD_IOC_QUOTACTL: {
954 		struct if_quotactl *qctl = karg;
955 		struct lmv_tgt_desc *tgt = NULL;
956 		struct obd_quotactl *oqctl;
957 
958 		if (qctl->qc_valid == QC_MDTIDX) {
959 			if (qctl->qc_idx < 0 || count <= qctl->qc_idx)
960 				return -EINVAL;
961 
962 			tgt = lmv->tgts[qctl->qc_idx];
963 			if (tgt == NULL || tgt->ltd_exp == NULL)
964 				return -EINVAL;
965 		} else if (qctl->qc_valid == QC_UUID) {
966 			for (i = 0; i < count; i++) {
967 				tgt = lmv->tgts[i];
968 				if (tgt == NULL)
969 					continue;
970 				if (!obd_uuid_equals(&tgt->ltd_uuid,
971 						     &qctl->obd_uuid))
972 					continue;
973 
974 				if (tgt->ltd_exp == NULL)
975 					return -EINVAL;
976 
977 				break;
978 			}
979 		} else {
980 			return -EINVAL;
981 		}
982 
983 		if (i >= count)
984 			return -EAGAIN;
985 
986 		LASSERT(tgt && tgt->ltd_exp);
987 		OBD_ALLOC_PTR(oqctl);
988 		if (!oqctl)
989 			return -ENOMEM;
990 
991 		QCTL_COPY(oqctl, qctl);
992 		rc = obd_quotactl(tgt->ltd_exp, oqctl);
993 		if (rc == 0) {
994 			QCTL_COPY(qctl, oqctl);
995 			qctl->qc_valid = QC_MDTIDX;
996 			qctl->obd_uuid = tgt->ltd_uuid;
997 		}
998 		OBD_FREE_PTR(oqctl);
999 		break;
1000 	}
1001 	case OBD_IOC_CHANGELOG_SEND:
1002 	case OBD_IOC_CHANGELOG_CLEAR: {
1003 		struct ioc_changelog *icc = karg;
1004 
1005 		if (icc->icc_mdtindex >= count)
1006 			return -ENODEV;
1007 
1008 		if (lmv->tgts[icc->icc_mdtindex] == NULL ||
1009 		    lmv->tgts[icc->icc_mdtindex]->ltd_exp == NULL ||
1010 		    lmv->tgts[icc->icc_mdtindex]->ltd_active == 0)
1011 			return -ENODEV;
1012 		rc = obd_iocontrol(cmd, lmv->tgts[icc->icc_mdtindex]->ltd_exp,
1013 				   sizeof(*icc), icc, NULL);
1014 		break;
1015 	}
1016 	case LL_IOC_GET_CONNECT_FLAGS: {
1017 		if (lmv->tgts[0] == NULL)
1018 			return -ENODATA;
1019 		rc = obd_iocontrol(cmd, lmv->tgts[0]->ltd_exp, len, karg, uarg);
1020 		break;
1021 	}
1022 	case OBD_IOC_FID2PATH: {
1023 		rc = lmv_fid2path(exp, len, karg, uarg);
1024 		break;
1025 	}
1026 	case LL_IOC_HSM_STATE_GET:
1027 	case LL_IOC_HSM_STATE_SET:
1028 	case LL_IOC_HSM_ACTION: {
1029 		struct md_op_data	*op_data = karg;
1030 		struct lmv_tgt_desc	*tgt;
1031 
1032 		tgt = lmv_find_target(lmv, &op_data->op_fid1);
1033 		if (IS_ERR(tgt))
1034 				return PTR_ERR(tgt);
1035 
1036 		if (tgt->ltd_exp == NULL)
1037 				return -EINVAL;
1038 
1039 		rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1040 		break;
1041 	}
1042 	case LL_IOC_HSM_PROGRESS: {
1043 		const struct hsm_progress_kernel *hpk = karg;
1044 		struct lmv_tgt_desc	*tgt;
1045 
1046 		tgt = lmv_find_target(lmv, &hpk->hpk_fid);
1047 		if (IS_ERR(tgt))
1048 			return PTR_ERR(tgt);
1049 		rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1050 		break;
1051 	}
1052 	case LL_IOC_HSM_REQUEST: {
1053 		struct hsm_user_request *hur = karg;
1054 		struct lmv_tgt_desc	*tgt;
1055 		unsigned int reqcount = hur->hur_request.hr_itemcount;
1056 
1057 		if (reqcount == 0)
1058 			return 0;
1059 
1060 		/* if the request is about a single fid
1061 		 * or if there is a single MDS, no need to split
1062 		 * the request. */
1063 		if (reqcount == 1 || count == 1) {
1064 			tgt = lmv_find_target(lmv,
1065 					      &hur->hur_user_item[0].hui_fid);
1066 			if (IS_ERR(tgt))
1067 				return PTR_ERR(tgt);
1068 			rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1069 		} else {
1070 			/* split fid list to their respective MDS */
1071 			for (i = 0; i < count; i++) {
1072 				unsigned int		nr, reqlen;
1073 				int			rc1;
1074 				struct hsm_user_request *req;
1075 
1076 				nr = lmv_hsm_req_count(lmv, hur, lmv->tgts[i]);
1077 				if (nr == 0) /* nothing for this MDS */
1078 					continue;
1079 
1080 				/* build a request with fids for this MDS */
1081 				reqlen = offsetof(typeof(*hur),
1082 						  hur_user_item[nr])
1083 					 + hur->hur_request.hr_data_len;
1084 				OBD_ALLOC_LARGE(req, reqlen);
1085 				if (req == NULL)
1086 					return -ENOMEM;
1087 
1088 				lmv_hsm_req_build(lmv, hur, lmv->tgts[i], req);
1089 
1090 				rc1 = obd_iocontrol(cmd, lmv->tgts[i]->ltd_exp,
1091 						    reqlen, req, uarg);
1092 				if (rc1 != 0 && rc == 0)
1093 					rc = rc1;
1094 				OBD_FREE_LARGE(req, reqlen);
1095 			}
1096 		}
1097 		break;
1098 	}
1099 	case LL_IOC_LOV_SWAP_LAYOUTS: {
1100 		struct md_op_data	*op_data = karg;
1101 		struct lmv_tgt_desc	*tgt1, *tgt2;
1102 
1103 		tgt1 = lmv_find_target(lmv, &op_data->op_fid1);
1104 		if (IS_ERR(tgt1))
1105 			return PTR_ERR(tgt1);
1106 
1107 		tgt2 = lmv_find_target(lmv, &op_data->op_fid2);
1108 		if (IS_ERR(tgt2))
1109 			return PTR_ERR(tgt2);
1110 
1111 		if ((tgt1->ltd_exp == NULL) || (tgt2->ltd_exp == NULL))
1112 			return -EINVAL;
1113 
1114 		/* only files on same MDT can have their layouts swapped */
1115 		if (tgt1->ltd_idx != tgt2->ltd_idx)
1116 			return -EPERM;
1117 
1118 		rc = obd_iocontrol(cmd, tgt1->ltd_exp, len, karg, uarg);
1119 		break;
1120 	}
1121 	case LL_IOC_HSM_CT_START: {
1122 		struct lustre_kernelcomm *lk = karg;
1123 		if (lk->lk_flags & LK_FLG_STOP)
1124 			rc = lmv_hsm_ct_unregister(lmv, cmd, len, lk, uarg);
1125 		else
1126 			rc = lmv_hsm_ct_register(lmv, cmd, len, lk, uarg);
1127 		break;
1128 	}
1129 	default:
1130 		for (i = 0; i < count; i++) {
1131 			struct obd_device *mdc_obd;
1132 			int err;
1133 
1134 			if (lmv->tgts[i] == NULL ||
1135 			    lmv->tgts[i]->ltd_exp == NULL)
1136 				continue;
1137 			/* ll_umount_begin() sets force flag but for lmv, not
1138 			 * mdc. Let's pass it through */
1139 			mdc_obd = class_exp2obd(lmv->tgts[i]->ltd_exp);
1140 			mdc_obd->obd_force = obddev->obd_force;
1141 			err = obd_iocontrol(cmd, lmv->tgts[i]->ltd_exp, len,
1142 					    karg, uarg);
1143 			if (err == -ENODATA && cmd == OBD_IOC_POLL_QUOTACHECK) {
1144 				return err;
1145 			} else if (err) {
1146 				if (lmv->tgts[i]->ltd_active) {
1147 					CERROR("error: iocontrol MDC %s on MDTidx %d cmd %x: err = %d\n",
1148 					       lmv->tgts[i]->ltd_uuid.uuid,
1149 					       i, cmd, err);
1150 					if (!rc)
1151 						rc = err;
1152 				}
1153 			} else
1154 				set = 1;
1155 		}
1156 		if (!set && !rc)
1157 			rc = -EIO;
1158 	}
1159 	return rc;
1160 }
1161 
1162 #if 0
1163 static int lmv_all_chars_policy(int count, const char *name,
1164 				int len)
1165 {
1166 	unsigned int c = 0;
1167 
1168 	while (len > 0)
1169 		c += name[--len];
1170 	c = c % count;
1171 	return c;
1172 }
1173 
1174 static int lmv_nid_policy(struct lmv_obd *lmv)
1175 {
1176 	struct obd_import *imp;
1177 	__u32	      id;
1178 
1179 	/*
1180 	 * XXX: To get nid we assume that underlying obd device is mdc.
1181 	 */
1182 	imp = class_exp2cliimp(lmv->tgts[0].ltd_exp);
1183 	id = imp->imp_connection->c_self ^ (imp->imp_connection->c_self >> 32);
1184 	return id % lmv->desc.ld_tgt_count;
1185 }
1186 
1187 static int lmv_choose_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
1188 			  enum placement_policy placement)
1189 {
1190 	switch (placement) {
1191 	case PLACEMENT_CHAR_POLICY:
1192 		return lmv_all_chars_policy(lmv->desc.ld_tgt_count,
1193 					    op_data->op_name,
1194 					    op_data->op_namelen);
1195 	case PLACEMENT_NID_POLICY:
1196 		return lmv_nid_policy(lmv);
1197 
1198 	default:
1199 		break;
1200 	}
1201 
1202 	CERROR("Unsupported placement policy %x\n", placement);
1203 	return -EINVAL;
1204 }
1205 #endif
1206 
1207 /**
1208  * This is _inode_ placement policy function (not name).
1209  */
lmv_placement_policy(struct obd_device * obd,struct md_op_data * op_data,u32 * mds)1210 static int lmv_placement_policy(struct obd_device *obd,
1211 				struct md_op_data *op_data, u32 *mds)
1212 {
1213 	struct lmv_obd	  *lmv = &obd->u.lmv;
1214 
1215 	LASSERT(mds != NULL);
1216 
1217 	if (lmv->desc.ld_tgt_count == 1) {
1218 		*mds = 0;
1219 		return 0;
1220 	}
1221 
1222 	/**
1223 	 * If stripe_offset is provided during setdirstripe
1224 	 * (setdirstripe -i xx), xx MDS will be chosen.
1225 	 */
1226 	if (op_data->op_cli_flags & CLI_SET_MEA) {
1227 		struct lmv_user_md *lum;
1228 
1229 		lum = (struct lmv_user_md *)op_data->op_data;
1230 		if (lum->lum_type == LMV_STRIPE_TYPE &&
1231 		    lum->lum_stripe_offset != -1) {
1232 			if (lum->lum_stripe_offset >= lmv->desc.ld_tgt_count) {
1233 				CERROR("%s: Stripe_offset %d > MDT count %d: rc = %d\n",
1234 				       obd->obd_name,
1235 				       lum->lum_stripe_offset,
1236 				       lmv->desc.ld_tgt_count, -ERANGE);
1237 				return -ERANGE;
1238 			}
1239 			*mds = lum->lum_stripe_offset;
1240 			return 0;
1241 		}
1242 	}
1243 
1244 	/* Allocate new fid on target according to operation type and parent
1245 	 * home mds. */
1246 	*mds = op_data->op_mds;
1247 	return 0;
1248 }
1249 
__lmv_fid_alloc(struct lmv_obd * lmv,struct lu_fid * fid,u32 mds)1250 int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid, u32 mds)
1251 {
1252 	struct lmv_tgt_desc	*tgt;
1253 	int			 rc;
1254 
1255 	tgt = lmv_get_target(lmv, mds);
1256 	if (IS_ERR(tgt))
1257 		return PTR_ERR(tgt);
1258 
1259 	/*
1260 	 * New seq alloc and FLD setup should be atomic. Otherwise we may find
1261 	 * on server that seq in new allocated fid is not yet known.
1262 	 */
1263 	mutex_lock(&tgt->ltd_fid_mutex);
1264 
1265 	if (tgt->ltd_active == 0 || tgt->ltd_exp == NULL) {
1266 		rc = -ENODEV;
1267 		goto out;
1268 	}
1269 
1270 	/*
1271 	 * Asking underlaying tgt layer to allocate new fid.
1272 	 */
1273 	rc = obd_fid_alloc(tgt->ltd_exp, fid, NULL);
1274 	if (rc > 0) {
1275 		LASSERT(fid_is_sane(fid));
1276 		rc = 0;
1277 	}
1278 
1279 out:
1280 	mutex_unlock(&tgt->ltd_fid_mutex);
1281 	return rc;
1282 }
1283 
lmv_fid_alloc(struct obd_export * exp,struct lu_fid * fid,struct md_op_data * op_data)1284 int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
1285 		  struct md_op_data *op_data)
1286 {
1287 	struct obd_device     *obd = class_exp2obd(exp);
1288 	struct lmv_obd	*lmv = &obd->u.lmv;
1289 	u32		       mds = 0;
1290 	int		    rc;
1291 
1292 	LASSERT(op_data != NULL);
1293 	LASSERT(fid != NULL);
1294 
1295 	rc = lmv_placement_policy(obd, op_data, &mds);
1296 	if (rc) {
1297 		CERROR("Can't get target for allocating fid, rc %d\n",
1298 		       rc);
1299 		return rc;
1300 	}
1301 
1302 	rc = __lmv_fid_alloc(lmv, fid, mds);
1303 	if (rc) {
1304 		CERROR("Can't alloc new fid, rc %d\n", rc);
1305 		return rc;
1306 	}
1307 
1308 	return rc;
1309 }
1310 
lmv_setup(struct obd_device * obd,struct lustre_cfg * lcfg)1311 static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
1312 {
1313 	struct lmv_obd	     *lmv = &obd->u.lmv;
1314 	struct lprocfs_static_vars  lvars;
1315 	struct lmv_desc	    *desc;
1316 	int			 rc;
1317 
1318 	if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
1319 		CERROR("LMV setup requires a descriptor\n");
1320 		return -EINVAL;
1321 	}
1322 
1323 	desc = (struct lmv_desc *)lustre_cfg_buf(lcfg, 1);
1324 	if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
1325 		CERROR("Lmv descriptor size wrong: %d > %d\n",
1326 		       (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
1327 		return -EINVAL;
1328 	}
1329 
1330 	OBD_ALLOC(lmv->tgts, sizeof(*lmv->tgts) * 32);
1331 	if (lmv->tgts == NULL)
1332 		return -ENOMEM;
1333 	lmv->tgts_size = 32;
1334 
1335 	obd_str2uuid(&lmv->desc.ld_uuid, desc->ld_uuid.uuid);
1336 	lmv->desc.ld_tgt_count = 0;
1337 	lmv->desc.ld_active_tgt_count = 0;
1338 	lmv->max_cookiesize = 0;
1339 	lmv->max_def_easize = 0;
1340 	lmv->max_easize = 0;
1341 	lmv->lmv_placement = PLACEMENT_CHAR_POLICY;
1342 
1343 	spin_lock_init(&lmv->lmv_lock);
1344 	mutex_init(&lmv->init_mutex);
1345 
1346 	lprocfs_lmv_init_vars(&lvars);
1347 
1348 	lprocfs_obd_setup(obd, lvars.obd_vars);
1349 #if defined (CONFIG_PROC_FS)
1350 	{
1351 		rc = lprocfs_seq_create(obd->obd_proc_entry, "target_obd",
1352 					0444, &lmv_proc_target_fops, obd);
1353 		if (rc)
1354 			CWARN("%s: error adding LMV target_obd file: rc = %d\n",
1355 			       obd->obd_name, rc);
1356        }
1357 #endif
1358 	rc = fld_client_init(&lmv->lmv_fld, obd->obd_name,
1359 			     LUSTRE_CLI_FLD_HASH_DHT);
1360 	if (rc) {
1361 		CERROR("Can't init FLD, err %d\n", rc);
1362 		goto out;
1363 	}
1364 
1365 	return 0;
1366 
1367 out:
1368 	return rc;
1369 }
1370 
lmv_cleanup(struct obd_device * obd)1371 static int lmv_cleanup(struct obd_device *obd)
1372 {
1373 	struct lmv_obd   *lmv = &obd->u.lmv;
1374 
1375 	fld_client_fini(&lmv->lmv_fld);
1376 	if (lmv->tgts != NULL) {
1377 		int i;
1378 		for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1379 			if (lmv->tgts[i] == NULL)
1380 				continue;
1381 			lmv_del_target(lmv, i);
1382 		}
1383 		OBD_FREE(lmv->tgts, sizeof(*lmv->tgts) * lmv->tgts_size);
1384 		lmv->tgts_size = 0;
1385 	}
1386 	return 0;
1387 }
1388 
lmv_process_config(struct obd_device * obd,u32 len,void * buf)1389 static int lmv_process_config(struct obd_device *obd, u32 len, void *buf)
1390 {
1391 	struct lustre_cfg	*lcfg = buf;
1392 	struct obd_uuid		obd_uuid;
1393 	int			gen;
1394 	__u32			index;
1395 	int			rc;
1396 
1397 	switch (lcfg->lcfg_command) {
1398 	case LCFG_ADD_MDC:
1399 		/* modify_mdc_tgts add 0:lustre-clilmv  1:lustre-MDT0000_UUID
1400 		 * 2:0  3:1  4:lustre-MDT0000-mdc_UUID */
1401 		if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(obd_uuid.uuid)) {
1402 			rc = -EINVAL;
1403 			goto out;
1404 		}
1405 
1406 		obd_str2uuid(&obd_uuid,  lustre_cfg_buf(lcfg, 1));
1407 
1408 		if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1409 			rc = -EINVAL;
1410 			goto out;
1411 		}
1412 		if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", &gen) != 1) {
1413 			rc = -EINVAL;
1414 			goto out;
1415 		}
1416 		rc = lmv_add_target(obd, &obd_uuid, index, gen);
1417 		goto out;
1418 	default:
1419 		CERROR("Unknown command: %d\n", lcfg->lcfg_command);
1420 		rc = -EINVAL;
1421 		goto out;
1422 	}
1423 out:
1424 	return rc;
1425 }
1426 
lmv_statfs(const struct lu_env * env,struct obd_export * exp,struct obd_statfs * osfs,__u64 max_age,__u32 flags)1427 static int lmv_statfs(const struct lu_env *env, struct obd_export *exp,
1428 		      struct obd_statfs *osfs, __u64 max_age, __u32 flags)
1429 {
1430 	struct obd_device     *obd = class_exp2obd(exp);
1431 	struct lmv_obd	*lmv = &obd->u.lmv;
1432 	struct obd_statfs     *temp;
1433 	int		    rc = 0;
1434 	int		    i;
1435 
1436 	rc = lmv_check_connect(obd);
1437 	if (rc)
1438 		return rc;
1439 
1440 	OBD_ALLOC(temp, sizeof(*temp));
1441 	if (temp == NULL)
1442 		return -ENOMEM;
1443 
1444 	for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1445 		if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
1446 			continue;
1447 
1448 		rc = obd_statfs(env, lmv->tgts[i]->ltd_exp, temp,
1449 				max_age, flags);
1450 		if (rc) {
1451 			CERROR("can't stat MDS #%d (%s), error %d\n", i,
1452 			       lmv->tgts[i]->ltd_exp->exp_obd->obd_name,
1453 			       rc);
1454 			goto out_free_temp;
1455 		}
1456 
1457 		if (i == 0) {
1458 			*osfs = *temp;
1459 			/* If the statfs is from mount, it will needs
1460 			 * retrieve necessary information from MDT0.
1461 			 * i.e. mount does not need the merged osfs
1462 			 * from all of MDT.
1463 			 * And also clients can be mounted as long as
1464 			 * MDT0 is in service*/
1465 			if (flags & OBD_STATFS_FOR_MDT0)
1466 				goto out_free_temp;
1467 		} else {
1468 			osfs->os_bavail += temp->os_bavail;
1469 			osfs->os_blocks += temp->os_blocks;
1470 			osfs->os_ffree += temp->os_ffree;
1471 			osfs->os_files += temp->os_files;
1472 		}
1473 	}
1474 
1475 out_free_temp:
1476 	OBD_FREE(temp, sizeof(*temp));
1477 	return rc;
1478 }
1479 
lmv_getstatus(struct obd_export * exp,struct lu_fid * fid,struct obd_capa ** pc)1480 static int lmv_getstatus(struct obd_export *exp,
1481 			 struct lu_fid *fid,
1482 			 struct obd_capa **pc)
1483 {
1484 	struct obd_device    *obd = exp->exp_obd;
1485 	struct lmv_obd       *lmv = &obd->u.lmv;
1486 	int		   rc;
1487 
1488 	rc = lmv_check_connect(obd);
1489 	if (rc)
1490 		return rc;
1491 
1492 	rc = md_getstatus(lmv->tgts[0]->ltd_exp, fid, pc);
1493 	return rc;
1494 }
1495 
lmv_getxattr(struct obd_export * exp,const struct lu_fid * fid,struct obd_capa * oc,u64 valid,const char * name,const char * input,int input_size,int output_size,int flags,struct ptlrpc_request ** request)1496 static int lmv_getxattr(struct obd_export *exp, const struct lu_fid *fid,
1497 			struct obd_capa *oc, u64 valid, const char *name,
1498 			const char *input, int input_size, int output_size,
1499 			int flags, struct ptlrpc_request **request)
1500 {
1501 	struct obd_device      *obd = exp->exp_obd;
1502 	struct lmv_obd	 *lmv = &obd->u.lmv;
1503 	struct lmv_tgt_desc    *tgt;
1504 	int		     rc;
1505 
1506 	rc = lmv_check_connect(obd);
1507 	if (rc)
1508 		return rc;
1509 
1510 	tgt = lmv_find_target(lmv, fid);
1511 	if (IS_ERR(tgt))
1512 		return PTR_ERR(tgt);
1513 
1514 	rc = md_getxattr(tgt->ltd_exp, fid, oc, valid, name, input,
1515 			 input_size, output_size, flags, request);
1516 
1517 	return rc;
1518 }
1519 
lmv_setxattr(struct obd_export * exp,const struct lu_fid * fid,struct obd_capa * oc,u64 valid,const char * name,const char * input,int input_size,int output_size,int flags,__u32 suppgid,struct ptlrpc_request ** request)1520 static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid,
1521 			struct obd_capa *oc, u64 valid, const char *name,
1522 			const char *input, int input_size, int output_size,
1523 			int flags, __u32 suppgid,
1524 			struct ptlrpc_request **request)
1525 {
1526 	struct obd_device      *obd = exp->exp_obd;
1527 	struct lmv_obd	 *lmv = &obd->u.lmv;
1528 	struct lmv_tgt_desc    *tgt;
1529 	int		     rc;
1530 
1531 	rc = lmv_check_connect(obd);
1532 	if (rc)
1533 		return rc;
1534 
1535 	tgt = lmv_find_target(lmv, fid);
1536 	if (IS_ERR(tgt))
1537 		return PTR_ERR(tgt);
1538 
1539 	rc = md_setxattr(tgt->ltd_exp, fid, oc, valid, name, input,
1540 			 input_size, output_size, flags, suppgid,
1541 			 request);
1542 
1543 	return rc;
1544 }
1545 
lmv_getattr(struct obd_export * exp,struct md_op_data * op_data,struct ptlrpc_request ** request)1546 static int lmv_getattr(struct obd_export *exp, struct md_op_data *op_data,
1547 		       struct ptlrpc_request **request)
1548 {
1549 	struct obd_device       *obd = exp->exp_obd;
1550 	struct lmv_obd	  *lmv = &obd->u.lmv;
1551 	struct lmv_tgt_desc     *tgt;
1552 	int		      rc;
1553 
1554 	rc = lmv_check_connect(obd);
1555 	if (rc)
1556 		return rc;
1557 
1558 	tgt = lmv_find_target(lmv, &op_data->op_fid1);
1559 	if (IS_ERR(tgt))
1560 		return PTR_ERR(tgt);
1561 
1562 	if (op_data->op_flags & MF_GET_MDT_IDX) {
1563 		op_data->op_mds = tgt->ltd_idx;
1564 		return 0;
1565 	}
1566 
1567 	rc = md_getattr(tgt->ltd_exp, op_data, request);
1568 
1569 	return rc;
1570 }
1571 
lmv_null_inode(struct obd_export * exp,const struct lu_fid * fid)1572 static int lmv_null_inode(struct obd_export *exp, const struct lu_fid *fid)
1573 {
1574 	struct obd_device   *obd = exp->exp_obd;
1575 	struct lmv_obd      *lmv = &obd->u.lmv;
1576 	int		  i;
1577 	int		  rc;
1578 
1579 	rc = lmv_check_connect(obd);
1580 	if (rc)
1581 		return rc;
1582 
1583 	CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid));
1584 
1585 	/*
1586 	 * With DNE every object can have two locks in different namespaces:
1587 	 * lookup lock in space of MDT storing direntry and update/open lock in
1588 	 * space of MDT storing inode.
1589 	 */
1590 	for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1591 		if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
1592 			continue;
1593 		md_null_inode(lmv->tgts[i]->ltd_exp, fid);
1594 	}
1595 
1596 	return 0;
1597 }
1598 
lmv_find_cbdata(struct obd_export * exp,const struct lu_fid * fid,ldlm_iterator_t it,void * data)1599 static int lmv_find_cbdata(struct obd_export *exp, const struct lu_fid *fid,
1600 			   ldlm_iterator_t it, void *data)
1601 {
1602 	struct obd_device   *obd = exp->exp_obd;
1603 	struct lmv_obd      *lmv = &obd->u.lmv;
1604 	int		  i;
1605 	int		  rc;
1606 
1607 	rc = lmv_check_connect(obd);
1608 	if (rc)
1609 		return rc;
1610 
1611 	CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid));
1612 
1613 	/*
1614 	 * With DNE every object can have two locks in different namespaces:
1615 	 * lookup lock in space of MDT storing direntry and update/open lock in
1616 	 * space of MDT storing inode.
1617 	 */
1618 	for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1619 		if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
1620 			continue;
1621 		rc = md_find_cbdata(lmv->tgts[i]->ltd_exp, fid, it, data);
1622 		if (rc)
1623 			return rc;
1624 	}
1625 
1626 	return rc;
1627 }
1628 
1629 
lmv_close(struct obd_export * exp,struct md_op_data * op_data,struct md_open_data * mod,struct ptlrpc_request ** request)1630 static int lmv_close(struct obd_export *exp, struct md_op_data *op_data,
1631 		     struct md_open_data *mod, struct ptlrpc_request **request)
1632 {
1633 	struct obd_device     *obd = exp->exp_obd;
1634 	struct lmv_obd	*lmv = &obd->u.lmv;
1635 	struct lmv_tgt_desc   *tgt;
1636 	int		    rc;
1637 
1638 	rc = lmv_check_connect(obd);
1639 	if (rc)
1640 		return rc;
1641 
1642 	tgt = lmv_find_target(lmv, &op_data->op_fid1);
1643 	if (IS_ERR(tgt))
1644 		return PTR_ERR(tgt);
1645 
1646 	CDEBUG(D_INODE, "CLOSE "DFID"\n", PFID(&op_data->op_fid1));
1647 	rc = md_close(tgt->ltd_exp, op_data, mod, request);
1648 	return rc;
1649 }
1650 
1651 struct lmv_tgt_desc
lmv_locate_mds(struct lmv_obd * lmv,struct md_op_data * op_data,struct lu_fid * fid)1652 *lmv_locate_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
1653 		struct lu_fid *fid)
1654 {
1655 	struct lmv_tgt_desc *tgt;
1656 
1657 	tgt = lmv_find_target(lmv, fid);
1658 	if (IS_ERR(tgt))
1659 		return tgt;
1660 
1661 	op_data->op_mds = tgt->ltd_idx;
1662 
1663 	return tgt;
1664 }
1665 
lmv_create(struct obd_export * exp,struct md_op_data * op_data,const void * data,int datalen,int mode,__u32 uid,__u32 gid,cfs_cap_t cap_effective,__u64 rdev,struct ptlrpc_request ** request)1666 static int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
1667 		      const void *data, int datalen, int mode, __u32 uid,
1668 		      __u32 gid, cfs_cap_t cap_effective, __u64 rdev,
1669 		      struct ptlrpc_request **request)
1670 {
1671 	struct obd_device       *obd = exp->exp_obd;
1672 	struct lmv_obd	  *lmv = &obd->u.lmv;
1673 	struct lmv_tgt_desc     *tgt;
1674 	int		      rc;
1675 
1676 	rc = lmv_check_connect(obd);
1677 	if (rc)
1678 		return rc;
1679 
1680 	if (!lmv->desc.ld_active_tgt_count)
1681 		return -EIO;
1682 
1683 	tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1684 	if (IS_ERR(tgt))
1685 		return PTR_ERR(tgt);
1686 
1687 	rc = lmv_fid_alloc(exp, &op_data->op_fid2, op_data);
1688 	if (rc)
1689 		return rc;
1690 
1691 	CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #%x\n",
1692 	       op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1693 	       op_data->op_mds);
1694 
1695 	op_data->op_flags |= MF_MDC_CANCEL_FID1;
1696 	rc = md_create(tgt->ltd_exp, op_data, data, datalen, mode, uid, gid,
1697 		       cap_effective, rdev, request);
1698 
1699 	if (rc == 0) {
1700 		if (*request == NULL)
1701 			return rc;
1702 		CDEBUG(D_INODE, "Created - "DFID"\n", PFID(&op_data->op_fid2));
1703 	}
1704 	return rc;
1705 }
1706 
lmv_done_writing(struct obd_export * exp,struct md_op_data * op_data,struct md_open_data * mod)1707 static int lmv_done_writing(struct obd_export *exp,
1708 			    struct md_op_data *op_data,
1709 			    struct md_open_data *mod)
1710 {
1711 	struct obd_device     *obd = exp->exp_obd;
1712 	struct lmv_obd	*lmv = &obd->u.lmv;
1713 	struct lmv_tgt_desc   *tgt;
1714 	int		    rc;
1715 
1716 	rc = lmv_check_connect(obd);
1717 	if (rc)
1718 		return rc;
1719 
1720 	tgt = lmv_find_target(lmv, &op_data->op_fid1);
1721 	if (IS_ERR(tgt))
1722 		return PTR_ERR(tgt);
1723 
1724 	rc = md_done_writing(tgt->ltd_exp, op_data, mod);
1725 	return rc;
1726 }
1727 
1728 static int
lmv_enqueue_remote(struct obd_export * exp,struct ldlm_enqueue_info * einfo,struct lookup_intent * it,struct md_op_data * op_data,struct lustre_handle * lockh,void * lmm,int lmmsize,__u64 extra_lock_flags)1729 lmv_enqueue_remote(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1730 		   struct lookup_intent *it, struct md_op_data *op_data,
1731 		   struct lustre_handle *lockh, void *lmm, int lmmsize,
1732 		   __u64 extra_lock_flags)
1733 {
1734 	struct ptlrpc_request      *req = it->d.lustre.it_data;
1735 	struct obd_device	  *obd = exp->exp_obd;
1736 	struct lmv_obd	     *lmv = &obd->u.lmv;
1737 	struct lustre_handle	plock;
1738 	struct lmv_tgt_desc	*tgt;
1739 	struct md_op_data	  *rdata;
1740 	struct lu_fid	       fid1;
1741 	struct mdt_body	    *body;
1742 	int			 rc = 0;
1743 	int			 pmode;
1744 
1745 	body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1746 	LASSERT(body != NULL);
1747 
1748 	if (!(body->valid & OBD_MD_MDS))
1749 		return 0;
1750 
1751 	CDEBUG(D_INODE, "REMOTE_ENQUEUE '%s' on "DFID" -> "DFID"\n",
1752 	       LL_IT2STR(it), PFID(&op_data->op_fid1), PFID(&body->fid1));
1753 
1754 	/*
1755 	 * We got LOOKUP lock, but we really need attrs.
1756 	 */
1757 	pmode = it->d.lustre.it_lock_mode;
1758 	LASSERT(pmode != 0);
1759 	memcpy(&plock, lockh, sizeof(plock));
1760 	it->d.lustre.it_lock_mode = 0;
1761 	it->d.lustre.it_data = NULL;
1762 	fid1 = body->fid1;
1763 
1764 	ptlrpc_req_finished(req);
1765 
1766 	tgt = lmv_find_target(lmv, &fid1);
1767 	if (IS_ERR(tgt)) {
1768 		rc = PTR_ERR(tgt);
1769 		goto out;
1770 	}
1771 
1772 	OBD_ALLOC_PTR(rdata);
1773 	if (rdata == NULL) {
1774 		rc = -ENOMEM;
1775 		goto out;
1776 	}
1777 
1778 	rdata->op_fid1 = fid1;
1779 	rdata->op_bias = MDS_CROSS_REF;
1780 
1781 	rc = md_enqueue(tgt->ltd_exp, einfo, it, rdata, lockh,
1782 			lmm, lmmsize, NULL, extra_lock_flags);
1783 	OBD_FREE_PTR(rdata);
1784 out:
1785 	ldlm_lock_decref(&plock, pmode);
1786 	return rc;
1787 }
1788 
1789 static int
lmv_enqueue(struct obd_export * exp,struct ldlm_enqueue_info * einfo,struct lookup_intent * it,struct md_op_data * op_data,struct lustre_handle * lockh,void * lmm,int lmmsize,struct ptlrpc_request ** req,__u64 extra_lock_flags)1790 lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1791 	    struct lookup_intent *it, struct md_op_data *op_data,
1792 	    struct lustre_handle *lockh, void *lmm, int lmmsize,
1793 	    struct ptlrpc_request **req, __u64 extra_lock_flags)
1794 {
1795 	struct obd_device	*obd = exp->exp_obd;
1796 	struct lmv_obd	   *lmv = &obd->u.lmv;
1797 	struct lmv_tgt_desc      *tgt;
1798 	int		       rc;
1799 
1800 	rc = lmv_check_connect(obd);
1801 	if (rc)
1802 		return rc;
1803 
1804 	CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID"\n",
1805 	       LL_IT2STR(it), PFID(&op_data->op_fid1));
1806 
1807 	tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1808 	if (IS_ERR(tgt))
1809 		return PTR_ERR(tgt);
1810 
1811 	CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID" -> mds #%d\n",
1812 	       LL_IT2STR(it), PFID(&op_data->op_fid1), tgt->ltd_idx);
1813 
1814 	rc = md_enqueue(tgt->ltd_exp, einfo, it, op_data, lockh,
1815 			lmm, lmmsize, req, extra_lock_flags);
1816 
1817 	if (rc == 0 && it && it->it_op == IT_OPEN) {
1818 		rc = lmv_enqueue_remote(exp, einfo, it, op_data, lockh,
1819 					lmm, lmmsize, extra_lock_flags);
1820 	}
1821 	return rc;
1822 }
1823 
1824 static int
lmv_getattr_name(struct obd_export * exp,struct md_op_data * op_data,struct ptlrpc_request ** request)1825 lmv_getattr_name(struct obd_export *exp, struct md_op_data *op_data,
1826 		 struct ptlrpc_request **request)
1827 {
1828 	struct ptlrpc_request   *req = NULL;
1829 	struct obd_device       *obd = exp->exp_obd;
1830 	struct lmv_obd	  *lmv = &obd->u.lmv;
1831 	struct lmv_tgt_desc     *tgt;
1832 	struct mdt_body	 *body;
1833 	int		      rc;
1834 
1835 	rc = lmv_check_connect(obd);
1836 	if (rc)
1837 		return rc;
1838 
1839 	tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1840 	if (IS_ERR(tgt))
1841 		return PTR_ERR(tgt);
1842 
1843 	CDEBUG(D_INODE, "GETATTR_NAME for %*s on "DFID" -> mds #%d\n",
1844 	       op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1845 	       tgt->ltd_idx);
1846 
1847 	rc = md_getattr_name(tgt->ltd_exp, op_data, request);
1848 	if (rc != 0)
1849 		return rc;
1850 
1851 	body = req_capsule_server_get(&(*request)->rq_pill,
1852 				      &RMF_MDT_BODY);
1853 	LASSERT(body != NULL);
1854 
1855 	if (body->valid & OBD_MD_MDS) {
1856 		struct lu_fid rid = body->fid1;
1857 		CDEBUG(D_INODE, "Request attrs for "DFID"\n",
1858 		       PFID(&rid));
1859 
1860 		tgt = lmv_find_target(lmv, &rid);
1861 		if (IS_ERR(tgt)) {
1862 			ptlrpc_req_finished(*request);
1863 			return PTR_ERR(tgt);
1864 		}
1865 
1866 		op_data->op_fid1 = rid;
1867 		op_data->op_valid |= OBD_MD_FLCROSSREF;
1868 		op_data->op_namelen = 0;
1869 		op_data->op_name = NULL;
1870 		rc = md_getattr_name(tgt->ltd_exp, op_data, &req);
1871 		ptlrpc_req_finished(*request);
1872 		*request = req;
1873 	}
1874 
1875 	return rc;
1876 }
1877 
1878 #define md_op_data_fid(op_data, fl)		     \
1879 	(fl == MF_MDC_CANCEL_FID1 ? &op_data->op_fid1 : \
1880 	 fl == MF_MDC_CANCEL_FID2 ? &op_data->op_fid2 : \
1881 	 fl == MF_MDC_CANCEL_FID3 ? &op_data->op_fid3 : \
1882 	 fl == MF_MDC_CANCEL_FID4 ? &op_data->op_fid4 : \
1883 	 NULL)
1884 
lmv_early_cancel(struct obd_export * exp,struct md_op_data * op_data,int op_tgt,ldlm_mode_t mode,int bits,int flag)1885 static int lmv_early_cancel(struct obd_export *exp, struct md_op_data *op_data,
1886 			    int op_tgt, ldlm_mode_t mode, int bits, int flag)
1887 {
1888 	struct lu_fid	  *fid = md_op_data_fid(op_data, flag);
1889 	struct obd_device      *obd = exp->exp_obd;
1890 	struct lmv_obd	 *lmv = &obd->u.lmv;
1891 	struct lmv_tgt_desc    *tgt;
1892 	ldlm_policy_data_t      policy = {{0}};
1893 	int		     rc = 0;
1894 
1895 	if (!fid_is_sane(fid))
1896 		return 0;
1897 
1898 	tgt = lmv_find_target(lmv, fid);
1899 	if (IS_ERR(tgt))
1900 		return PTR_ERR(tgt);
1901 
1902 	if (tgt->ltd_idx != op_tgt) {
1903 		CDEBUG(D_INODE, "EARLY_CANCEL on "DFID"\n", PFID(fid));
1904 		policy.l_inodebits.bits = bits;
1905 		rc = md_cancel_unused(tgt->ltd_exp, fid, &policy,
1906 				      mode, LCF_ASYNC, NULL);
1907 	} else {
1908 		CDEBUG(D_INODE,
1909 		       "EARLY_CANCEL skip operation target %d on "DFID"\n",
1910 		       op_tgt, PFID(fid));
1911 		op_data->op_flags |= flag;
1912 		rc = 0;
1913 	}
1914 
1915 	return rc;
1916 }
1917 
1918 /*
1919  * llite passes fid of an target inode in op_data->op_fid1 and id of directory in
1920  * op_data->op_fid2
1921  */
lmv_link(struct obd_export * exp,struct md_op_data * op_data,struct ptlrpc_request ** request)1922 static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
1923 		    struct ptlrpc_request **request)
1924 {
1925 	struct obd_device       *obd = exp->exp_obd;
1926 	struct lmv_obd	  *lmv = &obd->u.lmv;
1927 	struct lmv_tgt_desc     *tgt;
1928 	int		      rc;
1929 
1930 	rc = lmv_check_connect(obd);
1931 	if (rc)
1932 		return rc;
1933 
1934 	LASSERT(op_data->op_namelen != 0);
1935 
1936 	CDEBUG(D_INODE, "LINK "DFID":%*s to "DFID"\n",
1937 	       PFID(&op_data->op_fid2), op_data->op_namelen,
1938 	       op_data->op_name, PFID(&op_data->op_fid1));
1939 
1940 	op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
1941 	op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
1942 	op_data->op_cap = cfs_curproc_cap_pack();
1943 	tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
1944 	if (IS_ERR(tgt))
1945 		return PTR_ERR(tgt);
1946 
1947 	/*
1948 	 * Cancel UPDATE lock on child (fid1).
1949 	 */
1950 	op_data->op_flags |= MF_MDC_CANCEL_FID2;
1951 	rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
1952 			      MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
1953 	if (rc != 0)
1954 		return rc;
1955 
1956 	rc = md_link(tgt->ltd_exp, op_data, request);
1957 
1958 	return rc;
1959 }
1960 
lmv_rename(struct obd_export * exp,struct md_op_data * op_data,const char * old,int oldlen,const char * new,int newlen,struct ptlrpc_request ** request)1961 static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
1962 		      const char *old, int oldlen, const char *new, int newlen,
1963 		      struct ptlrpc_request **request)
1964 {
1965 	struct obd_device       *obd = exp->exp_obd;
1966 	struct lmv_obd	  *lmv = &obd->u.lmv;
1967 	struct lmv_tgt_desc     *src_tgt;
1968 	struct lmv_tgt_desc     *tgt_tgt;
1969 	int			rc;
1970 
1971 	LASSERT(oldlen != 0);
1972 
1973 	CDEBUG(D_INODE, "RENAME %*s in "DFID" to %*s in "DFID"\n",
1974 	       oldlen, old, PFID(&op_data->op_fid1),
1975 	       newlen, new, PFID(&op_data->op_fid2));
1976 
1977 	rc = lmv_check_connect(obd);
1978 	if (rc)
1979 		return rc;
1980 
1981 	op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
1982 	op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
1983 	op_data->op_cap = cfs_curproc_cap_pack();
1984 	src_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1985 	if (IS_ERR(src_tgt))
1986 		return PTR_ERR(src_tgt);
1987 
1988 	tgt_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
1989 	if (IS_ERR(tgt_tgt))
1990 		return PTR_ERR(tgt_tgt);
1991 	/*
1992 	 * LOOKUP lock on src child (fid3) should also be cancelled for
1993 	 * src_tgt in mdc_rename.
1994 	 */
1995 	op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
1996 
1997 	/*
1998 	 * Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its
1999 	 * own target.
2000 	 */
2001 	rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
2002 			      LCK_EX, MDS_INODELOCK_UPDATE,
2003 			      MF_MDC_CANCEL_FID2);
2004 
2005 	/*
2006 	 * Cancel LOOKUP locks on tgt child (fid4) for parent tgt_tgt.
2007 	 */
2008 	if (rc == 0) {
2009 		rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
2010 				      LCK_EX, MDS_INODELOCK_LOOKUP,
2011 				      MF_MDC_CANCEL_FID4);
2012 	}
2013 
2014 	/*
2015 	 * Cancel all the locks on tgt child (fid4).
2016 	 */
2017 	if (rc == 0)
2018 		rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
2019 				      LCK_EX, MDS_INODELOCK_FULL,
2020 				      MF_MDC_CANCEL_FID4);
2021 
2022 	if (rc == 0)
2023 		rc = md_rename(src_tgt->ltd_exp, op_data, old, oldlen,
2024 			       new, newlen, request);
2025 	return rc;
2026 }
2027 
lmv_setattr(struct obd_export * exp,struct md_op_data * op_data,void * ea,int ealen,void * ea2,int ea2len,struct ptlrpc_request ** request,struct md_open_data ** mod)2028 static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
2029 		       void *ea, int ealen, void *ea2, int ea2len,
2030 		       struct ptlrpc_request **request,
2031 		       struct md_open_data **mod)
2032 {
2033 	struct obd_device       *obd = exp->exp_obd;
2034 	struct lmv_obd	  *lmv = &obd->u.lmv;
2035 	struct lmv_tgt_desc     *tgt;
2036 	int		      rc = 0;
2037 
2038 	rc = lmv_check_connect(obd);
2039 	if (rc)
2040 		return rc;
2041 
2042 	CDEBUG(D_INODE, "SETATTR for "DFID", valid 0x%x\n",
2043 	       PFID(&op_data->op_fid1), op_data->op_attr.ia_valid);
2044 
2045 	op_data->op_flags |= MF_MDC_CANCEL_FID1;
2046 	tgt = lmv_find_target(lmv, &op_data->op_fid1);
2047 	if (IS_ERR(tgt))
2048 		return PTR_ERR(tgt);
2049 
2050 	rc = md_setattr(tgt->ltd_exp, op_data, ea, ealen, ea2,
2051 			ea2len, request, mod);
2052 
2053 	return rc;
2054 }
2055 
lmv_sync(struct obd_export * exp,const struct lu_fid * fid,struct obd_capa * oc,struct ptlrpc_request ** request)2056 static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid,
2057 		    struct obd_capa *oc, struct ptlrpc_request **request)
2058 {
2059 	struct obd_device	 *obd = exp->exp_obd;
2060 	struct lmv_obd	    *lmv = &obd->u.lmv;
2061 	struct lmv_tgt_desc       *tgt;
2062 	int			rc;
2063 
2064 	rc = lmv_check_connect(obd);
2065 	if (rc)
2066 		return rc;
2067 
2068 	tgt = lmv_find_target(lmv, fid);
2069 	if (IS_ERR(tgt))
2070 		return PTR_ERR(tgt);
2071 
2072 	rc = md_sync(tgt->ltd_exp, fid, oc, request);
2073 	return rc;
2074 }
2075 
2076 /*
2077  * Adjust a set of pages, each page containing an array of lu_dirpages,
2078  * so that each page can be used as a single logical lu_dirpage.
2079  *
2080  * A lu_dirpage is laid out as follows, where s = ldp_hash_start,
2081  * e = ldp_hash_end, f = ldp_flags, p = padding, and each "ent" is a
2082  * struct lu_dirent.  It has size up to LU_PAGE_SIZE. The ldp_hash_end
2083  * value is used as a cookie to request the next lu_dirpage in a
2084  * directory listing that spans multiple pages (two in this example):
2085  *   ________
2086  *  |	|
2087  * .|--------v-------   -----.
2088  * |s|e|f|p|ent|ent| ... |ent|
2089  * '--|--------------   -----'   Each CFS_PAGE contains a single
2090  *    '------.		   lu_dirpage.
2091  * .---------v-------   -----.
2092  * |s|e|f|p|ent| 0 | ... | 0 |
2093  * '-----------------   -----'
2094  *
2095  * However, on hosts where the native VM page size (PAGE_CACHE_SIZE) is
2096  * larger than LU_PAGE_SIZE, a single host page may contain multiple
2097  * lu_dirpages. After reading the lu_dirpages from the MDS, the
2098  * ldp_hash_end of the first lu_dirpage refers to the one immediately
2099  * after it in the same CFS_PAGE (arrows simplified for brevity, but
2100  * in general e0==s1, e1==s2, etc.):
2101  *
2102  * .--------------------   -----.
2103  * |s0|e0|f0|p|ent|ent| ... |ent|
2104  * |---v----------------   -----|
2105  * |s1|e1|f1|p|ent|ent| ... |ent|
2106  * |---v----------------   -----|  Here, each CFS_PAGE contains
2107  *	     ...		 multiple lu_dirpages.
2108  * |---v----------------   -----|
2109  * |s'|e'|f'|p|ent|ent| ... |ent|
2110  * '---|----------------   -----'
2111  *     v
2112  * .----------------------------.
2113  * |	next CFS_PAGE       |
2114  *
2115  * This structure is transformed into a single logical lu_dirpage as follows:
2116  *
2117  * - Replace e0 with e' so the request for the next lu_dirpage gets the page
2118  *   labeled 'next CFS_PAGE'.
2119  *
2120  * - Copy the LDF_COLLIDE flag from f' to f0 to correctly reflect whether
2121  *   a hash collision with the next page exists.
2122  *
2123  * - Adjust the lde_reclen of the ending entry of each lu_dirpage to span
2124  *   to the first entry of the next lu_dirpage.
2125  */
2126 #if PAGE_CACHE_SIZE > LU_PAGE_SIZE
lmv_adjust_dirpages(struct page ** pages,int ncfspgs,int nlupgs)2127 static void lmv_adjust_dirpages(struct page **pages, int ncfspgs, int nlupgs)
2128 {
2129 	int i;
2130 
2131 	for (i = 0; i < ncfspgs; i++) {
2132 		struct lu_dirpage	*dp = kmap(pages[i]);
2133 		struct lu_dirpage	*first = dp;
2134 		struct lu_dirent	*end_dirent = NULL;
2135 		struct lu_dirent	*ent;
2136 		__u64			hash_end = dp->ldp_hash_end;
2137 		__u32			flags = dp->ldp_flags;
2138 
2139 		while (--nlupgs > 0) {
2140 			ent = lu_dirent_start(dp);
2141 			for (end_dirent = ent; ent != NULL;
2142 			     end_dirent = ent, ent = lu_dirent_next(ent));
2143 
2144 			/* Advance dp to next lu_dirpage. */
2145 			dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2146 
2147 			/* Check if we've reached the end of the CFS_PAGE. */
2148 			if (!((unsigned long)dp & ~CFS_PAGE_MASK))
2149 				break;
2150 
2151 			/* Save the hash and flags of this lu_dirpage. */
2152 			hash_end = dp->ldp_hash_end;
2153 			flags = dp->ldp_flags;
2154 
2155 			/* Check if lu_dirpage contains no entries. */
2156 			if (!end_dirent)
2157 				break;
2158 
2159 			/* Enlarge the end entry lde_reclen from 0 to
2160 			 * first entry of next lu_dirpage. */
2161 			LASSERT(le16_to_cpu(end_dirent->lde_reclen) == 0);
2162 			end_dirent->lde_reclen =
2163 				cpu_to_le16((char *)(dp->ldp_entries) -
2164 					    (char *)end_dirent);
2165 		}
2166 
2167 		first->ldp_hash_end = hash_end;
2168 		first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE);
2169 		first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE);
2170 
2171 		kunmap(pages[i]);
2172 	}
2173 	LASSERTF(nlupgs == 0, "left = %d", nlupgs);
2174 }
2175 #else
2176 #define lmv_adjust_dirpages(pages, ncfspgs, nlupgs) do {} while (0)
2177 #endif	/* PAGE_CACHE_SIZE > LU_PAGE_SIZE */
2178 
lmv_readpage(struct obd_export * exp,struct md_op_data * op_data,struct page ** pages,struct ptlrpc_request ** request)2179 static int lmv_readpage(struct obd_export *exp, struct md_op_data *op_data,
2180 			struct page **pages, struct ptlrpc_request **request)
2181 {
2182 	struct obd_device	*obd = exp->exp_obd;
2183 	struct lmv_obd		*lmv = &obd->u.lmv;
2184 	__u64			offset = op_data->op_offset;
2185 	int			rc;
2186 	int			ncfspgs; /* pages read in PAGE_CACHE_SIZE */
2187 	int			nlupgs; /* pages read in LU_PAGE_SIZE */
2188 	struct lmv_tgt_desc	*tgt;
2189 
2190 	rc = lmv_check_connect(obd);
2191 	if (rc)
2192 		return rc;
2193 
2194 	CDEBUG(D_INODE, "READPAGE at %#llx from "DFID"\n",
2195 	       offset, PFID(&op_data->op_fid1));
2196 
2197 	tgt = lmv_find_target(lmv, &op_data->op_fid1);
2198 	if (IS_ERR(tgt))
2199 		return PTR_ERR(tgt);
2200 
2201 	rc = md_readpage(tgt->ltd_exp, op_data, pages, request);
2202 	if (rc != 0)
2203 		return rc;
2204 
2205 	ncfspgs = ((*request)->rq_bulk->bd_nob_transferred + PAGE_CACHE_SIZE - 1)
2206 		 >> PAGE_CACHE_SHIFT;
2207 	nlupgs = (*request)->rq_bulk->bd_nob_transferred >> LU_PAGE_SHIFT;
2208 	LASSERT(!((*request)->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK));
2209 	LASSERT(ncfspgs > 0 && ncfspgs <= op_data->op_npages);
2210 
2211 	CDEBUG(D_INODE, "read %d(%d)/%d pages\n", ncfspgs, nlupgs,
2212 	       op_data->op_npages);
2213 
2214 	lmv_adjust_dirpages(pages, ncfspgs, nlupgs);
2215 
2216 	return rc;
2217 }
2218 
lmv_unlink(struct obd_export * exp,struct md_op_data * op_data,struct ptlrpc_request ** request)2219 static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
2220 		      struct ptlrpc_request **request)
2221 {
2222 	struct obd_device       *obd = exp->exp_obd;
2223 	struct lmv_obd	  *lmv = &obd->u.lmv;
2224 	struct lmv_tgt_desc     *tgt = NULL;
2225 	struct mdt_body		*body;
2226 	int		     rc;
2227 
2228 	rc = lmv_check_connect(obd);
2229 	if (rc)
2230 		return rc;
2231 retry:
2232 	/* Send unlink requests to the MDT where the child is located */
2233 	if (likely(!fid_is_zero(&op_data->op_fid2)))
2234 		tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
2235 	else
2236 		tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
2237 	if (IS_ERR(tgt))
2238 		return PTR_ERR(tgt);
2239 
2240 	op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
2241 	op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
2242 	op_data->op_cap = cfs_curproc_cap_pack();
2243 
2244 	/*
2245 	 * If child's fid is given, cancel unused locks for it if it is from
2246 	 * another export than parent.
2247 	 *
2248 	 * LOOKUP lock for child (fid3) should also be cancelled on parent
2249 	 * tgt_tgt in mdc_unlink().
2250 	 */
2251 	op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
2252 
2253 	/*
2254 	 * Cancel FULL locks on child (fid3).
2255 	 */
2256 	rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
2257 			      MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
2258 
2259 	if (rc != 0)
2260 		return rc;
2261 
2262 	CDEBUG(D_INODE, "unlink with fid="DFID"/"DFID" -> mds #%d\n",
2263 	       PFID(&op_data->op_fid1), PFID(&op_data->op_fid2), tgt->ltd_idx);
2264 
2265 	rc = md_unlink(tgt->ltd_exp, op_data, request);
2266 	if (rc != 0 && rc != -EREMOTE)
2267 		return rc;
2268 
2269 	body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY);
2270 	if (body == NULL)
2271 		return -EPROTO;
2272 
2273 	/* Not cross-ref case, just get out of here. */
2274 	if (likely(!(body->valid & OBD_MD_MDS)))
2275 		return 0;
2276 
2277 	CDEBUG(D_INODE, "%s: try unlink to another MDT for "DFID"\n",
2278 	       exp->exp_obd->obd_name, PFID(&body->fid1));
2279 
2280 	/* This is a remote object, try remote MDT, Note: it may
2281 	 * try more than 1 time here, Considering following case
2282 	 * /mnt/lustre is root on MDT0, remote1 is on MDT1
2283 	 * 1. Initially A does not know where remote1 is, it send
2284 	 *    unlink RPC to MDT0, MDT0 return -EREMOTE, it will
2285 	 *    resend unlink RPC to MDT1 (retry 1st time).
2286 	 *
2287 	 * 2. During the unlink RPC in flight,
2288 	 *    client B mv /mnt/lustre/remote1 /mnt/lustre/remote2
2289 	 *    and create new remote1, but on MDT0
2290 	 *
2291 	 * 3. MDT1 get unlink RPC(from A), then do remote lock on
2292 	 *    /mnt/lustre, then lookup get fid of remote1, and find
2293 	 *    it is remote dir again, and replay -EREMOTE again.
2294 	 *
2295 	 * 4. Then A will resend unlink RPC to MDT0. (retry 2nd times).
2296 	 *
2297 	 * In theory, it might try unlimited time here, but it should
2298 	 * be very rare case.  */
2299 	op_data->op_fid2 = body->fid1;
2300 	ptlrpc_req_finished(*request);
2301 	*request = NULL;
2302 
2303 	goto retry;
2304 }
2305 
lmv_precleanup(struct obd_device * obd,enum obd_cleanup_stage stage)2306 static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2307 {
2308 	struct lmv_obd *lmv = &obd->u.lmv;
2309 
2310 	switch (stage) {
2311 	case OBD_CLEANUP_EARLY:
2312 		/* XXX: here should be calling obd_precleanup() down to
2313 		 * stack. */
2314 		break;
2315 	case OBD_CLEANUP_EXPORTS:
2316 		fld_client_proc_fini(&lmv->lmv_fld);
2317 		lprocfs_obd_cleanup(obd);
2318 		break;
2319 	default:
2320 		break;
2321 	}
2322 	return 0;
2323 }
2324 
lmv_get_info(const struct lu_env * env,struct obd_export * exp,__u32 keylen,void * key,__u32 * vallen,void * val,struct lov_stripe_md * lsm)2325 static int lmv_get_info(const struct lu_env *env, struct obd_export *exp,
2326 			__u32 keylen, void *key, __u32 *vallen, void *val,
2327 			struct lov_stripe_md *lsm)
2328 {
2329 	struct obd_device       *obd;
2330 	struct lmv_obd	  *lmv;
2331 	int		      rc = 0;
2332 
2333 	obd = class_exp2obd(exp);
2334 	if (obd == NULL) {
2335 		CDEBUG(D_IOCTL, "Invalid client cookie %#llx\n",
2336 		       exp->exp_handle.h_cookie);
2337 		return -EINVAL;
2338 	}
2339 
2340 	lmv = &obd->u.lmv;
2341 	if (keylen >= strlen("remote_flag") && !strcmp(key, "remote_flag")) {
2342 		struct lmv_tgt_desc *tgt;
2343 		int i;
2344 
2345 		rc = lmv_check_connect(obd);
2346 		if (rc)
2347 			return rc;
2348 
2349 		LASSERT(*vallen == sizeof(__u32));
2350 		for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2351 			tgt = lmv->tgts[i];
2352 			/*
2353 			 * All tgts should be connected when this gets called.
2354 			 */
2355 			if (tgt == NULL || tgt->ltd_exp == NULL)
2356 				continue;
2357 
2358 			if (!obd_get_info(env, tgt->ltd_exp, keylen, key,
2359 					  vallen, val, NULL))
2360 				return 0;
2361 		}
2362 		return -EINVAL;
2363 	} else if (KEY_IS(KEY_MAX_EASIZE) ||
2364 		   KEY_IS(KEY_DEFAULT_EASIZE) ||
2365 		   KEY_IS(KEY_MAX_COOKIESIZE) ||
2366 		   KEY_IS(KEY_DEFAULT_COOKIESIZE) ||
2367 		   KEY_IS(KEY_CONN_DATA)) {
2368 		rc = lmv_check_connect(obd);
2369 		if (rc)
2370 			return rc;
2371 
2372 		/*
2373 		 * Forwarding this request to first MDS, it should know LOV
2374 		 * desc.
2375 		 */
2376 		rc = obd_get_info(env, lmv->tgts[0]->ltd_exp, keylen, key,
2377 				  vallen, val, NULL);
2378 		if (!rc && KEY_IS(KEY_CONN_DATA))
2379 			exp->exp_connect_data = *(struct obd_connect_data *)val;
2380 		return rc;
2381 	} else if (KEY_IS(KEY_TGT_COUNT)) {
2382 		*((int *)val) = lmv->desc.ld_tgt_count;
2383 		return 0;
2384 	}
2385 
2386 	CDEBUG(D_IOCTL, "Invalid key\n");
2387 	return -EINVAL;
2388 }
2389 
lmv_set_info_async(const struct lu_env * env,struct obd_export * exp,u32 keylen,void * key,u32 vallen,void * val,struct ptlrpc_request_set * set)2390 static int lmv_set_info_async(const struct lu_env *env, struct obd_export *exp,
2391 			      u32 keylen, void *key, u32 vallen,
2392 			      void *val, struct ptlrpc_request_set *set)
2393 {
2394 	struct lmv_tgt_desc    *tgt;
2395 	struct obd_device      *obd;
2396 	struct lmv_obd	 *lmv;
2397 	int rc = 0;
2398 
2399 	obd = class_exp2obd(exp);
2400 	if (obd == NULL) {
2401 		CDEBUG(D_IOCTL, "Invalid client cookie %#llx\n",
2402 		       exp->exp_handle.h_cookie);
2403 		return -EINVAL;
2404 	}
2405 	lmv = &obd->u.lmv;
2406 
2407 	if (KEY_IS(KEY_READ_ONLY) || KEY_IS(KEY_FLUSH_CTX)) {
2408 		int i, err = 0;
2409 
2410 		for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2411 			tgt = lmv->tgts[i];
2412 
2413 			if (tgt == NULL || tgt->ltd_exp == NULL)
2414 				continue;
2415 
2416 			err = obd_set_info_async(env, tgt->ltd_exp,
2417 						 keylen, key, vallen, val, set);
2418 			if (err && rc == 0)
2419 				rc = err;
2420 		}
2421 
2422 		return rc;
2423 	}
2424 
2425 	return -EINVAL;
2426 }
2427 
lmv_packmd(struct obd_export * exp,struct lov_mds_md ** lmmp,struct lov_stripe_md * lsm)2428 static int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
2429 		      struct lov_stripe_md *lsm)
2430 {
2431 	struct obd_device	 *obd = class_exp2obd(exp);
2432 	struct lmv_obd	    *lmv = &obd->u.lmv;
2433 	struct lmv_stripe_md      *meap;
2434 	struct lmv_stripe_md      *lsmp;
2435 	int			mea_size;
2436 	int			i;
2437 
2438 	mea_size = lmv_get_easize(lmv);
2439 	if (!lmmp)
2440 		return mea_size;
2441 
2442 	if (*lmmp && !lsm) {
2443 		OBD_FREE_LARGE(*lmmp, mea_size);
2444 		*lmmp = NULL;
2445 		return 0;
2446 	}
2447 
2448 	if (*lmmp == NULL) {
2449 		OBD_ALLOC_LARGE(*lmmp, mea_size);
2450 		if (*lmmp == NULL)
2451 			return -ENOMEM;
2452 	}
2453 
2454 	if (!lsm)
2455 		return mea_size;
2456 
2457 	lsmp = (struct lmv_stripe_md *)lsm;
2458 	meap = (struct lmv_stripe_md *)*lmmp;
2459 
2460 	if (lsmp->mea_magic != MEA_MAGIC_LAST_CHAR &&
2461 	    lsmp->mea_magic != MEA_MAGIC_ALL_CHARS)
2462 		return -EINVAL;
2463 
2464 	meap->mea_magic = cpu_to_le32(lsmp->mea_magic);
2465 	meap->mea_count = cpu_to_le32(lsmp->mea_count);
2466 	meap->mea_master = cpu_to_le32(lsmp->mea_master);
2467 
2468 	for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2469 		meap->mea_ids[i] = lsmp->mea_ids[i];
2470 		fid_cpu_to_le(&meap->mea_ids[i], &lsmp->mea_ids[i]);
2471 	}
2472 
2473 	return mea_size;
2474 }
2475 
lmv_unpackmd(struct obd_export * exp,struct lov_stripe_md ** lsmp,struct lov_mds_md * lmm,int lmm_size)2476 static int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
2477 			struct lov_mds_md *lmm, int lmm_size)
2478 {
2479 	struct obd_device	  *obd = class_exp2obd(exp);
2480 	struct lmv_stripe_md      **tmea = (struct lmv_stripe_md **)lsmp;
2481 	struct lmv_stripe_md       *mea = (struct lmv_stripe_md *)lmm;
2482 	struct lmv_obd	     *lmv = &obd->u.lmv;
2483 	int			 mea_size;
2484 	int			 i;
2485 	__u32		       magic;
2486 
2487 	mea_size = lmv_get_easize(lmv);
2488 	if (lsmp == NULL)
2489 		return mea_size;
2490 
2491 	if (*lsmp != NULL && lmm == NULL) {
2492 		OBD_FREE_LARGE(*tmea, mea_size);
2493 		*lsmp = NULL;
2494 		return 0;
2495 	}
2496 
2497 	LASSERT(mea_size == lmm_size);
2498 
2499 	OBD_ALLOC_LARGE(*tmea, mea_size);
2500 	if (*tmea == NULL)
2501 		return -ENOMEM;
2502 
2503 	if (!lmm)
2504 		return mea_size;
2505 
2506 	if (mea->mea_magic == MEA_MAGIC_LAST_CHAR ||
2507 	    mea->mea_magic == MEA_MAGIC_ALL_CHARS ||
2508 	    mea->mea_magic == MEA_MAGIC_HASH_SEGMENT) {
2509 		magic = le32_to_cpu(mea->mea_magic);
2510 	} else {
2511 		/*
2512 		 * Old mea is not handled here.
2513 		 */
2514 		CERROR("Old not supportable EA is found\n");
2515 		LBUG();
2516 	}
2517 
2518 	(*tmea)->mea_magic = magic;
2519 	(*tmea)->mea_count = le32_to_cpu(mea->mea_count);
2520 	(*tmea)->mea_master = le32_to_cpu(mea->mea_master);
2521 
2522 	for (i = 0; i < (*tmea)->mea_count; i++) {
2523 		(*tmea)->mea_ids[i] = mea->mea_ids[i];
2524 		fid_le_to_cpu(&(*tmea)->mea_ids[i], &(*tmea)->mea_ids[i]);
2525 	}
2526 	return mea_size;
2527 }
2528 
lmv_cancel_unused(struct obd_export * exp,const struct lu_fid * fid,ldlm_policy_data_t * policy,ldlm_mode_t mode,ldlm_cancel_flags_t flags,void * opaque)2529 static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
2530 			     ldlm_policy_data_t *policy, ldlm_mode_t mode,
2531 			     ldlm_cancel_flags_t flags, void *opaque)
2532 {
2533 	struct obd_device       *obd = exp->exp_obd;
2534 	struct lmv_obd	  *lmv = &obd->u.lmv;
2535 	int		      rc = 0;
2536 	int		      err;
2537 	int		      i;
2538 
2539 	LASSERT(fid != NULL);
2540 
2541 	for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2542 		if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL ||
2543 		    lmv->tgts[i]->ltd_active == 0)
2544 			continue;
2545 
2546 		err = md_cancel_unused(lmv->tgts[i]->ltd_exp, fid,
2547 				       policy, mode, flags, opaque);
2548 		if (!rc)
2549 			rc = err;
2550 	}
2551 	return rc;
2552 }
2553 
lmv_set_lock_data(struct obd_export * exp,__u64 * lockh,void * data,__u64 * bits)2554 static int lmv_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
2555 			     __u64 *bits)
2556 {
2557 	struct lmv_obd	  *lmv = &exp->exp_obd->u.lmv;
2558 	int		      rc;
2559 
2560 	rc =  md_set_lock_data(lmv->tgts[0]->ltd_exp, lockh, data, bits);
2561 	return rc;
2562 }
2563 
lmv_lock_match(struct obd_export * exp,__u64 flags,const struct lu_fid * fid,ldlm_type_t type,ldlm_policy_data_t * policy,ldlm_mode_t mode,struct lustre_handle * lockh)2564 static ldlm_mode_t lmv_lock_match(struct obd_export *exp, __u64 flags,
2565 				  const struct lu_fid *fid, ldlm_type_t type,
2566 				  ldlm_policy_data_t *policy, ldlm_mode_t mode,
2567 				  struct lustre_handle *lockh)
2568 {
2569 	struct obd_device       *obd = exp->exp_obd;
2570 	struct lmv_obd	  *lmv = &obd->u.lmv;
2571 	ldlm_mode_t	      rc;
2572 	int		      i;
2573 
2574 	CDEBUG(D_INODE, "Lock match for "DFID"\n", PFID(fid));
2575 
2576 	/*
2577 	 * With CMD every object can have two locks in different namespaces:
2578 	 * lookup lock in space of mds storing direntry and update/open lock in
2579 	 * space of mds storing inode. Thus we check all targets, not only that
2580 	 * one fid was created in.
2581 	 */
2582 	for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2583 		if (lmv->tgts[i] == NULL ||
2584 		    lmv->tgts[i]->ltd_exp == NULL ||
2585 		    lmv->tgts[i]->ltd_active == 0)
2586 			continue;
2587 
2588 		rc = md_lock_match(lmv->tgts[i]->ltd_exp, flags, fid,
2589 				   type, policy, mode, lockh);
2590 		if (rc)
2591 			return rc;
2592 	}
2593 
2594 	return 0;
2595 }
2596 
lmv_get_lustre_md(struct obd_export * exp,struct ptlrpc_request * req,struct obd_export * dt_exp,struct obd_export * md_exp,struct lustre_md * md)2597 static int lmv_get_lustre_md(struct obd_export *exp,
2598 			     struct ptlrpc_request *req,
2599 			     struct obd_export *dt_exp,
2600 			     struct obd_export *md_exp,
2601 			     struct lustre_md *md)
2602 {
2603 	struct lmv_obd	  *lmv = &exp->exp_obd->u.lmv;
2604 
2605 	return md_get_lustre_md(lmv->tgts[0]->ltd_exp, req, dt_exp, md_exp, md);
2606 }
2607 
lmv_free_lustre_md(struct obd_export * exp,struct lustre_md * md)2608 static int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
2609 {
2610 	struct obd_device       *obd = exp->exp_obd;
2611 	struct lmv_obd	  *lmv = &obd->u.lmv;
2612 
2613 	if (md->mea)
2614 		obd_free_memmd(exp, (void *)&md->mea);
2615 	return md_free_lustre_md(lmv->tgts[0]->ltd_exp, md);
2616 }
2617 
lmv_set_open_replay_data(struct obd_export * exp,struct obd_client_handle * och,struct lookup_intent * it)2618 static int lmv_set_open_replay_data(struct obd_export *exp,
2619 				    struct obd_client_handle *och,
2620 				    struct lookup_intent *it)
2621 {
2622 	struct obd_device       *obd = exp->exp_obd;
2623 	struct lmv_obd	  *lmv = &obd->u.lmv;
2624 	struct lmv_tgt_desc     *tgt;
2625 
2626 	tgt = lmv_find_target(lmv, &och->och_fid);
2627 	if (IS_ERR(tgt))
2628 		return PTR_ERR(tgt);
2629 
2630 	return md_set_open_replay_data(tgt->ltd_exp, och, it);
2631 }
2632 
lmv_clear_open_replay_data(struct obd_export * exp,struct obd_client_handle * och)2633 static int lmv_clear_open_replay_data(struct obd_export *exp,
2634 				      struct obd_client_handle *och)
2635 {
2636 	struct obd_device       *obd = exp->exp_obd;
2637 	struct lmv_obd	  *lmv = &obd->u.lmv;
2638 	struct lmv_tgt_desc     *tgt;
2639 
2640 	tgt = lmv_find_target(lmv, &och->och_fid);
2641 	if (IS_ERR(tgt))
2642 		return PTR_ERR(tgt);
2643 
2644 	return md_clear_open_replay_data(tgt->ltd_exp, och);
2645 }
2646 
lmv_get_remote_perm(struct obd_export * exp,const struct lu_fid * fid,struct obd_capa * oc,__u32 suppgid,struct ptlrpc_request ** request)2647 static int lmv_get_remote_perm(struct obd_export *exp,
2648 			       const struct lu_fid *fid,
2649 			       struct obd_capa *oc, __u32 suppgid,
2650 			       struct ptlrpc_request **request)
2651 {
2652 	struct obd_device       *obd = exp->exp_obd;
2653 	struct lmv_obd	  *lmv = &obd->u.lmv;
2654 	struct lmv_tgt_desc     *tgt;
2655 	int		      rc;
2656 
2657 	rc = lmv_check_connect(obd);
2658 	if (rc)
2659 		return rc;
2660 
2661 	tgt = lmv_find_target(lmv, fid);
2662 	if (IS_ERR(tgt))
2663 		return PTR_ERR(tgt);
2664 
2665 	rc = md_get_remote_perm(tgt->ltd_exp, fid, oc, suppgid, request);
2666 	return rc;
2667 }
2668 
lmv_renew_capa(struct obd_export * exp,struct obd_capa * oc,renew_capa_cb_t cb)2669 static int lmv_renew_capa(struct obd_export *exp, struct obd_capa *oc,
2670 			  renew_capa_cb_t cb)
2671 {
2672 	struct obd_device       *obd = exp->exp_obd;
2673 	struct lmv_obd	  *lmv = &obd->u.lmv;
2674 	struct lmv_tgt_desc     *tgt;
2675 	int		      rc;
2676 
2677 	rc = lmv_check_connect(obd);
2678 	if (rc)
2679 		return rc;
2680 
2681 	tgt = lmv_find_target(lmv, &oc->c_capa.lc_fid);
2682 	if (IS_ERR(tgt))
2683 		return PTR_ERR(tgt);
2684 
2685 	rc = md_renew_capa(tgt->ltd_exp, oc, cb);
2686 	return rc;
2687 }
2688 
lmv_unpack_capa(struct obd_export * exp,struct ptlrpc_request * req,const struct req_msg_field * field,struct obd_capa ** oc)2689 static int lmv_unpack_capa(struct obd_export *exp, struct ptlrpc_request *req,
2690 			   const struct req_msg_field *field,
2691 			   struct obd_capa **oc)
2692 {
2693 	struct lmv_obd *lmv = &exp->exp_obd->u.lmv;
2694 
2695 	return md_unpack_capa(lmv->tgts[0]->ltd_exp, req, field, oc);
2696 }
2697 
lmv_intent_getattr_async(struct obd_export * exp,struct md_enqueue_info * minfo,struct ldlm_enqueue_info * einfo)2698 static int lmv_intent_getattr_async(struct obd_export *exp,
2699 				    struct md_enqueue_info *minfo,
2700 				    struct ldlm_enqueue_info *einfo)
2701 {
2702 	struct md_op_data       *op_data = &minfo->mi_data;
2703 	struct obd_device       *obd = exp->exp_obd;
2704 	struct lmv_obd	  *lmv = &obd->u.lmv;
2705 	struct lmv_tgt_desc     *tgt = NULL;
2706 	int		      rc;
2707 
2708 	rc = lmv_check_connect(obd);
2709 	if (rc)
2710 		return rc;
2711 
2712 	tgt = lmv_find_target(lmv, &op_data->op_fid1);
2713 	if (IS_ERR(tgt))
2714 		return PTR_ERR(tgt);
2715 
2716 	rc = md_intent_getattr_async(tgt->ltd_exp, minfo, einfo);
2717 	return rc;
2718 }
2719 
lmv_revalidate_lock(struct obd_export * exp,struct lookup_intent * it,struct lu_fid * fid,__u64 * bits)2720 static int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
2721 			       struct lu_fid *fid, __u64 *bits)
2722 {
2723 	struct obd_device       *obd = exp->exp_obd;
2724 	struct lmv_obd	  *lmv = &obd->u.lmv;
2725 	struct lmv_tgt_desc     *tgt;
2726 	int		      rc;
2727 
2728 	rc = lmv_check_connect(obd);
2729 	if (rc)
2730 		return rc;
2731 
2732 	tgt = lmv_find_target(lmv, fid);
2733 	if (IS_ERR(tgt))
2734 		return PTR_ERR(tgt);
2735 
2736 	rc = md_revalidate_lock(tgt->ltd_exp, it, fid, bits);
2737 	return rc;
2738 }
2739 
2740 /**
2741  * For lmv, only need to send request to master MDT, and the master MDT will
2742  * process with other slave MDTs. The only exception is Q_GETOQUOTA for which
2743  * we directly fetch data from the slave MDTs.
2744  */
lmv_quotactl(struct obd_device * unused,struct obd_export * exp,struct obd_quotactl * oqctl)2745 static int lmv_quotactl(struct obd_device *unused, struct obd_export *exp,
2746 			struct obd_quotactl *oqctl)
2747 {
2748 	struct obd_device   *obd = class_exp2obd(exp);
2749 	struct lmv_obd      *lmv = &obd->u.lmv;
2750 	struct lmv_tgt_desc *tgt = lmv->tgts[0];
2751 	int		  rc = 0, i;
2752 	__u64		curspace, curinodes;
2753 
2754 	if (!lmv->desc.ld_tgt_count || !tgt->ltd_active) {
2755 		CERROR("master lmv inactive\n");
2756 		return -EIO;
2757 	}
2758 
2759 	if (oqctl->qc_cmd != Q_GETOQUOTA) {
2760 		rc = obd_quotactl(tgt->ltd_exp, oqctl);
2761 		return rc;
2762 	}
2763 
2764 	curspace = curinodes = 0;
2765 	for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2766 		int err;
2767 		tgt = lmv->tgts[i];
2768 
2769 		if (tgt == NULL || tgt->ltd_exp == NULL || tgt->ltd_active == 0)
2770 			continue;
2771 		if (!tgt->ltd_active) {
2772 			CDEBUG(D_HA, "mdt %d is inactive.\n", i);
2773 			continue;
2774 		}
2775 
2776 		err = obd_quotactl(tgt->ltd_exp, oqctl);
2777 		if (err) {
2778 			CERROR("getquota on mdt %d failed. %d\n", i, err);
2779 			if (!rc)
2780 				rc = err;
2781 		} else {
2782 			curspace += oqctl->qc_dqblk.dqb_curspace;
2783 			curinodes += oqctl->qc_dqblk.dqb_curinodes;
2784 		}
2785 	}
2786 	oqctl->qc_dqblk.dqb_curspace = curspace;
2787 	oqctl->qc_dqblk.dqb_curinodes = curinodes;
2788 
2789 	return rc;
2790 }
2791 
lmv_quotacheck(struct obd_device * unused,struct obd_export * exp,struct obd_quotactl * oqctl)2792 static int lmv_quotacheck(struct obd_device *unused, struct obd_export *exp,
2793 			  struct obd_quotactl *oqctl)
2794 {
2795 	struct obd_device   *obd = class_exp2obd(exp);
2796 	struct lmv_obd      *lmv = &obd->u.lmv;
2797 	struct lmv_tgt_desc *tgt;
2798 	int		  i, rc = 0;
2799 
2800 	for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2801 		int err;
2802 		tgt = lmv->tgts[i];
2803 		if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active) {
2804 			CERROR("lmv idx %d inactive\n", i);
2805 			return -EIO;
2806 		}
2807 
2808 		err = obd_quotacheck(tgt->ltd_exp, oqctl);
2809 		if (err && !rc)
2810 			rc = err;
2811 	}
2812 
2813 	return rc;
2814 }
2815 
2816 static struct obd_ops lmv_obd_ops = {
2817 	.o_owner		= THIS_MODULE,
2818 	.o_setup		= lmv_setup,
2819 	.o_cleanup	      = lmv_cleanup,
2820 	.o_precleanup	   = lmv_precleanup,
2821 	.o_process_config       = lmv_process_config,
2822 	.o_connect	      = lmv_connect,
2823 	.o_disconnect	   = lmv_disconnect,
2824 	.o_statfs	       = lmv_statfs,
2825 	.o_get_info	     = lmv_get_info,
2826 	.o_set_info_async       = lmv_set_info_async,
2827 	.o_packmd	       = lmv_packmd,
2828 	.o_unpackmd	     = lmv_unpackmd,
2829 	.o_notify	       = lmv_notify,
2830 	.o_get_uuid	     = lmv_get_uuid,
2831 	.o_iocontrol	    = lmv_iocontrol,
2832 	.o_quotacheck	   = lmv_quotacheck,
2833 	.o_quotactl	     = lmv_quotactl
2834 };
2835 
2836 static struct md_ops lmv_md_ops = {
2837 	.m_getstatus	    = lmv_getstatus,
2838 	.m_null_inode		= lmv_null_inode,
2839 	.m_find_cbdata	  = lmv_find_cbdata,
2840 	.m_close		= lmv_close,
2841 	.m_create	       = lmv_create,
2842 	.m_done_writing	 = lmv_done_writing,
2843 	.m_enqueue	      = lmv_enqueue,
2844 	.m_getattr	      = lmv_getattr,
2845 	.m_getxattr	     = lmv_getxattr,
2846 	.m_getattr_name	 = lmv_getattr_name,
2847 	.m_intent_lock	  = lmv_intent_lock,
2848 	.m_link		 = lmv_link,
2849 	.m_rename	       = lmv_rename,
2850 	.m_setattr	      = lmv_setattr,
2851 	.m_setxattr	     = lmv_setxattr,
2852 	.m_sync		 = lmv_sync,
2853 	.m_readpage	     = lmv_readpage,
2854 	.m_unlink	       = lmv_unlink,
2855 	.m_init_ea_size	 = lmv_init_ea_size,
2856 	.m_cancel_unused	= lmv_cancel_unused,
2857 	.m_set_lock_data	= lmv_set_lock_data,
2858 	.m_lock_match	   = lmv_lock_match,
2859 	.m_get_lustre_md	= lmv_get_lustre_md,
2860 	.m_free_lustre_md       = lmv_free_lustre_md,
2861 	.m_set_open_replay_data = lmv_set_open_replay_data,
2862 	.m_clear_open_replay_data = lmv_clear_open_replay_data,
2863 	.m_renew_capa	   = lmv_renew_capa,
2864 	.m_unpack_capa	  = lmv_unpack_capa,
2865 	.m_get_remote_perm      = lmv_get_remote_perm,
2866 	.m_intent_getattr_async = lmv_intent_getattr_async,
2867 	.m_revalidate_lock      = lmv_revalidate_lock
2868 };
2869 
lmv_init(void)2870 static int __init lmv_init(void)
2871 {
2872 	struct lprocfs_static_vars lvars;
2873 	int			rc;
2874 
2875 	lprocfs_lmv_init_vars(&lvars);
2876 
2877 	rc = class_register_type(&lmv_obd_ops, &lmv_md_ops,
2878 				 lvars.module_vars, LUSTRE_LMV_NAME, NULL);
2879 	return rc;
2880 }
2881 
lmv_exit(void)2882 static void lmv_exit(void)
2883 {
2884 	class_unregister_type(LUSTRE_LMV_NAME);
2885 }
2886 
2887 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2888 MODULE_DESCRIPTION("Lustre Logical Metadata Volume OBD driver");
2889 MODULE_LICENSE("GPL");
2890 
2891 module_init(lmv_init);
2892 module_exit(lmv_exit);
2893