1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36 
37 #define DEBUG_SUBSYSTEM S_MDC
38 
39 # include <linux/module.h>
40 # include <linux/pagemap.h>
41 # include <linux/miscdevice.h>
42 # include <linux/init.h>
43 # include <linux/utsname.h>
44 
45 #include "../include/lustre_acl.h"
46 #include "../include/obd_class.h"
47 #include "../include/lustre_fid.h"
48 #include "../include/lprocfs_status.h"
49 #include "../include/lustre_param.h"
50 #include "../include/lustre_log.h"
51 
52 #include "mdc_internal.h"
53 
54 #define REQUEST_MINOR 244
55 
56 struct mdc_renew_capa_args {
57 	struct obd_capa	*ra_oc;
58 	renew_capa_cb_t	 ra_cb;
59 };
60 
61 static int mdc_cleanup(struct obd_device *obd);
62 
mdc_unpack_capa(struct obd_export * exp,struct ptlrpc_request * req,const struct req_msg_field * field,struct obd_capa ** oc)63 static int mdc_unpack_capa(struct obd_export *exp, struct ptlrpc_request *req,
64 		    const struct req_msg_field *field, struct obd_capa **oc)
65 {
66 	struct lustre_capa *capa;
67 	struct obd_capa *c;
68 
69 	/* swabbed already in mdc_enqueue */
70 	capa = req_capsule_server_get(&req->rq_pill, field);
71 	if (capa == NULL)
72 		return -EPROTO;
73 
74 	c = alloc_capa(CAPA_SITE_CLIENT);
75 	if (IS_ERR(c)) {
76 		CDEBUG(D_INFO, "alloc capa failed!\n");
77 		return PTR_ERR(c);
78 	} else {
79 		c->c_capa = *capa;
80 		*oc = c;
81 		return 0;
82 	}
83 }
84 
mdc_queue_wait(struct ptlrpc_request * req)85 static inline int mdc_queue_wait(struct ptlrpc_request *req)
86 {
87 	struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
88 	int rc;
89 
90 	/* mdc_enter_request() ensures that this client has no more
91 	 * than cl_max_rpcs_in_flight RPCs simultaneously inf light
92 	 * against an MDT. */
93 	rc = mdc_enter_request(cli);
94 	if (rc != 0)
95 		return rc;
96 
97 	rc = ptlrpc_queue_wait(req);
98 	mdc_exit_request(cli);
99 
100 	return rc;
101 }
102 
103 /* Helper that implements most of mdc_getstatus and signal_completed_replay. */
104 /* XXX this should become mdc_get_info("key"), sending MDS_GET_INFO RPC */
send_getstatus(struct obd_import * imp,struct lu_fid * rootfid,struct obd_capa ** pc,int level,int msg_flags)105 static int send_getstatus(struct obd_import *imp, struct lu_fid *rootfid,
106 			  struct obd_capa **pc, int level, int msg_flags)
107 {
108 	struct ptlrpc_request *req;
109 	struct mdt_body       *body;
110 	int		    rc;
111 
112 	req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_GETSTATUS,
113 					LUSTRE_MDS_VERSION, MDS_GETSTATUS);
114 	if (req == NULL)
115 		return -ENOMEM;
116 
117 	mdc_pack_body(req, NULL, NULL, 0, 0, -1, 0);
118 	lustre_msg_add_flags(req->rq_reqmsg, msg_flags);
119 	req->rq_send_state = level;
120 
121 	ptlrpc_request_set_replen(req);
122 
123 	rc = ptlrpc_queue_wait(req);
124 	if (rc)
125 		goto out;
126 
127 	body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
128 	if (body == NULL) {
129 		rc = -EPROTO;
130 		goto out;
131 	}
132 
133 	if (body->valid & OBD_MD_FLMDSCAPA) {
134 		rc = mdc_unpack_capa(NULL, req, &RMF_CAPA1, pc);
135 		if (rc)
136 			goto out;
137 	}
138 
139 	*rootfid = body->fid1;
140 	CDEBUG(D_NET,
141 	       "root fid="DFID", last_committed=%llu\n",
142 	       PFID(rootfid),
143 	       lustre_msg_get_last_committed(req->rq_repmsg));
144 out:
145 	ptlrpc_req_finished(req);
146 	return rc;
147 }
148 
149 /* This should be mdc_get_info("rootfid") */
mdc_getstatus(struct obd_export * exp,struct lu_fid * rootfid,struct obd_capa ** pc)150 static int mdc_getstatus(struct obd_export *exp, struct lu_fid *rootfid,
151 		  struct obd_capa **pc)
152 {
153 	return send_getstatus(class_exp2cliimp(exp), rootfid, pc,
154 			      LUSTRE_IMP_FULL, 0);
155 }
156 
157 /*
158  * This function now is known to always saying that it will receive 4 buffers
159  * from server. Even for cases when acl_size and md_size is zero, RPC header
160  * will contain 4 fields and RPC itself will contain zero size fields. This is
161  * because mdt_getattr*() _always_ returns 4 fields, but if acl is not needed
162  * and thus zero, it shrinks it, making zero size. The same story about
163  * md_size. And this is course of problem when client waits for smaller number
164  * of fields. This issue will be fixed later when client gets aware of RPC
165  * layouts.  --umka
166  */
mdc_getattr_common(struct obd_export * exp,struct ptlrpc_request * req)167 static int mdc_getattr_common(struct obd_export *exp,
168 			      struct ptlrpc_request *req)
169 {
170 	struct req_capsule *pill = &req->rq_pill;
171 	struct mdt_body    *body;
172 	void	       *eadata;
173 	int		 rc;
174 
175 	/* Request message already built. */
176 	rc = ptlrpc_queue_wait(req);
177 	if (rc != 0)
178 		return rc;
179 
180 	/* sanity check for the reply */
181 	body = req_capsule_server_get(pill, &RMF_MDT_BODY);
182 	if (body == NULL)
183 		return -EPROTO;
184 
185 	CDEBUG(D_NET, "mode: %o\n", body->mode);
186 
187 	if (body->eadatasize != 0) {
188 		mdc_update_max_ea_from_body(exp, body);
189 
190 		eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
191 						      body->eadatasize);
192 		if (eadata == NULL)
193 			return -EPROTO;
194 	}
195 
196 	if (body->valid & OBD_MD_FLRMTPERM) {
197 		struct mdt_remote_perm *perm;
198 
199 		LASSERT(client_is_remote(exp));
200 		perm = req_capsule_server_swab_get(pill, &RMF_ACL,
201 						lustre_swab_mdt_remote_perm);
202 		if (perm == NULL)
203 			return -EPROTO;
204 	}
205 
206 	if (body->valid & OBD_MD_FLMDSCAPA) {
207 		struct lustre_capa *capa;
208 
209 		capa = req_capsule_server_get(pill, &RMF_CAPA1);
210 		if (capa == NULL)
211 			return -EPROTO;
212 	}
213 
214 	return 0;
215 }
216 
mdc_getattr(struct obd_export * exp,struct md_op_data * op_data,struct ptlrpc_request ** request)217 static int mdc_getattr(struct obd_export *exp, struct md_op_data *op_data,
218 		struct ptlrpc_request **request)
219 {
220 	struct ptlrpc_request *req;
221 	int		    rc;
222 
223 	/* Single MDS without an LMV case */
224 	if (op_data->op_flags & MF_GET_MDT_IDX) {
225 		op_data->op_mds = 0;
226 		return 0;
227 	}
228 	*request = NULL;
229 	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_GETATTR);
230 	if (req == NULL)
231 		return -ENOMEM;
232 
233 	mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
234 
235 	rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GETATTR);
236 	if (rc) {
237 		ptlrpc_request_free(req);
238 		return rc;
239 	}
240 
241 	mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
242 		      op_data->op_valid, op_data->op_mode, -1, 0);
243 
244 	req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
245 			     op_data->op_mode);
246 	if (op_data->op_valid & OBD_MD_FLRMTPERM) {
247 		LASSERT(client_is_remote(exp));
248 		req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
249 				     sizeof(struct mdt_remote_perm));
250 	}
251 	ptlrpc_request_set_replen(req);
252 
253 	rc = mdc_getattr_common(exp, req);
254 	if (rc)
255 		ptlrpc_req_finished(req);
256 	else
257 		*request = req;
258 	return rc;
259 }
260 
mdc_getattr_name(struct obd_export * exp,struct md_op_data * op_data,struct ptlrpc_request ** request)261 static int mdc_getattr_name(struct obd_export *exp, struct md_op_data *op_data,
262 		     struct ptlrpc_request **request)
263 {
264 	struct ptlrpc_request *req;
265 	int		    rc;
266 
267 	*request = NULL;
268 	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
269 				   &RQF_MDS_GETATTR_NAME);
270 	if (req == NULL)
271 		return -ENOMEM;
272 
273 	mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
274 	req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
275 			     op_data->op_namelen + 1);
276 
277 	rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GETATTR_NAME);
278 	if (rc) {
279 		ptlrpc_request_free(req);
280 		return rc;
281 	}
282 
283 	mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
284 		      op_data->op_valid, op_data->op_mode,
285 		      op_data->op_suppgids[0], 0);
286 
287 	if (op_data->op_name) {
288 		char *name = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
289 
290 		LASSERT(strnlen(op_data->op_name, op_data->op_namelen) ==
291 				op_data->op_namelen);
292 		memcpy(name, op_data->op_name, op_data->op_namelen);
293 	}
294 
295 	req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
296 			     op_data->op_mode);
297 	ptlrpc_request_set_replen(req);
298 
299 	rc = mdc_getattr_common(exp, req);
300 	if (rc)
301 		ptlrpc_req_finished(req);
302 	else
303 		*request = req;
304 	return rc;
305 }
306 
mdc_is_subdir(struct obd_export * exp,const struct lu_fid * pfid,const struct lu_fid * cfid,struct ptlrpc_request ** request)307 static int mdc_is_subdir(struct obd_export *exp,
308 			 const struct lu_fid *pfid,
309 			 const struct lu_fid *cfid,
310 			 struct ptlrpc_request **request)
311 {
312 	struct ptlrpc_request  *req;
313 	int		     rc;
314 
315 	*request = NULL;
316 	req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
317 					&RQF_MDS_IS_SUBDIR, LUSTRE_MDS_VERSION,
318 					MDS_IS_SUBDIR);
319 	if (req == NULL)
320 		return -ENOMEM;
321 
322 	mdc_is_subdir_pack(req, pfid, cfid, 0);
323 	ptlrpc_request_set_replen(req);
324 
325 	rc = ptlrpc_queue_wait(req);
326 	if (rc && rc != -EREMOTE)
327 		ptlrpc_req_finished(req);
328 	else
329 		*request = req;
330 	return rc;
331 }
332 
mdc_xattr_common(struct obd_export * exp,const struct req_format * fmt,const struct lu_fid * fid,struct obd_capa * oc,int opcode,u64 valid,const char * xattr_name,const char * input,int input_size,int output_size,int flags,__u32 suppgid,struct ptlrpc_request ** request)333 static int mdc_xattr_common(struct obd_export *exp,
334 			    const struct req_format *fmt,
335 			    const struct lu_fid *fid,
336 			    struct obd_capa *oc, int opcode, u64 valid,
337 			    const char *xattr_name, const char *input,
338 			    int input_size, int output_size, int flags,
339 			    __u32 suppgid, struct ptlrpc_request **request)
340 {
341 	struct ptlrpc_request *req;
342 	int   xattr_namelen = 0;
343 	char *tmp;
344 	int   rc;
345 
346 	*request = NULL;
347 	req = ptlrpc_request_alloc(class_exp2cliimp(exp), fmt);
348 	if (req == NULL)
349 		return -ENOMEM;
350 
351 	mdc_set_capa_size(req, &RMF_CAPA1, oc);
352 	if (xattr_name) {
353 		xattr_namelen = strlen(xattr_name) + 1;
354 		req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
355 				     xattr_namelen);
356 	}
357 	if (input_size) {
358 		LASSERT(input);
359 		req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
360 				     input_size);
361 	}
362 
363 	/* Flush local XATTR locks to get rid of a possible cancel RPC */
364 	if (opcode == MDS_REINT && fid_is_sane(fid) &&
365 	    exp->exp_connect_data.ocd_ibits_known & MDS_INODELOCK_XATTR) {
366 		LIST_HEAD(cancels);
367 		int count;
368 
369 		/* Without that packing would fail */
370 		if (input_size == 0)
371 			req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
372 					     RCL_CLIENT, 0);
373 
374 		count = mdc_resource_get_unused(exp, fid,
375 						&cancels, LCK_EX,
376 						MDS_INODELOCK_XATTR);
377 
378 		rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
379 		if (rc) {
380 			ptlrpc_request_free(req);
381 			return rc;
382 		}
383 	} else {
384 		rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, opcode);
385 		if (rc) {
386 			ptlrpc_request_free(req);
387 			return rc;
388 		}
389 	}
390 
391 	if (opcode == MDS_REINT) {
392 		struct mdt_rec_setxattr *rec;
393 
394 		CLASSERT(sizeof(struct mdt_rec_setxattr) ==
395 			 sizeof(struct mdt_rec_reint));
396 		rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
397 		rec->sx_opcode = REINT_SETXATTR;
398 		rec->sx_fsuid  = from_kuid(&init_user_ns, current_fsuid());
399 		rec->sx_fsgid  = from_kgid(&init_user_ns, current_fsgid());
400 		rec->sx_cap    = cfs_curproc_cap_pack();
401 		rec->sx_suppgid1 = suppgid;
402 		rec->sx_suppgid2 = -1;
403 		rec->sx_fid    = *fid;
404 		rec->sx_valid  = valid | OBD_MD_FLCTIME;
405 		rec->sx_time   = get_seconds();
406 		rec->sx_size   = output_size;
407 		rec->sx_flags  = flags;
408 
409 		mdc_pack_capa(req, &RMF_CAPA1, oc);
410 	} else {
411 		mdc_pack_body(req, fid, oc, valid, output_size, suppgid, flags);
412 	}
413 
414 	if (xattr_name) {
415 		tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
416 		memcpy(tmp, xattr_name, xattr_namelen);
417 	}
418 	if (input_size) {
419 		tmp = req_capsule_client_get(&req->rq_pill, &RMF_EADATA);
420 		memcpy(tmp, input, input_size);
421 	}
422 
423 	if (req_capsule_has_field(&req->rq_pill, &RMF_EADATA, RCL_SERVER))
424 		req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
425 				     RCL_SERVER, output_size);
426 	ptlrpc_request_set_replen(req);
427 
428 	/* make rpc */
429 	if (opcode == MDS_REINT)
430 		mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
431 
432 	rc = ptlrpc_queue_wait(req);
433 
434 	if (opcode == MDS_REINT)
435 		mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
436 
437 	if (rc)
438 		ptlrpc_req_finished(req);
439 	else
440 		*request = req;
441 	return rc;
442 }
443 
mdc_setxattr(struct obd_export * exp,const struct lu_fid * fid,struct obd_capa * oc,u64 valid,const char * xattr_name,const char * input,int input_size,int output_size,int flags,__u32 suppgid,struct ptlrpc_request ** request)444 static int mdc_setxattr(struct obd_export *exp, const struct lu_fid *fid,
445 		 struct obd_capa *oc, u64 valid, const char *xattr_name,
446 		 const char *input, int input_size, int output_size,
447 		 int flags, __u32 suppgid, struct ptlrpc_request **request)
448 {
449 	return mdc_xattr_common(exp, &RQF_MDS_REINT_SETXATTR,
450 				fid, oc, MDS_REINT, valid, xattr_name,
451 				input, input_size, output_size, flags,
452 				suppgid, request);
453 }
454 
mdc_getxattr(struct obd_export * exp,const struct lu_fid * fid,struct obd_capa * oc,u64 valid,const char * xattr_name,const char * input,int input_size,int output_size,int flags,struct ptlrpc_request ** request)455 static int mdc_getxattr(struct obd_export *exp, const struct lu_fid *fid,
456 		 struct obd_capa *oc, u64 valid, const char *xattr_name,
457 		 const char *input, int input_size, int output_size,
458 		 int flags, struct ptlrpc_request **request)
459 {
460 	return mdc_xattr_common(exp, &RQF_MDS_GETXATTR,
461 				fid, oc, MDS_GETXATTR, valid, xattr_name,
462 				input, input_size, output_size, flags,
463 				-1, request);
464 }
465 
466 #ifdef CONFIG_FS_POSIX_ACL
mdc_unpack_acl(struct ptlrpc_request * req,struct lustre_md * md)467 static int mdc_unpack_acl(struct ptlrpc_request *req, struct lustre_md *md)
468 {
469 	struct req_capsule     *pill = &req->rq_pill;
470 	struct mdt_body	*body = md->body;
471 	struct posix_acl       *acl;
472 	void		   *buf;
473 	int		     rc;
474 
475 	if (!body->aclsize)
476 		return 0;
477 
478 	buf = req_capsule_server_sized_get(pill, &RMF_ACL, body->aclsize);
479 
480 	if (!buf)
481 		return -EPROTO;
482 
483 	acl = posix_acl_from_xattr(&init_user_ns, buf, body->aclsize);
484 	if (acl == NULL)
485 		return 0;
486 
487 	if (IS_ERR(acl)) {
488 		rc = PTR_ERR(acl);
489 		CERROR("convert xattr to acl: %d\n", rc);
490 		return rc;
491 	}
492 
493 	rc = posix_acl_valid(acl);
494 	if (rc) {
495 		CERROR("validate acl: %d\n", rc);
496 		posix_acl_release(acl);
497 		return rc;
498 	}
499 
500 	md->posix_acl = acl;
501 	return 0;
502 }
503 #else
504 #define mdc_unpack_acl(req, md) 0
505 #endif
506 
mdc_get_lustre_md(struct obd_export * exp,struct ptlrpc_request * req,struct obd_export * dt_exp,struct obd_export * md_exp,struct lustre_md * md)507 int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
508 		      struct obd_export *dt_exp, struct obd_export *md_exp,
509 		      struct lustre_md *md)
510 {
511 	struct req_capsule *pill = &req->rq_pill;
512 	int rc;
513 
514 	LASSERT(md);
515 	memset(md, 0, sizeof(*md));
516 
517 	md->body = req_capsule_server_get(pill, &RMF_MDT_BODY);
518 	LASSERT(md->body != NULL);
519 
520 	if (md->body->valid & OBD_MD_FLEASIZE) {
521 		int lmmsize;
522 		struct lov_mds_md *lmm;
523 
524 		if (!S_ISREG(md->body->mode)) {
525 			CDEBUG(D_INFO,
526 			       "OBD_MD_FLEASIZE set, should be a regular file, but is not\n");
527 			rc = -EPROTO;
528 			goto out;
529 		}
530 
531 		if (md->body->eadatasize == 0) {
532 			CDEBUG(D_INFO,
533 			       "OBD_MD_FLEASIZE set, but eadatasize 0\n");
534 			rc = -EPROTO;
535 			goto out;
536 		}
537 		lmmsize = md->body->eadatasize;
538 		lmm = req_capsule_server_sized_get(pill, &RMF_MDT_MD, lmmsize);
539 		if (!lmm) {
540 			rc = -EPROTO;
541 			goto out;
542 		}
543 
544 		rc = obd_unpackmd(dt_exp, &md->lsm, lmm, lmmsize);
545 		if (rc < 0)
546 			goto out;
547 
548 		if (rc < sizeof(*md->lsm)) {
549 			CDEBUG(D_INFO,
550 			       "lsm size too small: rc < sizeof (*md->lsm) (%d < %d)\n",
551 			       rc, (int)sizeof(*md->lsm));
552 			rc = -EPROTO;
553 			goto out;
554 		}
555 
556 	} else if (md->body->valid & OBD_MD_FLDIREA) {
557 		int lmvsize;
558 		struct lov_mds_md *lmv;
559 
560 		if (!S_ISDIR(md->body->mode)) {
561 			CDEBUG(D_INFO,
562 			       "OBD_MD_FLDIREA set, should be a directory, but is not\n");
563 			rc = -EPROTO;
564 			goto out;
565 		}
566 
567 		if (md->body->eadatasize == 0) {
568 			CDEBUG(D_INFO,
569 			       "OBD_MD_FLDIREA is set, but eadatasize 0\n");
570 			return -EPROTO;
571 		}
572 		if (md->body->valid & OBD_MD_MEA) {
573 			lmvsize = md->body->eadatasize;
574 			lmv = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
575 							   lmvsize);
576 			if (!lmv) {
577 				rc = -EPROTO;
578 				goto out;
579 			}
580 
581 			rc = obd_unpackmd(md_exp, (void *)&md->mea, lmv,
582 					  lmvsize);
583 			if (rc < 0)
584 				goto out;
585 
586 			if (rc < sizeof(*md->mea)) {
587 				CDEBUG(D_INFO,
588 				       "size too small: rc < sizeof(*md->mea) (%d < %d)\n",
589 					rc, (int)sizeof(*md->mea));
590 				rc = -EPROTO;
591 				goto out;
592 			}
593 		}
594 	}
595 	rc = 0;
596 
597 	if (md->body->valid & OBD_MD_FLRMTPERM) {
598 		/* remote permission */
599 		LASSERT(client_is_remote(exp));
600 		md->remote_perm = req_capsule_server_swab_get(pill, &RMF_ACL,
601 						lustre_swab_mdt_remote_perm);
602 		if (!md->remote_perm) {
603 			rc = -EPROTO;
604 			goto out;
605 		}
606 	} else if (md->body->valid & OBD_MD_FLACL) {
607 		/* for ACL, it's possible that FLACL is set but aclsize is zero.
608 		 * only when aclsize != 0 there's an actual segment for ACL
609 		 * in reply buffer.
610 		 */
611 		if (md->body->aclsize) {
612 			rc = mdc_unpack_acl(req, md);
613 			if (rc)
614 				goto out;
615 #ifdef CONFIG_FS_POSIX_ACL
616 		} else {
617 			md->posix_acl = NULL;
618 #endif
619 		}
620 	}
621 	if (md->body->valid & OBD_MD_FLMDSCAPA) {
622 		struct obd_capa *oc = NULL;
623 
624 		rc = mdc_unpack_capa(NULL, req, &RMF_CAPA1, &oc);
625 		if (rc)
626 			goto out;
627 		md->mds_capa = oc;
628 	}
629 
630 	if (md->body->valid & OBD_MD_FLOSSCAPA) {
631 		struct obd_capa *oc = NULL;
632 
633 		rc = mdc_unpack_capa(NULL, req, &RMF_CAPA2, &oc);
634 		if (rc)
635 			goto out;
636 		md->oss_capa = oc;
637 	}
638 
639 out:
640 	if (rc) {
641 		if (md->oss_capa) {
642 			capa_put(md->oss_capa);
643 			md->oss_capa = NULL;
644 		}
645 		if (md->mds_capa) {
646 			capa_put(md->mds_capa);
647 			md->mds_capa = NULL;
648 		}
649 #ifdef CONFIG_FS_POSIX_ACL
650 		posix_acl_release(md->posix_acl);
651 #endif
652 		if (md->lsm)
653 			obd_free_memmd(dt_exp, &md->lsm);
654 	}
655 	return rc;
656 }
657 
mdc_free_lustre_md(struct obd_export * exp,struct lustre_md * md)658 int mdc_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
659 {
660 	return 0;
661 }
662 
663 /**
664  * Handles both OPEN and SETATTR RPCs for OPEN-CLOSE and SETATTR-DONE_WRITING
665  * RPC chains.
666  */
mdc_replay_open(struct ptlrpc_request * req)667 void mdc_replay_open(struct ptlrpc_request *req)
668 {
669 	struct md_open_data *mod = req->rq_cb_data;
670 	struct ptlrpc_request *close_req;
671 	struct obd_client_handle *och;
672 	struct lustre_handle old;
673 	struct mdt_body *body;
674 
675 	if (mod == NULL) {
676 		DEBUG_REQ(D_ERROR, req,
677 			  "Can't properly replay without open data.");
678 		return;
679 	}
680 
681 	body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
682 	LASSERT(body != NULL);
683 
684 	och = mod->mod_och;
685 	if (och != NULL) {
686 		struct lustre_handle *file_fh;
687 
688 		LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC);
689 
690 		file_fh = &och->och_fh;
691 		CDEBUG(D_HA, "updating handle from %#llx to %#llx\n",
692 		       file_fh->cookie, body->handle.cookie);
693 		old = *file_fh;
694 		*file_fh = body->handle;
695 	}
696 	close_req = mod->mod_close_req;
697 	if (close_req != NULL) {
698 		__u32 opc = lustre_msg_get_opc(close_req->rq_reqmsg);
699 		struct mdt_ioepoch *epoch;
700 
701 		LASSERT(opc == MDS_CLOSE || opc == MDS_DONE_WRITING);
702 		epoch = req_capsule_client_get(&close_req->rq_pill,
703 					       &RMF_MDT_EPOCH);
704 		LASSERT(epoch);
705 
706 		if (och != NULL)
707 			LASSERT(!memcmp(&old, &epoch->handle, sizeof(old)));
708 		DEBUG_REQ(D_HA, close_req, "updating close body with new fh");
709 		epoch->handle = body->handle;
710 	}
711 }
712 
mdc_commit_open(struct ptlrpc_request * req)713 void mdc_commit_open(struct ptlrpc_request *req)
714 {
715 	struct md_open_data *mod = req->rq_cb_data;
716 
717 	if (mod == NULL)
718 		return;
719 
720 	/**
721 	 * No need to touch md_open_data::mod_och, it holds a reference on
722 	 * \var mod and will zero references to each other, \var mod will be
723 	 * freed after that when md_open_data::mod_och will put the reference.
724 	 */
725 
726 	/**
727 	 * Do not let open request to disappear as it still may be needed
728 	 * for close rpc to happen (it may happen on evict only, otherwise
729 	 * ptlrpc_request::rq_replay does not let mdc_commit_open() to be
730 	 * called), just mark this rpc as committed to distinguish these 2
731 	 * cases, see mdc_close() for details. The open request reference will
732 	 * be put along with freeing \var mod.
733 	 */
734 	ptlrpc_request_addref(req);
735 	spin_lock(&req->rq_lock);
736 	req->rq_committed = 1;
737 	spin_unlock(&req->rq_lock);
738 	req->rq_cb_data = NULL;
739 	obd_mod_put(mod);
740 }
741 
mdc_set_open_replay_data(struct obd_export * exp,struct obd_client_handle * och,struct lookup_intent * it)742 int mdc_set_open_replay_data(struct obd_export *exp,
743 			     struct obd_client_handle *och,
744 			     struct lookup_intent *it)
745 {
746 	struct md_open_data   *mod;
747 	struct mdt_rec_create *rec;
748 	struct mdt_body       *body;
749 	struct ptlrpc_request *open_req = it->d.lustre.it_data;
750 	struct obd_import     *imp = open_req->rq_import;
751 
752 	if (!open_req->rq_replay)
753 		return 0;
754 
755 	rec = req_capsule_client_get(&open_req->rq_pill, &RMF_REC_REINT);
756 	body = req_capsule_server_get(&open_req->rq_pill, &RMF_MDT_BODY);
757 	LASSERT(rec != NULL);
758 	/* Incoming message in my byte order (it's been swabbed). */
759 	/* Outgoing messages always in my byte order. */
760 	LASSERT(body != NULL);
761 
762 	/* Only if the import is replayable, we set replay_open data */
763 	if (och && imp->imp_replayable) {
764 		mod = obd_mod_alloc();
765 		if (mod == NULL) {
766 			DEBUG_REQ(D_ERROR, open_req,
767 				  "Can't allocate md_open_data");
768 			return 0;
769 		}
770 
771 		/**
772 		 * Take a reference on \var mod, to be freed on mdc_close().
773 		 * It protects \var mod from being freed on eviction (commit
774 		 * callback is called despite rq_replay flag).
775 		 * Another reference for \var och.
776 		 */
777 		obd_mod_get(mod);
778 		obd_mod_get(mod);
779 
780 		spin_lock(&open_req->rq_lock);
781 		och->och_mod = mod;
782 		mod->mod_och = och;
783 		mod->mod_is_create = it_disposition(it, DISP_OPEN_CREATE) ||
784 				     it_disposition(it, DISP_OPEN_STRIPE);
785 		mod->mod_open_req = open_req;
786 		open_req->rq_cb_data = mod;
787 		open_req->rq_commit_cb = mdc_commit_open;
788 		spin_unlock(&open_req->rq_lock);
789 	}
790 
791 	rec->cr_fid2 = body->fid1;
792 	rec->cr_ioepoch = body->ioepoch;
793 	rec->cr_old_handle.cookie = body->handle.cookie;
794 	open_req->rq_replay_cb = mdc_replay_open;
795 	if (!fid_is_sane(&body->fid1)) {
796 		DEBUG_REQ(D_ERROR, open_req,
797 			  "Saving replay request with insane fid");
798 		LBUG();
799 	}
800 
801 	DEBUG_REQ(D_RPCTRACE, open_req, "Set up open replay data");
802 	return 0;
803 }
804 
mdc_free_open(struct md_open_data * mod)805 static void mdc_free_open(struct md_open_data *mod)
806 {
807 	int committed = 0;
808 
809 	if (mod->mod_is_create == 0 &&
810 	    imp_connect_disp_stripe(mod->mod_open_req->rq_import))
811 		committed = 1;
812 
813 	LASSERT(mod->mod_open_req->rq_replay == 0);
814 
815 	DEBUG_REQ(D_RPCTRACE, mod->mod_open_req, "free open request\n");
816 
817 	ptlrpc_request_committed(mod->mod_open_req, committed);
818 	if (mod->mod_close_req)
819 		ptlrpc_request_committed(mod->mod_close_req, committed);
820 }
821 
mdc_clear_open_replay_data(struct obd_export * exp,struct obd_client_handle * och)822 int mdc_clear_open_replay_data(struct obd_export *exp,
823 			       struct obd_client_handle *och)
824 {
825 	struct md_open_data *mod = och->och_mod;
826 
827 	/**
828 	 * It is possible to not have \var mod in a case of eviction between
829 	 * lookup and ll_file_open().
830 	 **/
831 	if (mod == NULL)
832 		return 0;
833 
834 	LASSERT(mod != LP_POISON);
835 	LASSERT(mod->mod_open_req != NULL);
836 	mdc_free_open(mod);
837 
838 	mod->mod_och = NULL;
839 	och->och_mod = NULL;
840 	obd_mod_put(mod);
841 
842 	return 0;
843 }
844 
845 /* Prepares the request for the replay by the given reply */
mdc_close_handle_reply(struct ptlrpc_request * req,struct md_op_data * op_data,int rc)846 static void mdc_close_handle_reply(struct ptlrpc_request *req,
847 				   struct md_op_data *op_data, int rc) {
848 	struct mdt_body  *repbody;
849 	struct mdt_ioepoch *epoch;
850 
851 	if (req && rc == -EAGAIN) {
852 		repbody = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
853 		epoch = req_capsule_client_get(&req->rq_pill, &RMF_MDT_EPOCH);
854 
855 		epoch->flags |= MF_SOM_AU;
856 		if (repbody->valid & OBD_MD_FLGETATTRLOCK)
857 			op_data->op_flags |= MF_GETATTR_LOCK;
858 	}
859 }
860 
mdc_close(struct obd_export * exp,struct md_op_data * op_data,struct md_open_data * mod,struct ptlrpc_request ** request)861 static int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
862 		     struct md_open_data *mod, struct ptlrpc_request **request)
863 {
864 	struct obd_device     *obd = class_exp2obd(exp);
865 	struct ptlrpc_request *req;
866 	struct req_format     *req_fmt;
867 	int                    rc;
868 	int		       saved_rc = 0;
869 
870 
871 	req_fmt = &RQF_MDS_CLOSE;
872 	if (op_data->op_bias & MDS_HSM_RELEASE) {
873 		req_fmt = &RQF_MDS_RELEASE_CLOSE;
874 
875 		/* allocate a FID for volatile file */
876 		rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
877 		if (rc < 0) {
878 			CERROR("%s: "DFID" failed to allocate FID: %d\n",
879 			       obd->obd_name, PFID(&op_data->op_fid1), rc);
880 			/* save the errcode and proceed to close */
881 			saved_rc = rc;
882 		}
883 	}
884 
885 	*request = NULL;
886 	req = ptlrpc_request_alloc(class_exp2cliimp(exp), req_fmt);
887 	if (req == NULL)
888 		return -ENOMEM;
889 
890 	mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
891 
892 	rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_CLOSE);
893 	if (rc) {
894 		ptlrpc_request_free(req);
895 		return rc;
896 	}
897 
898 	/* To avoid a livelock (bug 7034), we need to send CLOSE RPCs to a
899 	 * portal whose threads are not taking any DLM locks and are therefore
900 	 * always progressing */
901 	req->rq_request_portal = MDS_READPAGE_PORTAL;
902 	ptlrpc_at_set_req_timeout(req);
903 
904 	/* Ensure that this close's handle is fixed up during replay. */
905 	if (likely(mod != NULL)) {
906 		LASSERTF(mod->mod_open_req != NULL &&
907 			 mod->mod_open_req->rq_type != LI_POISON,
908 			 "POISONED open %p!\n", mod->mod_open_req);
909 
910 		mod->mod_close_req = req;
911 
912 		DEBUG_REQ(D_HA, mod->mod_open_req, "matched open");
913 		/* We no longer want to preserve this open for replay even
914 		 * though the open was committed. b=3632, b=3633 */
915 		spin_lock(&mod->mod_open_req->rq_lock);
916 		mod->mod_open_req->rq_replay = 0;
917 		spin_unlock(&mod->mod_open_req->rq_lock);
918 	} else {
919 		 CDEBUG(D_HA,
920 			"couldn't find open req; expecting close error\n");
921 	}
922 
923 	mdc_close_pack(req, op_data);
924 
925 	req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
926 			     obd->u.cli.cl_default_mds_easize);
927 	req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER,
928 			     obd->u.cli.cl_default_mds_cookiesize);
929 
930 	ptlrpc_request_set_replen(req);
931 
932 	mdc_get_rpc_lock(obd->u.cli.cl_close_lock, NULL);
933 	rc = ptlrpc_queue_wait(req);
934 	mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL);
935 
936 	if (req->rq_repmsg == NULL) {
937 		CDEBUG(D_RPCTRACE, "request failed to send: %p, %d\n", req,
938 		       req->rq_status);
939 		if (rc == 0)
940 			rc = req->rq_status ?: -EIO;
941 	} else if (rc == 0 || rc == -EAGAIN) {
942 		struct mdt_body *body;
943 
944 		rc = lustre_msg_get_status(req->rq_repmsg);
945 		if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) {
946 			DEBUG_REQ(D_ERROR, req,
947 				  "type == PTL_RPC_MSG_ERR, err = %d", rc);
948 			if (rc > 0)
949 				rc = -rc;
950 		}
951 		body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
952 		if (body == NULL)
953 			rc = -EPROTO;
954 	} else if (rc == -ESTALE) {
955 		/**
956 		 * it can be allowed error after 3633 if open was committed and
957 		 * server failed before close was sent. Let's check if mod
958 		 * exists and return no error in that case
959 		 */
960 		if (mod) {
961 			DEBUG_REQ(D_HA, req, "Reset ESTALE = %d", rc);
962 			LASSERT(mod->mod_open_req != NULL);
963 			if (mod->mod_open_req->rq_committed)
964 				rc = 0;
965 		}
966 	}
967 
968 	if (mod) {
969 		if (rc != 0)
970 			mod->mod_close_req = NULL;
971 		/* Since now, mod is accessed through open_req only,
972 		 * thus close req does not keep a reference on mod anymore. */
973 		obd_mod_put(mod);
974 	}
975 	*request = req;
976 	mdc_close_handle_reply(req, op_data, rc);
977 	return rc < 0 ? rc : saved_rc;
978 }
979 
mdc_done_writing(struct obd_export * exp,struct md_op_data * op_data,struct md_open_data * mod)980 static int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data,
981 			    struct md_open_data *mod)
982 {
983 	struct obd_device     *obd = class_exp2obd(exp);
984 	struct ptlrpc_request *req;
985 	int		    rc;
986 
987 	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
988 				   &RQF_MDS_DONE_WRITING);
989 	if (req == NULL)
990 		return -ENOMEM;
991 
992 	mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
993 	rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_DONE_WRITING);
994 	if (rc) {
995 		ptlrpc_request_free(req);
996 		return rc;
997 	}
998 
999 	if (mod != NULL) {
1000 		LASSERTF(mod->mod_open_req != NULL &&
1001 			 mod->mod_open_req->rq_type != LI_POISON,
1002 			 "POISONED setattr %p!\n", mod->mod_open_req);
1003 
1004 		mod->mod_close_req = req;
1005 		DEBUG_REQ(D_HA, mod->mod_open_req, "matched setattr");
1006 		/* We no longer want to preserve this setattr for replay even
1007 		 * though the open was committed. b=3632, b=3633 */
1008 		spin_lock(&mod->mod_open_req->rq_lock);
1009 		mod->mod_open_req->rq_replay = 0;
1010 		spin_unlock(&mod->mod_open_req->rq_lock);
1011 	}
1012 
1013 	mdc_close_pack(req, op_data);
1014 	ptlrpc_request_set_replen(req);
1015 
1016 	mdc_get_rpc_lock(obd->u.cli.cl_close_lock, NULL);
1017 	rc = ptlrpc_queue_wait(req);
1018 	mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL);
1019 
1020 	if (rc == -ESTALE) {
1021 		/**
1022 		 * it can be allowed error after 3633 if open or setattr were
1023 		 * committed and server failed before close was sent.
1024 		 * Let's check if mod exists and return no error in that case
1025 		 */
1026 		if (mod) {
1027 			LASSERT(mod->mod_open_req != NULL);
1028 			if (mod->mod_open_req->rq_committed)
1029 				rc = 0;
1030 		}
1031 	}
1032 
1033 	if (mod) {
1034 		if (rc != 0)
1035 			mod->mod_close_req = NULL;
1036 		LASSERT(mod->mod_open_req != NULL);
1037 		mdc_free_open(mod);
1038 
1039 		/* Since now, mod is accessed through setattr req only,
1040 		 * thus DW req does not keep a reference on mod anymore. */
1041 		obd_mod_put(mod);
1042 	}
1043 
1044 	mdc_close_handle_reply(req, op_data, rc);
1045 	ptlrpc_req_finished(req);
1046 	return rc;
1047 }
1048 
1049 
mdc_readpage(struct obd_export * exp,struct md_op_data * op_data,struct page ** pages,struct ptlrpc_request ** request)1050 static int mdc_readpage(struct obd_export *exp, struct md_op_data *op_data,
1051 			struct page **pages, struct ptlrpc_request **request)
1052 {
1053 	struct ptlrpc_request   *req;
1054 	struct ptlrpc_bulk_desc *desc;
1055 	int		      i;
1056 	wait_queue_head_t	      waitq;
1057 	int		      resends = 0;
1058 	struct l_wait_info       lwi;
1059 	int		      rc;
1060 
1061 	*request = NULL;
1062 	init_waitqueue_head(&waitq);
1063 
1064 restart_bulk:
1065 	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_READPAGE);
1066 	if (req == NULL)
1067 		return -ENOMEM;
1068 
1069 	mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
1070 
1071 	rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_READPAGE);
1072 	if (rc) {
1073 		ptlrpc_request_free(req);
1074 		return rc;
1075 	}
1076 
1077 	req->rq_request_portal = MDS_READPAGE_PORTAL;
1078 	ptlrpc_at_set_req_timeout(req);
1079 
1080 	desc = ptlrpc_prep_bulk_imp(req, op_data->op_npages, 1, BULK_PUT_SINK,
1081 				    MDS_BULK_PORTAL);
1082 	if (desc == NULL) {
1083 		ptlrpc_request_free(req);
1084 		return -ENOMEM;
1085 	}
1086 
1087 	/* NB req now owns desc and will free it when it gets freed */
1088 	for (i = 0; i < op_data->op_npages; i++)
1089 		ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_CACHE_SIZE);
1090 
1091 	mdc_readdir_pack(req, op_data->op_offset,
1092 			 PAGE_CACHE_SIZE * op_data->op_npages,
1093 			 &op_data->op_fid1, op_data->op_capa1);
1094 
1095 	ptlrpc_request_set_replen(req);
1096 	rc = ptlrpc_queue_wait(req);
1097 	if (rc) {
1098 		ptlrpc_req_finished(req);
1099 		if (rc != -ETIMEDOUT)
1100 			return rc;
1101 
1102 		resends++;
1103 		if (!client_should_resend(resends, &exp->exp_obd->u.cli)) {
1104 			CERROR("too many resend retries, returning error\n");
1105 			return -EIO;
1106 		}
1107 		lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends),
1108 				       NULL, NULL, NULL);
1109 		l_wait_event(waitq, 0, &lwi);
1110 
1111 		goto restart_bulk;
1112 	}
1113 
1114 	rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk,
1115 					  req->rq_bulk->bd_nob_transferred);
1116 	if (rc < 0) {
1117 		ptlrpc_req_finished(req);
1118 		return rc;
1119 	}
1120 
1121 	if (req->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK) {
1122 		CERROR("Unexpected # bytes transferred: %d (%ld expected)\n",
1123 			req->rq_bulk->bd_nob_transferred,
1124 			PAGE_CACHE_SIZE * op_data->op_npages);
1125 		ptlrpc_req_finished(req);
1126 		return -EPROTO;
1127 	}
1128 
1129 	*request = req;
1130 	return 0;
1131 }
1132 
mdc_statfs(const struct lu_env * env,struct obd_export * exp,struct obd_statfs * osfs,__u64 max_age,__u32 flags)1133 static int mdc_statfs(const struct lu_env *env,
1134 		      struct obd_export *exp, struct obd_statfs *osfs,
1135 		      __u64 max_age, __u32 flags)
1136 {
1137 	struct obd_device     *obd = class_exp2obd(exp);
1138 	struct ptlrpc_request *req;
1139 	struct obd_statfs     *msfs;
1140 	struct obd_import     *imp = NULL;
1141 	int		    rc;
1142 
1143 	/*
1144 	 * Since the request might also come from lprocfs, so we need
1145 	 * sync this with client_disconnect_export Bug15684
1146 	 */
1147 	down_read(&obd->u.cli.cl_sem);
1148 	if (obd->u.cli.cl_import)
1149 		imp = class_import_get(obd->u.cli.cl_import);
1150 	up_read(&obd->u.cli.cl_sem);
1151 	if (!imp)
1152 		return -ENODEV;
1153 
1154 	req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_STATFS,
1155 					LUSTRE_MDS_VERSION, MDS_STATFS);
1156 	if (req == NULL) {
1157 		rc = -ENOMEM;
1158 		goto output;
1159 	}
1160 
1161 	ptlrpc_request_set_replen(req);
1162 
1163 	if (flags & OBD_STATFS_NODELAY) {
1164 		/* procfs requests not want stay in wait for avoid deadlock */
1165 		req->rq_no_resend = 1;
1166 		req->rq_no_delay = 1;
1167 	}
1168 
1169 	rc = ptlrpc_queue_wait(req);
1170 	if (rc) {
1171 		/* check connection error first */
1172 		if (imp->imp_connect_error)
1173 			rc = imp->imp_connect_error;
1174 		goto out;
1175 	}
1176 
1177 	msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
1178 	if (msfs == NULL) {
1179 		rc = -EPROTO;
1180 		goto out;
1181 	}
1182 
1183 	*osfs = *msfs;
1184 out:
1185 	ptlrpc_req_finished(req);
1186 output:
1187 	class_import_put(imp);
1188 	return rc;
1189 }
1190 
mdc_ioc_fid2path(struct obd_export * exp,struct getinfo_fid2path * gf)1191 static int mdc_ioc_fid2path(struct obd_export *exp, struct getinfo_fid2path *gf)
1192 {
1193 	__u32 keylen, vallen;
1194 	void *key;
1195 	int rc;
1196 
1197 	if (gf->gf_pathlen > PATH_MAX)
1198 		return -ENAMETOOLONG;
1199 	if (gf->gf_pathlen < 2)
1200 		return -EOVERFLOW;
1201 
1202 	/* Key is KEY_FID2PATH + getinfo_fid2path description */
1203 	keylen = cfs_size_round(sizeof(KEY_FID2PATH)) + sizeof(*gf);
1204 	OBD_ALLOC(key, keylen);
1205 	if (key == NULL)
1206 		return -ENOMEM;
1207 	memcpy(key, KEY_FID2PATH, sizeof(KEY_FID2PATH));
1208 	memcpy(key + cfs_size_round(sizeof(KEY_FID2PATH)), gf, sizeof(*gf));
1209 
1210 	CDEBUG(D_IOCTL, "path get "DFID" from %llu #%d\n",
1211 	       PFID(&gf->gf_fid), gf->gf_recno, gf->gf_linkno);
1212 
1213 	if (!fid_is_sane(&gf->gf_fid)) {
1214 		rc = -EINVAL;
1215 		goto out;
1216 	}
1217 
1218 	/* Val is struct getinfo_fid2path result plus path */
1219 	vallen = sizeof(*gf) + gf->gf_pathlen;
1220 
1221 	rc = obd_get_info(NULL, exp, keylen, key, &vallen, gf, NULL);
1222 	if (rc != 0 && rc != -EREMOTE)
1223 		goto out;
1224 
1225 	if (vallen <= sizeof(*gf)) {
1226 		rc = -EPROTO;
1227 		goto out;
1228 	} else if (vallen > sizeof(*gf) + gf->gf_pathlen) {
1229 		rc = -EOVERFLOW;
1230 		goto out;
1231 	}
1232 
1233 	CDEBUG(D_IOCTL, "path get "DFID" from %llu #%d\n%s\n",
1234 	       PFID(&gf->gf_fid), gf->gf_recno, gf->gf_linkno, gf->gf_path);
1235 
1236 out:
1237 	OBD_FREE(key, keylen);
1238 	return rc;
1239 }
1240 
mdc_ioc_hsm_progress(struct obd_export * exp,struct hsm_progress_kernel * hpk)1241 static int mdc_ioc_hsm_progress(struct obd_export *exp,
1242 				struct hsm_progress_kernel *hpk)
1243 {
1244 	struct obd_import		*imp = class_exp2cliimp(exp);
1245 	struct hsm_progress_kernel	*req_hpk;
1246 	struct ptlrpc_request		*req;
1247 	int				 rc;
1248 
1249 	req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_HSM_PROGRESS,
1250 					LUSTRE_MDS_VERSION, MDS_HSM_PROGRESS);
1251 	if (req == NULL) {
1252 		rc = -ENOMEM;
1253 		goto out;
1254 	}
1255 
1256 	mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, 0, 0);
1257 
1258 	/* Copy hsm_progress struct */
1259 	req_hpk = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_PROGRESS);
1260 	if (req_hpk == NULL) {
1261 		rc = -EPROTO;
1262 		goto out;
1263 	}
1264 
1265 	*req_hpk = *hpk;
1266 	req_hpk->hpk_errval = lustre_errno_hton(hpk->hpk_errval);
1267 
1268 	ptlrpc_request_set_replen(req);
1269 
1270 	rc = mdc_queue_wait(req);
1271 	goto out;
1272 out:
1273 	ptlrpc_req_finished(req);
1274 	return rc;
1275 }
1276 
mdc_ioc_hsm_ct_register(struct obd_import * imp,__u32 archives)1277 static int mdc_ioc_hsm_ct_register(struct obd_import *imp, __u32 archives)
1278 {
1279 	__u32			*archive_mask;
1280 	struct ptlrpc_request	*req;
1281 	int			 rc;
1282 
1283 	req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_HSM_CT_REGISTER,
1284 					LUSTRE_MDS_VERSION,
1285 					MDS_HSM_CT_REGISTER);
1286 	if (req == NULL) {
1287 		rc = -ENOMEM;
1288 		goto out;
1289 	}
1290 
1291 	mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, 0, 0);
1292 
1293 	/* Copy hsm_progress struct */
1294 	archive_mask = req_capsule_client_get(&req->rq_pill,
1295 					      &RMF_MDS_HSM_ARCHIVE);
1296 	if (archive_mask == NULL) {
1297 		rc = -EPROTO;
1298 		goto out;
1299 	}
1300 
1301 	*archive_mask = archives;
1302 
1303 	ptlrpc_request_set_replen(req);
1304 
1305 	rc = mdc_queue_wait(req);
1306 	goto out;
1307 out:
1308 	ptlrpc_req_finished(req);
1309 	return rc;
1310 }
1311 
mdc_ioc_hsm_current_action(struct obd_export * exp,struct md_op_data * op_data)1312 static int mdc_ioc_hsm_current_action(struct obd_export *exp,
1313 				      struct md_op_data *op_data)
1314 {
1315 	struct hsm_current_action	*hca = op_data->op_data;
1316 	struct hsm_current_action	*req_hca;
1317 	struct ptlrpc_request		*req;
1318 	int				 rc;
1319 
1320 	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
1321 				   &RQF_MDS_HSM_ACTION);
1322 	if (req == NULL)
1323 		return -ENOMEM;
1324 
1325 	mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
1326 
1327 	rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_ACTION);
1328 	if (rc) {
1329 		ptlrpc_request_free(req);
1330 		return rc;
1331 	}
1332 
1333 	mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
1334 		      OBD_MD_FLRMTPERM, 0, op_data->op_suppgids[0], 0);
1335 
1336 	ptlrpc_request_set_replen(req);
1337 
1338 	rc = mdc_queue_wait(req);
1339 	if (rc)
1340 		goto out;
1341 
1342 	req_hca = req_capsule_server_get(&req->rq_pill,
1343 					 &RMF_MDS_HSM_CURRENT_ACTION);
1344 	if (req_hca == NULL) {
1345 		rc = -EPROTO;
1346 		goto out;
1347 	}
1348 
1349 	*hca = *req_hca;
1350 
1351 out:
1352 	ptlrpc_req_finished(req);
1353 	return rc;
1354 }
1355 
mdc_ioc_hsm_ct_unregister(struct obd_import * imp)1356 static int mdc_ioc_hsm_ct_unregister(struct obd_import *imp)
1357 {
1358 	struct ptlrpc_request	*req;
1359 	int			 rc;
1360 
1361 	req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_HSM_CT_UNREGISTER,
1362 					LUSTRE_MDS_VERSION,
1363 					MDS_HSM_CT_UNREGISTER);
1364 	if (req == NULL) {
1365 		rc = -ENOMEM;
1366 		goto out;
1367 	}
1368 
1369 	mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, 0, 0);
1370 
1371 	ptlrpc_request_set_replen(req);
1372 
1373 	rc = mdc_queue_wait(req);
1374 	goto out;
1375 out:
1376 	ptlrpc_req_finished(req);
1377 	return rc;
1378 }
1379 
mdc_ioc_hsm_state_get(struct obd_export * exp,struct md_op_data * op_data)1380 static int mdc_ioc_hsm_state_get(struct obd_export *exp,
1381 				 struct md_op_data *op_data)
1382 {
1383 	struct hsm_user_state	*hus = op_data->op_data;
1384 	struct hsm_user_state	*req_hus;
1385 	struct ptlrpc_request	*req;
1386 	int			 rc;
1387 
1388 	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
1389 				   &RQF_MDS_HSM_STATE_GET);
1390 	if (req == NULL)
1391 		return -ENOMEM;
1392 
1393 	mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
1394 
1395 	rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_STATE_GET);
1396 	if (rc != 0) {
1397 		ptlrpc_request_free(req);
1398 		return rc;
1399 	}
1400 
1401 	mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
1402 		      OBD_MD_FLRMTPERM, 0, op_data->op_suppgids[0], 0);
1403 
1404 	ptlrpc_request_set_replen(req);
1405 
1406 	rc = mdc_queue_wait(req);
1407 	if (rc)
1408 		goto out;
1409 
1410 	req_hus = req_capsule_server_get(&req->rq_pill, &RMF_HSM_USER_STATE);
1411 	if (req_hus == NULL) {
1412 		rc = -EPROTO;
1413 		goto out;
1414 	}
1415 
1416 	*hus = *req_hus;
1417 
1418 out:
1419 	ptlrpc_req_finished(req);
1420 	return rc;
1421 }
1422 
mdc_ioc_hsm_state_set(struct obd_export * exp,struct md_op_data * op_data)1423 static int mdc_ioc_hsm_state_set(struct obd_export *exp,
1424 				 struct md_op_data *op_data)
1425 {
1426 	struct hsm_state_set	*hss = op_data->op_data;
1427 	struct hsm_state_set	*req_hss;
1428 	struct ptlrpc_request	*req;
1429 	int			 rc;
1430 
1431 	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
1432 				   &RQF_MDS_HSM_STATE_SET);
1433 	if (req == NULL)
1434 		return -ENOMEM;
1435 
1436 	mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
1437 
1438 	rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_STATE_SET);
1439 	if (rc) {
1440 		ptlrpc_request_free(req);
1441 		return rc;
1442 	}
1443 
1444 	mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
1445 		      OBD_MD_FLRMTPERM, 0, op_data->op_suppgids[0], 0);
1446 
1447 	/* Copy states */
1448 	req_hss = req_capsule_client_get(&req->rq_pill, &RMF_HSM_STATE_SET);
1449 	if (req_hss == NULL) {
1450 		rc = -EPROTO;
1451 		goto out;
1452 	}
1453 	*req_hss = *hss;
1454 
1455 	ptlrpc_request_set_replen(req);
1456 
1457 	rc = mdc_queue_wait(req);
1458 	goto out;
1459 
1460 out:
1461 	ptlrpc_req_finished(req);
1462 	return rc;
1463 }
1464 
mdc_ioc_hsm_request(struct obd_export * exp,struct hsm_user_request * hur)1465 static int mdc_ioc_hsm_request(struct obd_export *exp,
1466 			       struct hsm_user_request *hur)
1467 {
1468 	struct obd_import	*imp = class_exp2cliimp(exp);
1469 	struct ptlrpc_request	*req;
1470 	struct hsm_request	*req_hr;
1471 	struct hsm_user_item	*req_hui;
1472 	char			*req_opaque;
1473 	int			 rc;
1474 
1475 	req = ptlrpc_request_alloc(imp, &RQF_MDS_HSM_REQUEST);
1476 	if (req == NULL) {
1477 		rc = -ENOMEM;
1478 		goto out;
1479 	}
1480 
1481 	req_capsule_set_size(&req->rq_pill, &RMF_MDS_HSM_USER_ITEM, RCL_CLIENT,
1482 			     hur->hur_request.hr_itemcount
1483 			     * sizeof(struct hsm_user_item));
1484 	req_capsule_set_size(&req->rq_pill, &RMF_GENERIC_DATA, RCL_CLIENT,
1485 			     hur->hur_request.hr_data_len);
1486 
1487 	rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_REQUEST);
1488 	if (rc) {
1489 		ptlrpc_request_free(req);
1490 		return rc;
1491 	}
1492 
1493 	mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, 0, 0);
1494 
1495 	/* Copy hsm_request struct */
1496 	req_hr = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_REQUEST);
1497 	if (req_hr == NULL) {
1498 		rc = -EPROTO;
1499 		goto out;
1500 	}
1501 	*req_hr = hur->hur_request;
1502 
1503 	/* Copy hsm_user_item structs */
1504 	req_hui = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_USER_ITEM);
1505 	if (req_hui == NULL) {
1506 		rc = -EPROTO;
1507 		goto out;
1508 	}
1509 	memcpy(req_hui, hur->hur_user_item,
1510 	       hur->hur_request.hr_itemcount * sizeof(struct hsm_user_item));
1511 
1512 	/* Copy opaque field */
1513 	req_opaque = req_capsule_client_get(&req->rq_pill, &RMF_GENERIC_DATA);
1514 	if (req_opaque == NULL) {
1515 		rc = -EPROTO;
1516 		goto out;
1517 	}
1518 	memcpy(req_opaque, hur_data(hur), hur->hur_request.hr_data_len);
1519 
1520 	ptlrpc_request_set_replen(req);
1521 
1522 	rc = mdc_queue_wait(req);
1523 	goto out;
1524 
1525 out:
1526 	ptlrpc_req_finished(req);
1527 	return rc;
1528 }
1529 
changelog_kuc_hdr(char * buf,int len,int flags)1530 static struct kuc_hdr *changelog_kuc_hdr(char *buf, int len, int flags)
1531 {
1532 	struct kuc_hdr *lh = (struct kuc_hdr *)buf;
1533 
1534 	LASSERT(len <= KUC_CHANGELOG_MSG_MAXSIZE);
1535 
1536 	lh->kuc_magic = KUC_MAGIC;
1537 	lh->kuc_transport = KUC_TRANSPORT_CHANGELOG;
1538 	lh->kuc_flags = flags;
1539 	lh->kuc_msgtype = CL_RECORD;
1540 	lh->kuc_msglen = len;
1541 	return lh;
1542 }
1543 
1544 #define D_CHANGELOG 0
1545 
1546 struct changelog_show {
1547 	__u64		cs_startrec;
1548 	__u32		cs_flags;
1549 	struct file	*cs_fp;
1550 	char		*cs_buf;
1551 	struct obd_device *cs_obd;
1552 };
1553 
changelog_kkuc_cb(const struct lu_env * env,struct llog_handle * llh,struct llog_rec_hdr * hdr,void * data)1554 static int changelog_kkuc_cb(const struct lu_env *env, struct llog_handle *llh,
1555 			     struct llog_rec_hdr *hdr, void *data)
1556 {
1557 	struct changelog_show *cs = data;
1558 	struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr;
1559 	struct kuc_hdr *lh;
1560 	int len, rc;
1561 
1562 	if (rec->cr_hdr.lrh_type != CHANGELOG_REC) {
1563 		rc = -EINVAL;
1564 		CERROR("%s: not a changelog rec %x/%d: rc = %d\n",
1565 		       cs->cs_obd->obd_name, rec->cr_hdr.lrh_type,
1566 		       rec->cr.cr_type, rc);
1567 		return rc;
1568 	}
1569 
1570 	if (rec->cr.cr_index < cs->cs_startrec) {
1571 		/* Skip entries earlier than what we are interested in */
1572 		CDEBUG(D_CHANGELOG, "rec=%llu start=%llu\n",
1573 		       rec->cr.cr_index, cs->cs_startrec);
1574 		return 0;
1575 	}
1576 
1577 	CDEBUG(D_CHANGELOG, "%llu %02d%-5s %llu 0x%x t="DFID" p="DFID
1578 		" %.*s\n", rec->cr.cr_index, rec->cr.cr_type,
1579 		changelog_type2str(rec->cr.cr_type), rec->cr.cr_time,
1580 		rec->cr.cr_flags & CLF_FLAGMASK,
1581 		PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid),
1582 		rec->cr.cr_namelen, changelog_rec_name(&rec->cr));
1583 
1584 	len = sizeof(*lh) + changelog_rec_size(&rec->cr) + rec->cr.cr_namelen;
1585 
1586 	/* Set up the message */
1587 	lh = changelog_kuc_hdr(cs->cs_buf, len, cs->cs_flags);
1588 	memcpy(lh + 1, &rec->cr, len - sizeof(*lh));
1589 
1590 	rc = libcfs_kkuc_msg_put(cs->cs_fp, lh);
1591 	CDEBUG(D_CHANGELOG, "kucmsg fp %p len %d rc %d\n", cs->cs_fp, len, rc);
1592 
1593 	return rc;
1594 }
1595 
mdc_changelog_send_thread(void * csdata)1596 static int mdc_changelog_send_thread(void *csdata)
1597 {
1598 	struct changelog_show *cs = csdata;
1599 	struct llog_ctxt *ctxt = NULL;
1600 	struct llog_handle *llh = NULL;
1601 	struct kuc_hdr *kuch;
1602 	int rc;
1603 
1604 	CDEBUG(D_CHANGELOG, "changelog to fp=%p start %llu\n",
1605 	       cs->cs_fp, cs->cs_startrec);
1606 
1607 	OBD_ALLOC(cs->cs_buf, KUC_CHANGELOG_MSG_MAXSIZE);
1608 	if (cs->cs_buf == NULL) {
1609 		rc = -ENOMEM;
1610 		goto out;
1611 	}
1612 
1613 	/* Set up the remote catalog handle */
1614 	ctxt = llog_get_context(cs->cs_obd, LLOG_CHANGELOG_REPL_CTXT);
1615 	if (ctxt == NULL) {
1616 		rc = -ENOENT;
1617 		goto out;
1618 	}
1619 	rc = llog_open(NULL, ctxt, &llh, NULL, CHANGELOG_CATALOG,
1620 		       LLOG_OPEN_EXISTS);
1621 	if (rc) {
1622 		CERROR("%s: fail to open changelog catalog: rc = %d\n",
1623 		       cs->cs_obd->obd_name, rc);
1624 		goto out;
1625 	}
1626 	rc = llog_init_handle(NULL, llh, LLOG_F_IS_CAT, NULL);
1627 	if (rc) {
1628 		CERROR("llog_init_handle failed %d\n", rc);
1629 		goto out;
1630 	}
1631 
1632 	rc = llog_cat_process(NULL, llh, changelog_kkuc_cb, cs, 0, 0);
1633 
1634 	/* Send EOF no matter what our result */
1635 	kuch = changelog_kuc_hdr(cs->cs_buf, sizeof(*kuch), cs->cs_flags);
1636 	if (kuch) {
1637 		kuch->kuc_msgtype = CL_EOF;
1638 		libcfs_kkuc_msg_put(cs->cs_fp, kuch);
1639 	}
1640 
1641 out:
1642 	fput(cs->cs_fp);
1643 	if (llh)
1644 		llog_cat_close(NULL, llh);
1645 	if (ctxt)
1646 		llog_ctxt_put(ctxt);
1647 	if (cs->cs_buf)
1648 		OBD_FREE(cs->cs_buf, KUC_CHANGELOG_MSG_MAXSIZE);
1649 	OBD_FREE_PTR(cs);
1650 	return rc;
1651 }
1652 
mdc_ioc_changelog_send(struct obd_device * obd,struct ioc_changelog * icc)1653 static int mdc_ioc_changelog_send(struct obd_device *obd,
1654 				  struct ioc_changelog *icc)
1655 {
1656 	struct changelog_show *cs;
1657 	int rc;
1658 
1659 	/* Freed in mdc_changelog_send_thread */
1660 	OBD_ALLOC_PTR(cs);
1661 	if (!cs)
1662 		return -ENOMEM;
1663 
1664 	cs->cs_obd = obd;
1665 	cs->cs_startrec = icc->icc_recno;
1666 	/* matching fput in mdc_changelog_send_thread */
1667 	cs->cs_fp = fget(icc->icc_id);
1668 	cs->cs_flags = icc->icc_flags;
1669 
1670 	/*
1671 	 * New thread because we should return to user app before
1672 	 * writing into our pipe
1673 	 */
1674 	rc = PTR_ERR(kthread_run(mdc_changelog_send_thread, cs,
1675 				 "mdc_clg_send_thread"));
1676 	if (!IS_ERR_VALUE(rc)) {
1677 		CDEBUG(D_CHANGELOG, "start changelog thread\n");
1678 		return 0;
1679 	}
1680 
1681 	CERROR("Failed to start changelog thread: %d\n", rc);
1682 	OBD_FREE_PTR(cs);
1683 	return rc;
1684 }
1685 
1686 static int mdc_ioc_hsm_ct_start(struct obd_export *exp,
1687 				struct lustre_kernelcomm *lk);
1688 
mdc_quotacheck(struct obd_device * unused,struct obd_export * exp,struct obd_quotactl * oqctl)1689 static int mdc_quotacheck(struct obd_device *unused, struct obd_export *exp,
1690 			  struct obd_quotactl *oqctl)
1691 {
1692 	struct client_obd       *cli = &exp->exp_obd->u.cli;
1693 	struct ptlrpc_request   *req;
1694 	struct obd_quotactl     *body;
1695 	int		      rc;
1696 
1697 	req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
1698 					&RQF_MDS_QUOTACHECK, LUSTRE_MDS_VERSION,
1699 					MDS_QUOTACHECK);
1700 	if (req == NULL)
1701 		return -ENOMEM;
1702 
1703 	body = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1704 	*body = *oqctl;
1705 
1706 	ptlrpc_request_set_replen(req);
1707 
1708 	/* the next poll will find -ENODATA, that means quotacheck is
1709 	 * going on */
1710 	cli->cl_qchk_stat = -ENODATA;
1711 	rc = ptlrpc_queue_wait(req);
1712 	if (rc)
1713 		cli->cl_qchk_stat = rc;
1714 	ptlrpc_req_finished(req);
1715 	return rc;
1716 }
1717 
mdc_quota_poll_check(struct obd_export * exp,struct if_quotacheck * qchk)1718 static int mdc_quota_poll_check(struct obd_export *exp,
1719 				struct if_quotacheck *qchk)
1720 {
1721 	struct client_obd *cli = &exp->exp_obd->u.cli;
1722 	int rc;
1723 
1724 	qchk->obd_uuid = cli->cl_target_uuid;
1725 	memcpy(qchk->obd_type, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME));
1726 
1727 	rc = cli->cl_qchk_stat;
1728 	/* the client is not the previous one */
1729 	if (rc == CL_NOT_QUOTACHECKED)
1730 		rc = -EINTR;
1731 	return rc;
1732 }
1733 
mdc_quotactl(struct obd_device * unused,struct obd_export * exp,struct obd_quotactl * oqctl)1734 static int mdc_quotactl(struct obd_device *unused, struct obd_export *exp,
1735 			struct obd_quotactl *oqctl)
1736 {
1737 	struct ptlrpc_request   *req;
1738 	struct obd_quotactl     *oqc;
1739 	int		      rc;
1740 
1741 	req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
1742 					&RQF_MDS_QUOTACTL, LUSTRE_MDS_VERSION,
1743 					MDS_QUOTACTL);
1744 	if (req == NULL)
1745 		return -ENOMEM;
1746 
1747 	oqc = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1748 	*oqc = *oqctl;
1749 
1750 	ptlrpc_request_set_replen(req);
1751 	ptlrpc_at_set_req_timeout(req);
1752 	req->rq_no_resend = 1;
1753 
1754 	rc = ptlrpc_queue_wait(req);
1755 	if (rc)
1756 		CERROR("ptlrpc_queue_wait failed, rc: %d\n", rc);
1757 
1758 	if (req->rq_repmsg) {
1759 		oqc = req_capsule_server_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1760 		if (oqc) {
1761 			*oqctl = *oqc;
1762 		} else if (!rc) {
1763 			CERROR("Can't unpack obd_quotactl\n");
1764 			rc = -EPROTO;
1765 		}
1766 	} else if (!rc) {
1767 		CERROR("Can't unpack obd_quotactl\n");
1768 		rc = -EPROTO;
1769 	}
1770 	ptlrpc_req_finished(req);
1771 
1772 	return rc;
1773 }
1774 
mdc_ioc_swap_layouts(struct obd_export * exp,struct md_op_data * op_data)1775 static int mdc_ioc_swap_layouts(struct obd_export *exp,
1776 				struct md_op_data *op_data)
1777 {
1778 	LIST_HEAD(cancels);
1779 	struct ptlrpc_request	*req;
1780 	int			 rc, count;
1781 	struct mdc_swap_layouts *msl, *payload;
1782 
1783 	msl = op_data->op_data;
1784 
1785 	/* When the MDT will get the MDS_SWAP_LAYOUTS RPC the
1786 	 * first thing it will do is to cancel the 2 layout
1787 	 * locks hold by this client.
1788 	 * So the client must cancel its layout locks on the 2 fids
1789 	 * with the request RPC to avoid extra RPC round trips
1790 	 */
1791 	count = mdc_resource_get_unused(exp, &op_data->op_fid1, &cancels,
1792 					LCK_CR, MDS_INODELOCK_LAYOUT);
1793 	count += mdc_resource_get_unused(exp, &op_data->op_fid2, &cancels,
1794 					 LCK_CR, MDS_INODELOCK_LAYOUT);
1795 
1796 	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
1797 				   &RQF_MDS_SWAP_LAYOUTS);
1798 	if (req == NULL) {
1799 		ldlm_lock_list_put(&cancels, l_bl_ast, count);
1800 		return -ENOMEM;
1801 	}
1802 
1803 	mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
1804 	mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa2);
1805 
1806 	rc = mdc_prep_elc_req(exp, req, MDS_SWAP_LAYOUTS, &cancels, count);
1807 	if (rc) {
1808 		ptlrpc_request_free(req);
1809 		return rc;
1810 	}
1811 
1812 	mdc_swap_layouts_pack(req, op_data);
1813 
1814 	payload = req_capsule_client_get(&req->rq_pill, &RMF_SWAP_LAYOUTS);
1815 	LASSERT(payload);
1816 
1817 	*payload = *msl;
1818 
1819 	ptlrpc_request_set_replen(req);
1820 
1821 	rc = ptlrpc_queue_wait(req);
1822 	if (rc)
1823 		goto out;
1824 
1825 out:
1826 	ptlrpc_req_finished(req);
1827 	return rc;
1828 }
1829 
mdc_iocontrol(unsigned int cmd,struct obd_export * exp,int len,void * karg,void * uarg)1830 static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1831 			 void *karg, void *uarg)
1832 {
1833 	struct obd_device *obd = exp->exp_obd;
1834 	struct obd_ioctl_data *data = karg;
1835 	struct obd_import *imp = obd->u.cli.cl_import;
1836 	int rc;
1837 
1838 	if (!try_module_get(THIS_MODULE)) {
1839 		CERROR("Can't get module. Is it alive?");
1840 		return -EINVAL;
1841 	}
1842 	switch (cmd) {
1843 	case OBD_IOC_CHANGELOG_SEND:
1844 		rc = mdc_ioc_changelog_send(obd, karg);
1845 		goto out;
1846 	case OBD_IOC_CHANGELOG_CLEAR: {
1847 		struct ioc_changelog *icc = karg;
1848 		struct changelog_setinfo cs = {
1849 			.cs_recno = icc->icc_recno,
1850 			.cs_id = icc->icc_id
1851 		};
1852 
1853 		rc = obd_set_info_async(NULL, exp, strlen(KEY_CHANGELOG_CLEAR),
1854 					KEY_CHANGELOG_CLEAR, sizeof(cs), &cs,
1855 					NULL);
1856 		goto out;
1857 	}
1858 	case OBD_IOC_FID2PATH:
1859 		rc = mdc_ioc_fid2path(exp, karg);
1860 		goto out;
1861 	case LL_IOC_HSM_CT_START:
1862 		rc = mdc_ioc_hsm_ct_start(exp, karg);
1863 		/* ignore if it was already registered on this MDS. */
1864 		if (rc == -EEXIST)
1865 			rc = 0;
1866 		goto out;
1867 	case LL_IOC_HSM_PROGRESS:
1868 		rc = mdc_ioc_hsm_progress(exp, karg);
1869 		goto out;
1870 	case LL_IOC_HSM_STATE_GET:
1871 		rc = mdc_ioc_hsm_state_get(exp, karg);
1872 		goto out;
1873 	case LL_IOC_HSM_STATE_SET:
1874 		rc = mdc_ioc_hsm_state_set(exp, karg);
1875 		goto out;
1876 	case LL_IOC_HSM_ACTION:
1877 		rc = mdc_ioc_hsm_current_action(exp, karg);
1878 		goto out;
1879 	case LL_IOC_HSM_REQUEST:
1880 		rc = mdc_ioc_hsm_request(exp, karg);
1881 		goto out;
1882 	case OBD_IOC_CLIENT_RECOVER:
1883 		rc = ptlrpc_recover_import(imp, data->ioc_inlbuf1, 0);
1884 		if (rc < 0)
1885 			goto out;
1886 		rc = 0;
1887 		goto out;
1888 	case IOC_OSC_SET_ACTIVE:
1889 		rc = ptlrpc_set_import_active(imp, data->ioc_offset);
1890 		goto out;
1891 	case OBD_IOC_POLL_QUOTACHECK:
1892 		rc = mdc_quota_poll_check(exp, (struct if_quotacheck *)karg);
1893 		goto out;
1894 	case OBD_IOC_PING_TARGET:
1895 		rc = ptlrpc_obd_ping(obd);
1896 		goto out;
1897 	/*
1898 	 * Normally IOC_OBD_STATFS, OBD_IOC_QUOTACTL iocontrol are handled by
1899 	 * LMV instead of MDC. But when the cluster is upgraded from 1.8,
1900 	 * there'd be no LMV layer thus we might be called here. Eventually
1901 	 * this code should be removed.
1902 	 * bz20731, LU-592.
1903 	 */
1904 	case IOC_OBD_STATFS: {
1905 		struct obd_statfs stat_buf = {0};
1906 
1907 		if (*((__u32 *) data->ioc_inlbuf2) != 0) {
1908 			rc = -ENODEV;
1909 			goto out;
1910 		}
1911 
1912 		/* copy UUID */
1913 		if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(obd),
1914 				 min_t(size_t, data->ioc_plen2,
1915 					       sizeof(struct obd_uuid)))) {
1916 			rc = -EFAULT;
1917 			goto out;
1918 		}
1919 
1920 		rc = mdc_statfs(NULL, obd->obd_self_export, &stat_buf,
1921 				cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
1922 				0);
1923 		if (rc != 0)
1924 			goto out;
1925 
1926 		if (copy_to_user(data->ioc_pbuf1, &stat_buf,
1927 				 min_t(size_t, data->ioc_plen1,
1928 					       sizeof(stat_buf)))) {
1929 			rc = -EFAULT;
1930 			goto out;
1931 		}
1932 
1933 		rc = 0;
1934 		goto out;
1935 	}
1936 	case OBD_IOC_QUOTACTL: {
1937 		struct if_quotactl *qctl = karg;
1938 		struct obd_quotactl *oqctl;
1939 
1940 		OBD_ALLOC_PTR(oqctl);
1941 		if (oqctl == NULL) {
1942 			rc = -ENOMEM;
1943 			goto out;
1944 		}
1945 
1946 		QCTL_COPY(oqctl, qctl);
1947 		rc = obd_quotactl(exp, oqctl);
1948 		if (rc == 0) {
1949 			QCTL_COPY(qctl, oqctl);
1950 			qctl->qc_valid = QC_MDTIDX;
1951 			qctl->obd_uuid = obd->u.cli.cl_target_uuid;
1952 		}
1953 
1954 		OBD_FREE_PTR(oqctl);
1955 		goto out;
1956 	}
1957 	case LL_IOC_GET_CONNECT_FLAGS:
1958 		if (copy_to_user(uarg, exp_connect_flags_ptr(exp),
1959 				 sizeof(*exp_connect_flags_ptr(exp)))) {
1960 			rc = -EFAULT;
1961 			goto out;
1962 		}
1963 
1964 		rc = 0;
1965 		goto out;
1966 	case LL_IOC_LOV_SWAP_LAYOUTS:
1967 		rc = mdc_ioc_swap_layouts(exp, karg);
1968 		goto out;
1969 	default:
1970 		CERROR("unrecognised ioctl: cmd = %#x\n", cmd);
1971 		rc = -ENOTTY;
1972 		goto out;
1973 	}
1974 out:
1975 	module_put(THIS_MODULE);
1976 
1977 	return rc;
1978 }
1979 
mdc_get_info_rpc(struct obd_export * exp,u32 keylen,void * key,int vallen,void * val)1980 static int mdc_get_info_rpc(struct obd_export *exp,
1981 			    u32 keylen, void *key,
1982 			    int vallen, void *val)
1983 {
1984 	struct obd_import      *imp = class_exp2cliimp(exp);
1985 	struct ptlrpc_request  *req;
1986 	char		   *tmp;
1987 	int		     rc = -EINVAL;
1988 
1989 	req = ptlrpc_request_alloc(imp, &RQF_MDS_GET_INFO);
1990 	if (req == NULL)
1991 		return -ENOMEM;
1992 
1993 	req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_KEY,
1994 			     RCL_CLIENT, keylen);
1995 	req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_VALLEN,
1996 			     RCL_CLIENT, sizeof(__u32));
1997 
1998 	rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GET_INFO);
1999 	if (rc) {
2000 		ptlrpc_request_free(req);
2001 		return rc;
2002 	}
2003 
2004 	tmp = req_capsule_client_get(&req->rq_pill, &RMF_GETINFO_KEY);
2005 	memcpy(tmp, key, keylen);
2006 	tmp = req_capsule_client_get(&req->rq_pill, &RMF_GETINFO_VALLEN);
2007 	memcpy(tmp, &vallen, sizeof(__u32));
2008 
2009 	req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_VAL,
2010 			     RCL_SERVER, vallen);
2011 	ptlrpc_request_set_replen(req);
2012 
2013 	rc = ptlrpc_queue_wait(req);
2014 	/* -EREMOTE means the get_info result is partial, and it needs to
2015 	 * continue on another MDT, see fid2path part in lmv_iocontrol */
2016 	if (rc == 0 || rc == -EREMOTE) {
2017 		tmp = req_capsule_server_get(&req->rq_pill, &RMF_GETINFO_VAL);
2018 		memcpy(val, tmp, vallen);
2019 		if (ptlrpc_rep_need_swab(req)) {
2020 			if (KEY_IS(KEY_FID2PATH))
2021 				lustre_swab_fid2path(val);
2022 		}
2023 	}
2024 	ptlrpc_req_finished(req);
2025 
2026 	return rc;
2027 }
2028 
lustre_swab_hai(struct hsm_action_item * h)2029 static void lustre_swab_hai(struct hsm_action_item *h)
2030 {
2031 	__swab32s(&h->hai_len);
2032 	__swab32s(&h->hai_action);
2033 	lustre_swab_lu_fid(&h->hai_fid);
2034 	lustre_swab_lu_fid(&h->hai_dfid);
2035 	__swab64s(&h->hai_cookie);
2036 	__swab64s(&h->hai_extent.offset);
2037 	__swab64s(&h->hai_extent.length);
2038 	__swab64s(&h->hai_gid);
2039 }
2040 
lustre_swab_hal(struct hsm_action_list * h)2041 static void lustre_swab_hal(struct hsm_action_list *h)
2042 {
2043 	struct hsm_action_item	*hai;
2044 	int			 i;
2045 
2046 	__swab32s(&h->hal_version);
2047 	__swab32s(&h->hal_count);
2048 	__swab32s(&h->hal_archive_id);
2049 	__swab64s(&h->hal_flags);
2050 	hai = hai_zero(h);
2051 	for (i = 0; i < h->hal_count; i++, hai = hai_next(hai))
2052 		lustre_swab_hai(hai);
2053 }
2054 
lustre_swab_kuch(struct kuc_hdr * l)2055 static void lustre_swab_kuch(struct kuc_hdr *l)
2056 {
2057 	__swab16s(&l->kuc_magic);
2058 	/* __u8 l->kuc_transport */
2059 	__swab16s(&l->kuc_msgtype);
2060 	__swab16s(&l->kuc_msglen);
2061 }
2062 
mdc_ioc_hsm_ct_start(struct obd_export * exp,struct lustre_kernelcomm * lk)2063 static int mdc_ioc_hsm_ct_start(struct obd_export *exp,
2064 				struct lustre_kernelcomm *lk)
2065 {
2066 	struct obd_import  *imp = class_exp2cliimp(exp);
2067 	__u32		    archive = lk->lk_data;
2068 	int		    rc = 0;
2069 
2070 	if (lk->lk_group != KUC_GRP_HSM) {
2071 		CERROR("Bad copytool group %d\n", lk->lk_group);
2072 		return -EINVAL;
2073 	}
2074 
2075 	CDEBUG(D_HSM, "CT start r%d w%d u%d g%d f%#x\n", lk->lk_rfd, lk->lk_wfd,
2076 	       lk->lk_uid, lk->lk_group, lk->lk_flags);
2077 
2078 	if (lk->lk_flags & LK_FLG_STOP) {
2079 		/* Unregister with the coordinator */
2080 		rc = mdc_ioc_hsm_ct_unregister(imp);
2081 	} else {
2082 		rc = mdc_ioc_hsm_ct_register(imp, archive);
2083 	}
2084 
2085 	return rc;
2086 }
2087 
2088 /**
2089  * Send a message to any listening copytools
2090  * @param val KUC message (kuc_hdr + hsm_action_list)
2091  * @param len total length of message
2092  */
mdc_hsm_copytool_send(int len,void * val)2093 static int mdc_hsm_copytool_send(int len, void *val)
2094 {
2095 	struct kuc_hdr		*lh = (struct kuc_hdr *)val;
2096 	struct hsm_action_list	*hal = (struct hsm_action_list *)(lh + 1);
2097 	int			 rc;
2098 
2099 	if (len < sizeof(*lh) + sizeof(*hal)) {
2100 		CERROR("Short HSM message %d < %d\n", len,
2101 		       (int) (sizeof(*lh) + sizeof(*hal)));
2102 		return -EPROTO;
2103 	}
2104 	if (lh->kuc_magic == __swab16(KUC_MAGIC)) {
2105 		lustre_swab_kuch(lh);
2106 		lustre_swab_hal(hal);
2107 	} else if (lh->kuc_magic != KUC_MAGIC) {
2108 		CERROR("Bad magic %x!=%x\n", lh->kuc_magic, KUC_MAGIC);
2109 		return -EPROTO;
2110 	}
2111 
2112 	CDEBUG(D_HSM,
2113 	       "Received message mg=%x t=%d m=%d l=%d actions=%d on %s\n",
2114 	       lh->kuc_magic, lh->kuc_transport, lh->kuc_msgtype,
2115 	       lh->kuc_msglen, hal->hal_count, hal->hal_fsname);
2116 
2117 	/* Broadcast to HSM listeners */
2118 	rc = libcfs_kkuc_group_put(KUC_GRP_HSM, lh);
2119 
2120 	return rc;
2121 }
2122 
2123 /**
2124  * callback function passed to kuc for re-registering each HSM copytool
2125  * running on MDC, after MDT shutdown/recovery.
2126  * @param data archive id served by the copytool
2127  * @param cb_arg callback argument (obd_import)
2128  */
mdc_hsm_ct_reregister(__u32 data,void * cb_arg)2129 static int mdc_hsm_ct_reregister(__u32 data, void *cb_arg)
2130 {
2131 	struct obd_import	*imp = (struct obd_import *)cb_arg;
2132 	__u32			 archive = data;
2133 	int			 rc;
2134 
2135 	CDEBUG(D_HA, "recover copytool registration to MDT (archive=%#x)\n",
2136 	       archive);
2137 	rc = mdc_ioc_hsm_ct_register(imp, archive);
2138 
2139 	/* ignore error if the copytool is already registered */
2140 	return ((rc != 0) && (rc != -EEXIST)) ? rc : 0;
2141 }
2142 
2143 /**
2144  * Re-establish all kuc contexts with MDT
2145  * after MDT shutdown/recovery.
2146  */
mdc_kuc_reregister(struct obd_import * imp)2147 static int mdc_kuc_reregister(struct obd_import *imp)
2148 {
2149 	/* re-register HSM agents */
2150 	return libcfs_kkuc_group_foreach(KUC_GRP_HSM, mdc_hsm_ct_reregister,
2151 					 (void *)imp);
2152 }
2153 
mdc_set_info_async(const struct lu_env * env,struct obd_export * exp,u32 keylen,void * key,u32 vallen,void * val,struct ptlrpc_request_set * set)2154 static int mdc_set_info_async(const struct lu_env *env,
2155 			      struct obd_export *exp,
2156 			      u32 keylen, void *key,
2157 			      u32 vallen, void *val,
2158 			      struct ptlrpc_request_set *set)
2159 {
2160 	struct obd_import	*imp = class_exp2cliimp(exp);
2161 	int			 rc;
2162 
2163 	if (KEY_IS(KEY_READ_ONLY)) {
2164 		if (vallen != sizeof(int))
2165 			return -EINVAL;
2166 
2167 		spin_lock(&imp->imp_lock);
2168 		if (*((int *)val)) {
2169 			imp->imp_connect_flags_orig |= OBD_CONNECT_RDONLY;
2170 			imp->imp_connect_data.ocd_connect_flags |=
2171 							OBD_CONNECT_RDONLY;
2172 		} else {
2173 			imp->imp_connect_flags_orig &= ~OBD_CONNECT_RDONLY;
2174 			imp->imp_connect_data.ocd_connect_flags &=
2175 							~OBD_CONNECT_RDONLY;
2176 		}
2177 		spin_unlock(&imp->imp_lock);
2178 
2179 		rc = do_set_info_async(imp, MDS_SET_INFO, LUSTRE_MDS_VERSION,
2180 				       keylen, key, vallen, val, set);
2181 		return rc;
2182 	}
2183 	if (KEY_IS(KEY_SPTLRPC_CONF)) {
2184 		sptlrpc_conf_client_adapt(exp->exp_obd);
2185 		return 0;
2186 	}
2187 	if (KEY_IS(KEY_FLUSH_CTX)) {
2188 		sptlrpc_import_flush_my_ctx(imp);
2189 		return 0;
2190 	}
2191 	if (KEY_IS(KEY_CHANGELOG_CLEAR)) {
2192 		rc = do_set_info_async(imp, MDS_SET_INFO, LUSTRE_MDS_VERSION,
2193 				       keylen, key, vallen, val, set);
2194 		return rc;
2195 	}
2196 	if (KEY_IS(KEY_HSM_COPYTOOL_SEND)) {
2197 		rc = mdc_hsm_copytool_send(vallen, val);
2198 		return rc;
2199 	}
2200 
2201 	CERROR("Unknown key %s\n", (char *)key);
2202 	return -EINVAL;
2203 }
2204 
mdc_get_info(const struct lu_env * env,struct obd_export * exp,__u32 keylen,void * key,__u32 * vallen,void * val,struct lov_stripe_md * lsm)2205 static int mdc_get_info(const struct lu_env *env, struct obd_export *exp,
2206 			__u32 keylen, void *key, __u32 *vallen, void *val,
2207 			struct lov_stripe_md *lsm)
2208 {
2209 	int rc = -EINVAL;
2210 
2211 	if (KEY_IS(KEY_MAX_EASIZE)) {
2212 		int mdsize, *max_easize;
2213 
2214 		if (*vallen != sizeof(int))
2215 			return -EINVAL;
2216 		mdsize = *(int *)val;
2217 		if (mdsize > exp->exp_obd->u.cli.cl_max_mds_easize)
2218 			exp->exp_obd->u.cli.cl_max_mds_easize = mdsize;
2219 		max_easize = val;
2220 		*max_easize = exp->exp_obd->u.cli.cl_max_mds_easize;
2221 		return 0;
2222 	} else if (KEY_IS(KEY_DEFAULT_EASIZE)) {
2223 		int *default_easize;
2224 
2225 		if (*vallen != sizeof(int))
2226 			return -EINVAL;
2227 		default_easize = val;
2228 		*default_easize = exp->exp_obd->u.cli.cl_default_mds_easize;
2229 		return 0;
2230 	} else if (KEY_IS(KEY_MAX_COOKIESIZE)) {
2231 		int mdsize, *max_cookiesize;
2232 
2233 		if (*vallen != sizeof(int))
2234 			return -EINVAL;
2235 		mdsize = *(int *)val;
2236 		if (mdsize > exp->exp_obd->u.cli.cl_max_mds_cookiesize)
2237 			exp->exp_obd->u.cli.cl_max_mds_cookiesize = mdsize;
2238 		max_cookiesize = val;
2239 		*max_cookiesize = exp->exp_obd->u.cli.cl_max_mds_cookiesize;
2240 		return 0;
2241 	} else if (KEY_IS(KEY_DEFAULT_COOKIESIZE)) {
2242 		int *default_cookiesize;
2243 
2244 		if (*vallen != sizeof(int))
2245 			return -EINVAL;
2246 		default_cookiesize = val;
2247 		*default_cookiesize =
2248 			exp->exp_obd->u.cli.cl_default_mds_cookiesize;
2249 		return 0;
2250 	} else if (KEY_IS(KEY_CONN_DATA)) {
2251 		struct obd_import *imp = class_exp2cliimp(exp);
2252 		struct obd_connect_data *data = val;
2253 
2254 		if (*vallen != sizeof(*data))
2255 			return -EINVAL;
2256 
2257 		*data = imp->imp_connect_data;
2258 		return 0;
2259 	} else if (KEY_IS(KEY_TGT_COUNT)) {
2260 		*((int *)val) = 1;
2261 		return 0;
2262 	}
2263 
2264 	rc = mdc_get_info_rpc(exp, keylen, key, *vallen, val);
2265 
2266 	return rc;
2267 }
2268 
mdc_sync(struct obd_export * exp,const struct lu_fid * fid,struct obd_capa * oc,struct ptlrpc_request ** request)2269 static int mdc_sync(struct obd_export *exp, const struct lu_fid *fid,
2270 		    struct obd_capa *oc, struct ptlrpc_request **request)
2271 {
2272 	struct ptlrpc_request *req;
2273 	int		    rc;
2274 
2275 	*request = NULL;
2276 	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_SYNC);
2277 	if (req == NULL)
2278 		return -ENOMEM;
2279 
2280 	mdc_set_capa_size(req, &RMF_CAPA1, oc);
2281 
2282 	rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_SYNC);
2283 	if (rc) {
2284 		ptlrpc_request_free(req);
2285 		return rc;
2286 	}
2287 
2288 	mdc_pack_body(req, fid, oc, 0, 0, -1, 0);
2289 
2290 	ptlrpc_request_set_replen(req);
2291 
2292 	rc = ptlrpc_queue_wait(req);
2293 	if (rc)
2294 		ptlrpc_req_finished(req);
2295 	else
2296 		*request = req;
2297 	return rc;
2298 }
2299 
mdc_import_event(struct obd_device * obd,struct obd_import * imp,enum obd_import_event event)2300 static int mdc_import_event(struct obd_device *obd, struct obd_import *imp,
2301 			    enum obd_import_event event)
2302 {
2303 	int rc = 0;
2304 
2305 	LASSERT(imp->imp_obd == obd);
2306 
2307 	switch (event) {
2308 	case IMP_EVENT_DISCON: {
2309 #if 0
2310 		/* XXX Pass event up to OBDs stack. used only for FLD now */
2311 		rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DISCON, NULL);
2312 #endif
2313 		break;
2314 	}
2315 	case IMP_EVENT_INACTIVE: {
2316 		struct client_obd *cli = &obd->u.cli;
2317 		/*
2318 		 * Flush current sequence to make client obtain new one
2319 		 * from server in case of disconnect/reconnect.
2320 		 */
2321 		if (cli->cl_seq != NULL)
2322 			seq_client_flush(cli->cl_seq);
2323 
2324 		rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2325 		break;
2326 	}
2327 	case IMP_EVENT_INVALIDATE: {
2328 		struct ldlm_namespace *ns = obd->obd_namespace;
2329 
2330 		ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2331 
2332 		break;
2333 	}
2334 	case IMP_EVENT_ACTIVE:
2335 		rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2336 		/* redo the kuc registration after reconnecting */
2337 		if (rc == 0)
2338 			rc = mdc_kuc_reregister(imp);
2339 		break;
2340 	case IMP_EVENT_OCD:
2341 		rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2342 		break;
2343 	case IMP_EVENT_DEACTIVATE:
2344 	case IMP_EVENT_ACTIVATE:
2345 		break;
2346 	default:
2347 		CERROR("Unknown import event %x\n", event);
2348 		LBUG();
2349 	}
2350 	return rc;
2351 }
2352 
mdc_fid_alloc(struct obd_export * exp,struct lu_fid * fid,struct md_op_data * op_data)2353 int mdc_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
2354 		  struct md_op_data *op_data)
2355 {
2356 	struct client_obd *cli = &exp->exp_obd->u.cli;
2357 	struct lu_client_seq *seq = cli->cl_seq;
2358 
2359 	return seq_client_alloc_fid(NULL, seq, fid);
2360 }
2361 
mdc_get_uuid(struct obd_export * exp)2362 static struct obd_uuid *mdc_get_uuid(struct obd_export *exp)
2363 {
2364 	struct client_obd *cli = &exp->exp_obd->u.cli;
2365 
2366 	return &cli->cl_target_uuid;
2367 }
2368 
2369 /**
2370  * Determine whether the lock can be canceled before replaying it during
2371  * recovery, non zero value will be return if the lock can be canceled,
2372  * or zero returned for not
2373  */
mdc_cancel_for_recovery(struct ldlm_lock * lock)2374 static int mdc_cancel_for_recovery(struct ldlm_lock *lock)
2375 {
2376 	if (lock->l_resource->lr_type != LDLM_IBITS)
2377 		return 0;
2378 
2379 	/* FIXME: if we ever get into a situation where there are too many
2380 	 * opened files with open locks on a single node, then we really
2381 	 * should replay these open locks to reget it */
2382 	if (lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_OPEN)
2383 		return 0;
2384 
2385 	return 1;
2386 }
2387 
mdc_resource_inode_free(struct ldlm_resource * res)2388 static int mdc_resource_inode_free(struct ldlm_resource *res)
2389 {
2390 	if (res->lr_lvb_inode)
2391 		res->lr_lvb_inode = NULL;
2392 
2393 	return 0;
2394 }
2395 
2396 static struct ldlm_valblock_ops inode_lvbo = {
2397 	.lvbo_free = mdc_resource_inode_free,
2398 };
2399 
mdc_llog_init(struct obd_device * obd)2400 static int mdc_llog_init(struct obd_device *obd)
2401 {
2402 	struct obd_llog_group	*olg = &obd->obd_olg;
2403 	struct llog_ctxt	*ctxt;
2404 	int			 rc;
2405 
2406 	rc = llog_setup(NULL, obd, olg, LLOG_CHANGELOG_REPL_CTXT, obd,
2407 			&llog_client_ops);
2408 	if (rc)
2409 		return rc;
2410 
2411 	ctxt = llog_group_get_ctxt(olg, LLOG_CHANGELOG_REPL_CTXT);
2412 	llog_initiator_connect(ctxt);
2413 	llog_ctxt_put(ctxt);
2414 
2415 	return 0;
2416 }
2417 
mdc_llog_finish(struct obd_device * obd)2418 static void mdc_llog_finish(struct obd_device *obd)
2419 {
2420 	struct llog_ctxt *ctxt;
2421 
2422 	ctxt = llog_get_context(obd, LLOG_CHANGELOG_REPL_CTXT);
2423 	if (ctxt)
2424 		llog_cleanup(NULL, ctxt);
2425 }
2426 
mdc_setup(struct obd_device * obd,struct lustre_cfg * cfg)2427 static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg)
2428 {
2429 	struct client_obd *cli = &obd->u.cli;
2430 	struct lprocfs_static_vars lvars = { NULL };
2431 	int rc;
2432 
2433 	OBD_ALLOC(cli->cl_rpc_lock, sizeof(*cli->cl_rpc_lock));
2434 	if (!cli->cl_rpc_lock)
2435 		return -ENOMEM;
2436 	mdc_init_rpc_lock(cli->cl_rpc_lock);
2437 
2438 	ptlrpcd_addref();
2439 
2440 	OBD_ALLOC(cli->cl_close_lock, sizeof(*cli->cl_close_lock));
2441 	if (!cli->cl_close_lock) {
2442 		rc = -ENOMEM;
2443 		goto err_rpc_lock;
2444 	}
2445 	mdc_init_rpc_lock(cli->cl_close_lock);
2446 
2447 	rc = client_obd_setup(obd, cfg);
2448 	if (rc)
2449 		goto err_close_lock;
2450 	lprocfs_mdc_init_vars(&lvars);
2451 	lprocfs_obd_setup(obd, lvars.obd_vars);
2452 	sptlrpc_lprocfs_cliobd_attach(obd);
2453 	ptlrpc_lprocfs_register_obd(obd);
2454 
2455 	ns_register_cancel(obd->obd_namespace, mdc_cancel_for_recovery);
2456 
2457 	obd->obd_namespace->ns_lvbo = &inode_lvbo;
2458 
2459 	rc = mdc_llog_init(obd);
2460 	if (rc) {
2461 		mdc_cleanup(obd);
2462 		CERROR("failed to setup llogging subsystems\n");
2463 	}
2464 
2465 	return rc;
2466 
2467 err_close_lock:
2468 	OBD_FREE(cli->cl_close_lock, sizeof(*cli->cl_close_lock));
2469 err_rpc_lock:
2470 	OBD_FREE(cli->cl_rpc_lock, sizeof(*cli->cl_rpc_lock));
2471 	ptlrpcd_decref();
2472 	return rc;
2473 }
2474 
2475 /* Initialize the default and maximum LOV EA and cookie sizes.  This allows
2476  * us to make MDS RPCs with large enough reply buffers to hold a default
2477  * sized EA and cookie without having to calculate this (via a call into the
2478  * LOV + OSCs) each time we make an RPC.  The maximum size is also tracked
2479  * but not used to avoid wastefully vmalloc()'ing large reply buffers when
2480  * a large number of stripes is possible.  If a larger reply buffer is
2481  * required it will be reallocated in the ptlrpc layer due to overflow.
2482  */
mdc_init_ea_size(struct obd_export * exp,int easize,int def_easize,int cookiesize,int def_cookiesize)2483 static int mdc_init_ea_size(struct obd_export *exp, int easize,
2484 			    int def_easize, int cookiesize, int def_cookiesize)
2485 {
2486 	struct obd_device *obd = exp->exp_obd;
2487 	struct client_obd *cli = &obd->u.cli;
2488 
2489 	if (cli->cl_max_mds_easize < easize)
2490 		cli->cl_max_mds_easize = easize;
2491 
2492 	if (cli->cl_default_mds_easize < def_easize)
2493 		cli->cl_default_mds_easize = def_easize;
2494 
2495 	if (cli->cl_max_mds_cookiesize < cookiesize)
2496 		cli->cl_max_mds_cookiesize = cookiesize;
2497 
2498 	if (cli->cl_default_mds_cookiesize < def_cookiesize)
2499 		cli->cl_default_mds_cookiesize = def_cookiesize;
2500 
2501 	return 0;
2502 }
2503 
mdc_precleanup(struct obd_device * obd,enum obd_cleanup_stage stage)2504 static int mdc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2505 {
2506 	switch (stage) {
2507 	case OBD_CLEANUP_EARLY:
2508 		break;
2509 	case OBD_CLEANUP_EXPORTS:
2510 		/* Failsafe, ok if racy */
2511 		if (obd->obd_type->typ_refcnt <= 1)
2512 			libcfs_kkuc_group_rem(0, KUC_GRP_HSM);
2513 
2514 		obd_cleanup_client_import(obd);
2515 		ptlrpc_lprocfs_unregister_obd(obd);
2516 		lprocfs_obd_cleanup(obd);
2517 
2518 		mdc_llog_finish(obd);
2519 		break;
2520 	}
2521 	return 0;
2522 }
2523 
mdc_cleanup(struct obd_device * obd)2524 static int mdc_cleanup(struct obd_device *obd)
2525 {
2526 	struct client_obd *cli = &obd->u.cli;
2527 
2528 	OBD_FREE(cli->cl_rpc_lock, sizeof(*cli->cl_rpc_lock));
2529 	OBD_FREE(cli->cl_close_lock, sizeof(*cli->cl_close_lock));
2530 
2531 	ptlrpcd_decref();
2532 
2533 	return client_obd_cleanup(obd);
2534 }
2535 
mdc_process_config(struct obd_device * obd,u32 len,void * buf)2536 static int mdc_process_config(struct obd_device *obd, u32 len, void *buf)
2537 {
2538 	struct lustre_cfg *lcfg = buf;
2539 	struct lprocfs_static_vars lvars = { NULL };
2540 	int rc = 0;
2541 
2542 	lprocfs_mdc_init_vars(&lvars);
2543 	switch (lcfg->lcfg_command) {
2544 	default:
2545 		rc = class_process_proc_param(PARAM_MDC, lvars.obd_vars,
2546 					      lcfg, obd);
2547 		if (rc > 0)
2548 			rc = 0;
2549 		break;
2550 	}
2551 	return rc;
2552 }
2553 
2554 
2555 /* get remote permission for current user on fid */
mdc_get_remote_perm(struct obd_export * exp,const struct lu_fid * fid,struct obd_capa * oc,__u32 suppgid,struct ptlrpc_request ** request)2556 static int mdc_get_remote_perm(struct obd_export *exp, const struct lu_fid *fid,
2557 			       struct obd_capa *oc, __u32 suppgid,
2558 			       struct ptlrpc_request **request)
2559 {
2560 	struct ptlrpc_request  *req;
2561 	int		    rc;
2562 
2563 	LASSERT(client_is_remote(exp));
2564 
2565 	*request = NULL;
2566 	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_GETATTR);
2567 	if (req == NULL)
2568 		return -ENOMEM;
2569 
2570 	mdc_set_capa_size(req, &RMF_CAPA1, oc);
2571 
2572 	rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GETATTR);
2573 	if (rc) {
2574 		ptlrpc_request_free(req);
2575 		return rc;
2576 	}
2577 
2578 	mdc_pack_body(req, fid, oc, OBD_MD_FLRMTPERM, 0, suppgid, 0);
2579 
2580 	req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
2581 			     sizeof(struct mdt_remote_perm));
2582 
2583 	ptlrpc_request_set_replen(req);
2584 
2585 	rc = ptlrpc_queue_wait(req);
2586 	if (rc)
2587 		ptlrpc_req_finished(req);
2588 	else
2589 		*request = req;
2590 	return rc;
2591 }
2592 
mdc_interpret_renew_capa(const struct lu_env * env,struct ptlrpc_request * req,void * args,int status)2593 static int mdc_interpret_renew_capa(const struct lu_env *env,
2594 				    struct ptlrpc_request *req, void *args,
2595 				    int status)
2596 {
2597 	struct mdc_renew_capa_args *ra = args;
2598 	struct mdt_body *body = NULL;
2599 	struct lustre_capa *capa;
2600 
2601 	if (status) {
2602 		capa = ERR_PTR(status);
2603 		goto out;
2604 	}
2605 
2606 	body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
2607 	if (body == NULL) {
2608 		capa = ERR_PTR(-EFAULT);
2609 		goto out;
2610 	}
2611 
2612 	if ((body->valid & OBD_MD_FLOSSCAPA) == 0) {
2613 		capa = ERR_PTR(-ENOENT);
2614 		goto out;
2615 	}
2616 
2617 	capa = req_capsule_server_get(&req->rq_pill, &RMF_CAPA2);
2618 	if (!capa) {
2619 		capa = ERR_PTR(-EFAULT);
2620 		goto out;
2621 	}
2622 out:
2623 	ra->ra_cb(ra->ra_oc, capa);
2624 	return 0;
2625 }
2626 
mdc_renew_capa(struct obd_export * exp,struct obd_capa * oc,renew_capa_cb_t cb)2627 static int mdc_renew_capa(struct obd_export *exp, struct obd_capa *oc,
2628 			  renew_capa_cb_t cb)
2629 {
2630 	struct ptlrpc_request *req;
2631 	struct mdc_renew_capa_args *ra;
2632 
2633 	req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_MDS_GETATTR,
2634 					LUSTRE_MDS_VERSION, MDS_GETATTR);
2635 	if (req == NULL)
2636 		return -ENOMEM;
2637 
2638 	/* NB, OBD_MD_FLOSSCAPA is set here, but it doesn't necessarily mean the
2639 	 * capa to renew is oss capa.
2640 	 */
2641 	mdc_pack_body(req, &oc->c_capa.lc_fid, oc, OBD_MD_FLOSSCAPA, 0, -1, 0);
2642 	ptlrpc_request_set_replen(req);
2643 
2644 	CLASSERT(sizeof(*ra) <= sizeof(req->rq_async_args));
2645 	ra = ptlrpc_req_async_args(req);
2646 	ra->ra_oc = oc;
2647 	ra->ra_cb = cb;
2648 	req->rq_interpret_reply = mdc_interpret_renew_capa;
2649 	ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
2650 	return 0;
2651 }
2652 
2653 static struct obd_ops mdc_obd_ops = {
2654 	.o_owner	    = THIS_MODULE,
2655 	.o_setup	    = mdc_setup,
2656 	.o_precleanup       = mdc_precleanup,
2657 	.o_cleanup	  = mdc_cleanup,
2658 	.o_add_conn	 = client_import_add_conn,
2659 	.o_del_conn	 = client_import_del_conn,
2660 	.o_connect          = client_connect_import,
2661 	.o_disconnect       = client_disconnect_export,
2662 	.o_iocontrol	= mdc_iocontrol,
2663 	.o_set_info_async   = mdc_set_info_async,
2664 	.o_statfs	   = mdc_statfs,
2665 	.o_fid_init	    = client_fid_init,
2666 	.o_fid_fini	    = client_fid_fini,
2667 	.o_fid_alloc	= mdc_fid_alloc,
2668 	.o_import_event     = mdc_import_event,
2669 	.o_get_info	 = mdc_get_info,
2670 	.o_process_config   = mdc_process_config,
2671 	.o_get_uuid	 = mdc_get_uuid,
2672 	.o_quotactl	 = mdc_quotactl,
2673 	.o_quotacheck       = mdc_quotacheck
2674 };
2675 
2676 static struct md_ops mdc_md_ops = {
2677 	.m_getstatus	= mdc_getstatus,
2678 	.m_null_inode	    = mdc_null_inode,
2679 	.m_find_cbdata      = mdc_find_cbdata,
2680 	.m_close	    = mdc_close,
2681 	.m_create	   = mdc_create,
2682 	.m_done_writing     = mdc_done_writing,
2683 	.m_enqueue	  = mdc_enqueue,
2684 	.m_getattr	  = mdc_getattr,
2685 	.m_getattr_name     = mdc_getattr_name,
2686 	.m_intent_lock      = mdc_intent_lock,
2687 	.m_link	     = mdc_link,
2688 	.m_is_subdir	= mdc_is_subdir,
2689 	.m_rename	   = mdc_rename,
2690 	.m_setattr	  = mdc_setattr,
2691 	.m_setxattr	 = mdc_setxattr,
2692 	.m_getxattr	 = mdc_getxattr,
2693 	.m_sync	     = mdc_sync,
2694 	.m_readpage	 = mdc_readpage,
2695 	.m_unlink	   = mdc_unlink,
2696 	.m_cancel_unused    = mdc_cancel_unused,
2697 	.m_init_ea_size     = mdc_init_ea_size,
2698 	.m_set_lock_data    = mdc_set_lock_data,
2699 	.m_lock_match       = mdc_lock_match,
2700 	.m_get_lustre_md    = mdc_get_lustre_md,
2701 	.m_free_lustre_md   = mdc_free_lustre_md,
2702 	.m_set_open_replay_data = mdc_set_open_replay_data,
2703 	.m_clear_open_replay_data = mdc_clear_open_replay_data,
2704 	.m_renew_capa       = mdc_renew_capa,
2705 	.m_unpack_capa      = mdc_unpack_capa,
2706 	.m_get_remote_perm  = mdc_get_remote_perm,
2707 	.m_intent_getattr_async = mdc_intent_getattr_async,
2708 	.m_revalidate_lock      = mdc_revalidate_lock
2709 };
2710 
mdc_init(void)2711 static int __init mdc_init(void)
2712 {
2713 	struct lprocfs_static_vars lvars = { NULL };
2714 
2715 	lprocfs_mdc_init_vars(&lvars);
2716 
2717 	return class_register_type(&mdc_obd_ops, &mdc_md_ops, lvars.module_vars,
2718 				 LUSTRE_MDC_NAME, NULL);
2719 }
2720 
mdc_exit(void)2721 static void /*__exit*/ mdc_exit(void)
2722 {
2723 	class_unregister_type(LUSTRE_MDC_NAME);
2724 }
2725 
2726 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2727 MODULE_DESCRIPTION("Lustre Metadata Client");
2728 MODULE_LICENSE("GPL");
2729 
2730 module_init(mdc_init);
2731 module_exit(mdc_exit);
2732