1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36 
37 #define DEBUG_SUBSYSTEM S_MDC
38 
39 # include <linux/module.h>
40 # include <linux/kernel.h>
41 
42 #include "../include/obd_class.h"
43 #include "mdc_internal.h"
44 #include "../include/lustre_fid.h"
45 
46 /* mdc_setattr does its own semaphore handling */
mdc_reint(struct ptlrpc_request * request,struct mdc_rpc_lock * rpc_lock,int level)47 static int mdc_reint(struct ptlrpc_request *request,
48 		     struct mdc_rpc_lock *rpc_lock,
49 		     int level)
50 {
51 	int rc;
52 
53 	request->rq_send_state = level;
54 
55 	mdc_get_rpc_lock(rpc_lock, NULL);
56 	rc = ptlrpc_queue_wait(request);
57 	mdc_put_rpc_lock(rpc_lock, NULL);
58 	if (rc)
59 		CDEBUG(D_INFO, "error in handling %d\n", rc);
60 	else if (!req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY))
61 		rc = -EPROTO;
62 
63 	return rc;
64 }
65 
66 /* Find and cancel locally locks matched by inode @bits & @mode in the resource
67  * found by @fid. Found locks are added into @cancel list. Returns the amount of
68  * locks added to @cancels list. */
mdc_resource_get_unused(struct obd_export * exp,const struct lu_fid * fid,struct list_head * cancels,ldlm_mode_t mode,__u64 bits)69 int mdc_resource_get_unused(struct obd_export *exp, const struct lu_fid *fid,
70 			    struct list_head *cancels, ldlm_mode_t mode,
71 			    __u64 bits)
72 {
73 	struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
74 	ldlm_policy_data_t policy = {};
75 	struct ldlm_res_id res_id;
76 	struct ldlm_resource *res;
77 	int count;
78 
79 	/* Return, i.e. cancel nothing, only if ELC is supported (flag in
80 	 * export) but disabled through procfs (flag in NS).
81 	 *
82 	 * This distinguishes from a case when ELC is not supported originally,
83 	 * when we still want to cancel locks in advance and just cancel them
84 	 * locally, without sending any RPC. */
85 	if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
86 		return 0;
87 
88 	fid_build_reg_res_name(fid, &res_id);
89 	res = ldlm_resource_get(exp->exp_obd->obd_namespace,
90 				NULL, &res_id, 0, 0);
91 	if (res == NULL)
92 		return 0;
93 	LDLM_RESOURCE_ADDREF(res);
94 	/* Initialize ibits lock policy. */
95 	policy.l_inodebits.bits = bits;
96 	count = ldlm_cancel_resource_local(res, cancels, &policy,
97 					   mode, 0, 0, NULL);
98 	LDLM_RESOURCE_DELREF(res);
99 	ldlm_resource_putref(res);
100 	return count;
101 }
102 
mdc_setattr(struct obd_export * exp,struct md_op_data * op_data,void * ea,int ealen,void * ea2,int ea2len,struct ptlrpc_request ** request,struct md_open_data ** mod)103 int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data,
104 		void *ea, int ealen, void *ea2, int ea2len,
105 		struct ptlrpc_request **request, struct md_open_data **mod)
106 {
107 	LIST_HEAD(cancels);
108 	struct ptlrpc_request *req;
109 	struct mdc_rpc_lock *rpc_lock;
110 	struct obd_device *obd = exp->exp_obd;
111 	int count = 0, rc;
112 	__u64 bits;
113 
114 	LASSERT(op_data != NULL);
115 
116 	bits = MDS_INODELOCK_UPDATE;
117 	if (op_data->op_attr.ia_valid & (ATTR_MODE|ATTR_UID|ATTR_GID))
118 		bits |= MDS_INODELOCK_LOOKUP;
119 	if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
120 	    (fid_is_sane(&op_data->op_fid1)) &&
121 	    !OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET))
122 		count = mdc_resource_get_unused(exp, &op_data->op_fid1,
123 						&cancels, LCK_EX, bits);
124 	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
125 				   &RQF_MDS_REINT_SETATTR);
126 	if (req == NULL) {
127 		ldlm_lock_list_put(&cancels, l_bl_ast, count);
128 		return -ENOMEM;
129 	}
130 	mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
131 	if ((op_data->op_flags & (MF_SOM_CHANGE | MF_EPOCH_OPEN)) == 0)
132 		req_capsule_set_size(&req->rq_pill, &RMF_MDT_EPOCH, RCL_CLIENT,
133 				     0);
134 	req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, ealen);
135 	req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_CLIENT,
136 			     ea2len);
137 
138 	rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
139 	if (rc) {
140 		ptlrpc_request_free(req);
141 		return rc;
142 	}
143 
144 	rpc_lock = obd->u.cli.cl_rpc_lock;
145 
146 	if (op_data->op_attr.ia_valid & (ATTR_MTIME | ATTR_CTIME))
147 		CDEBUG(D_INODE, "setting mtime "CFS_TIME_T
148 		       ", ctime "CFS_TIME_T"\n",
149 		       LTIME_S(op_data->op_attr.ia_mtime),
150 		       LTIME_S(op_data->op_attr.ia_ctime));
151 	mdc_setattr_pack(req, op_data, ea, ealen, ea2, ea2len);
152 
153 	ptlrpc_request_set_replen(req);
154 	if (mod && (op_data->op_flags & MF_EPOCH_OPEN) &&
155 	    req->rq_import->imp_replayable) {
156 		LASSERT(*mod == NULL);
157 
158 		*mod = obd_mod_alloc();
159 		if (*mod == NULL) {
160 			DEBUG_REQ(D_ERROR, req, "Can't allocate md_open_data");
161 		} else {
162 			req->rq_replay = 1;
163 			req->rq_cb_data = *mod;
164 			(*mod)->mod_open_req = req;
165 			req->rq_commit_cb = mdc_commit_open;
166 			(*mod)->mod_is_create = true;
167 			/**
168 			 * Take an extra reference on \var mod, it protects \var
169 			 * mod from being freed on eviction (commit callback is
170 			 * called despite rq_replay flag).
171 			 * Will be put on mdc_done_writing().
172 			 */
173 			obd_mod_get(*mod);
174 		}
175 	}
176 
177 	rc = mdc_reint(req, rpc_lock, LUSTRE_IMP_FULL);
178 
179 	/* Save the obtained info in the original RPC for the replay case. */
180 	if (rc == 0 && (op_data->op_flags & MF_EPOCH_OPEN)) {
181 		struct mdt_ioepoch *epoch;
182 		struct mdt_body  *body;
183 
184 		epoch = req_capsule_client_get(&req->rq_pill, &RMF_MDT_EPOCH);
185 		body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
186 		LASSERT(epoch != NULL);
187 		LASSERT(body != NULL);
188 		epoch->handle = body->handle;
189 		epoch->ioepoch = body->ioepoch;
190 		req->rq_replay_cb = mdc_replay_open;
191 	/** bug 3633, open may be committed and estale answer is not error */
192 	} else if (rc == -ESTALE && (op_data->op_flags & MF_SOM_CHANGE)) {
193 		rc = 0;
194 	} else if (rc == -ERESTARTSYS) {
195 		rc = 0;
196 	}
197 	*request = req;
198 	if (rc && req->rq_commit_cb) {
199 		/* Put an extra reference on \var mod on error case. */
200 		if (mod != NULL && *mod != NULL)
201 			obd_mod_put(*mod);
202 		req->rq_commit_cb(req);
203 	}
204 	return rc;
205 }
206 
mdc_create(struct obd_export * exp,struct md_op_data * op_data,const void * data,int datalen,int mode,__u32 uid,__u32 gid,cfs_cap_t cap_effective,__u64 rdev,struct ptlrpc_request ** request)207 int mdc_create(struct obd_export *exp, struct md_op_data *op_data,
208 	       const void *data, int datalen, int mode, __u32 uid, __u32 gid,
209 	       cfs_cap_t cap_effective, __u64 rdev,
210 	       struct ptlrpc_request **request)
211 {
212 	struct ptlrpc_request *req;
213 	int level, rc;
214 	int count, resends = 0;
215 	struct obd_import *import = exp->exp_obd->u.cli.cl_import;
216 	int generation = import->imp_generation;
217 	LIST_HEAD(cancels);
218 
219 	/* For case if upper layer did not alloc fid, do it now. */
220 	if (!fid_is_sane(&op_data->op_fid2)) {
221 		/*
222 		 * mdc_fid_alloc() may return errno 1 in case of switch to new
223 		 * sequence, handle this.
224 		 */
225 		rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
226 		if (rc < 0) {
227 			CERROR("Can't alloc new fid, rc %d\n", rc);
228 			return rc;
229 		}
230 	}
231 
232 rebuild:
233 	count = 0;
234 	if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
235 	    (fid_is_sane(&op_data->op_fid1)))
236 		count = mdc_resource_get_unused(exp, &op_data->op_fid1,
237 						&cancels, LCK_EX,
238 						MDS_INODELOCK_UPDATE);
239 
240 	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
241 				   &RQF_MDS_REINT_CREATE_RMT_ACL);
242 	if (req == NULL) {
243 		ldlm_lock_list_put(&cancels, l_bl_ast, count);
244 		return -ENOMEM;
245 	}
246 	mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
247 	req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
248 			     op_data->op_namelen + 1);
249 	req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
250 			     data && datalen ? datalen : 0);
251 
252 	rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
253 	if (rc) {
254 		ptlrpc_request_free(req);
255 		return rc;
256 	}
257 
258 	/*
259 	 * mdc_create_pack() fills msg->bufs[1] with name and msg->bufs[2] with
260 	 * tgt, for symlinks or lov MD data.
261 	 */
262 	mdc_create_pack(req, op_data, data, datalen, mode, uid,
263 			gid, cap_effective, rdev);
264 
265 	ptlrpc_request_set_replen(req);
266 
267 	/* ask ptlrpc not to resend on EINPROGRESS since we have our own retry
268 	 * logic here */
269 	req->rq_no_retry_einprogress = 1;
270 
271 	if (resends) {
272 		req->rq_generation_set = 1;
273 		req->rq_import_generation = generation;
274 		req->rq_sent = get_seconds() + resends;
275 	}
276 	level = LUSTRE_IMP_FULL;
277  resend:
278 	rc = mdc_reint(req, exp->exp_obd->u.cli.cl_rpc_lock, level);
279 
280 	/* Resend if we were told to. */
281 	if (rc == -ERESTARTSYS) {
282 		level = LUSTRE_IMP_RECOVER;
283 		goto resend;
284 	} else if (rc == -EINPROGRESS) {
285 		/* Retry create infinitely until succeed or get other
286 		 * error code. */
287 		ptlrpc_req_finished(req);
288 		resends++;
289 
290 		CDEBUG(D_HA, "%s: resend:%d create on "DFID"/"DFID"\n",
291 		       exp->exp_obd->obd_name, resends,
292 		       PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
293 
294 		if (generation == import->imp_generation) {
295 			goto rebuild;
296 		} else {
297 			CDEBUG(D_HA, "resend cross eviction\n");
298 			return -EIO;
299 		}
300 	} else if (rc == 0) {
301 		struct mdt_body *body;
302 		struct lustre_capa *capa;
303 
304 		body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
305 		LASSERT(body);
306 		if (body->valid & OBD_MD_FLMDSCAPA) {
307 			capa = req_capsule_server_get(&req->rq_pill,
308 						      &RMF_CAPA1);
309 			if (capa == NULL)
310 				rc = -EPROTO;
311 		}
312 	}
313 
314 	*request = req;
315 	return rc;
316 }
317 
mdc_unlink(struct obd_export * exp,struct md_op_data * op_data,struct ptlrpc_request ** request)318 int mdc_unlink(struct obd_export *exp, struct md_op_data *op_data,
319 	       struct ptlrpc_request **request)
320 {
321 	LIST_HEAD(cancels);
322 	struct obd_device *obd = class_exp2obd(exp);
323 	struct ptlrpc_request *req = *request;
324 	int count = 0, rc;
325 
326 	LASSERT(req == NULL);
327 
328 	if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
329 	    (fid_is_sane(&op_data->op_fid1)) &&
330 	    !OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET))
331 		count = mdc_resource_get_unused(exp, &op_data->op_fid1,
332 						&cancels, LCK_EX,
333 						MDS_INODELOCK_UPDATE);
334 	if ((op_data->op_flags & MF_MDC_CANCEL_FID3) &&
335 	    (fid_is_sane(&op_data->op_fid3)) &&
336 	    !OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET))
337 		count += mdc_resource_get_unused(exp, &op_data->op_fid3,
338 						 &cancels, LCK_EX,
339 						 MDS_INODELOCK_FULL);
340 	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
341 				   &RQF_MDS_REINT_UNLINK);
342 	if (req == NULL) {
343 		ldlm_lock_list_put(&cancels, l_bl_ast, count);
344 		return -ENOMEM;
345 	}
346 	mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
347 	req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
348 			     op_data->op_namelen + 1);
349 
350 	rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
351 	if (rc) {
352 		ptlrpc_request_free(req);
353 		return rc;
354 	}
355 
356 	mdc_unlink_pack(req, op_data);
357 
358 	req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
359 			     obd->u.cli.cl_default_mds_easize);
360 	req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER,
361 			     obd->u.cli.cl_default_mds_cookiesize);
362 	ptlrpc_request_set_replen(req);
363 
364 	*request = req;
365 
366 	rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
367 	if (rc == -ERESTARTSYS)
368 		rc = 0;
369 	return rc;
370 }
371 
mdc_link(struct obd_export * exp,struct md_op_data * op_data,struct ptlrpc_request ** request)372 int mdc_link(struct obd_export *exp, struct md_op_data *op_data,
373 	     struct ptlrpc_request **request)
374 {
375 	LIST_HEAD(cancels);
376 	struct obd_device *obd = exp->exp_obd;
377 	struct ptlrpc_request *req;
378 	int count = 0, rc;
379 
380 	if ((op_data->op_flags & MF_MDC_CANCEL_FID2) &&
381 	    (fid_is_sane(&op_data->op_fid2)))
382 		count = mdc_resource_get_unused(exp, &op_data->op_fid2,
383 						&cancels, LCK_EX,
384 						MDS_INODELOCK_UPDATE);
385 	if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
386 	    (fid_is_sane(&op_data->op_fid1)))
387 		count += mdc_resource_get_unused(exp, &op_data->op_fid1,
388 						 &cancels, LCK_EX,
389 						 MDS_INODELOCK_UPDATE);
390 
391 	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_REINT_LINK);
392 	if (req == NULL) {
393 		ldlm_lock_list_put(&cancels, l_bl_ast, count);
394 		return -ENOMEM;
395 	}
396 	mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
397 	mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa2);
398 	req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
399 			     op_data->op_namelen + 1);
400 
401 	rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
402 	if (rc) {
403 		ptlrpc_request_free(req);
404 		return rc;
405 	}
406 
407 	mdc_link_pack(req, op_data);
408 	ptlrpc_request_set_replen(req);
409 
410 	rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
411 	*request = req;
412 	if (rc == -ERESTARTSYS)
413 		rc = 0;
414 
415 	return rc;
416 }
417 
mdc_rename(struct obd_export * exp,struct md_op_data * op_data,const char * old,int oldlen,const char * new,int newlen,struct ptlrpc_request ** request)418 int mdc_rename(struct obd_export *exp, struct md_op_data *op_data,
419 	       const char *old, int oldlen, const char *new, int newlen,
420 	       struct ptlrpc_request **request)
421 {
422 	LIST_HEAD(cancels);
423 	struct obd_device *obd = exp->exp_obd;
424 	struct ptlrpc_request *req;
425 	int count = 0, rc;
426 
427 	if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
428 	    (fid_is_sane(&op_data->op_fid1)))
429 		count = mdc_resource_get_unused(exp, &op_data->op_fid1,
430 						&cancels, LCK_EX,
431 						MDS_INODELOCK_UPDATE);
432 	if ((op_data->op_flags & MF_MDC_CANCEL_FID2) &&
433 	    (fid_is_sane(&op_data->op_fid2)))
434 		count += mdc_resource_get_unused(exp, &op_data->op_fid2,
435 						 &cancels, LCK_EX,
436 						 MDS_INODELOCK_UPDATE);
437 	if ((op_data->op_flags & MF_MDC_CANCEL_FID3) &&
438 	    (fid_is_sane(&op_data->op_fid3)))
439 		count += mdc_resource_get_unused(exp, &op_data->op_fid3,
440 						 &cancels, LCK_EX,
441 						 MDS_INODELOCK_LOOKUP);
442 	if ((op_data->op_flags & MF_MDC_CANCEL_FID4) &&
443 	     (fid_is_sane(&op_data->op_fid4)))
444 		count += mdc_resource_get_unused(exp, &op_data->op_fid4,
445 						 &cancels, LCK_EX,
446 						 MDS_INODELOCK_FULL);
447 
448 	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
449 				   &RQF_MDS_REINT_RENAME);
450 	if (req == NULL) {
451 		ldlm_lock_list_put(&cancels, l_bl_ast, count);
452 		return -ENOMEM;
453 	}
454 
455 	mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
456 	mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa2);
457 	req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, oldlen + 1);
458 	req_capsule_set_size(&req->rq_pill, &RMF_SYMTGT, RCL_CLIENT, newlen+1);
459 
460 	rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
461 	if (rc) {
462 		ptlrpc_request_free(req);
463 		return rc;
464 	}
465 
466 	if (exp_connect_cancelset(exp) && req)
467 		ldlm_cli_cancel_list(&cancels, count, req, 0);
468 
469 	mdc_rename_pack(req, op_data, old, oldlen, new, newlen);
470 
471 	req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
472 			     obd->u.cli.cl_default_mds_easize);
473 	req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER,
474 			     obd->u.cli.cl_default_mds_cookiesize);
475 	ptlrpc_request_set_replen(req);
476 
477 	rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
478 	*request = req;
479 	if (rc == -ERESTARTSYS)
480 		rc = 0;
481 
482 	return rc;
483 }
484