1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36 
37 #define DEBUG_SUBSYSTEM S_LMV
38 #include <linux/slab.h>
39 #include <linux/module.h>
40 #include <linux/init.h>
41 #include <linux/pagemap.h>
42 #include <linux/mm.h>
43 #include <asm/div64.h>
44 #include <linux/seq_file.h>
45 #include <linux/namei.h>
46 #include <linux/uaccess.h>
47 
48 #include "../include/lustre/lustre_idl.h"
49 #include "../include/obd_support.h"
50 #include "../include/lustre_lib.h"
51 #include "../include/lustre_net.h"
52 #include "../include/obd_class.h"
53 #include "../include/lprocfs_status.h"
54 #include "../include/lustre_lite.h"
55 #include "../include/lustre_fid.h"
56 #include "lmv_internal.h"
57 
lmv_activate_target(struct lmv_obd * lmv,struct lmv_tgt_desc * tgt,int activate)58 static void lmv_activate_target(struct lmv_obd *lmv,
59 				struct lmv_tgt_desc *tgt,
60 				int activate)
61 {
62 	if (tgt->ltd_active == activate)
63 		return;
64 
65 	tgt->ltd_active = activate;
66 	lmv->desc.ld_active_tgt_count += (activate ? 1 : -1);
67 }
68 
69 /**
70  * Error codes:
71  *
72  *  -EINVAL  : UUID can't be found in the LMV's target list
73  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
74  *  -EBADF   : The UUID is found, but the OBD of the wrong type (!)
75  */
lmv_set_mdc_active(struct lmv_obd * lmv,struct obd_uuid * uuid,int activate)76 static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
77 			      int activate)
78 {
79 	struct lmv_tgt_desc    *uninitialized_var(tgt);
80 	struct obd_device      *obd;
81 	int		     i;
82 	int		     rc = 0;
83 
84 	CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n",
85 	       lmv, uuid->uuid, activate);
86 
87 	spin_lock(&lmv->lmv_lock);
88 	for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
89 		tgt = lmv->tgts[i];
90 		if (tgt == NULL || tgt->ltd_exp == NULL)
91 			continue;
92 
93 		CDEBUG(D_INFO, "Target idx %d is %s conn %#llx\n", i,
94 		       tgt->ltd_uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
95 
96 		if (obd_uuid_equals(uuid, &tgt->ltd_uuid))
97 			break;
98 	}
99 
100 	if (i == lmv->desc.ld_tgt_count) {
101 		rc = -EINVAL;
102 		goto out_lmv_lock;
103 	}
104 
105 	obd = class_exp2obd(tgt->ltd_exp);
106 	if (obd == NULL) {
107 		rc = -ENOTCONN;
108 		goto out_lmv_lock;
109 	}
110 
111 	CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LMV idx %d\n",
112 	       obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
113 	       obd->obd_type->typ_name, i);
114 	LASSERT(strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) == 0);
115 
116 	if (tgt->ltd_active == activate) {
117 		CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
118 		       activate ? "" : "in");
119 		goto out_lmv_lock;
120 	}
121 
122 	CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd,
123 	       activate ? "" : "in");
124 	lmv_activate_target(lmv, tgt, activate);
125 
126  out_lmv_lock:
127 	spin_unlock(&lmv->lmv_lock);
128 	return rc;
129 }
130 
lmv_get_uuid(struct obd_export * exp)131 static struct obd_uuid *lmv_get_uuid(struct obd_export *exp)
132 {
133 	struct lmv_obd *lmv = &exp->exp_obd->u.lmv;
134 
135 	return obd_get_uuid(lmv->tgts[0]->ltd_exp);
136 }
137 
lmv_notify(struct obd_device * obd,struct obd_device * watched,enum obd_notify_event ev,void * data)138 static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
139 		      enum obd_notify_event ev, void *data)
140 {
141 	struct obd_connect_data *conn_data;
142 	struct lmv_obd	  *lmv = &obd->u.lmv;
143 	struct obd_uuid	 *uuid;
144 	int		      rc = 0;
145 
146 	if (strcmp(watched->obd_type->typ_name, LUSTRE_MDC_NAME)) {
147 		CERROR("unexpected notification of %s %s!\n",
148 		       watched->obd_type->typ_name,
149 		       watched->obd_name);
150 		return -EINVAL;
151 	}
152 
153 	uuid = &watched->u.cli.cl_target_uuid;
154 	if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE) {
155 		/*
156 		 * Set MDC as active before notifying the observer, so the
157 		 * observer can use the MDC normally.
158 		 */
159 		rc = lmv_set_mdc_active(lmv, uuid,
160 					ev == OBD_NOTIFY_ACTIVE);
161 		if (rc) {
162 			CERROR("%sactivation of %s failed: %d\n",
163 			       ev == OBD_NOTIFY_ACTIVE ? "" : "de",
164 			       uuid->uuid, rc);
165 			return rc;
166 		}
167 	} else if (ev == OBD_NOTIFY_OCD) {
168 		conn_data = &watched->u.cli.cl_import->imp_connect_data;
169 		/*
170 		 * XXX: Make sure that ocd_connect_flags from all targets are
171 		 * the same. Otherwise one of MDTs runs wrong version or
172 		 * something like this.  --umka
173 		 */
174 		obd->obd_self_export->exp_connect_data = *conn_data;
175 	}
176 #if 0
177 	else if (ev == OBD_NOTIFY_DISCON) {
178 		/*
179 		 * For disconnect event, flush fld cache for failout MDS case.
180 		 */
181 		fld_client_flush(&lmv->lmv_fld);
182 	}
183 #endif
184 	/*
185 	 * Pass the notification up the chain.
186 	 */
187 	if (obd->obd_observer)
188 		rc = obd_notify(obd->obd_observer, watched, ev, data);
189 
190 	return rc;
191 }
192 
193 /**
194  * This is fake connect function. Its purpose is to initialize lmv and say
195  * caller that everything is okay. Real connection will be performed later.
196  */
lmv_connect(const struct lu_env * env,struct obd_export ** exp,struct obd_device * obd,struct obd_uuid * cluuid,struct obd_connect_data * data,void * localdata)197 static int lmv_connect(const struct lu_env *env,
198 		       struct obd_export **exp, struct obd_device *obd,
199 		       struct obd_uuid *cluuid, struct obd_connect_data *data,
200 		       void *localdata)
201 {
202 	struct lmv_obd	*lmv = &obd->u.lmv;
203 	struct lustre_handle  conn = { 0 };
204 	int		    rc = 0;
205 
206 	/*
207 	 * We don't want to actually do the underlying connections more than
208 	 * once, so keep track.
209 	 */
210 	lmv->refcount++;
211 	if (lmv->refcount > 1) {
212 		*exp = NULL;
213 		return 0;
214 	}
215 
216 	rc = class_connect(&conn, obd, cluuid);
217 	if (rc) {
218 		CERROR("class_connection() returned %d\n", rc);
219 		return rc;
220 	}
221 
222 	*exp = class_conn2export(&conn);
223 	class_export_get(*exp);
224 
225 	lmv->exp = *exp;
226 	lmv->connected = 0;
227 	lmv->cluuid = *cluuid;
228 
229 	if (data)
230 		lmv->conn_data = *data;
231 
232 	lmv->lmv_tgts_kobj = kobject_create_and_add("target_obds",
233 						    &obd->obd_kobj);
234 	/*
235 	 * All real clients should perform actual connection right away, because
236 	 * it is possible, that LMV will not have opportunity to connect targets
237 	 * and MDC stuff will be called directly, for instance while reading
238 	 * ../mdc/../kbytesfree procfs file, etc.
239 	 */
240 	if (data->ocd_connect_flags & OBD_CONNECT_REAL)
241 		rc = lmv_check_connect(obd);
242 
243 	if (rc && lmv->lmv_tgts_kobj)
244 		kobject_put(lmv->lmv_tgts_kobj);
245 
246 	return rc;
247 }
248 
lmv_set_timeouts(struct obd_device * obd)249 static void lmv_set_timeouts(struct obd_device *obd)
250 {
251 	struct lmv_tgt_desc   *tgt;
252 	struct lmv_obd	*lmv;
253 	int		    i;
254 
255 	lmv = &obd->u.lmv;
256 	if (lmv->server_timeout == 0)
257 		return;
258 
259 	if (lmv->connected == 0)
260 		return;
261 
262 	for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
263 		tgt = lmv->tgts[i];
264 		if (tgt == NULL || tgt->ltd_exp == NULL || tgt->ltd_active == 0)
265 			continue;
266 
267 		obd_set_info_async(NULL, tgt->ltd_exp, sizeof(KEY_INTERMDS),
268 				   KEY_INTERMDS, 0, NULL, NULL);
269 	}
270 }
271 
lmv_init_ea_size(struct obd_export * exp,int easize,int def_easize,int cookiesize,int def_cookiesize)272 static int lmv_init_ea_size(struct obd_export *exp, int easize,
273 			    int def_easize, int cookiesize, int def_cookiesize)
274 {
275 	struct obd_device   *obd = exp->exp_obd;
276 	struct lmv_obd      *lmv = &obd->u.lmv;
277 	int		  i;
278 	int		  rc = 0;
279 	int		  change = 0;
280 
281 	if (lmv->max_easize < easize) {
282 		lmv->max_easize = easize;
283 		change = 1;
284 	}
285 	if (lmv->max_def_easize < def_easize) {
286 		lmv->max_def_easize = def_easize;
287 		change = 1;
288 	}
289 	if (lmv->max_cookiesize < cookiesize) {
290 		lmv->max_cookiesize = cookiesize;
291 		change = 1;
292 	}
293 	if (lmv->max_def_cookiesize < def_cookiesize) {
294 		lmv->max_def_cookiesize = def_cookiesize;
295 		change = 1;
296 	}
297 	if (change == 0)
298 		return 0;
299 
300 	if (lmv->connected == 0)
301 		return 0;
302 
303 	for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
304 		if (lmv->tgts[i] == NULL ||
305 		    lmv->tgts[i]->ltd_exp == NULL ||
306 		    lmv->tgts[i]->ltd_active == 0) {
307 			CWARN("%s: NULL export for %d\n", obd->obd_name, i);
308 			continue;
309 		}
310 
311 		rc = md_init_ea_size(lmv->tgts[i]->ltd_exp, easize, def_easize,
312 				     cookiesize, def_cookiesize);
313 		if (rc) {
314 			CERROR("%s: obd_init_ea_size() failed on MDT target %d: rc = %d.\n",
315 			       obd->obd_name, i, rc);
316 			break;
317 		}
318 	}
319 	return rc;
320 }
321 
322 #define MAX_STRING_SIZE 128
323 
lmv_connect_mdc(struct obd_device * obd,struct lmv_tgt_desc * tgt)324 static int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
325 {
326 	struct lmv_obd	  *lmv = &obd->u.lmv;
327 	struct obd_uuid	 *cluuid = &lmv->cluuid;
328 	struct obd_uuid	  lmv_mdc_uuid = { "LMV_MDC_UUID" };
329 	struct obd_device       *mdc_obd;
330 	struct obd_export       *mdc_exp;
331 	struct lu_fld_target     target;
332 	int		      rc;
333 
334 	mdc_obd = class_find_client_obd(&tgt->ltd_uuid, LUSTRE_MDC_NAME,
335 					&obd->obd_uuid);
336 	if (!mdc_obd) {
337 		CERROR("target %s not attached\n", tgt->ltd_uuid.uuid);
338 		return -EINVAL;
339 	}
340 
341 	CDEBUG(D_CONFIG, "connect to %s(%s) - %s, %s FOR %s\n",
342 		mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
343 		tgt->ltd_uuid.uuid, obd->obd_uuid.uuid,
344 		cluuid->uuid);
345 
346 	if (!mdc_obd->obd_set_up) {
347 		CERROR("target %s is not set up\n", tgt->ltd_uuid.uuid);
348 		return -EINVAL;
349 	}
350 
351 	rc = obd_connect(NULL, &mdc_exp, mdc_obd, &lmv_mdc_uuid,
352 			 &lmv->conn_data, NULL);
353 	if (rc) {
354 		CERROR("target %s connect error %d\n", tgt->ltd_uuid.uuid, rc);
355 		return rc;
356 	}
357 
358 	/*
359 	 * Init fid sequence client for this mdc and add new fld target.
360 	 */
361 	rc = obd_fid_init(mdc_obd, mdc_exp, LUSTRE_SEQ_METADATA);
362 	if (rc)
363 		return rc;
364 
365 	target.ft_srv = NULL;
366 	target.ft_exp = mdc_exp;
367 	target.ft_idx = tgt->ltd_idx;
368 
369 	fld_client_add_target(&lmv->lmv_fld, &target);
370 
371 	rc = obd_register_observer(mdc_obd, obd);
372 	if (rc) {
373 		obd_disconnect(mdc_exp);
374 		CERROR("target %s register_observer error %d\n",
375 		       tgt->ltd_uuid.uuid, rc);
376 		return rc;
377 	}
378 
379 	if (obd->obd_observer) {
380 		/*
381 		 * Tell the observer about the new target.
382 		 */
383 		rc = obd_notify(obd->obd_observer, mdc_exp->exp_obd,
384 				OBD_NOTIFY_ACTIVE,
385 				(void *)(tgt - lmv->tgts[0]));
386 		if (rc) {
387 			obd_disconnect(mdc_exp);
388 			return rc;
389 		}
390 	}
391 
392 	tgt->ltd_active = 1;
393 	tgt->ltd_exp = mdc_exp;
394 	lmv->desc.ld_active_tgt_count++;
395 
396 	md_init_ea_size(tgt->ltd_exp, lmv->max_easize, lmv->max_def_easize,
397 			lmv->max_cookiesize, lmv->max_def_cookiesize);
398 
399 	CDEBUG(D_CONFIG, "Connected to %s(%s) successfully (%d)\n",
400 		mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
401 		atomic_read(&obd->obd_refcount));
402 
403 	if (lmv->lmv_tgts_kobj)
404 		/* Even if we failed to create the link, that's fine */
405 		rc = sysfs_create_link(lmv->lmv_tgts_kobj, &mdc_obd->obd_kobj,
406 				       mdc_obd->obd_name);
407 	return 0;
408 }
409 
lmv_del_target(struct lmv_obd * lmv,int index)410 static void lmv_del_target(struct lmv_obd *lmv, int index)
411 {
412 	if (lmv->tgts[index] == NULL)
413 		return;
414 
415 	kfree(lmv->tgts[index]);
416 	lmv->tgts[index] = NULL;
417 	return;
418 }
419 
lmv_add_target(struct obd_device * obd,struct obd_uuid * uuidp,__u32 index,int gen)420 static int lmv_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
421 			   __u32 index, int gen)
422 {
423 	struct lmv_obd      *lmv = &obd->u.lmv;
424 	struct lmv_tgt_desc *tgt;
425 	int		  rc = 0;
426 
427 	CDEBUG(D_CONFIG, "Target uuid: %s. index %d\n", uuidp->uuid, index);
428 
429 	lmv_init_lock(lmv);
430 
431 	if (lmv->desc.ld_tgt_count == 0) {
432 		struct obd_device *mdc_obd;
433 
434 		mdc_obd = class_find_client_obd(uuidp, LUSTRE_MDC_NAME,
435 						&obd->obd_uuid);
436 		if (!mdc_obd) {
437 			lmv_init_unlock(lmv);
438 			CERROR("%s: Target %s not attached: rc = %d\n",
439 			       obd->obd_name, uuidp->uuid, -EINVAL);
440 			return -EINVAL;
441 		}
442 	}
443 
444 	if ((index < lmv->tgts_size) && (lmv->tgts[index] != NULL)) {
445 		tgt = lmv->tgts[index];
446 		CERROR("%s: UUID %s already assigned at LOV target index %d: rc = %d\n",
447 		       obd->obd_name,
448 		       obd_uuid2str(&tgt->ltd_uuid), index, -EEXIST);
449 		lmv_init_unlock(lmv);
450 		return -EEXIST;
451 	}
452 
453 	if (index >= lmv->tgts_size) {
454 		/* We need to reallocate the lmv target array. */
455 		struct lmv_tgt_desc **newtgts, **old = NULL;
456 		__u32 newsize = 1;
457 		__u32 oldsize = 0;
458 
459 		while (newsize < index + 1)
460 			newsize <<= 1;
461 		newtgts = kcalloc(newsize, sizeof(*newtgts), GFP_NOFS);
462 		if (newtgts == NULL) {
463 			lmv_init_unlock(lmv);
464 			return -ENOMEM;
465 		}
466 
467 		if (lmv->tgts_size) {
468 			memcpy(newtgts, lmv->tgts,
469 			       sizeof(*newtgts) * lmv->tgts_size);
470 			old = lmv->tgts;
471 			oldsize = lmv->tgts_size;
472 		}
473 
474 		lmv->tgts = newtgts;
475 		lmv->tgts_size = newsize;
476 		smp_rmb();
477 		kfree(old);
478 
479 		CDEBUG(D_CONFIG, "tgts: %p size: %d\n", lmv->tgts,
480 		       lmv->tgts_size);
481 	}
482 
483 	tgt = kzalloc(sizeof(*tgt), GFP_NOFS);
484 	if (!tgt) {
485 		lmv_init_unlock(lmv);
486 		return -ENOMEM;
487 	}
488 
489 	mutex_init(&tgt->ltd_fid_mutex);
490 	tgt->ltd_idx = index;
491 	tgt->ltd_uuid = *uuidp;
492 	tgt->ltd_active = 0;
493 	lmv->tgts[index] = tgt;
494 	if (index >= lmv->desc.ld_tgt_count)
495 		lmv->desc.ld_tgt_count = index + 1;
496 
497 	if (lmv->connected) {
498 		rc = lmv_connect_mdc(obd, tgt);
499 		if (rc) {
500 			spin_lock(&lmv->lmv_lock);
501 			lmv->desc.ld_tgt_count--;
502 			memset(tgt, 0, sizeof(*tgt));
503 			spin_unlock(&lmv->lmv_lock);
504 		} else {
505 			int easize = sizeof(struct lmv_stripe_md) +
506 				lmv->desc.ld_tgt_count * sizeof(struct lu_fid);
507 			lmv_init_ea_size(obd->obd_self_export, easize, 0, 0, 0);
508 		}
509 	}
510 
511 	lmv_init_unlock(lmv);
512 	return rc;
513 }
514 
lmv_check_connect(struct obd_device * obd)515 int lmv_check_connect(struct obd_device *obd)
516 {
517 	struct lmv_obd       *lmv = &obd->u.lmv;
518 	struct lmv_tgt_desc  *tgt;
519 	int		   i;
520 	int		   rc;
521 	int		   easize;
522 
523 	if (lmv->connected)
524 		return 0;
525 
526 	lmv_init_lock(lmv);
527 	if (lmv->connected) {
528 		lmv_init_unlock(lmv);
529 		return 0;
530 	}
531 
532 	if (lmv->desc.ld_tgt_count == 0) {
533 		lmv_init_unlock(lmv);
534 		CERROR("%s: no targets configured.\n", obd->obd_name);
535 		return -EINVAL;
536 	}
537 
538 	CDEBUG(D_CONFIG, "Time to connect %s to %s\n",
539 	       lmv->cluuid.uuid, obd->obd_name);
540 
541 	LASSERT(lmv->tgts != NULL);
542 
543 	for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
544 		tgt = lmv->tgts[i];
545 		if (tgt == NULL)
546 			continue;
547 		rc = lmv_connect_mdc(obd, tgt);
548 		if (rc)
549 			goto out_disc;
550 	}
551 
552 	lmv_set_timeouts(obd);
553 	class_export_put(lmv->exp);
554 	lmv->connected = 1;
555 	easize = lmv_get_easize(lmv);
556 	lmv_init_ea_size(obd->obd_self_export, easize, 0, 0, 0);
557 	lmv_init_unlock(lmv);
558 	return 0;
559 
560  out_disc:
561 	while (i-- > 0) {
562 		int rc2;
563 
564 		tgt = lmv->tgts[i];
565 		if (tgt == NULL)
566 			continue;
567 		tgt->ltd_active = 0;
568 		if (tgt->ltd_exp) {
569 			--lmv->desc.ld_active_tgt_count;
570 			rc2 = obd_disconnect(tgt->ltd_exp);
571 			if (rc2) {
572 				CERROR("LMV target %s disconnect on MDC idx %d: error %d\n",
573 				       tgt->ltd_uuid.uuid, i, rc2);
574 			}
575 		}
576 	}
577 	class_disconnect(lmv->exp);
578 	lmv_init_unlock(lmv);
579 	return rc;
580 }
581 
lmv_disconnect_mdc(struct obd_device * obd,struct lmv_tgt_desc * tgt)582 static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
583 {
584 	struct lmv_obd	 *lmv = &obd->u.lmv;
585 	struct obd_device      *mdc_obd;
586 	int		     rc;
587 
588 	LASSERT(tgt != NULL);
589 	LASSERT(obd != NULL);
590 
591 	mdc_obd = class_exp2obd(tgt->ltd_exp);
592 
593 	if (mdc_obd) {
594 		mdc_obd->obd_force = obd->obd_force;
595 		mdc_obd->obd_fail = obd->obd_fail;
596 		mdc_obd->obd_no_recov = obd->obd_no_recov;
597 
598 		if (lmv->lmv_tgts_kobj)
599 			sysfs_remove_link(lmv->lmv_tgts_kobj,
600 					  mdc_obd->obd_name);
601 	}
602 
603 	rc = obd_fid_fini(tgt->ltd_exp->exp_obd);
604 	if (rc)
605 		CERROR("Can't finalize fids factory\n");
606 
607 	CDEBUG(D_INFO, "Disconnected from %s(%s) successfully\n",
608 	       tgt->ltd_exp->exp_obd->obd_name,
609 	       tgt->ltd_exp->exp_obd->obd_uuid.uuid);
610 
611 	obd_register_observer(tgt->ltd_exp->exp_obd, NULL);
612 	rc = obd_disconnect(tgt->ltd_exp);
613 	if (rc) {
614 		if (tgt->ltd_active) {
615 			CERROR("Target %s disconnect error %d\n",
616 			       tgt->ltd_uuid.uuid, rc);
617 		}
618 	}
619 
620 	lmv_activate_target(lmv, tgt, 0);
621 	tgt->ltd_exp = NULL;
622 	return 0;
623 }
624 
lmv_disconnect(struct obd_export * exp)625 static int lmv_disconnect(struct obd_export *exp)
626 {
627 	struct obd_device     *obd = class_exp2obd(exp);
628 	struct lmv_obd	*lmv = &obd->u.lmv;
629 	int		    rc;
630 	int		    i;
631 
632 	if (!lmv->tgts)
633 		goto out_local;
634 
635 	/*
636 	 * Only disconnect the underlying layers on the final disconnect.
637 	 */
638 	lmv->refcount--;
639 	if (lmv->refcount != 0)
640 		goto out_local;
641 
642 	for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
643 		if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
644 			continue;
645 
646 		lmv_disconnect_mdc(obd, lmv->tgts[i]);
647 	}
648 
649 	if (lmv->lmv_tgts_kobj)
650 		kobject_put(lmv->lmv_tgts_kobj);
651 
652 out_local:
653 	/*
654 	 * This is the case when no real connection is established by
655 	 * lmv_check_connect().
656 	 */
657 	if (!lmv->connected)
658 		class_export_put(exp);
659 	rc = class_disconnect(exp);
660 	if (lmv->refcount == 0)
661 		lmv->connected = 0;
662 	return rc;
663 }
664 
lmv_fid2path(struct obd_export * exp,int len,void * karg,void * uarg)665 static int lmv_fid2path(struct obd_export *exp, int len, void *karg, void *uarg)
666 {
667 	struct obd_device	*obddev = class_exp2obd(exp);
668 	struct lmv_obd		*lmv = &obddev->u.lmv;
669 	struct getinfo_fid2path *gf;
670 	struct lmv_tgt_desc     *tgt;
671 	struct getinfo_fid2path *remote_gf = NULL;
672 	int			remote_gf_size = 0;
673 	int			rc;
674 
675 	gf = (struct getinfo_fid2path *)karg;
676 	tgt = lmv_find_target(lmv, &gf->gf_fid);
677 	if (IS_ERR(tgt))
678 		return PTR_ERR(tgt);
679 
680 repeat_fid2path:
681 	rc = obd_iocontrol(OBD_IOC_FID2PATH, tgt->ltd_exp, len, gf, uarg);
682 	if (rc != 0 && rc != -EREMOTE)
683 		goto out_fid2path;
684 
685 	/* If remote_gf != NULL, it means just building the
686 	 * path on the remote MDT, copy this path segment to gf */
687 	if (remote_gf != NULL) {
688 		struct getinfo_fid2path *ori_gf;
689 		char *ptr;
690 
691 		ori_gf = (struct getinfo_fid2path *)karg;
692 		if (strlen(ori_gf->gf_path) +
693 		    strlen(gf->gf_path) > ori_gf->gf_pathlen) {
694 			rc = -EOVERFLOW;
695 			goto out_fid2path;
696 		}
697 
698 		ptr = ori_gf->gf_path;
699 
700 		memmove(ptr + strlen(gf->gf_path) + 1, ptr,
701 			strlen(ori_gf->gf_path));
702 
703 		strncpy(ptr, gf->gf_path, strlen(gf->gf_path));
704 		ptr += strlen(gf->gf_path);
705 		*ptr = '/';
706 	}
707 
708 	CDEBUG(D_INFO, "%s: get path %s "DFID" rec: %llu ln: %u\n",
709 	       tgt->ltd_exp->exp_obd->obd_name,
710 	       gf->gf_path, PFID(&gf->gf_fid), gf->gf_recno,
711 	       gf->gf_linkno);
712 
713 	if (rc == 0)
714 		goto out_fid2path;
715 
716 	/* sigh, has to go to another MDT to do path building further */
717 	if (remote_gf == NULL) {
718 		remote_gf_size = sizeof(*remote_gf) + PATH_MAX;
719 		remote_gf = kzalloc(remote_gf_size, GFP_NOFS);
720 		if (!remote_gf) {
721 			rc = -ENOMEM;
722 			goto out_fid2path;
723 		}
724 		remote_gf->gf_pathlen = PATH_MAX;
725 	}
726 
727 	if (!fid_is_sane(&gf->gf_fid)) {
728 		CERROR("%s: invalid FID "DFID": rc = %d\n",
729 		       tgt->ltd_exp->exp_obd->obd_name,
730 		       PFID(&gf->gf_fid), -EINVAL);
731 		rc = -EINVAL;
732 		goto out_fid2path;
733 	}
734 
735 	tgt = lmv_find_target(lmv, &gf->gf_fid);
736 	if (IS_ERR(tgt)) {
737 		rc = -EINVAL;
738 		goto out_fid2path;
739 	}
740 
741 	remote_gf->gf_fid = gf->gf_fid;
742 	remote_gf->gf_recno = -1;
743 	remote_gf->gf_linkno = -1;
744 	memset(remote_gf->gf_path, 0, remote_gf->gf_pathlen);
745 	gf = remote_gf;
746 	goto repeat_fid2path;
747 
748 out_fid2path:
749 	kfree(remote_gf);
750 	return rc;
751 }
752 
lmv_hsm_req_count(struct lmv_obd * lmv,const struct hsm_user_request * hur,const struct lmv_tgt_desc * tgt_mds)753 static int lmv_hsm_req_count(struct lmv_obd *lmv,
754 			     const struct hsm_user_request *hur,
755 			     const struct lmv_tgt_desc *tgt_mds)
756 {
757 	int			i, nr = 0;
758 	struct lmv_tgt_desc    *curr_tgt;
759 
760 	/* count how many requests must be sent to the given target */
761 	for (i = 0; i < hur->hur_request.hr_itemcount; i++) {
762 		curr_tgt = lmv_find_target(lmv, &hur->hur_user_item[i].hui_fid);
763 		if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid))
764 			nr++;
765 	}
766 	return nr;
767 }
768 
lmv_hsm_req_build(struct lmv_obd * lmv,struct hsm_user_request * hur_in,const struct lmv_tgt_desc * tgt_mds,struct hsm_user_request * hur_out)769 static void lmv_hsm_req_build(struct lmv_obd *lmv,
770 			      struct hsm_user_request *hur_in,
771 			      const struct lmv_tgt_desc *tgt_mds,
772 			      struct hsm_user_request *hur_out)
773 {
774 	int			i, nr_out;
775 	struct lmv_tgt_desc    *curr_tgt;
776 
777 	/* build the hsm_user_request for the given target */
778 	hur_out->hur_request = hur_in->hur_request;
779 	nr_out = 0;
780 	for (i = 0; i < hur_in->hur_request.hr_itemcount; i++) {
781 		curr_tgt = lmv_find_target(lmv,
782 					&hur_in->hur_user_item[i].hui_fid);
783 		if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid)) {
784 			hur_out->hur_user_item[nr_out] =
785 				hur_in->hur_user_item[i];
786 			nr_out++;
787 		}
788 	}
789 	hur_out->hur_request.hr_itemcount = nr_out;
790 	memcpy(hur_data(hur_out), hur_data(hur_in),
791 	       hur_in->hur_request.hr_data_len);
792 }
793 
lmv_hsm_ct_unregister(struct lmv_obd * lmv,unsigned int cmd,int len,struct lustre_kernelcomm * lk,void * uarg)794 static int lmv_hsm_ct_unregister(struct lmv_obd *lmv, unsigned int cmd, int len,
795 				 struct lustre_kernelcomm *lk, void *uarg)
796 {
797 	int	i, rc = 0;
798 
799 	/* unregister request (call from llapi_hsm_copytool_fini) */
800 	for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
801 		/* best effort: try to clean as much as possible
802 		 * (continue on error) */
803 		obd_iocontrol(cmd, lmv->tgts[i]->ltd_exp, len, lk, uarg);
804 	}
805 
806 	/* Whatever the result, remove copytool from kuc groups.
807 	 * Unreached coordinators will get EPIPE on next requests
808 	 * and will unregister automatically.
809 	 */
810 	rc = libcfs_kkuc_group_rem(lk->lk_uid, lk->lk_group);
811 	return rc;
812 }
813 
lmv_hsm_ct_register(struct lmv_obd * lmv,unsigned int cmd,int len,struct lustre_kernelcomm * lk,void * uarg)814 static int lmv_hsm_ct_register(struct lmv_obd *lmv, unsigned int cmd, int len,
815 			       struct lustre_kernelcomm *lk, void *uarg)
816 {
817 	struct file	*filp;
818 	int		 i, j, err;
819 	int		 rc = 0;
820 	bool		 any_set = false;
821 
822 	/* All or nothing: try to register to all MDS.
823 	 * In case of failure, unregister from previous MDS,
824 	 * except if it because of inactive target. */
825 	for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
826 		err = obd_iocontrol(cmd, lmv->tgts[i]->ltd_exp,
827 				   len, lk, uarg);
828 		if (err) {
829 			if (lmv->tgts[i]->ltd_active) {
830 				/* permanent error */
831 				CERROR("error: iocontrol MDC %s on MDTidx %d cmd %x: err = %d\n",
832 				       lmv->tgts[i]->ltd_uuid.uuid,
833 				       i, cmd, err);
834 				rc = err;
835 				lk->lk_flags |= LK_FLG_STOP;
836 				/* unregister from previous MDS */
837 				for (j = 0; j < i; j++)
838 					obd_iocontrol(cmd,
839 						  lmv->tgts[j]->ltd_exp,
840 						  len, lk, uarg);
841 				return rc;
842 			}
843 			/* else: transient error.
844 			 * kuc will register to the missing MDT
845 			 * when it is back */
846 		} else {
847 			any_set = true;
848 		}
849 	}
850 
851 	if (!any_set)
852 		/* no registration done: return error */
853 		return -ENOTCONN;
854 
855 	/* at least one registration done, with no failure */
856 	filp = fget(lk->lk_wfd);
857 	if (filp == NULL) {
858 		return -EBADF;
859 	}
860 	rc = libcfs_kkuc_group_add(filp, lk->lk_uid, lk->lk_group, lk->lk_data);
861 	if (rc != 0 && filp != NULL)
862 		fput(filp);
863 	return rc;
864 }
865 
lmv_iocontrol(unsigned int cmd,struct obd_export * exp,int len,void * karg,void * uarg)866 static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
867 			 int len, void *karg, void *uarg)
868 {
869 	struct obd_device    *obddev = class_exp2obd(exp);
870 	struct lmv_obd       *lmv = &obddev->u.lmv;
871 	int		   i = 0;
872 	int		   rc = 0;
873 	int		   set = 0;
874 	int		   count = lmv->desc.ld_tgt_count;
875 
876 	if (count == 0)
877 		return -ENOTTY;
878 
879 	switch (cmd) {
880 	case IOC_OBD_STATFS: {
881 		struct obd_ioctl_data *data = karg;
882 		struct obd_device *mdc_obd;
883 		struct obd_statfs stat_buf = {0};
884 		__u32 index;
885 
886 		memcpy(&index, data->ioc_inlbuf2, sizeof(__u32));
887 		if (index >= count)
888 			return -ENODEV;
889 
890 		if (lmv->tgts[index] == NULL ||
891 		    lmv->tgts[index]->ltd_active == 0)
892 			return -ENODATA;
893 
894 		mdc_obd = class_exp2obd(lmv->tgts[index]->ltd_exp);
895 		if (!mdc_obd)
896 			return -EINVAL;
897 
898 		/* copy UUID */
899 		if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd),
900 				     min((int) data->ioc_plen2,
901 					 (int) sizeof(struct obd_uuid))))
902 			return -EFAULT;
903 
904 		rc = obd_statfs(NULL, lmv->tgts[index]->ltd_exp, &stat_buf,
905 				cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
906 				0);
907 		if (rc)
908 			return rc;
909 		if (copy_to_user(data->ioc_pbuf1, &stat_buf,
910 				     min((int) data->ioc_plen1,
911 					 (int) sizeof(stat_buf))))
912 			return -EFAULT;
913 		break;
914 	}
915 	case OBD_IOC_QUOTACTL: {
916 		struct if_quotactl *qctl = karg;
917 		struct lmv_tgt_desc *tgt = NULL;
918 		struct obd_quotactl *oqctl;
919 
920 		if (qctl->qc_valid == QC_MDTIDX) {
921 			if (qctl->qc_idx < 0 || count <= qctl->qc_idx)
922 				return -EINVAL;
923 
924 			tgt = lmv->tgts[qctl->qc_idx];
925 			if (tgt == NULL || tgt->ltd_exp == NULL)
926 				return -EINVAL;
927 		} else if (qctl->qc_valid == QC_UUID) {
928 			for (i = 0; i < count; i++) {
929 				tgt = lmv->tgts[i];
930 				if (tgt == NULL)
931 					continue;
932 				if (!obd_uuid_equals(&tgt->ltd_uuid,
933 						     &qctl->obd_uuid))
934 					continue;
935 
936 				if (tgt->ltd_exp == NULL)
937 					return -EINVAL;
938 
939 				break;
940 			}
941 		} else {
942 			return -EINVAL;
943 		}
944 
945 		if (i >= count)
946 			return -EAGAIN;
947 
948 		LASSERT(tgt && tgt->ltd_exp);
949 		oqctl = kzalloc(sizeof(*oqctl), GFP_NOFS);
950 		if (!oqctl)
951 			return -ENOMEM;
952 
953 		QCTL_COPY(oqctl, qctl);
954 		rc = obd_quotactl(tgt->ltd_exp, oqctl);
955 		if (rc == 0) {
956 			QCTL_COPY(qctl, oqctl);
957 			qctl->qc_valid = QC_MDTIDX;
958 			qctl->obd_uuid = tgt->ltd_uuid;
959 		}
960 		kfree(oqctl);
961 		break;
962 	}
963 	case OBD_IOC_CHANGELOG_SEND:
964 	case OBD_IOC_CHANGELOG_CLEAR: {
965 		struct ioc_changelog *icc = karg;
966 
967 		if (icc->icc_mdtindex >= count)
968 			return -ENODEV;
969 
970 		if (lmv->tgts[icc->icc_mdtindex] == NULL ||
971 		    lmv->tgts[icc->icc_mdtindex]->ltd_exp == NULL ||
972 		    lmv->tgts[icc->icc_mdtindex]->ltd_active == 0)
973 			return -ENODEV;
974 		rc = obd_iocontrol(cmd, lmv->tgts[icc->icc_mdtindex]->ltd_exp,
975 				   sizeof(*icc), icc, NULL);
976 		break;
977 	}
978 	case LL_IOC_GET_CONNECT_FLAGS: {
979 		if (lmv->tgts[0] == NULL)
980 			return -ENODATA;
981 		rc = obd_iocontrol(cmd, lmv->tgts[0]->ltd_exp, len, karg, uarg);
982 		break;
983 	}
984 	case OBD_IOC_FID2PATH: {
985 		rc = lmv_fid2path(exp, len, karg, uarg);
986 		break;
987 	}
988 	case LL_IOC_HSM_STATE_GET:
989 	case LL_IOC_HSM_STATE_SET:
990 	case LL_IOC_HSM_ACTION: {
991 		struct md_op_data	*op_data = karg;
992 		struct lmv_tgt_desc	*tgt;
993 
994 		tgt = lmv_find_target(lmv, &op_data->op_fid1);
995 		if (IS_ERR(tgt))
996 				return PTR_ERR(tgt);
997 
998 		if (tgt->ltd_exp == NULL)
999 				return -EINVAL;
1000 
1001 		rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1002 		break;
1003 	}
1004 	case LL_IOC_HSM_PROGRESS: {
1005 		const struct hsm_progress_kernel *hpk = karg;
1006 		struct lmv_tgt_desc	*tgt;
1007 
1008 		tgt = lmv_find_target(lmv, &hpk->hpk_fid);
1009 		if (IS_ERR(tgt))
1010 			return PTR_ERR(tgt);
1011 		rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1012 		break;
1013 	}
1014 	case LL_IOC_HSM_REQUEST: {
1015 		struct hsm_user_request *hur = karg;
1016 		struct lmv_tgt_desc	*tgt;
1017 		unsigned int reqcount = hur->hur_request.hr_itemcount;
1018 
1019 		if (reqcount == 0)
1020 			return 0;
1021 
1022 		/* if the request is about a single fid
1023 		 * or if there is a single MDS, no need to split
1024 		 * the request. */
1025 		if (reqcount == 1 || count == 1) {
1026 			tgt = lmv_find_target(lmv,
1027 					      &hur->hur_user_item[0].hui_fid);
1028 			if (IS_ERR(tgt))
1029 				return PTR_ERR(tgt);
1030 			rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1031 		} else {
1032 			/* split fid list to their respective MDS */
1033 			for (i = 0; i < count; i++) {
1034 				unsigned int		nr, reqlen;
1035 				int			rc1;
1036 				struct hsm_user_request *req;
1037 
1038 				nr = lmv_hsm_req_count(lmv, hur, lmv->tgts[i]);
1039 				if (nr == 0) /* nothing for this MDS */
1040 					continue;
1041 
1042 				/* build a request with fids for this MDS */
1043 				reqlen = offsetof(typeof(*hur),
1044 						  hur_user_item[nr])
1045 					 + hur->hur_request.hr_data_len;
1046 				req = libcfs_kvzalloc(reqlen, GFP_NOFS);
1047 				if (req == NULL)
1048 					return -ENOMEM;
1049 
1050 				lmv_hsm_req_build(lmv, hur, lmv->tgts[i], req);
1051 
1052 				rc1 = obd_iocontrol(cmd, lmv->tgts[i]->ltd_exp,
1053 						    reqlen, req, uarg);
1054 				if (rc1 != 0 && rc == 0)
1055 					rc = rc1;
1056 				kvfree(req);
1057 			}
1058 		}
1059 		break;
1060 	}
1061 	case LL_IOC_LOV_SWAP_LAYOUTS: {
1062 		struct md_op_data	*op_data = karg;
1063 		struct lmv_tgt_desc	*tgt1, *tgt2;
1064 
1065 		tgt1 = lmv_find_target(lmv, &op_data->op_fid1);
1066 		if (IS_ERR(tgt1))
1067 			return PTR_ERR(tgt1);
1068 
1069 		tgt2 = lmv_find_target(lmv, &op_data->op_fid2);
1070 		if (IS_ERR(tgt2))
1071 			return PTR_ERR(tgt2);
1072 
1073 		if ((tgt1->ltd_exp == NULL) || (tgt2->ltd_exp == NULL))
1074 			return -EINVAL;
1075 
1076 		/* only files on same MDT can have their layouts swapped */
1077 		if (tgt1->ltd_idx != tgt2->ltd_idx)
1078 			return -EPERM;
1079 
1080 		rc = obd_iocontrol(cmd, tgt1->ltd_exp, len, karg, uarg);
1081 		break;
1082 	}
1083 	case LL_IOC_HSM_CT_START: {
1084 		struct lustre_kernelcomm *lk = karg;
1085 
1086 		if (lk->lk_flags & LK_FLG_STOP)
1087 			rc = lmv_hsm_ct_unregister(lmv, cmd, len, lk, uarg);
1088 		else
1089 			rc = lmv_hsm_ct_register(lmv, cmd, len, lk, uarg);
1090 		break;
1091 	}
1092 	default:
1093 		for (i = 0; i < count; i++) {
1094 			struct obd_device *mdc_obd;
1095 			int err;
1096 
1097 			if (lmv->tgts[i] == NULL ||
1098 			    lmv->tgts[i]->ltd_exp == NULL)
1099 				continue;
1100 			/* ll_umount_begin() sets force flag but for lmv, not
1101 			 * mdc. Let's pass it through */
1102 			mdc_obd = class_exp2obd(lmv->tgts[i]->ltd_exp);
1103 			mdc_obd->obd_force = obddev->obd_force;
1104 			err = obd_iocontrol(cmd, lmv->tgts[i]->ltd_exp, len,
1105 					    karg, uarg);
1106 			if (err == -ENODATA && cmd == OBD_IOC_POLL_QUOTACHECK) {
1107 				return err;
1108 			} else if (err) {
1109 				if (lmv->tgts[i]->ltd_active) {
1110 					CERROR("error: iocontrol MDC %s on MDTidx %d cmd %x: err = %d\n",
1111 					       lmv->tgts[i]->ltd_uuid.uuid,
1112 					       i, cmd, err);
1113 					if (!rc)
1114 						rc = err;
1115 				}
1116 			} else
1117 				set = 1;
1118 		}
1119 		if (!set && !rc)
1120 			rc = -EIO;
1121 	}
1122 	return rc;
1123 }
1124 
1125 #if 0
1126 static int lmv_all_chars_policy(int count, const char *name,
1127 				int len)
1128 {
1129 	unsigned int c = 0;
1130 
1131 	while (len > 0)
1132 		c += name[--len];
1133 	c = c % count;
1134 	return c;
1135 }
1136 
1137 static int lmv_nid_policy(struct lmv_obd *lmv)
1138 {
1139 	struct obd_import *imp;
1140 	__u32	      id;
1141 
1142 	/*
1143 	 * XXX: To get nid we assume that underlying obd device is mdc.
1144 	 */
1145 	imp = class_exp2cliimp(lmv->tgts[0].ltd_exp);
1146 	id = imp->imp_connection->c_self ^ (imp->imp_connection->c_self >> 32);
1147 	return id % lmv->desc.ld_tgt_count;
1148 }
1149 
1150 static int lmv_choose_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
1151 			  enum placement_policy placement)
1152 {
1153 	switch (placement) {
1154 	case PLACEMENT_CHAR_POLICY:
1155 		return lmv_all_chars_policy(lmv->desc.ld_tgt_count,
1156 					    op_data->op_name,
1157 					    op_data->op_namelen);
1158 	case PLACEMENT_NID_POLICY:
1159 		return lmv_nid_policy(lmv);
1160 
1161 	default:
1162 		break;
1163 	}
1164 
1165 	CERROR("Unsupported placement policy %x\n", placement);
1166 	return -EINVAL;
1167 }
1168 #endif
1169 
1170 /**
1171  * This is _inode_ placement policy function (not name).
1172  */
lmv_placement_policy(struct obd_device * obd,struct md_op_data * op_data,u32 * mds)1173 static int lmv_placement_policy(struct obd_device *obd,
1174 				struct md_op_data *op_data, u32 *mds)
1175 {
1176 	struct lmv_obd	  *lmv = &obd->u.lmv;
1177 
1178 	LASSERT(mds != NULL);
1179 
1180 	if (lmv->desc.ld_tgt_count == 1) {
1181 		*mds = 0;
1182 		return 0;
1183 	}
1184 
1185 	/**
1186 	 * If stripe_offset is provided during setdirstripe
1187 	 * (setdirstripe -i xx), xx MDS will be chosen.
1188 	 */
1189 	if (op_data->op_cli_flags & CLI_SET_MEA) {
1190 		struct lmv_user_md *lum;
1191 
1192 		lum = (struct lmv_user_md *)op_data->op_data;
1193 		if (lum->lum_type == LMV_STRIPE_TYPE &&
1194 		    lum->lum_stripe_offset != -1) {
1195 			if (lum->lum_stripe_offset >= lmv->desc.ld_tgt_count) {
1196 				CERROR("%s: Stripe_offset %d > MDT count %d: rc = %d\n",
1197 				       obd->obd_name,
1198 				       lum->lum_stripe_offset,
1199 				       lmv->desc.ld_tgt_count, -ERANGE);
1200 				return -ERANGE;
1201 			}
1202 			*mds = lum->lum_stripe_offset;
1203 			return 0;
1204 		}
1205 	}
1206 
1207 	/* Allocate new fid on target according to operation type and parent
1208 	 * home mds. */
1209 	*mds = op_data->op_mds;
1210 	return 0;
1211 }
1212 
__lmv_fid_alloc(struct lmv_obd * lmv,struct lu_fid * fid,u32 mds)1213 int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid, u32 mds)
1214 {
1215 	struct lmv_tgt_desc	*tgt;
1216 	int			 rc;
1217 
1218 	tgt = lmv_get_target(lmv, mds);
1219 	if (IS_ERR(tgt))
1220 		return PTR_ERR(tgt);
1221 
1222 	/*
1223 	 * New seq alloc and FLD setup should be atomic. Otherwise we may find
1224 	 * on server that seq in new allocated fid is not yet known.
1225 	 */
1226 	mutex_lock(&tgt->ltd_fid_mutex);
1227 
1228 	if (tgt->ltd_active == 0 || tgt->ltd_exp == NULL) {
1229 		rc = -ENODEV;
1230 		goto out;
1231 	}
1232 
1233 	/*
1234 	 * Asking underlaying tgt layer to allocate new fid.
1235 	 */
1236 	rc = obd_fid_alloc(tgt->ltd_exp, fid, NULL);
1237 	if (rc > 0) {
1238 		LASSERT(fid_is_sane(fid));
1239 		rc = 0;
1240 	}
1241 
1242 out:
1243 	mutex_unlock(&tgt->ltd_fid_mutex);
1244 	return rc;
1245 }
1246 
lmv_fid_alloc(struct obd_export * exp,struct lu_fid * fid,struct md_op_data * op_data)1247 int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
1248 		  struct md_op_data *op_data)
1249 {
1250 	struct obd_device     *obd = class_exp2obd(exp);
1251 	struct lmv_obd	*lmv = &obd->u.lmv;
1252 	u32		       mds = 0;
1253 	int		    rc;
1254 
1255 	LASSERT(op_data != NULL);
1256 	LASSERT(fid != NULL);
1257 
1258 	rc = lmv_placement_policy(obd, op_data, &mds);
1259 	if (rc) {
1260 		CERROR("Can't get target for allocating fid, rc %d\n",
1261 		       rc);
1262 		return rc;
1263 	}
1264 
1265 	rc = __lmv_fid_alloc(lmv, fid, mds);
1266 	if (rc) {
1267 		CERROR("Can't alloc new fid, rc %d\n", rc);
1268 		return rc;
1269 	}
1270 
1271 	return rc;
1272 }
1273 
lmv_setup(struct obd_device * obd,struct lustre_cfg * lcfg)1274 static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
1275 {
1276 	struct lmv_obd	     *lmv = &obd->u.lmv;
1277 	struct lprocfs_static_vars  lvars = { NULL };
1278 	struct lmv_desc	    *desc;
1279 	int			 rc;
1280 
1281 	if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
1282 		CERROR("LMV setup requires a descriptor\n");
1283 		return -EINVAL;
1284 	}
1285 
1286 	desc = (struct lmv_desc *)lustre_cfg_buf(lcfg, 1);
1287 	if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
1288 		CERROR("Lmv descriptor size wrong: %d > %d\n",
1289 		       (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
1290 		return -EINVAL;
1291 	}
1292 
1293 	lmv->tgts = kcalloc(32, sizeof(*lmv->tgts), GFP_NOFS);
1294 	if (lmv->tgts == NULL)
1295 		return -ENOMEM;
1296 	lmv->tgts_size = 32;
1297 
1298 	obd_str2uuid(&lmv->desc.ld_uuid, desc->ld_uuid.uuid);
1299 	lmv->desc.ld_tgt_count = 0;
1300 	lmv->desc.ld_active_tgt_count = 0;
1301 	lmv->max_cookiesize = 0;
1302 	lmv->max_def_easize = 0;
1303 	lmv->max_easize = 0;
1304 	lmv->lmv_placement = PLACEMENT_CHAR_POLICY;
1305 
1306 	spin_lock_init(&lmv->lmv_lock);
1307 	mutex_init(&lmv->init_mutex);
1308 
1309 	lprocfs_lmv_init_vars(&lvars);
1310 
1311 	lprocfs_obd_setup(obd, lvars.obd_vars, lvars.sysfs_vars);
1312 	rc = ldebugfs_seq_create(obd->obd_debugfs_entry, "target_obd",
1313 				 0444, &lmv_proc_target_fops, obd);
1314 	if (rc)
1315 		CWARN("%s: error adding LMV target_obd file: rc = %d\n",
1316 		      obd->obd_name, rc);
1317 	rc = fld_client_init(&lmv->lmv_fld, obd->obd_name,
1318 			     LUSTRE_CLI_FLD_HASH_DHT);
1319 	if (rc) {
1320 		CERROR("Can't init FLD, err %d\n", rc);
1321 		goto out;
1322 	}
1323 
1324 	return 0;
1325 
1326 out:
1327 	return rc;
1328 }
1329 
lmv_cleanup(struct obd_device * obd)1330 static int lmv_cleanup(struct obd_device *obd)
1331 {
1332 	struct lmv_obd   *lmv = &obd->u.lmv;
1333 
1334 	fld_client_fini(&lmv->lmv_fld);
1335 	if (lmv->tgts != NULL) {
1336 		int i;
1337 
1338 		for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1339 			if (lmv->tgts[i] == NULL)
1340 				continue;
1341 			lmv_del_target(lmv, i);
1342 		}
1343 		kfree(lmv->tgts);
1344 		lmv->tgts_size = 0;
1345 	}
1346 	return 0;
1347 }
1348 
lmv_process_config(struct obd_device * obd,u32 len,void * buf)1349 static int lmv_process_config(struct obd_device *obd, u32 len, void *buf)
1350 {
1351 	struct lustre_cfg	*lcfg = buf;
1352 	struct obd_uuid		obd_uuid;
1353 	int			gen;
1354 	__u32			index;
1355 	int			rc;
1356 
1357 	switch (lcfg->lcfg_command) {
1358 	case LCFG_ADD_MDC:
1359 		/* modify_mdc_tgts add 0:lustre-clilmv  1:lustre-MDT0000_UUID
1360 		 * 2:0  3:1  4:lustre-MDT0000-mdc_UUID */
1361 		if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(obd_uuid.uuid)) {
1362 			rc = -EINVAL;
1363 			goto out;
1364 		}
1365 
1366 		obd_str2uuid(&obd_uuid,  lustre_cfg_buf(lcfg, 1));
1367 
1368 		if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1369 			rc = -EINVAL;
1370 			goto out;
1371 		}
1372 		if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", &gen) != 1) {
1373 			rc = -EINVAL;
1374 			goto out;
1375 		}
1376 		rc = lmv_add_target(obd, &obd_uuid, index, gen);
1377 		goto out;
1378 	default:
1379 		CERROR("Unknown command: %d\n", lcfg->lcfg_command);
1380 		rc = -EINVAL;
1381 		goto out;
1382 	}
1383 out:
1384 	return rc;
1385 }
1386 
lmv_statfs(const struct lu_env * env,struct obd_export * exp,struct obd_statfs * osfs,__u64 max_age,__u32 flags)1387 static int lmv_statfs(const struct lu_env *env, struct obd_export *exp,
1388 		      struct obd_statfs *osfs, __u64 max_age, __u32 flags)
1389 {
1390 	struct obd_device     *obd = class_exp2obd(exp);
1391 	struct lmv_obd	*lmv = &obd->u.lmv;
1392 	struct obd_statfs     *temp;
1393 	int		    rc = 0;
1394 	int		    i;
1395 
1396 	rc = lmv_check_connect(obd);
1397 	if (rc)
1398 		return rc;
1399 
1400 	temp = kzalloc(sizeof(*temp), GFP_NOFS);
1401 	if (!temp)
1402 		return -ENOMEM;
1403 
1404 	for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1405 		if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
1406 			continue;
1407 
1408 		rc = obd_statfs(env, lmv->tgts[i]->ltd_exp, temp,
1409 				max_age, flags);
1410 		if (rc) {
1411 			CERROR("can't stat MDS #%d (%s), error %d\n", i,
1412 			       lmv->tgts[i]->ltd_exp->exp_obd->obd_name,
1413 			       rc);
1414 			goto out_free_temp;
1415 		}
1416 
1417 		if (i == 0) {
1418 			*osfs = *temp;
1419 			/* If the statfs is from mount, it will needs
1420 			 * retrieve necessary information from MDT0.
1421 			 * i.e. mount does not need the merged osfs
1422 			 * from all of MDT.
1423 			 * And also clients can be mounted as long as
1424 			 * MDT0 is in service*/
1425 			if (flags & OBD_STATFS_FOR_MDT0)
1426 				goto out_free_temp;
1427 		} else {
1428 			osfs->os_bavail += temp->os_bavail;
1429 			osfs->os_blocks += temp->os_blocks;
1430 			osfs->os_ffree += temp->os_ffree;
1431 			osfs->os_files += temp->os_files;
1432 		}
1433 	}
1434 
1435 out_free_temp:
1436 	kfree(temp);
1437 	return rc;
1438 }
1439 
lmv_getstatus(struct obd_export * exp,struct lu_fid * fid)1440 static int lmv_getstatus(struct obd_export *exp,
1441 			 struct lu_fid *fid)
1442 {
1443 	struct obd_device    *obd = exp->exp_obd;
1444 	struct lmv_obd       *lmv = &obd->u.lmv;
1445 	int		   rc;
1446 
1447 	rc = lmv_check_connect(obd);
1448 	if (rc)
1449 		return rc;
1450 
1451 	rc = md_getstatus(lmv->tgts[0]->ltd_exp, fid);
1452 	return rc;
1453 }
1454 
lmv_getxattr(struct obd_export * exp,const struct lu_fid * fid,u64 valid,const char * name,const char * input,int input_size,int output_size,int flags,struct ptlrpc_request ** request)1455 static int lmv_getxattr(struct obd_export *exp, const struct lu_fid *fid,
1456 			u64 valid, const char *name,
1457 			const char *input, int input_size, int output_size,
1458 			int flags, struct ptlrpc_request **request)
1459 {
1460 	struct obd_device      *obd = exp->exp_obd;
1461 	struct lmv_obd	 *lmv = &obd->u.lmv;
1462 	struct lmv_tgt_desc    *tgt;
1463 	int		     rc;
1464 
1465 	rc = lmv_check_connect(obd);
1466 	if (rc)
1467 		return rc;
1468 
1469 	tgt = lmv_find_target(lmv, fid);
1470 	if (IS_ERR(tgt))
1471 		return PTR_ERR(tgt);
1472 
1473 	rc = md_getxattr(tgt->ltd_exp, fid, valid, name, input,
1474 			 input_size, output_size, flags, request);
1475 
1476 	return rc;
1477 }
1478 
lmv_setxattr(struct obd_export * exp,const struct lu_fid * fid,u64 valid,const char * name,const char * input,int input_size,int output_size,int flags,__u32 suppgid,struct ptlrpc_request ** request)1479 static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid,
1480 			u64 valid, const char *name,
1481 			const char *input, int input_size, int output_size,
1482 			int flags, __u32 suppgid,
1483 			struct ptlrpc_request **request)
1484 {
1485 	struct obd_device      *obd = exp->exp_obd;
1486 	struct lmv_obd	 *lmv = &obd->u.lmv;
1487 	struct lmv_tgt_desc    *tgt;
1488 	int		     rc;
1489 
1490 	rc = lmv_check_connect(obd);
1491 	if (rc)
1492 		return rc;
1493 
1494 	tgt = lmv_find_target(lmv, fid);
1495 	if (IS_ERR(tgt))
1496 		return PTR_ERR(tgt);
1497 
1498 	rc = md_setxattr(tgt->ltd_exp, fid, valid, name, input,
1499 			 input_size, output_size, flags, suppgid,
1500 			 request);
1501 
1502 	return rc;
1503 }
1504 
lmv_getattr(struct obd_export * exp,struct md_op_data * op_data,struct ptlrpc_request ** request)1505 static int lmv_getattr(struct obd_export *exp, struct md_op_data *op_data,
1506 		       struct ptlrpc_request **request)
1507 {
1508 	struct obd_device       *obd = exp->exp_obd;
1509 	struct lmv_obd	  *lmv = &obd->u.lmv;
1510 	struct lmv_tgt_desc     *tgt;
1511 	int		      rc;
1512 
1513 	rc = lmv_check_connect(obd);
1514 	if (rc)
1515 		return rc;
1516 
1517 	tgt = lmv_find_target(lmv, &op_data->op_fid1);
1518 	if (IS_ERR(tgt))
1519 		return PTR_ERR(tgt);
1520 
1521 	if (op_data->op_flags & MF_GET_MDT_IDX) {
1522 		op_data->op_mds = tgt->ltd_idx;
1523 		return 0;
1524 	}
1525 
1526 	rc = md_getattr(tgt->ltd_exp, op_data, request);
1527 
1528 	return rc;
1529 }
1530 
lmv_null_inode(struct obd_export * exp,const struct lu_fid * fid)1531 static int lmv_null_inode(struct obd_export *exp, const struct lu_fid *fid)
1532 {
1533 	struct obd_device   *obd = exp->exp_obd;
1534 	struct lmv_obd      *lmv = &obd->u.lmv;
1535 	int		  i;
1536 	int		  rc;
1537 
1538 	rc = lmv_check_connect(obd);
1539 	if (rc)
1540 		return rc;
1541 
1542 	CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid));
1543 
1544 	/*
1545 	 * With DNE every object can have two locks in different namespaces:
1546 	 * lookup lock in space of MDT storing direntry and update/open lock in
1547 	 * space of MDT storing inode.
1548 	 */
1549 	for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1550 		if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
1551 			continue;
1552 		md_null_inode(lmv->tgts[i]->ltd_exp, fid);
1553 	}
1554 
1555 	return 0;
1556 }
1557 
lmv_find_cbdata(struct obd_export * exp,const struct lu_fid * fid,ldlm_iterator_t it,void * data)1558 static int lmv_find_cbdata(struct obd_export *exp, const struct lu_fid *fid,
1559 			   ldlm_iterator_t it, void *data)
1560 {
1561 	struct obd_device   *obd = exp->exp_obd;
1562 	struct lmv_obd      *lmv = &obd->u.lmv;
1563 	int		  i;
1564 	int		  rc;
1565 
1566 	rc = lmv_check_connect(obd);
1567 	if (rc)
1568 		return rc;
1569 
1570 	CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid));
1571 
1572 	/*
1573 	 * With DNE every object can have two locks in different namespaces:
1574 	 * lookup lock in space of MDT storing direntry and update/open lock in
1575 	 * space of MDT storing inode.
1576 	 */
1577 	for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1578 		if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
1579 			continue;
1580 		rc = md_find_cbdata(lmv->tgts[i]->ltd_exp, fid, it, data);
1581 		if (rc)
1582 			return rc;
1583 	}
1584 
1585 	return rc;
1586 }
1587 
lmv_close(struct obd_export * exp,struct md_op_data * op_data,struct md_open_data * mod,struct ptlrpc_request ** request)1588 static int lmv_close(struct obd_export *exp, struct md_op_data *op_data,
1589 		     struct md_open_data *mod, struct ptlrpc_request **request)
1590 {
1591 	struct obd_device     *obd = exp->exp_obd;
1592 	struct lmv_obd	*lmv = &obd->u.lmv;
1593 	struct lmv_tgt_desc   *tgt;
1594 	int		    rc;
1595 
1596 	rc = lmv_check_connect(obd);
1597 	if (rc)
1598 		return rc;
1599 
1600 	tgt = lmv_find_target(lmv, &op_data->op_fid1);
1601 	if (IS_ERR(tgt))
1602 		return PTR_ERR(tgt);
1603 
1604 	CDEBUG(D_INODE, "CLOSE "DFID"\n", PFID(&op_data->op_fid1));
1605 	rc = md_close(tgt->ltd_exp, op_data, mod, request);
1606 	return rc;
1607 }
1608 
1609 struct lmv_tgt_desc
lmv_locate_mds(struct lmv_obd * lmv,struct md_op_data * op_data,struct lu_fid * fid)1610 *lmv_locate_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
1611 		struct lu_fid *fid)
1612 {
1613 	struct lmv_tgt_desc *tgt;
1614 
1615 	tgt = lmv_find_target(lmv, fid);
1616 	if (IS_ERR(tgt))
1617 		return tgt;
1618 
1619 	op_data->op_mds = tgt->ltd_idx;
1620 
1621 	return tgt;
1622 }
1623 
lmv_create(struct obd_export * exp,struct md_op_data * op_data,const void * data,int datalen,int mode,__u32 uid,__u32 gid,cfs_cap_t cap_effective,__u64 rdev,struct ptlrpc_request ** request)1624 static int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
1625 		      const void *data, int datalen, int mode, __u32 uid,
1626 		      __u32 gid, cfs_cap_t cap_effective, __u64 rdev,
1627 		      struct ptlrpc_request **request)
1628 {
1629 	struct obd_device       *obd = exp->exp_obd;
1630 	struct lmv_obd	  *lmv = &obd->u.lmv;
1631 	struct lmv_tgt_desc     *tgt;
1632 	int		      rc;
1633 
1634 	rc = lmv_check_connect(obd);
1635 	if (rc)
1636 		return rc;
1637 
1638 	if (!lmv->desc.ld_active_tgt_count)
1639 		return -EIO;
1640 
1641 	tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1642 	if (IS_ERR(tgt))
1643 		return PTR_ERR(tgt);
1644 
1645 	rc = lmv_fid_alloc(exp, &op_data->op_fid2, op_data);
1646 	if (rc)
1647 		return rc;
1648 
1649 	CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #%x\n",
1650 	       op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1651 	       op_data->op_mds);
1652 
1653 	op_data->op_flags |= MF_MDC_CANCEL_FID1;
1654 	rc = md_create(tgt->ltd_exp, op_data, data, datalen, mode, uid, gid,
1655 		       cap_effective, rdev, request);
1656 
1657 	if (rc == 0) {
1658 		if (*request == NULL)
1659 			return rc;
1660 		CDEBUG(D_INODE, "Created - "DFID"\n", PFID(&op_data->op_fid2));
1661 	}
1662 	return rc;
1663 }
1664 
lmv_done_writing(struct obd_export * exp,struct md_op_data * op_data,struct md_open_data * mod)1665 static int lmv_done_writing(struct obd_export *exp,
1666 			    struct md_op_data *op_data,
1667 			    struct md_open_data *mod)
1668 {
1669 	struct obd_device     *obd = exp->exp_obd;
1670 	struct lmv_obd	*lmv = &obd->u.lmv;
1671 	struct lmv_tgt_desc   *tgt;
1672 	int		    rc;
1673 
1674 	rc = lmv_check_connect(obd);
1675 	if (rc)
1676 		return rc;
1677 
1678 	tgt = lmv_find_target(lmv, &op_data->op_fid1);
1679 	if (IS_ERR(tgt))
1680 		return PTR_ERR(tgt);
1681 
1682 	rc = md_done_writing(tgt->ltd_exp, op_data, mod);
1683 	return rc;
1684 }
1685 
1686 static int
lmv_enqueue_remote(struct obd_export * exp,struct ldlm_enqueue_info * einfo,struct lookup_intent * it,struct md_op_data * op_data,struct lustre_handle * lockh,void * lmm,int lmmsize,__u64 extra_lock_flags)1687 lmv_enqueue_remote(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1688 		   struct lookup_intent *it, struct md_op_data *op_data,
1689 		   struct lustre_handle *lockh, void *lmm, int lmmsize,
1690 		   __u64 extra_lock_flags)
1691 {
1692 	struct ptlrpc_request      *req = it->d.lustre.it_data;
1693 	struct obd_device	  *obd = exp->exp_obd;
1694 	struct lmv_obd	     *lmv = &obd->u.lmv;
1695 	struct lustre_handle	plock;
1696 	struct lmv_tgt_desc	*tgt;
1697 	struct md_op_data	  *rdata;
1698 	struct lu_fid	       fid1;
1699 	struct mdt_body	    *body;
1700 	int			 rc = 0;
1701 	int			 pmode;
1702 
1703 	body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1704 	LASSERT(body != NULL);
1705 
1706 	if (!(body->valid & OBD_MD_MDS))
1707 		return 0;
1708 
1709 	CDEBUG(D_INODE, "REMOTE_ENQUEUE '%s' on "DFID" -> "DFID"\n",
1710 	       LL_IT2STR(it), PFID(&op_data->op_fid1), PFID(&body->fid1));
1711 
1712 	/*
1713 	 * We got LOOKUP lock, but we really need attrs.
1714 	 */
1715 	pmode = it->d.lustre.it_lock_mode;
1716 	LASSERT(pmode != 0);
1717 	memcpy(&plock, lockh, sizeof(plock));
1718 	it->d.lustre.it_lock_mode = 0;
1719 	it->d.lustre.it_data = NULL;
1720 	fid1 = body->fid1;
1721 
1722 	ptlrpc_req_finished(req);
1723 
1724 	tgt = lmv_find_target(lmv, &fid1);
1725 	if (IS_ERR(tgt)) {
1726 		rc = PTR_ERR(tgt);
1727 		goto out;
1728 	}
1729 
1730 	rdata = kzalloc(sizeof(*rdata), GFP_NOFS);
1731 	if (!rdata) {
1732 		rc = -ENOMEM;
1733 		goto out;
1734 	}
1735 
1736 	rdata->op_fid1 = fid1;
1737 	rdata->op_bias = MDS_CROSS_REF;
1738 
1739 	rc = md_enqueue(tgt->ltd_exp, einfo, it, rdata, lockh,
1740 			lmm, lmmsize, NULL, extra_lock_flags);
1741 	kfree(rdata);
1742 out:
1743 	ldlm_lock_decref(&plock, pmode);
1744 	return rc;
1745 }
1746 
1747 static int
lmv_enqueue(struct obd_export * exp,struct ldlm_enqueue_info * einfo,struct lookup_intent * it,struct md_op_data * op_data,struct lustre_handle * lockh,void * lmm,int lmmsize,struct ptlrpc_request ** req,__u64 extra_lock_flags)1748 lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1749 	    struct lookup_intent *it, struct md_op_data *op_data,
1750 	    struct lustre_handle *lockh, void *lmm, int lmmsize,
1751 	    struct ptlrpc_request **req, __u64 extra_lock_flags)
1752 {
1753 	struct obd_device	*obd = exp->exp_obd;
1754 	struct lmv_obd	   *lmv = &obd->u.lmv;
1755 	struct lmv_tgt_desc      *tgt;
1756 	int		       rc;
1757 
1758 	rc = lmv_check_connect(obd);
1759 	if (rc)
1760 		return rc;
1761 
1762 	CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID"\n",
1763 	       LL_IT2STR(it), PFID(&op_data->op_fid1));
1764 
1765 	tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1766 	if (IS_ERR(tgt))
1767 		return PTR_ERR(tgt);
1768 
1769 	CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID" -> mds #%d\n",
1770 	       LL_IT2STR(it), PFID(&op_data->op_fid1), tgt->ltd_idx);
1771 
1772 	rc = md_enqueue(tgt->ltd_exp, einfo, it, op_data, lockh,
1773 			lmm, lmmsize, req, extra_lock_flags);
1774 
1775 	if (rc == 0 && it && it->it_op == IT_OPEN) {
1776 		rc = lmv_enqueue_remote(exp, einfo, it, op_data, lockh,
1777 					lmm, lmmsize, extra_lock_flags);
1778 	}
1779 	return rc;
1780 }
1781 
1782 static int
lmv_getattr_name(struct obd_export * exp,struct md_op_data * op_data,struct ptlrpc_request ** request)1783 lmv_getattr_name(struct obd_export *exp, struct md_op_data *op_data,
1784 		 struct ptlrpc_request **request)
1785 {
1786 	struct ptlrpc_request   *req = NULL;
1787 	struct obd_device       *obd = exp->exp_obd;
1788 	struct lmv_obd	  *lmv = &obd->u.lmv;
1789 	struct lmv_tgt_desc     *tgt;
1790 	struct mdt_body	 *body;
1791 	int		      rc;
1792 
1793 	rc = lmv_check_connect(obd);
1794 	if (rc)
1795 		return rc;
1796 
1797 	tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1798 	if (IS_ERR(tgt))
1799 		return PTR_ERR(tgt);
1800 
1801 	CDEBUG(D_INODE, "GETATTR_NAME for %*s on "DFID" -> mds #%d\n",
1802 	       op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1803 	       tgt->ltd_idx);
1804 
1805 	rc = md_getattr_name(tgt->ltd_exp, op_data, request);
1806 	if (rc != 0)
1807 		return rc;
1808 
1809 	body = req_capsule_server_get(&(*request)->rq_pill,
1810 				      &RMF_MDT_BODY);
1811 	LASSERT(body != NULL);
1812 
1813 	if (body->valid & OBD_MD_MDS) {
1814 		struct lu_fid rid = body->fid1;
1815 
1816 		CDEBUG(D_INODE, "Request attrs for "DFID"\n",
1817 		       PFID(&rid));
1818 
1819 		tgt = lmv_find_target(lmv, &rid);
1820 		if (IS_ERR(tgt)) {
1821 			ptlrpc_req_finished(*request);
1822 			return PTR_ERR(tgt);
1823 		}
1824 
1825 		op_data->op_fid1 = rid;
1826 		op_data->op_valid |= OBD_MD_FLCROSSREF;
1827 		op_data->op_namelen = 0;
1828 		op_data->op_name = NULL;
1829 		rc = md_getattr_name(tgt->ltd_exp, op_data, &req);
1830 		ptlrpc_req_finished(*request);
1831 		*request = req;
1832 	}
1833 
1834 	return rc;
1835 }
1836 
1837 #define md_op_data_fid(op_data, fl)		     \
1838 	(fl == MF_MDC_CANCEL_FID1 ? &op_data->op_fid1 : \
1839 	 fl == MF_MDC_CANCEL_FID2 ? &op_data->op_fid2 : \
1840 	 fl == MF_MDC_CANCEL_FID3 ? &op_data->op_fid3 : \
1841 	 fl == MF_MDC_CANCEL_FID4 ? &op_data->op_fid4 : \
1842 	 NULL)
1843 
lmv_early_cancel(struct obd_export * exp,struct md_op_data * op_data,int op_tgt,ldlm_mode_t mode,int bits,int flag)1844 static int lmv_early_cancel(struct obd_export *exp, struct md_op_data *op_data,
1845 			    int op_tgt, ldlm_mode_t mode, int bits, int flag)
1846 {
1847 	struct lu_fid	  *fid = md_op_data_fid(op_data, flag);
1848 	struct obd_device      *obd = exp->exp_obd;
1849 	struct lmv_obd	 *lmv = &obd->u.lmv;
1850 	struct lmv_tgt_desc    *tgt;
1851 	ldlm_policy_data_t      policy = { {0} };
1852 	int		     rc = 0;
1853 
1854 	if (!fid_is_sane(fid))
1855 		return 0;
1856 
1857 	tgt = lmv_find_target(lmv, fid);
1858 	if (IS_ERR(tgt))
1859 		return PTR_ERR(tgt);
1860 
1861 	if (tgt->ltd_idx != op_tgt) {
1862 		CDEBUG(D_INODE, "EARLY_CANCEL on "DFID"\n", PFID(fid));
1863 		policy.l_inodebits.bits = bits;
1864 		rc = md_cancel_unused(tgt->ltd_exp, fid, &policy,
1865 				      mode, LCF_ASYNC, NULL);
1866 	} else {
1867 		CDEBUG(D_INODE,
1868 		       "EARLY_CANCEL skip operation target %d on "DFID"\n",
1869 		       op_tgt, PFID(fid));
1870 		op_data->op_flags |= flag;
1871 		rc = 0;
1872 	}
1873 
1874 	return rc;
1875 }
1876 
1877 /*
1878  * llite passes fid of an target inode in op_data->op_fid1 and id of directory in
1879  * op_data->op_fid2
1880  */
lmv_link(struct obd_export * exp,struct md_op_data * op_data,struct ptlrpc_request ** request)1881 static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
1882 		    struct ptlrpc_request **request)
1883 {
1884 	struct obd_device       *obd = exp->exp_obd;
1885 	struct lmv_obd	  *lmv = &obd->u.lmv;
1886 	struct lmv_tgt_desc     *tgt;
1887 	int		      rc;
1888 
1889 	rc = lmv_check_connect(obd);
1890 	if (rc)
1891 		return rc;
1892 
1893 	LASSERT(op_data->op_namelen != 0);
1894 
1895 	CDEBUG(D_INODE, "LINK "DFID":%*s to "DFID"\n",
1896 	       PFID(&op_data->op_fid2), op_data->op_namelen,
1897 	       op_data->op_name, PFID(&op_data->op_fid1));
1898 
1899 	op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
1900 	op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
1901 	op_data->op_cap = cfs_curproc_cap_pack();
1902 	tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
1903 	if (IS_ERR(tgt))
1904 		return PTR_ERR(tgt);
1905 
1906 	/*
1907 	 * Cancel UPDATE lock on child (fid1).
1908 	 */
1909 	op_data->op_flags |= MF_MDC_CANCEL_FID2;
1910 	rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
1911 			      MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
1912 	if (rc != 0)
1913 		return rc;
1914 
1915 	rc = md_link(tgt->ltd_exp, op_data, request);
1916 
1917 	return rc;
1918 }
1919 
lmv_rename(struct obd_export * exp,struct md_op_data * op_data,const char * old,int oldlen,const char * new,int newlen,struct ptlrpc_request ** request)1920 static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
1921 		      const char *old, int oldlen, const char *new, int newlen,
1922 		      struct ptlrpc_request **request)
1923 {
1924 	struct obd_device       *obd = exp->exp_obd;
1925 	struct lmv_obd	  *lmv = &obd->u.lmv;
1926 	struct lmv_tgt_desc     *src_tgt;
1927 	struct lmv_tgt_desc     *tgt_tgt;
1928 	int			rc;
1929 
1930 	LASSERT(oldlen != 0);
1931 
1932 	CDEBUG(D_INODE, "RENAME %*s in "DFID" to %*s in "DFID"\n",
1933 	       oldlen, old, PFID(&op_data->op_fid1),
1934 	       newlen, new, PFID(&op_data->op_fid2));
1935 
1936 	rc = lmv_check_connect(obd);
1937 	if (rc)
1938 		return rc;
1939 
1940 	op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
1941 	op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
1942 	op_data->op_cap = cfs_curproc_cap_pack();
1943 	src_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1944 	if (IS_ERR(src_tgt))
1945 		return PTR_ERR(src_tgt);
1946 
1947 	tgt_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
1948 	if (IS_ERR(tgt_tgt))
1949 		return PTR_ERR(tgt_tgt);
1950 	/*
1951 	 * LOOKUP lock on src child (fid3) should also be cancelled for
1952 	 * src_tgt in mdc_rename.
1953 	 */
1954 	op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
1955 
1956 	/*
1957 	 * Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its
1958 	 * own target.
1959 	 */
1960 	rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
1961 			      LCK_EX, MDS_INODELOCK_UPDATE,
1962 			      MF_MDC_CANCEL_FID2);
1963 
1964 	/*
1965 	 * Cancel LOOKUP locks on tgt child (fid4) for parent tgt_tgt.
1966 	 */
1967 	if (rc == 0) {
1968 		rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
1969 				      LCK_EX, MDS_INODELOCK_LOOKUP,
1970 				      MF_MDC_CANCEL_FID4);
1971 	}
1972 
1973 	/*
1974 	 * Cancel all the locks on tgt child (fid4).
1975 	 */
1976 	if (rc == 0)
1977 		rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
1978 				      LCK_EX, MDS_INODELOCK_FULL,
1979 				      MF_MDC_CANCEL_FID4);
1980 
1981 	if (rc == 0)
1982 		rc = md_rename(src_tgt->ltd_exp, op_data, old, oldlen,
1983 			       new, newlen, request);
1984 	return rc;
1985 }
1986 
lmv_setattr(struct obd_export * exp,struct md_op_data * op_data,void * ea,int ealen,void * ea2,int ea2len,struct ptlrpc_request ** request,struct md_open_data ** mod)1987 static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
1988 		       void *ea, int ealen, void *ea2, int ea2len,
1989 		       struct ptlrpc_request **request,
1990 		       struct md_open_data **mod)
1991 {
1992 	struct obd_device       *obd = exp->exp_obd;
1993 	struct lmv_obd	  *lmv = &obd->u.lmv;
1994 	struct lmv_tgt_desc     *tgt;
1995 	int		      rc;
1996 
1997 	rc = lmv_check_connect(obd);
1998 	if (rc)
1999 		return rc;
2000 
2001 	CDEBUG(D_INODE, "SETATTR for "DFID", valid 0x%x\n",
2002 	       PFID(&op_data->op_fid1), op_data->op_attr.ia_valid);
2003 
2004 	op_data->op_flags |= MF_MDC_CANCEL_FID1;
2005 	tgt = lmv_find_target(lmv, &op_data->op_fid1);
2006 	if (IS_ERR(tgt))
2007 		return PTR_ERR(tgt);
2008 
2009 	rc = md_setattr(tgt->ltd_exp, op_data, ea, ealen, ea2,
2010 			ea2len, request, mod);
2011 
2012 	return rc;
2013 }
2014 
lmv_sync(struct obd_export * exp,const struct lu_fid * fid,struct ptlrpc_request ** request)2015 static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid,
2016 		    struct ptlrpc_request **request)
2017 {
2018 	struct obd_device	 *obd = exp->exp_obd;
2019 	struct lmv_obd	    *lmv = &obd->u.lmv;
2020 	struct lmv_tgt_desc       *tgt;
2021 	int			rc;
2022 
2023 	rc = lmv_check_connect(obd);
2024 	if (rc)
2025 		return rc;
2026 
2027 	tgt = lmv_find_target(lmv, fid);
2028 	if (IS_ERR(tgt))
2029 		return PTR_ERR(tgt);
2030 
2031 	rc = md_sync(tgt->ltd_exp, fid, request);
2032 	return rc;
2033 }
2034 
2035 /*
2036  * Adjust a set of pages, each page containing an array of lu_dirpages,
2037  * so that each page can be used as a single logical lu_dirpage.
2038  *
2039  * A lu_dirpage is laid out as follows, where s = ldp_hash_start,
2040  * e = ldp_hash_end, f = ldp_flags, p = padding, and each "ent" is a
2041  * struct lu_dirent.  It has size up to LU_PAGE_SIZE. The ldp_hash_end
2042  * value is used as a cookie to request the next lu_dirpage in a
2043  * directory listing that spans multiple pages (two in this example):
2044  *   ________
2045  *  |	|
2046  * .|--------v-------   -----.
2047  * |s|e|f|p|ent|ent| ... |ent|
2048  * '--|--------------   -----'   Each CFS_PAGE contains a single
2049  *    '------.		   lu_dirpage.
2050  * .---------v-------   -----.
2051  * |s|e|f|p|ent| 0 | ... | 0 |
2052  * '-----------------   -----'
2053  *
2054  * However, on hosts where the native VM page size (PAGE_CACHE_SIZE) is
2055  * larger than LU_PAGE_SIZE, a single host page may contain multiple
2056  * lu_dirpages. After reading the lu_dirpages from the MDS, the
2057  * ldp_hash_end of the first lu_dirpage refers to the one immediately
2058  * after it in the same CFS_PAGE (arrows simplified for brevity, but
2059  * in general e0==s1, e1==s2, etc.):
2060  *
2061  * .--------------------   -----.
2062  * |s0|e0|f0|p|ent|ent| ... |ent|
2063  * |---v----------------   -----|
2064  * |s1|e1|f1|p|ent|ent| ... |ent|
2065  * |---v----------------   -----|  Here, each CFS_PAGE contains
2066  *	     ...		 multiple lu_dirpages.
2067  * |---v----------------   -----|
2068  * |s'|e'|f'|p|ent|ent| ... |ent|
2069  * '---|----------------   -----'
2070  *     v
2071  * .----------------------------.
2072  * |	next CFS_PAGE       |
2073  *
2074  * This structure is transformed into a single logical lu_dirpage as follows:
2075  *
2076  * - Replace e0 with e' so the request for the next lu_dirpage gets the page
2077  *   labeled 'next CFS_PAGE'.
2078  *
2079  * - Copy the LDF_COLLIDE flag from f' to f0 to correctly reflect whether
2080  *   a hash collision with the next page exists.
2081  *
2082  * - Adjust the lde_reclen of the ending entry of each lu_dirpage to span
2083  *   to the first entry of the next lu_dirpage.
2084  */
2085 #if PAGE_CACHE_SIZE > LU_PAGE_SIZE
lmv_adjust_dirpages(struct page ** pages,int ncfspgs,int nlupgs)2086 static void lmv_adjust_dirpages(struct page **pages, int ncfspgs, int nlupgs)
2087 {
2088 	int i;
2089 
2090 	for (i = 0; i < ncfspgs; i++) {
2091 		struct lu_dirpage	*dp = kmap(pages[i]);
2092 		struct lu_dirpage	*first = dp;
2093 		struct lu_dirent	*end_dirent = NULL;
2094 		struct lu_dirent	*ent;
2095 		__u64			hash_end = dp->ldp_hash_end;
2096 		__u32			flags = dp->ldp_flags;
2097 
2098 		while (--nlupgs > 0) {
2099 			ent = lu_dirent_start(dp);
2100 			for (end_dirent = ent; ent != NULL;
2101 			     end_dirent = ent, ent = lu_dirent_next(ent))
2102 				;
2103 
2104 			/* Advance dp to next lu_dirpage. */
2105 			dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2106 
2107 			/* Check if we've reached the end of the CFS_PAGE. */
2108 			if (!((unsigned long)dp & ~CFS_PAGE_MASK))
2109 				break;
2110 
2111 			/* Save the hash and flags of this lu_dirpage. */
2112 			hash_end = dp->ldp_hash_end;
2113 			flags = dp->ldp_flags;
2114 
2115 			/* Check if lu_dirpage contains no entries. */
2116 			if (!end_dirent)
2117 				break;
2118 
2119 			/* Enlarge the end entry lde_reclen from 0 to
2120 			 * first entry of next lu_dirpage. */
2121 			LASSERT(le16_to_cpu(end_dirent->lde_reclen) == 0);
2122 			end_dirent->lde_reclen =
2123 				cpu_to_le16((char *)(dp->ldp_entries) -
2124 					    (char *)end_dirent);
2125 		}
2126 
2127 		first->ldp_hash_end = hash_end;
2128 		first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE);
2129 		first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE);
2130 
2131 		kunmap(pages[i]);
2132 	}
2133 	LASSERTF(nlupgs == 0, "left = %d", nlupgs);
2134 }
2135 #else
2136 #define lmv_adjust_dirpages(pages, ncfspgs, nlupgs) do {} while (0)
2137 #endif	/* PAGE_CACHE_SIZE > LU_PAGE_SIZE */
2138 
lmv_readpage(struct obd_export * exp,struct md_op_data * op_data,struct page ** pages,struct ptlrpc_request ** request)2139 static int lmv_readpage(struct obd_export *exp, struct md_op_data *op_data,
2140 			struct page **pages, struct ptlrpc_request **request)
2141 {
2142 	struct obd_device	*obd = exp->exp_obd;
2143 	struct lmv_obd		*lmv = &obd->u.lmv;
2144 	__u64			offset = op_data->op_offset;
2145 	int			rc;
2146 	int			ncfspgs; /* pages read in PAGE_CACHE_SIZE */
2147 	int			nlupgs; /* pages read in LU_PAGE_SIZE */
2148 	struct lmv_tgt_desc	*tgt;
2149 
2150 	rc = lmv_check_connect(obd);
2151 	if (rc)
2152 		return rc;
2153 
2154 	CDEBUG(D_INODE, "READPAGE at %#llx from "DFID"\n",
2155 	       offset, PFID(&op_data->op_fid1));
2156 
2157 	tgt = lmv_find_target(lmv, &op_data->op_fid1);
2158 	if (IS_ERR(tgt))
2159 		return PTR_ERR(tgt);
2160 
2161 	rc = md_readpage(tgt->ltd_exp, op_data, pages, request);
2162 	if (rc != 0)
2163 		return rc;
2164 
2165 	ncfspgs = ((*request)->rq_bulk->bd_nob_transferred + PAGE_CACHE_SIZE - 1)
2166 		 >> PAGE_CACHE_SHIFT;
2167 	nlupgs = (*request)->rq_bulk->bd_nob_transferred >> LU_PAGE_SHIFT;
2168 	LASSERT(!((*request)->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK));
2169 	LASSERT(ncfspgs > 0 && ncfspgs <= op_data->op_npages);
2170 
2171 	CDEBUG(D_INODE, "read %d(%d)/%d pages\n", ncfspgs, nlupgs,
2172 	       op_data->op_npages);
2173 
2174 	lmv_adjust_dirpages(pages, ncfspgs, nlupgs);
2175 
2176 	return rc;
2177 }
2178 
lmv_unlink(struct obd_export * exp,struct md_op_data * op_data,struct ptlrpc_request ** request)2179 static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
2180 		      struct ptlrpc_request **request)
2181 {
2182 	struct obd_device       *obd = exp->exp_obd;
2183 	struct lmv_obd	  *lmv = &obd->u.lmv;
2184 	struct lmv_tgt_desc     *tgt = NULL;
2185 	struct mdt_body		*body;
2186 	int		     rc;
2187 
2188 	rc = lmv_check_connect(obd);
2189 	if (rc)
2190 		return rc;
2191 retry:
2192 	/* Send unlink requests to the MDT where the child is located */
2193 	if (likely(!fid_is_zero(&op_data->op_fid2)))
2194 		tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
2195 	else
2196 		tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
2197 	if (IS_ERR(tgt))
2198 		return PTR_ERR(tgt);
2199 
2200 	op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
2201 	op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
2202 	op_data->op_cap = cfs_curproc_cap_pack();
2203 
2204 	/*
2205 	 * If child's fid is given, cancel unused locks for it if it is from
2206 	 * another export than parent.
2207 	 *
2208 	 * LOOKUP lock for child (fid3) should also be cancelled on parent
2209 	 * tgt_tgt in mdc_unlink().
2210 	 */
2211 	op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
2212 
2213 	/*
2214 	 * Cancel FULL locks on child (fid3).
2215 	 */
2216 	rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
2217 			      MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
2218 
2219 	if (rc != 0)
2220 		return rc;
2221 
2222 	CDEBUG(D_INODE, "unlink with fid="DFID"/"DFID" -> mds #%d\n",
2223 	       PFID(&op_data->op_fid1), PFID(&op_data->op_fid2), tgt->ltd_idx);
2224 
2225 	rc = md_unlink(tgt->ltd_exp, op_data, request);
2226 	if (rc != 0 && rc != -EREMOTE)
2227 		return rc;
2228 
2229 	body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY);
2230 	if (body == NULL)
2231 		return -EPROTO;
2232 
2233 	/* Not cross-ref case, just get out of here. */
2234 	if (likely(!(body->valid & OBD_MD_MDS)))
2235 		return 0;
2236 
2237 	CDEBUG(D_INODE, "%s: try unlink to another MDT for "DFID"\n",
2238 	       exp->exp_obd->obd_name, PFID(&body->fid1));
2239 
2240 	/* This is a remote object, try remote MDT, Note: it may
2241 	 * try more than 1 time here, Considering following case
2242 	 * /mnt/lustre is root on MDT0, remote1 is on MDT1
2243 	 * 1. Initially A does not know where remote1 is, it send
2244 	 *    unlink RPC to MDT0, MDT0 return -EREMOTE, it will
2245 	 *    resend unlink RPC to MDT1 (retry 1st time).
2246 	 *
2247 	 * 2. During the unlink RPC in flight,
2248 	 *    client B mv /mnt/lustre/remote1 /mnt/lustre/remote2
2249 	 *    and create new remote1, but on MDT0
2250 	 *
2251 	 * 3. MDT1 get unlink RPC(from A), then do remote lock on
2252 	 *    /mnt/lustre, then lookup get fid of remote1, and find
2253 	 *    it is remote dir again, and replay -EREMOTE again.
2254 	 *
2255 	 * 4. Then A will resend unlink RPC to MDT0. (retry 2nd times).
2256 	 *
2257 	 * In theory, it might try unlimited time here, but it should
2258 	 * be very rare case.  */
2259 	op_data->op_fid2 = body->fid1;
2260 	ptlrpc_req_finished(*request);
2261 	*request = NULL;
2262 
2263 	goto retry;
2264 }
2265 
lmv_precleanup(struct obd_device * obd,enum obd_cleanup_stage stage)2266 static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2267 {
2268 	struct lmv_obd *lmv = &obd->u.lmv;
2269 
2270 	switch (stage) {
2271 	case OBD_CLEANUP_EARLY:
2272 		/* XXX: here should be calling obd_precleanup() down to
2273 		 * stack. */
2274 		break;
2275 	case OBD_CLEANUP_EXPORTS:
2276 		fld_client_debugfs_fini(&lmv->lmv_fld);
2277 		lprocfs_obd_cleanup(obd);
2278 		break;
2279 	default:
2280 		break;
2281 	}
2282 	return 0;
2283 }
2284 
lmv_get_info(const struct lu_env * env,struct obd_export * exp,__u32 keylen,void * key,__u32 * vallen,void * val,struct lov_stripe_md * lsm)2285 static int lmv_get_info(const struct lu_env *env, struct obd_export *exp,
2286 			__u32 keylen, void *key, __u32 *vallen, void *val,
2287 			struct lov_stripe_md *lsm)
2288 {
2289 	struct obd_device       *obd;
2290 	struct lmv_obd	  *lmv;
2291 	int		      rc = 0;
2292 
2293 	obd = class_exp2obd(exp);
2294 	if (obd == NULL) {
2295 		CDEBUG(D_IOCTL, "Invalid client cookie %#llx\n",
2296 		       exp->exp_handle.h_cookie);
2297 		return -EINVAL;
2298 	}
2299 
2300 	lmv = &obd->u.lmv;
2301 	if (keylen >= strlen("remote_flag") && !strcmp(key, "remote_flag")) {
2302 		struct lmv_tgt_desc *tgt;
2303 		int i;
2304 
2305 		rc = lmv_check_connect(obd);
2306 		if (rc)
2307 			return rc;
2308 
2309 		LASSERT(*vallen == sizeof(__u32));
2310 		for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2311 			tgt = lmv->tgts[i];
2312 			/*
2313 			 * All tgts should be connected when this gets called.
2314 			 */
2315 			if (tgt == NULL || tgt->ltd_exp == NULL)
2316 				continue;
2317 
2318 			if (!obd_get_info(env, tgt->ltd_exp, keylen, key,
2319 					  vallen, val, NULL))
2320 				return 0;
2321 		}
2322 		return -EINVAL;
2323 	} else if (KEY_IS(KEY_MAX_EASIZE) ||
2324 		   KEY_IS(KEY_DEFAULT_EASIZE) ||
2325 		   KEY_IS(KEY_CONN_DATA)) {
2326 		rc = lmv_check_connect(obd);
2327 		if (rc)
2328 			return rc;
2329 
2330 		/*
2331 		 * Forwarding this request to first MDS, it should know LOV
2332 		 * desc.
2333 		 */
2334 		rc = obd_get_info(env, lmv->tgts[0]->ltd_exp, keylen, key,
2335 				  vallen, val, NULL);
2336 		if (!rc && KEY_IS(KEY_CONN_DATA))
2337 			exp->exp_connect_data = *(struct obd_connect_data *)val;
2338 		return rc;
2339 	} else if (KEY_IS(KEY_TGT_COUNT)) {
2340 		*((int *)val) = lmv->desc.ld_tgt_count;
2341 		return 0;
2342 	}
2343 
2344 	CDEBUG(D_IOCTL, "Invalid key\n");
2345 	return -EINVAL;
2346 }
2347 
lmv_set_info_async(const struct lu_env * env,struct obd_export * exp,u32 keylen,void * key,u32 vallen,void * val,struct ptlrpc_request_set * set)2348 static int lmv_set_info_async(const struct lu_env *env, struct obd_export *exp,
2349 			      u32 keylen, void *key, u32 vallen,
2350 			      void *val, struct ptlrpc_request_set *set)
2351 {
2352 	struct lmv_tgt_desc    *tgt;
2353 	struct obd_device      *obd;
2354 	struct lmv_obd	 *lmv;
2355 	int rc = 0;
2356 
2357 	obd = class_exp2obd(exp);
2358 	if (obd == NULL) {
2359 		CDEBUG(D_IOCTL, "Invalid client cookie %#llx\n",
2360 		       exp->exp_handle.h_cookie);
2361 		return -EINVAL;
2362 	}
2363 	lmv = &obd->u.lmv;
2364 
2365 	if (KEY_IS(KEY_READ_ONLY) || KEY_IS(KEY_FLUSH_CTX)) {
2366 		int i, err = 0;
2367 
2368 		for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2369 			tgt = lmv->tgts[i];
2370 
2371 			if (tgt == NULL || tgt->ltd_exp == NULL)
2372 				continue;
2373 
2374 			err = obd_set_info_async(env, tgt->ltd_exp,
2375 						 keylen, key, vallen, val, set);
2376 			if (err && rc == 0)
2377 				rc = err;
2378 		}
2379 
2380 		return rc;
2381 	}
2382 
2383 	return -EINVAL;
2384 }
2385 
lmv_packmd(struct obd_export * exp,struct lov_mds_md ** lmmp,struct lov_stripe_md * lsm)2386 static int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
2387 		      struct lov_stripe_md *lsm)
2388 {
2389 	struct obd_device	 *obd = class_exp2obd(exp);
2390 	struct lmv_obd	    *lmv = &obd->u.lmv;
2391 	struct lmv_stripe_md      *meap;
2392 	struct lmv_stripe_md      *lsmp;
2393 	int			mea_size;
2394 	int			i;
2395 
2396 	mea_size = lmv_get_easize(lmv);
2397 	if (!lmmp)
2398 		return mea_size;
2399 
2400 	if (*lmmp && !lsm) {
2401 		kvfree(*lmmp);
2402 		*lmmp = NULL;
2403 		return 0;
2404 	}
2405 
2406 	if (*lmmp == NULL) {
2407 		*lmmp = libcfs_kvzalloc(mea_size, GFP_NOFS);
2408 		if (*lmmp == NULL)
2409 			return -ENOMEM;
2410 	}
2411 
2412 	if (!lsm)
2413 		return mea_size;
2414 
2415 	lsmp = (struct lmv_stripe_md *)lsm;
2416 	meap = (struct lmv_stripe_md *)*lmmp;
2417 
2418 	if (lsmp->mea_magic != MEA_MAGIC_LAST_CHAR &&
2419 	    lsmp->mea_magic != MEA_MAGIC_ALL_CHARS)
2420 		return -EINVAL;
2421 
2422 	meap->mea_magic = cpu_to_le32(lsmp->mea_magic);
2423 	meap->mea_count = cpu_to_le32(lsmp->mea_count);
2424 	meap->mea_master = cpu_to_le32(lsmp->mea_master);
2425 
2426 	for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2427 		meap->mea_ids[i] = lsmp->mea_ids[i];
2428 		fid_cpu_to_le(&meap->mea_ids[i], &lsmp->mea_ids[i]);
2429 	}
2430 
2431 	return mea_size;
2432 }
2433 
lmv_unpackmd(struct obd_export * exp,struct lov_stripe_md ** lsmp,struct lov_mds_md * lmm,int lmm_size)2434 static int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
2435 			struct lov_mds_md *lmm, int lmm_size)
2436 {
2437 	struct obd_device	  *obd = class_exp2obd(exp);
2438 	struct lmv_stripe_md      **tmea = (struct lmv_stripe_md **)lsmp;
2439 	struct lmv_stripe_md       *mea = (struct lmv_stripe_md *)lmm;
2440 	struct lmv_obd	     *lmv = &obd->u.lmv;
2441 	int			 mea_size;
2442 	int			 i;
2443 	__u32		       magic;
2444 
2445 	mea_size = lmv_get_easize(lmv);
2446 	if (lsmp == NULL)
2447 		return mea_size;
2448 
2449 	if (*lsmp != NULL && lmm == NULL) {
2450 		kvfree(*tmea);
2451 		*lsmp = NULL;
2452 		return 0;
2453 	}
2454 
2455 	LASSERT(mea_size == lmm_size);
2456 
2457 	*tmea = libcfs_kvzalloc(mea_size, GFP_NOFS);
2458 	if (*tmea == NULL)
2459 		return -ENOMEM;
2460 
2461 	if (!lmm)
2462 		return mea_size;
2463 
2464 	if (mea->mea_magic == MEA_MAGIC_LAST_CHAR ||
2465 	    mea->mea_magic == MEA_MAGIC_ALL_CHARS ||
2466 	    mea->mea_magic == MEA_MAGIC_HASH_SEGMENT) {
2467 		magic = le32_to_cpu(mea->mea_magic);
2468 	} else {
2469 		/*
2470 		 * Old mea is not handled here.
2471 		 */
2472 		CERROR("Old not supportable EA is found\n");
2473 		LBUG();
2474 	}
2475 
2476 	(*tmea)->mea_magic = magic;
2477 	(*tmea)->mea_count = le32_to_cpu(mea->mea_count);
2478 	(*tmea)->mea_master = le32_to_cpu(mea->mea_master);
2479 
2480 	for (i = 0; i < (*tmea)->mea_count; i++) {
2481 		(*tmea)->mea_ids[i] = mea->mea_ids[i];
2482 		fid_le_to_cpu(&(*tmea)->mea_ids[i], &(*tmea)->mea_ids[i]);
2483 	}
2484 	return mea_size;
2485 }
2486 
lmv_cancel_unused(struct obd_export * exp,const struct lu_fid * fid,ldlm_policy_data_t * policy,ldlm_mode_t mode,ldlm_cancel_flags_t flags,void * opaque)2487 static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
2488 			     ldlm_policy_data_t *policy, ldlm_mode_t mode,
2489 			     ldlm_cancel_flags_t flags, void *opaque)
2490 {
2491 	struct obd_device       *obd = exp->exp_obd;
2492 	struct lmv_obd	  *lmv = &obd->u.lmv;
2493 	int		      rc = 0;
2494 	int		      err;
2495 	int		      i;
2496 
2497 	LASSERT(fid != NULL);
2498 
2499 	for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2500 		if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL ||
2501 		    lmv->tgts[i]->ltd_active == 0)
2502 			continue;
2503 
2504 		err = md_cancel_unused(lmv->tgts[i]->ltd_exp, fid,
2505 				       policy, mode, flags, opaque);
2506 		if (!rc)
2507 			rc = err;
2508 	}
2509 	return rc;
2510 }
2511 
lmv_set_lock_data(struct obd_export * exp,__u64 * lockh,void * data,__u64 * bits)2512 static int lmv_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
2513 			     __u64 *bits)
2514 {
2515 	struct lmv_obd	  *lmv = &exp->exp_obd->u.lmv;
2516 	int		      rc;
2517 
2518 	rc =  md_set_lock_data(lmv->tgts[0]->ltd_exp, lockh, data, bits);
2519 	return rc;
2520 }
2521 
lmv_lock_match(struct obd_export * exp,__u64 flags,const struct lu_fid * fid,ldlm_type_t type,ldlm_policy_data_t * policy,ldlm_mode_t mode,struct lustre_handle * lockh)2522 static ldlm_mode_t lmv_lock_match(struct obd_export *exp, __u64 flags,
2523 				  const struct lu_fid *fid, ldlm_type_t type,
2524 				  ldlm_policy_data_t *policy, ldlm_mode_t mode,
2525 				  struct lustre_handle *lockh)
2526 {
2527 	struct obd_device       *obd = exp->exp_obd;
2528 	struct lmv_obd	  *lmv = &obd->u.lmv;
2529 	ldlm_mode_t	      rc;
2530 	int		      i;
2531 
2532 	CDEBUG(D_INODE, "Lock match for "DFID"\n", PFID(fid));
2533 
2534 	/*
2535 	 * With CMD every object can have two locks in different namespaces:
2536 	 * lookup lock in space of mds storing direntry and update/open lock in
2537 	 * space of mds storing inode. Thus we check all targets, not only that
2538 	 * one fid was created in.
2539 	 */
2540 	for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2541 		if (lmv->tgts[i] == NULL ||
2542 		    lmv->tgts[i]->ltd_exp == NULL ||
2543 		    lmv->tgts[i]->ltd_active == 0)
2544 			continue;
2545 
2546 		rc = md_lock_match(lmv->tgts[i]->ltd_exp, flags, fid,
2547 				   type, policy, mode, lockh);
2548 		if (rc)
2549 			return rc;
2550 	}
2551 
2552 	return 0;
2553 }
2554 
lmv_get_lustre_md(struct obd_export * exp,struct ptlrpc_request * req,struct obd_export * dt_exp,struct obd_export * md_exp,struct lustre_md * md)2555 static int lmv_get_lustre_md(struct obd_export *exp,
2556 			     struct ptlrpc_request *req,
2557 			     struct obd_export *dt_exp,
2558 			     struct obd_export *md_exp,
2559 			     struct lustre_md *md)
2560 {
2561 	struct lmv_obd	  *lmv = &exp->exp_obd->u.lmv;
2562 
2563 	return md_get_lustre_md(lmv->tgts[0]->ltd_exp, req, dt_exp, md_exp, md);
2564 }
2565 
lmv_free_lustre_md(struct obd_export * exp,struct lustre_md * md)2566 static int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
2567 {
2568 	struct obd_device       *obd = exp->exp_obd;
2569 	struct lmv_obd	  *lmv = &obd->u.lmv;
2570 
2571 	if (md->mea)
2572 		obd_free_memmd(exp, (void *)&md->mea);
2573 	return md_free_lustre_md(lmv->tgts[0]->ltd_exp, md);
2574 }
2575 
lmv_set_open_replay_data(struct obd_export * exp,struct obd_client_handle * och,struct lookup_intent * it)2576 static int lmv_set_open_replay_data(struct obd_export *exp,
2577 				    struct obd_client_handle *och,
2578 				    struct lookup_intent *it)
2579 {
2580 	struct obd_device       *obd = exp->exp_obd;
2581 	struct lmv_obd	  *lmv = &obd->u.lmv;
2582 	struct lmv_tgt_desc     *tgt;
2583 
2584 	tgt = lmv_find_target(lmv, &och->och_fid);
2585 	if (IS_ERR(tgt))
2586 		return PTR_ERR(tgt);
2587 
2588 	return md_set_open_replay_data(tgt->ltd_exp, och, it);
2589 }
2590 
lmv_clear_open_replay_data(struct obd_export * exp,struct obd_client_handle * och)2591 static int lmv_clear_open_replay_data(struct obd_export *exp,
2592 				      struct obd_client_handle *och)
2593 {
2594 	struct obd_device       *obd = exp->exp_obd;
2595 	struct lmv_obd	  *lmv = &obd->u.lmv;
2596 	struct lmv_tgt_desc     *tgt;
2597 
2598 	tgt = lmv_find_target(lmv, &och->och_fid);
2599 	if (IS_ERR(tgt))
2600 		return PTR_ERR(tgt);
2601 
2602 	return md_clear_open_replay_data(tgt->ltd_exp, och);
2603 }
2604 
lmv_get_remote_perm(struct obd_export * exp,const struct lu_fid * fid,__u32 suppgid,struct ptlrpc_request ** request)2605 static int lmv_get_remote_perm(struct obd_export *exp,
2606 			       const struct lu_fid *fid,
2607 			       __u32 suppgid, struct ptlrpc_request **request)
2608 {
2609 	struct obd_device       *obd = exp->exp_obd;
2610 	struct lmv_obd	  *lmv = &obd->u.lmv;
2611 	struct lmv_tgt_desc     *tgt;
2612 	int		      rc;
2613 
2614 	rc = lmv_check_connect(obd);
2615 	if (rc)
2616 		return rc;
2617 
2618 	tgt = lmv_find_target(lmv, fid);
2619 	if (IS_ERR(tgt))
2620 		return PTR_ERR(tgt);
2621 
2622 	rc = md_get_remote_perm(tgt->ltd_exp, fid, suppgid, request);
2623 	return rc;
2624 }
2625 
lmv_intent_getattr_async(struct obd_export * exp,struct md_enqueue_info * minfo,struct ldlm_enqueue_info * einfo)2626 static int lmv_intent_getattr_async(struct obd_export *exp,
2627 				    struct md_enqueue_info *minfo,
2628 				    struct ldlm_enqueue_info *einfo)
2629 {
2630 	struct md_op_data       *op_data = &minfo->mi_data;
2631 	struct obd_device       *obd = exp->exp_obd;
2632 	struct lmv_obd	  *lmv = &obd->u.lmv;
2633 	struct lmv_tgt_desc     *tgt = NULL;
2634 	int		      rc;
2635 
2636 	rc = lmv_check_connect(obd);
2637 	if (rc)
2638 		return rc;
2639 
2640 	tgt = lmv_find_target(lmv, &op_data->op_fid1);
2641 	if (IS_ERR(tgt))
2642 		return PTR_ERR(tgt);
2643 
2644 	rc = md_intent_getattr_async(tgt->ltd_exp, minfo, einfo);
2645 	return rc;
2646 }
2647 
lmv_revalidate_lock(struct obd_export * exp,struct lookup_intent * it,struct lu_fid * fid,__u64 * bits)2648 static int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
2649 			       struct lu_fid *fid, __u64 *bits)
2650 {
2651 	struct obd_device       *obd = exp->exp_obd;
2652 	struct lmv_obd	  *lmv = &obd->u.lmv;
2653 	struct lmv_tgt_desc     *tgt;
2654 	int		      rc;
2655 
2656 	rc = lmv_check_connect(obd);
2657 	if (rc)
2658 		return rc;
2659 
2660 	tgt = lmv_find_target(lmv, fid);
2661 	if (IS_ERR(tgt))
2662 		return PTR_ERR(tgt);
2663 
2664 	rc = md_revalidate_lock(tgt->ltd_exp, it, fid, bits);
2665 	return rc;
2666 }
2667 
2668 /**
2669  * For lmv, only need to send request to master MDT, and the master MDT will
2670  * process with other slave MDTs. The only exception is Q_GETOQUOTA for which
2671  * we directly fetch data from the slave MDTs.
2672  */
lmv_quotactl(struct obd_device * unused,struct obd_export * exp,struct obd_quotactl * oqctl)2673 static int lmv_quotactl(struct obd_device *unused, struct obd_export *exp,
2674 			struct obd_quotactl *oqctl)
2675 {
2676 	struct obd_device   *obd = class_exp2obd(exp);
2677 	struct lmv_obd      *lmv = &obd->u.lmv;
2678 	struct lmv_tgt_desc *tgt = lmv->tgts[0];
2679 	int		  rc = 0, i;
2680 	__u64		curspace, curinodes;
2681 
2682 	if (!lmv->desc.ld_tgt_count || !tgt->ltd_active) {
2683 		CERROR("master lmv inactive\n");
2684 		return -EIO;
2685 	}
2686 
2687 	if (oqctl->qc_cmd != Q_GETOQUOTA) {
2688 		rc = obd_quotactl(tgt->ltd_exp, oqctl);
2689 		return rc;
2690 	}
2691 
2692 	curspace = curinodes = 0;
2693 	for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2694 		int err;
2695 
2696 		tgt = lmv->tgts[i];
2697 
2698 		if (tgt == NULL || tgt->ltd_exp == NULL || tgt->ltd_active == 0)
2699 			continue;
2700 		if (!tgt->ltd_active) {
2701 			CDEBUG(D_HA, "mdt %d is inactive.\n", i);
2702 			continue;
2703 		}
2704 
2705 		err = obd_quotactl(tgt->ltd_exp, oqctl);
2706 		if (err) {
2707 			CERROR("getquota on mdt %d failed. %d\n", i, err);
2708 			if (!rc)
2709 				rc = err;
2710 		} else {
2711 			curspace += oqctl->qc_dqblk.dqb_curspace;
2712 			curinodes += oqctl->qc_dqblk.dqb_curinodes;
2713 		}
2714 	}
2715 	oqctl->qc_dqblk.dqb_curspace = curspace;
2716 	oqctl->qc_dqblk.dqb_curinodes = curinodes;
2717 
2718 	return rc;
2719 }
2720 
lmv_quotacheck(struct obd_device * unused,struct obd_export * exp,struct obd_quotactl * oqctl)2721 static int lmv_quotacheck(struct obd_device *unused, struct obd_export *exp,
2722 			  struct obd_quotactl *oqctl)
2723 {
2724 	struct obd_device   *obd = class_exp2obd(exp);
2725 	struct lmv_obd      *lmv = &obd->u.lmv;
2726 	struct lmv_tgt_desc *tgt;
2727 	int		  i, rc = 0;
2728 
2729 	for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2730 		int err;
2731 
2732 		tgt = lmv->tgts[i];
2733 		if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active) {
2734 			CERROR("lmv idx %d inactive\n", i);
2735 			return -EIO;
2736 		}
2737 
2738 		err = obd_quotacheck(tgt->ltd_exp, oqctl);
2739 		if (err && !rc)
2740 			rc = err;
2741 	}
2742 
2743 	return rc;
2744 }
2745 
2746 static struct obd_ops lmv_obd_ops = {
2747 	.o_owner		= THIS_MODULE,
2748 	.o_setup		= lmv_setup,
2749 	.o_cleanup	      = lmv_cleanup,
2750 	.o_precleanup	   = lmv_precleanup,
2751 	.o_process_config       = lmv_process_config,
2752 	.o_connect	      = lmv_connect,
2753 	.o_disconnect	   = lmv_disconnect,
2754 	.o_statfs	       = lmv_statfs,
2755 	.o_get_info	     = lmv_get_info,
2756 	.o_set_info_async       = lmv_set_info_async,
2757 	.o_packmd	       = lmv_packmd,
2758 	.o_unpackmd	     = lmv_unpackmd,
2759 	.o_notify	       = lmv_notify,
2760 	.o_get_uuid	     = lmv_get_uuid,
2761 	.o_iocontrol	    = lmv_iocontrol,
2762 	.o_quotacheck	   = lmv_quotacheck,
2763 	.o_quotactl	     = lmv_quotactl
2764 };
2765 
2766 static struct md_ops lmv_md_ops = {
2767 	.m_getstatus	    = lmv_getstatus,
2768 	.m_null_inode		= lmv_null_inode,
2769 	.m_find_cbdata	  = lmv_find_cbdata,
2770 	.m_close		= lmv_close,
2771 	.m_create	       = lmv_create,
2772 	.m_done_writing	 = lmv_done_writing,
2773 	.m_enqueue	      = lmv_enqueue,
2774 	.m_getattr	      = lmv_getattr,
2775 	.m_getxattr	     = lmv_getxattr,
2776 	.m_getattr_name	 = lmv_getattr_name,
2777 	.m_intent_lock	  = lmv_intent_lock,
2778 	.m_link		 = lmv_link,
2779 	.m_rename	       = lmv_rename,
2780 	.m_setattr	      = lmv_setattr,
2781 	.m_setxattr	     = lmv_setxattr,
2782 	.m_sync		 = lmv_sync,
2783 	.m_readpage	     = lmv_readpage,
2784 	.m_unlink	       = lmv_unlink,
2785 	.m_init_ea_size	 = lmv_init_ea_size,
2786 	.m_cancel_unused	= lmv_cancel_unused,
2787 	.m_set_lock_data	= lmv_set_lock_data,
2788 	.m_lock_match	   = lmv_lock_match,
2789 	.m_get_lustre_md	= lmv_get_lustre_md,
2790 	.m_free_lustre_md       = lmv_free_lustre_md,
2791 	.m_set_open_replay_data = lmv_set_open_replay_data,
2792 	.m_clear_open_replay_data = lmv_clear_open_replay_data,
2793 	.m_get_remote_perm      = lmv_get_remote_perm,
2794 	.m_intent_getattr_async = lmv_intent_getattr_async,
2795 	.m_revalidate_lock      = lmv_revalidate_lock
2796 };
2797 
lmv_init(void)2798 static int __init lmv_init(void)
2799 {
2800 	struct lprocfs_static_vars lvars;
2801 	int			rc;
2802 
2803 	lprocfs_lmv_init_vars(&lvars);
2804 
2805 	rc = class_register_type(&lmv_obd_ops, &lmv_md_ops,
2806 				 LUSTRE_LMV_NAME, NULL);
2807 	return rc;
2808 }
2809 
lmv_exit(void)2810 static void lmv_exit(void)
2811 {
2812 	class_unregister_type(LUSTRE_LMV_NAME);
2813 }
2814 
2815 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2816 MODULE_DESCRIPTION("Lustre Logical Metadata Volume OBD driver");
2817 MODULE_LICENSE("GPL");
2818 
2819 module_init(lmv_init);
2820 module_exit(lmv_exit);
2821