1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36 
37 #define DEBUG_SUBSYSTEM S_MDC
38 
39 # include <linux/module.h>
40 # include <linux/kernel.h>
41 
42 #include "../include/obd_class.h"
43 #include "mdc_internal.h"
44 #include "../include/lustre_fid.h"
45 
46 /* mdc_setattr does its own semaphore handling */
mdc_reint(struct ptlrpc_request * request,struct mdc_rpc_lock * rpc_lock,int level)47 static int mdc_reint(struct ptlrpc_request *request,
48 		     struct mdc_rpc_lock *rpc_lock,
49 		     int level)
50 {
51 	int rc;
52 
53 	request->rq_send_state = level;
54 
55 	mdc_get_rpc_lock(rpc_lock, NULL);
56 	rc = ptlrpc_queue_wait(request);
57 	mdc_put_rpc_lock(rpc_lock, NULL);
58 	if (rc)
59 		CDEBUG(D_INFO, "error in handling %d\n", rc);
60 	else if (!req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY))
61 		rc = -EPROTO;
62 
63 	return rc;
64 }
65 
66 /* Find and cancel locally locks matched by inode @bits & @mode in the resource
67  * found by @fid. Found locks are added into @cancel list. Returns the amount of
68  * locks added to @cancels list. */
mdc_resource_get_unused(struct obd_export * exp,const struct lu_fid * fid,struct list_head * cancels,ldlm_mode_t mode,__u64 bits)69 int mdc_resource_get_unused(struct obd_export *exp, const struct lu_fid *fid,
70 			    struct list_head *cancels, ldlm_mode_t mode,
71 			    __u64 bits)
72 {
73 	struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
74 	ldlm_policy_data_t policy = {};
75 	struct ldlm_res_id res_id;
76 	struct ldlm_resource *res;
77 	int count;
78 
79 	/* Return, i.e. cancel nothing, only if ELC is supported (flag in
80 	 * export) but disabled through procfs (flag in NS).
81 	 *
82 	 * This distinguishes from a case when ELC is not supported originally,
83 	 * when we still want to cancel locks in advance and just cancel them
84 	 * locally, without sending any RPC. */
85 	if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
86 		return 0;
87 
88 	fid_build_reg_res_name(fid, &res_id);
89 	res = ldlm_resource_get(exp->exp_obd->obd_namespace,
90 				NULL, &res_id, 0, 0);
91 	if (res == NULL)
92 		return 0;
93 	LDLM_RESOURCE_ADDREF(res);
94 	/* Initialize ibits lock policy. */
95 	policy.l_inodebits.bits = bits;
96 	count = ldlm_cancel_resource_local(res, cancels, &policy,
97 					   mode, 0, 0, NULL);
98 	LDLM_RESOURCE_DELREF(res);
99 	ldlm_resource_putref(res);
100 	return count;
101 }
102 
mdc_setattr(struct obd_export * exp,struct md_op_data * op_data,void * ea,int ealen,void * ea2,int ea2len,struct ptlrpc_request ** request,struct md_open_data ** mod)103 int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data,
104 		void *ea, int ealen, void *ea2, int ea2len,
105 		struct ptlrpc_request **request, struct md_open_data **mod)
106 {
107 	LIST_HEAD(cancels);
108 	struct ptlrpc_request *req;
109 	struct mdc_rpc_lock *rpc_lock;
110 	struct obd_device *obd = exp->exp_obd;
111 	int count = 0, rc;
112 	__u64 bits;
113 
114 	LASSERT(op_data != NULL);
115 
116 	bits = MDS_INODELOCK_UPDATE;
117 	if (op_data->op_attr.ia_valid & (ATTR_MODE|ATTR_UID|ATTR_GID))
118 		bits |= MDS_INODELOCK_LOOKUP;
119 	if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
120 	    (fid_is_sane(&op_data->op_fid1)) &&
121 	    !OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET))
122 		count = mdc_resource_get_unused(exp, &op_data->op_fid1,
123 						&cancels, LCK_EX, bits);
124 	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
125 				   &RQF_MDS_REINT_SETATTR);
126 	if (req == NULL) {
127 		ldlm_lock_list_put(&cancels, l_bl_ast, count);
128 		return -ENOMEM;
129 	}
130 	if ((op_data->op_flags & (MF_SOM_CHANGE | MF_EPOCH_OPEN)) == 0)
131 		req_capsule_set_size(&req->rq_pill, &RMF_MDT_EPOCH, RCL_CLIENT,
132 				     0);
133 	req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, ealen);
134 	req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_CLIENT,
135 			     ea2len);
136 
137 	rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
138 	if (rc) {
139 		ptlrpc_request_free(req);
140 		return rc;
141 	}
142 
143 	rpc_lock = obd->u.cli.cl_rpc_lock;
144 
145 	if (op_data->op_attr.ia_valid & (ATTR_MTIME | ATTR_CTIME))
146 		CDEBUG(D_INODE, "setting mtime %ld, ctime %ld\n",
147 		       LTIME_S(op_data->op_attr.ia_mtime),
148 		       LTIME_S(op_data->op_attr.ia_ctime));
149 	mdc_setattr_pack(req, op_data, ea, ealen, ea2, ea2len);
150 
151 	ptlrpc_request_set_replen(req);
152 	if (mod && (op_data->op_flags & MF_EPOCH_OPEN) &&
153 	    req->rq_import->imp_replayable) {
154 		LASSERT(*mod == NULL);
155 
156 		*mod = obd_mod_alloc();
157 		if (*mod == NULL) {
158 			DEBUG_REQ(D_ERROR, req, "Can't allocate md_open_data");
159 		} else {
160 			req->rq_replay = 1;
161 			req->rq_cb_data = *mod;
162 			(*mod)->mod_open_req = req;
163 			req->rq_commit_cb = mdc_commit_open;
164 			(*mod)->mod_is_create = true;
165 			/**
166 			 * Take an extra reference on \var mod, it protects \var
167 			 * mod from being freed on eviction (commit callback is
168 			 * called despite rq_replay flag).
169 			 * Will be put on mdc_done_writing().
170 			 */
171 			obd_mod_get(*mod);
172 		}
173 	}
174 
175 	rc = mdc_reint(req, rpc_lock, LUSTRE_IMP_FULL);
176 
177 	/* Save the obtained info in the original RPC for the replay case. */
178 	if (rc == 0 && (op_data->op_flags & MF_EPOCH_OPEN)) {
179 		struct mdt_ioepoch *epoch;
180 		struct mdt_body  *body;
181 
182 		epoch = req_capsule_client_get(&req->rq_pill, &RMF_MDT_EPOCH);
183 		body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
184 		LASSERT(epoch != NULL);
185 		LASSERT(body != NULL);
186 		epoch->handle = body->handle;
187 		epoch->ioepoch = body->ioepoch;
188 		req->rq_replay_cb = mdc_replay_open;
189 	/** bug 3633, open may be committed and estale answer is not error */
190 	} else if (rc == -ESTALE && (op_data->op_flags & MF_SOM_CHANGE)) {
191 		rc = 0;
192 	} else if (rc == -ERESTARTSYS) {
193 		rc = 0;
194 	}
195 	*request = req;
196 	if (rc && req->rq_commit_cb) {
197 		/* Put an extra reference on \var mod on error case. */
198 		if (mod != NULL && *mod != NULL)
199 			obd_mod_put(*mod);
200 		req->rq_commit_cb(req);
201 	}
202 	return rc;
203 }
204 
mdc_create(struct obd_export * exp,struct md_op_data * op_data,const void * data,int datalen,int mode,__u32 uid,__u32 gid,cfs_cap_t cap_effective,__u64 rdev,struct ptlrpc_request ** request)205 int mdc_create(struct obd_export *exp, struct md_op_data *op_data,
206 	       const void *data, int datalen, int mode, __u32 uid, __u32 gid,
207 	       cfs_cap_t cap_effective, __u64 rdev,
208 	       struct ptlrpc_request **request)
209 {
210 	struct ptlrpc_request *req;
211 	int level, rc;
212 	int count, resends = 0;
213 	struct obd_import *import = exp->exp_obd->u.cli.cl_import;
214 	int generation = import->imp_generation;
215 	LIST_HEAD(cancels);
216 
217 	/* For case if upper layer did not alloc fid, do it now. */
218 	if (!fid_is_sane(&op_data->op_fid2)) {
219 		/*
220 		 * mdc_fid_alloc() may return errno 1 in case of switch to new
221 		 * sequence, handle this.
222 		 */
223 		rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
224 		if (rc < 0) {
225 			CERROR("Can't alloc new fid, rc %d\n", rc);
226 			return rc;
227 		}
228 	}
229 
230 rebuild:
231 	count = 0;
232 	if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
233 	    (fid_is_sane(&op_data->op_fid1)))
234 		count = mdc_resource_get_unused(exp, &op_data->op_fid1,
235 						&cancels, LCK_EX,
236 						MDS_INODELOCK_UPDATE);
237 
238 	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
239 				   &RQF_MDS_REINT_CREATE_RMT_ACL);
240 	if (req == NULL) {
241 		ldlm_lock_list_put(&cancels, l_bl_ast, count);
242 		return -ENOMEM;
243 	}
244 	req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
245 			     op_data->op_namelen + 1);
246 	req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
247 			     data && datalen ? datalen : 0);
248 
249 	rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
250 	if (rc) {
251 		ptlrpc_request_free(req);
252 		return rc;
253 	}
254 
255 	/*
256 	 * mdc_create_pack() fills msg->bufs[1] with name and msg->bufs[2] with
257 	 * tgt, for symlinks or lov MD data.
258 	 */
259 	mdc_create_pack(req, op_data, data, datalen, mode, uid,
260 			gid, cap_effective, rdev);
261 
262 	ptlrpc_request_set_replen(req);
263 
264 	/* ask ptlrpc not to resend on EINPROGRESS since we have our own retry
265 	 * logic here */
266 	req->rq_no_retry_einprogress = 1;
267 
268 	if (resends) {
269 		req->rq_generation_set = 1;
270 		req->rq_import_generation = generation;
271 		req->rq_sent = ktime_get_real_seconds() + resends;
272 	}
273 	level = LUSTRE_IMP_FULL;
274  resend:
275 	rc = mdc_reint(req, exp->exp_obd->u.cli.cl_rpc_lock, level);
276 
277 	/* Resend if we were told to. */
278 	if (rc == -ERESTARTSYS) {
279 		level = LUSTRE_IMP_RECOVER;
280 		goto resend;
281 	} else if (rc == -EINPROGRESS) {
282 		/* Retry create infinitely until succeed or get other
283 		 * error code. */
284 		ptlrpc_req_finished(req);
285 		resends++;
286 
287 		CDEBUG(D_HA, "%s: resend:%d create on "DFID"/"DFID"\n",
288 		       exp->exp_obd->obd_name, resends,
289 		       PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
290 
291 		if (generation == import->imp_generation) {
292 			goto rebuild;
293 		} else {
294 			CDEBUG(D_HA, "resend cross eviction\n");
295 			return -EIO;
296 		}
297 	}
298 
299 	*request = req;
300 	return rc;
301 }
302 
mdc_unlink(struct obd_export * exp,struct md_op_data * op_data,struct ptlrpc_request ** request)303 int mdc_unlink(struct obd_export *exp, struct md_op_data *op_data,
304 	       struct ptlrpc_request **request)
305 {
306 	LIST_HEAD(cancels);
307 	struct obd_device *obd = class_exp2obd(exp);
308 	struct ptlrpc_request *req = *request;
309 	int count = 0, rc;
310 
311 	LASSERT(req == NULL);
312 
313 	if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
314 	    (fid_is_sane(&op_data->op_fid1)) &&
315 	    !OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET))
316 		count = mdc_resource_get_unused(exp, &op_data->op_fid1,
317 						&cancels, LCK_EX,
318 						MDS_INODELOCK_UPDATE);
319 	if ((op_data->op_flags & MF_MDC_CANCEL_FID3) &&
320 	    (fid_is_sane(&op_data->op_fid3)) &&
321 	    !OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET))
322 		count += mdc_resource_get_unused(exp, &op_data->op_fid3,
323 						 &cancels, LCK_EX,
324 						 MDS_INODELOCK_FULL);
325 	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
326 				   &RQF_MDS_REINT_UNLINK);
327 	if (req == NULL) {
328 		ldlm_lock_list_put(&cancels, l_bl_ast, count);
329 		return -ENOMEM;
330 	}
331 	req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
332 			     op_data->op_namelen + 1);
333 
334 	rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
335 	if (rc) {
336 		ptlrpc_request_free(req);
337 		return rc;
338 	}
339 
340 	mdc_unlink_pack(req, op_data);
341 
342 	req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
343 			     obd->u.cli.cl_default_mds_easize);
344 	req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER,
345 			     obd->u.cli.cl_default_mds_cookiesize);
346 	ptlrpc_request_set_replen(req);
347 
348 	*request = req;
349 
350 	rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
351 	if (rc == -ERESTARTSYS)
352 		rc = 0;
353 	return rc;
354 }
355 
mdc_link(struct obd_export * exp,struct md_op_data * op_data,struct ptlrpc_request ** request)356 int mdc_link(struct obd_export *exp, struct md_op_data *op_data,
357 	     struct ptlrpc_request **request)
358 {
359 	LIST_HEAD(cancels);
360 	struct obd_device *obd = exp->exp_obd;
361 	struct ptlrpc_request *req;
362 	int count = 0, rc;
363 
364 	if ((op_data->op_flags & MF_MDC_CANCEL_FID2) &&
365 	    (fid_is_sane(&op_data->op_fid2)))
366 		count = mdc_resource_get_unused(exp, &op_data->op_fid2,
367 						&cancels, LCK_EX,
368 						MDS_INODELOCK_UPDATE);
369 	if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
370 	    (fid_is_sane(&op_data->op_fid1)))
371 		count += mdc_resource_get_unused(exp, &op_data->op_fid1,
372 						 &cancels, LCK_EX,
373 						 MDS_INODELOCK_UPDATE);
374 
375 	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_REINT_LINK);
376 	if (req == NULL) {
377 		ldlm_lock_list_put(&cancels, l_bl_ast, count);
378 		return -ENOMEM;
379 	}
380 	req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
381 			     op_data->op_namelen + 1);
382 
383 	rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
384 	if (rc) {
385 		ptlrpc_request_free(req);
386 		return rc;
387 	}
388 
389 	mdc_link_pack(req, op_data);
390 	ptlrpc_request_set_replen(req);
391 
392 	rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
393 	*request = req;
394 	if (rc == -ERESTARTSYS)
395 		rc = 0;
396 
397 	return rc;
398 }
399 
mdc_rename(struct obd_export * exp,struct md_op_data * op_data,const char * old,int oldlen,const char * new,int newlen,struct ptlrpc_request ** request)400 int mdc_rename(struct obd_export *exp, struct md_op_data *op_data,
401 	       const char *old, int oldlen, const char *new, int newlen,
402 	       struct ptlrpc_request **request)
403 {
404 	LIST_HEAD(cancels);
405 	struct obd_device *obd = exp->exp_obd;
406 	struct ptlrpc_request *req;
407 	int count = 0, rc;
408 
409 	if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
410 	    (fid_is_sane(&op_data->op_fid1)))
411 		count = mdc_resource_get_unused(exp, &op_data->op_fid1,
412 						&cancels, LCK_EX,
413 						MDS_INODELOCK_UPDATE);
414 	if ((op_data->op_flags & MF_MDC_CANCEL_FID2) &&
415 	    (fid_is_sane(&op_data->op_fid2)))
416 		count += mdc_resource_get_unused(exp, &op_data->op_fid2,
417 						 &cancels, LCK_EX,
418 						 MDS_INODELOCK_UPDATE);
419 	if ((op_data->op_flags & MF_MDC_CANCEL_FID3) &&
420 	    (fid_is_sane(&op_data->op_fid3)))
421 		count += mdc_resource_get_unused(exp, &op_data->op_fid3,
422 						 &cancels, LCK_EX,
423 						 MDS_INODELOCK_LOOKUP);
424 	if ((op_data->op_flags & MF_MDC_CANCEL_FID4) &&
425 	     (fid_is_sane(&op_data->op_fid4)))
426 		count += mdc_resource_get_unused(exp, &op_data->op_fid4,
427 						 &cancels, LCK_EX,
428 						 MDS_INODELOCK_FULL);
429 
430 	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
431 				   &RQF_MDS_REINT_RENAME);
432 	if (req == NULL) {
433 		ldlm_lock_list_put(&cancels, l_bl_ast, count);
434 		return -ENOMEM;
435 	}
436 
437 	req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, oldlen + 1);
438 	req_capsule_set_size(&req->rq_pill, &RMF_SYMTGT, RCL_CLIENT, newlen+1);
439 
440 	rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
441 	if (rc) {
442 		ptlrpc_request_free(req);
443 		return rc;
444 	}
445 
446 	if (exp_connect_cancelset(exp) && req)
447 		ldlm_cli_cancel_list(&cancels, count, req, 0);
448 
449 	mdc_rename_pack(req, op_data, old, oldlen, new, newlen);
450 
451 	req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
452 			     obd->u.cli.cl_default_mds_easize);
453 	req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER,
454 			     obd->u.cli.cl_default_mds_cookiesize);
455 	ptlrpc_request_set_replen(req);
456 
457 	rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
458 	*request = req;
459 	if (rc == -ERESTARTSYS)
460 		rc = 0;
461 
462 	return rc;
463 }
464