1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36 
37 #define DEBUG_SUBSYSTEM S_MDC
38 
39 # include <linux/module.h>
40 
41 #include "../include/lustre_intent.h"
42 #include "../include/obd.h"
43 #include "../include/obd_class.h"
44 #include "../include/lustre_dlm.h"
45 #include "../include/lustre_fid.h"	/* fid_res_name_eq() */
46 #include "../include/lustre_mdc.h"
47 #include "../include/lustre_net.h"
48 #include "../include/lustre_req_layout.h"
49 #include "mdc_internal.h"
50 
51 struct mdc_getattr_args {
52 	struct obd_export	   *ga_exp;
53 	struct md_enqueue_info      *ga_minfo;
54 	struct ldlm_enqueue_info    *ga_einfo;
55 };
56 
it_disposition(struct lookup_intent * it,int flag)57 int it_disposition(struct lookup_intent *it, int flag)
58 {
59 	return it->d.lustre.it_disposition & flag;
60 }
61 EXPORT_SYMBOL(it_disposition);
62 
it_set_disposition(struct lookup_intent * it,int flag)63 void it_set_disposition(struct lookup_intent *it, int flag)
64 {
65 	it->d.lustre.it_disposition |= flag;
66 }
67 EXPORT_SYMBOL(it_set_disposition);
68 
it_clear_disposition(struct lookup_intent * it,int flag)69 void it_clear_disposition(struct lookup_intent *it, int flag)
70 {
71 	it->d.lustre.it_disposition &= ~flag;
72 }
73 EXPORT_SYMBOL(it_clear_disposition);
74 
it_open_error(int phase,struct lookup_intent * it)75 int it_open_error(int phase, struct lookup_intent *it)
76 {
77 	if (it_disposition(it, DISP_OPEN_LEASE)) {
78 		if (phase >= DISP_OPEN_LEASE)
79 			return it->d.lustre.it_status;
80 		else
81 			return 0;
82 	}
83 	if (it_disposition(it, DISP_OPEN_OPEN)) {
84 		if (phase >= DISP_OPEN_OPEN)
85 			return it->d.lustre.it_status;
86 		else
87 			return 0;
88 	}
89 
90 	if (it_disposition(it, DISP_OPEN_CREATE)) {
91 		if (phase >= DISP_OPEN_CREATE)
92 			return it->d.lustre.it_status;
93 		else
94 			return 0;
95 	}
96 
97 	if (it_disposition(it, DISP_LOOKUP_EXECD)) {
98 		if (phase >= DISP_LOOKUP_EXECD)
99 			return it->d.lustre.it_status;
100 		else
101 			return 0;
102 	}
103 
104 	if (it_disposition(it, DISP_IT_EXECD)) {
105 		if (phase >= DISP_IT_EXECD)
106 			return it->d.lustre.it_status;
107 		else
108 			return 0;
109 	}
110 	CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
111 	       it->d.lustre.it_status);
112 	LBUG();
113 	return 0;
114 }
115 EXPORT_SYMBOL(it_open_error);
116 
117 /* this must be called on a lockh that is known to have a referenced lock */
mdc_set_lock_data(struct obd_export * exp,__u64 * lockh,void * data,__u64 * bits)118 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
119 		      __u64 *bits)
120 {
121 	struct ldlm_lock *lock;
122 	struct inode *new_inode = data;
123 
124 	if (bits)
125 		*bits = 0;
126 
127 	if (!*lockh)
128 		return 0;
129 
130 	lock = ldlm_handle2lock((struct lustre_handle *)lockh);
131 
132 	LASSERT(lock != NULL);
133 	lock_res_and_lock(lock);
134 	if (lock->l_resource->lr_lvb_inode &&
135 	    lock->l_resource->lr_lvb_inode != data) {
136 		struct inode *old_inode = lock->l_resource->lr_lvb_inode;
137 
138 		LASSERTF(old_inode->i_state & I_FREEING,
139 			 "Found existing inode %p/%lu/%u state %lu in lock: setting data to %p/%lu/%u\n",
140 			 old_inode, old_inode->i_ino, old_inode->i_generation,
141 			 old_inode->i_state, new_inode, new_inode->i_ino,
142 			 new_inode->i_generation);
143 	}
144 	lock->l_resource->lr_lvb_inode = new_inode;
145 	if (bits)
146 		*bits = lock->l_policy_data.l_inodebits.bits;
147 
148 	unlock_res_and_lock(lock);
149 	LDLM_LOCK_PUT(lock);
150 
151 	return 0;
152 }
153 
mdc_lock_match(struct obd_export * exp,__u64 flags,const struct lu_fid * fid,ldlm_type_t type,ldlm_policy_data_t * policy,ldlm_mode_t mode,struct lustre_handle * lockh)154 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
155 			   const struct lu_fid *fid, ldlm_type_t type,
156 			   ldlm_policy_data_t *policy, ldlm_mode_t mode,
157 			   struct lustre_handle *lockh)
158 {
159 	struct ldlm_res_id res_id;
160 	ldlm_mode_t rc;
161 
162 	fid_build_reg_res_name(fid, &res_id);
163 	/* LU-4405: Clear bits not supported by server */
164 	policy->l_inodebits.bits &= exp_connect_ibits(exp);
165 	rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
166 			     &res_id, type, policy, mode, lockh, 0);
167 	return rc;
168 }
169 
mdc_cancel_unused(struct obd_export * exp,const struct lu_fid * fid,ldlm_policy_data_t * policy,ldlm_mode_t mode,ldlm_cancel_flags_t flags,void * opaque)170 int mdc_cancel_unused(struct obd_export *exp,
171 		      const struct lu_fid *fid,
172 		      ldlm_policy_data_t *policy,
173 		      ldlm_mode_t mode,
174 		      ldlm_cancel_flags_t flags,
175 		      void *opaque)
176 {
177 	struct ldlm_res_id res_id;
178 	struct obd_device *obd = class_exp2obd(exp);
179 	int rc;
180 
181 	fid_build_reg_res_name(fid, &res_id);
182 	rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
183 					     policy, mode, flags, opaque);
184 	return rc;
185 }
186 
mdc_null_inode(struct obd_export * exp,const struct lu_fid * fid)187 int mdc_null_inode(struct obd_export *exp,
188 		   const struct lu_fid *fid)
189 {
190 	struct ldlm_res_id res_id;
191 	struct ldlm_resource *res;
192 	struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
193 
194 	LASSERTF(ns != NULL, "no namespace passed\n");
195 
196 	fid_build_reg_res_name(fid, &res_id);
197 
198 	res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
199 	if (res == NULL)
200 		return 0;
201 
202 	lock_res(res);
203 	res->lr_lvb_inode = NULL;
204 	unlock_res(res);
205 
206 	ldlm_resource_putref(res);
207 	return 0;
208 }
209 
210 /* find any ldlm lock of the inode in mdc
211  * return 0    not find
212  *	1    find one
213  *      < 0    error */
mdc_find_cbdata(struct obd_export * exp,const struct lu_fid * fid,ldlm_iterator_t it,void * data)214 int mdc_find_cbdata(struct obd_export *exp,
215 		    const struct lu_fid *fid,
216 		    ldlm_iterator_t it, void *data)
217 {
218 	struct ldlm_res_id res_id;
219 	int rc = 0;
220 
221 	fid_build_reg_res_name((struct lu_fid *)fid, &res_id);
222 	rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
223 				   it, data);
224 	if (rc == LDLM_ITER_STOP)
225 		return 1;
226 	else if (rc == LDLM_ITER_CONTINUE)
227 		return 0;
228 	return rc;
229 }
230 
mdc_clear_replay_flag(struct ptlrpc_request * req,int rc)231 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
232 {
233 	/* Don't hold error requests for replay. */
234 	if (req->rq_replay) {
235 		spin_lock(&req->rq_lock);
236 		req->rq_replay = 0;
237 		spin_unlock(&req->rq_lock);
238 	}
239 	if (rc && req->rq_transno != 0) {
240 		DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
241 		LBUG();
242 	}
243 }
244 
245 /* Save a large LOV EA into the request buffer so that it is available
246  * for replay.  We don't do this in the initial request because the
247  * original request doesn't need this buffer (at most it sends just the
248  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
249  * buffer and may also be difficult to allocate and save a very large
250  * request buffer for each open. (bug 5707)
251  *
252  * OOM here may cause recovery failure if lmm is needed (only for the
253  * original open if the MDS crashed just when this client also OOM'd)
254  * but this is incredibly unlikely, and questionable whether the client
255  * could do MDS recovery under OOM anyways... */
mdc_realloc_openmsg(struct ptlrpc_request * req,struct mdt_body * body)256 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
257 				struct mdt_body *body)
258 {
259 	int     rc;
260 
261 	/* FIXME: remove this explicit offset. */
262 	rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
263 					body->eadatasize);
264 	if (rc) {
265 		CERROR("Can't enlarge segment %d size to %d\n",
266 		       DLM_INTENT_REC_OFF + 4, body->eadatasize);
267 		body->valid &= ~OBD_MD_FLEASIZE;
268 		body->eadatasize = 0;
269 	}
270 }
271 
mdc_intent_open_pack(struct obd_export * exp,struct lookup_intent * it,struct md_op_data * op_data,void * lmm,int lmmsize,void * cb_data)272 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
273 						   struct lookup_intent *it,
274 						   struct md_op_data *op_data,
275 						   void *lmm, int lmmsize,
276 						   void *cb_data)
277 {
278 	struct ptlrpc_request *req;
279 	struct obd_device     *obddev = class_exp2obd(exp);
280 	struct ldlm_intent    *lit;
281 	LIST_HEAD(cancels);
282 	int		    count = 0;
283 	int		    mode;
284 	int		    rc;
285 
286 	it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
287 
288 	/* XXX: openlock is not cancelled for cross-refs. */
289 	/* If inode is known, cancel conflicting OPEN locks. */
290 	if (fid_is_sane(&op_data->op_fid2)) {
291 		if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
292 			if (it->it_flags & FMODE_WRITE)
293 				mode = LCK_EX;
294 			else
295 				mode = LCK_PR;
296 		} else {
297 			if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
298 				mode = LCK_CW;
299 			else if (it->it_flags & __FMODE_EXEC)
300 				mode = LCK_PR;
301 			else
302 				mode = LCK_CR;
303 		}
304 		count = mdc_resource_get_unused(exp, &op_data->op_fid2,
305 						&cancels, mode,
306 						MDS_INODELOCK_OPEN);
307 	}
308 
309 	/* If CREATE, cancel parent's UPDATE lock. */
310 	if (it->it_op & IT_CREAT)
311 		mode = LCK_EX;
312 	else
313 		mode = LCK_CR;
314 	count += mdc_resource_get_unused(exp, &op_data->op_fid1,
315 					 &cancels, mode,
316 					 MDS_INODELOCK_UPDATE);
317 
318 	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
319 				   &RQF_LDLM_INTENT_OPEN);
320 	if (req == NULL) {
321 		ldlm_lock_list_put(&cancels, l_bl_ast, count);
322 		return ERR_PTR(-ENOMEM);
323 	}
324 
325 	/* parent capability */
326 	mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
327 	/* child capability, reserve the size according to parent capa, it will
328 	 * be filled after we get the reply */
329 	mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
330 
331 	req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
332 			     op_data->op_namelen + 1);
333 	req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
334 			     max(lmmsize, obddev->u.cli.cl_default_mds_easize));
335 
336 	rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
337 	if (rc < 0) {
338 		ptlrpc_request_free(req);
339 		return ERR_PTR(rc);
340 	}
341 
342 	spin_lock(&req->rq_lock);
343 	req->rq_replay = req->rq_import->imp_replayable;
344 	spin_unlock(&req->rq_lock);
345 
346 	/* pack the intent */
347 	lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
348 	lit->opc = (__u64)it->it_op;
349 
350 	/* pack the intended request */
351 	mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
352 		      lmmsize);
353 
354 	/* for remote client, fetch remote perm for current user */
355 	if (client_is_remote(exp))
356 		req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
357 				     sizeof(struct mdt_remote_perm));
358 	ptlrpc_request_set_replen(req);
359 	return req;
360 }
361 
362 static struct ptlrpc_request *
mdc_intent_getxattr_pack(struct obd_export * exp,struct lookup_intent * it,struct md_op_data * op_data)363 mdc_intent_getxattr_pack(struct obd_export *exp,
364 			 struct lookup_intent *it,
365 			 struct md_op_data *op_data)
366 {
367 	struct ptlrpc_request	*req;
368 	struct ldlm_intent	*lit;
369 	int			rc, count = 0, maxdata;
370 	LIST_HEAD(cancels);
371 
372 
373 
374 	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
375 					&RQF_LDLM_INTENT_GETXATTR);
376 	if (req == NULL)
377 		return ERR_PTR(-ENOMEM);
378 
379 	mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
380 
381 	rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
382 	if (rc) {
383 		ptlrpc_request_free(req);
384 		return ERR_PTR(rc);
385 	}
386 
387 	/* pack the intent */
388 	lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
389 	lit->opc = IT_GETXATTR;
390 
391 	maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
392 
393 	/* pack the intended request */
394 	mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
395 			op_data->op_valid, maxdata, -1, 0);
396 
397 	req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
398 				RCL_SERVER, maxdata);
399 
400 	req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
401 				RCL_SERVER, maxdata);
402 
403 	req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
404 				RCL_SERVER, maxdata);
405 
406 	ptlrpc_request_set_replen(req);
407 
408 	return req;
409 }
410 
mdc_intent_unlink_pack(struct obd_export * exp,struct lookup_intent * it,struct md_op_data * op_data)411 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
412 						     struct lookup_intent *it,
413 						     struct md_op_data *op_data)
414 {
415 	struct ptlrpc_request *req;
416 	struct obd_device     *obddev = class_exp2obd(exp);
417 	struct ldlm_intent    *lit;
418 	int		    rc;
419 
420 	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
421 				   &RQF_LDLM_INTENT_UNLINK);
422 	if (req == NULL)
423 		return ERR_PTR(-ENOMEM);
424 
425 	mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
426 	req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
427 			     op_data->op_namelen + 1);
428 
429 	rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
430 	if (rc) {
431 		ptlrpc_request_free(req);
432 		return ERR_PTR(rc);
433 	}
434 
435 	/* pack the intent */
436 	lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
437 	lit->opc = (__u64)it->it_op;
438 
439 	/* pack the intended request */
440 	mdc_unlink_pack(req, op_data);
441 
442 	req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
443 			     obddev->u.cli.cl_default_mds_easize);
444 	req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
445 			     obddev->u.cli.cl_default_mds_cookiesize);
446 	ptlrpc_request_set_replen(req);
447 	return req;
448 }
449 
mdc_intent_getattr_pack(struct obd_export * exp,struct lookup_intent * it,struct md_op_data * op_data)450 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
451 						    struct lookup_intent *it,
452 						    struct md_op_data *op_data)
453 {
454 	struct ptlrpc_request *req;
455 	struct obd_device     *obddev = class_exp2obd(exp);
456 	u64		       valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
457 				       OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
458 				       OBD_MD_FLMDSCAPA | OBD_MD_MEA |
459 				       (client_is_remote(exp) ?
460 					       OBD_MD_FLRMTPERM : OBD_MD_FLACL);
461 	struct ldlm_intent    *lit;
462 	int		    rc;
463 	int		    easize;
464 
465 	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
466 				   &RQF_LDLM_INTENT_GETATTR);
467 	if (req == NULL)
468 		return ERR_PTR(-ENOMEM);
469 
470 	mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
471 	req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
472 			     op_data->op_namelen + 1);
473 
474 	rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
475 	if (rc) {
476 		ptlrpc_request_free(req);
477 		return ERR_PTR(rc);
478 	}
479 
480 	/* pack the intent */
481 	lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
482 	lit->opc = (__u64)it->it_op;
483 
484 	if (obddev->u.cli.cl_default_mds_easize > 0)
485 		easize = obddev->u.cli.cl_default_mds_easize;
486 	else
487 		easize = obddev->u.cli.cl_max_mds_easize;
488 
489 	/* pack the intended request */
490 	mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
491 
492 	req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
493 	if (client_is_remote(exp))
494 		req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
495 				     sizeof(struct mdt_remote_perm));
496 	ptlrpc_request_set_replen(req);
497 	return req;
498 }
499 
mdc_intent_layout_pack(struct obd_export * exp,struct lookup_intent * it,struct md_op_data * unused)500 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
501 						     struct lookup_intent *it,
502 						     struct md_op_data *unused)
503 {
504 	struct obd_device     *obd = class_exp2obd(exp);
505 	struct ptlrpc_request *req;
506 	struct ldlm_intent    *lit;
507 	struct layout_intent  *layout;
508 	int rc;
509 
510 	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
511 				&RQF_LDLM_INTENT_LAYOUT);
512 	if (req == NULL)
513 		return ERR_PTR(-ENOMEM);
514 
515 	req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
516 	rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
517 	if (rc) {
518 		ptlrpc_request_free(req);
519 		return ERR_PTR(rc);
520 	}
521 
522 	/* pack the intent */
523 	lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
524 	lit->opc = (__u64)it->it_op;
525 
526 	/* pack the layout intent request */
527 	layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
528 	/* LAYOUT_INTENT_ACCESS is generic, specific operation will be
529 	 * set for replication */
530 	layout->li_opc = LAYOUT_INTENT_ACCESS;
531 
532 	req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
533 			     obd->u.cli.cl_default_mds_easize);
534 	ptlrpc_request_set_replen(req);
535 	return req;
536 }
537 
538 static struct ptlrpc_request *
mdc_enqueue_pack(struct obd_export * exp,int lvb_len)539 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
540 {
541 	struct ptlrpc_request *req;
542 	int rc;
543 
544 	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
545 	if (req == NULL)
546 		return ERR_PTR(-ENOMEM);
547 
548 	rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
549 	if (rc) {
550 		ptlrpc_request_free(req);
551 		return ERR_PTR(rc);
552 	}
553 
554 	req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
555 	ptlrpc_request_set_replen(req);
556 	return req;
557 }
558 
mdc_finish_enqueue(struct obd_export * exp,struct ptlrpc_request * req,struct ldlm_enqueue_info * einfo,struct lookup_intent * it,struct lustre_handle * lockh,int rc)559 static int mdc_finish_enqueue(struct obd_export *exp,
560 			      struct ptlrpc_request *req,
561 			      struct ldlm_enqueue_info *einfo,
562 			      struct lookup_intent *it,
563 			      struct lustre_handle *lockh,
564 			      int rc)
565 {
566 	struct req_capsule  *pill = &req->rq_pill;
567 	struct ldlm_request *lockreq;
568 	struct ldlm_reply   *lockrep;
569 	struct lustre_intent_data *intent = &it->d.lustre;
570 	struct ldlm_lock    *lock;
571 	void		*lvb_data = NULL;
572 	int		  lvb_len = 0;
573 
574 	LASSERT(rc >= 0);
575 	/* Similarly, if we're going to replay this request, we don't want to
576 	 * actually get a lock, just perform the intent. */
577 	if (req->rq_transno || req->rq_replay) {
578 		lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
579 		lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
580 	}
581 
582 	if (rc == ELDLM_LOCK_ABORTED) {
583 		einfo->ei_mode = 0;
584 		memset(lockh, 0, sizeof(*lockh));
585 		rc = 0;
586 	} else { /* rc = 0 */
587 		lock = ldlm_handle2lock(lockh);
588 		LASSERT(lock != NULL);
589 
590 		/* If the server gave us back a different lock mode, we should
591 		 * fix up our variables. */
592 		if (lock->l_req_mode != einfo->ei_mode) {
593 			ldlm_lock_addref(lockh, lock->l_req_mode);
594 			ldlm_lock_decref(lockh, einfo->ei_mode);
595 			einfo->ei_mode = lock->l_req_mode;
596 		}
597 		LDLM_LOCK_PUT(lock);
598 	}
599 
600 	lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
601 	LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
602 
603 	intent->it_disposition = (int)lockrep->lock_policy_res1;
604 	intent->it_status = (int)lockrep->lock_policy_res2;
605 	intent->it_lock_mode = einfo->ei_mode;
606 	intent->it_lock_handle = lockh->cookie;
607 	intent->it_data = req;
608 
609 	/* Technically speaking rq_transno must already be zero if
610 	 * it_status is in error, so the check is a bit redundant */
611 	if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
612 		mdc_clear_replay_flag(req, intent->it_status);
613 
614 	/* If we're doing an IT_OPEN which did not result in an actual
615 	 * successful open, then we need to remove the bit which saves
616 	 * this request for unconditional replay.
617 	 *
618 	 * It's important that we do this first!  Otherwise we might exit the
619 	 * function without doing so, and try to replay a failed create
620 	 * (bug 3440) */
621 	if (it->it_op & IT_OPEN && req->rq_replay &&
622 	    (!it_disposition(it, DISP_OPEN_OPEN) || intent->it_status != 0))
623 		mdc_clear_replay_flag(req, intent->it_status);
624 
625 	DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
626 		  it->it_op, intent->it_disposition, intent->it_status);
627 
628 	/* We know what to expect, so we do any byte flipping required here */
629 	if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
630 		struct mdt_body *body;
631 
632 		body = req_capsule_server_get(pill, &RMF_MDT_BODY);
633 		if (body == NULL) {
634 			CERROR("Can't swab mdt_body\n");
635 			return -EPROTO;
636 		}
637 
638 		if (it_disposition(it, DISP_OPEN_OPEN) &&
639 		    !it_open_error(DISP_OPEN_OPEN, it)) {
640 			/*
641 			 * If this is a successful OPEN request, we need to set
642 			 * replay handler and data early, so that if replay
643 			 * happens immediately after swabbing below, new reply
644 			 * is swabbed by that handler correctly.
645 			 */
646 			mdc_set_open_replay_data(NULL, NULL, it);
647 		}
648 
649 		if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
650 			void *eadata;
651 
652 			mdc_update_max_ea_from_body(exp, body);
653 
654 			/*
655 			 * The eadata is opaque; just check that it is there.
656 			 * Eventually, obd_unpackmd() will check the contents.
657 			 */
658 			eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
659 							      body->eadatasize);
660 			if (eadata == NULL)
661 				return -EPROTO;
662 
663 			/* save lvb data and length in case this is for layout
664 			 * lock */
665 			lvb_data = eadata;
666 			lvb_len = body->eadatasize;
667 
668 			/*
669 			 * We save the reply LOV EA in case we have to replay a
670 			 * create for recovery.  If we didn't allocate a large
671 			 * enough request buffer above we need to reallocate it
672 			 * here to hold the actual LOV EA.
673 			 *
674 			 * To not save LOV EA if request is not going to replay
675 			 * (for example error one).
676 			 */
677 			if ((it->it_op & IT_OPEN) && req->rq_replay) {
678 				void *lmm;
679 
680 				if (req_capsule_get_size(pill, &RMF_EADATA,
681 							 RCL_CLIENT) <
682 				    body->eadatasize)
683 					mdc_realloc_openmsg(req, body);
684 				else
685 					req_capsule_shrink(pill, &RMF_EADATA,
686 							   body->eadatasize,
687 							   RCL_CLIENT);
688 
689 				req_capsule_set_size(pill, &RMF_EADATA,
690 						     RCL_CLIENT,
691 						     body->eadatasize);
692 
693 				lmm = req_capsule_client_get(pill, &RMF_EADATA);
694 				if (lmm)
695 					memcpy(lmm, eadata, body->eadatasize);
696 			}
697 		}
698 
699 		if (body->valid & OBD_MD_FLRMTPERM) {
700 			struct mdt_remote_perm *perm;
701 
702 			LASSERT(client_is_remote(exp));
703 			perm = req_capsule_server_swab_get(pill, &RMF_ACL,
704 						lustre_swab_mdt_remote_perm);
705 			if (perm == NULL)
706 				return -EPROTO;
707 		}
708 		if (body->valid & OBD_MD_FLMDSCAPA) {
709 			struct lustre_capa *capa, *p;
710 
711 			capa = req_capsule_server_get(pill, &RMF_CAPA1);
712 			if (capa == NULL)
713 				return -EPROTO;
714 
715 			if (it->it_op & IT_OPEN) {
716 				/* client fid capa will be checked in replay */
717 				p = req_capsule_client_get(pill, &RMF_CAPA2);
718 				LASSERT(p);
719 				*p = *capa;
720 			}
721 		}
722 		if (body->valid & OBD_MD_FLOSSCAPA) {
723 			struct lustre_capa *capa;
724 
725 			capa = req_capsule_server_get(pill, &RMF_CAPA2);
726 			if (capa == NULL)
727 				return -EPROTO;
728 		}
729 	} else if (it->it_op & IT_LAYOUT) {
730 		/* maybe the lock was granted right away and layout
731 		 * is packed into RMF_DLM_LVB of req */
732 		lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
733 		if (lvb_len > 0) {
734 			lvb_data = req_capsule_server_sized_get(pill,
735 							&RMF_DLM_LVB, lvb_len);
736 			if (lvb_data == NULL)
737 				return -EPROTO;
738 		}
739 	}
740 
741 	/* fill in stripe data for layout lock */
742 	lock = ldlm_handle2lock(lockh);
743 	if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
744 		void *lmm;
745 
746 		LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
747 			ldlm_it2str(it->it_op), lvb_len);
748 
749 		OBD_ALLOC_LARGE(lmm, lvb_len);
750 		if (lmm == NULL) {
751 			LDLM_LOCK_PUT(lock);
752 			return -ENOMEM;
753 		}
754 		memcpy(lmm, lvb_data, lvb_len);
755 
756 		/* install lvb_data */
757 		lock_res_and_lock(lock);
758 		if (lock->l_lvb_data == NULL) {
759 			lock->l_lvb_type = LVB_T_LAYOUT;
760 			lock->l_lvb_data = lmm;
761 			lock->l_lvb_len = lvb_len;
762 			lmm = NULL;
763 		}
764 		unlock_res_and_lock(lock);
765 		if (lmm != NULL)
766 			OBD_FREE_LARGE(lmm, lvb_len);
767 	}
768 	if (lock != NULL)
769 		LDLM_LOCK_PUT(lock);
770 
771 	return rc;
772 }
773 
774 /* We always reserve enough space in the reply packet for a stripe MD, because
775  * we don't know in advance the file type. */
mdc_enqueue(struct obd_export * exp,struct ldlm_enqueue_info * einfo,struct lookup_intent * it,struct md_op_data * op_data,struct lustre_handle * lockh,void * lmm,int lmmsize,struct ptlrpc_request ** reqp,u64 extra_lock_flags)776 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
777 		struct lookup_intent *it, struct md_op_data *op_data,
778 		struct lustre_handle *lockh, void *lmm, int lmmsize,
779 		struct ptlrpc_request **reqp, u64 extra_lock_flags)
780 {
781 	static const ldlm_policy_data_t lookup_policy = {
782 		.l_inodebits = { MDS_INODELOCK_LOOKUP }
783 	};
784 	static const ldlm_policy_data_t update_policy = {
785 		.l_inodebits = { MDS_INODELOCK_UPDATE }
786 	};
787 	static const ldlm_policy_data_t layout_policy = {
788 		.l_inodebits = { MDS_INODELOCK_LAYOUT }
789 	};
790 	static const ldlm_policy_data_t getxattr_policy = {
791 		.l_inodebits = { MDS_INODELOCK_XATTR }
792 	};
793 	ldlm_policy_data_t const *policy = &lookup_policy;
794 	struct obd_device *obddev = class_exp2obd(exp);
795 	struct ptlrpc_request *req;
796 	u64 flags, saved_flags = extra_lock_flags;
797 	struct ldlm_res_id res_id;
798 	int generation, resends = 0;
799 	struct ldlm_reply *lockrep;
800 	enum lvb_type lvb_type = LVB_T_NONE;
801 	int rc;
802 
803 	LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
804 		 einfo->ei_type);
805 
806 	fid_build_reg_res_name(&op_data->op_fid1, &res_id);
807 
808 	if (it) {
809 		saved_flags |= LDLM_FL_HAS_INTENT;
810 		if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
811 			policy = &update_policy;
812 		else if (it->it_op & IT_LAYOUT)
813 			policy = &layout_policy;
814 		else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
815 			policy = &getxattr_policy;
816 	}
817 
818 	LASSERT(reqp == NULL);
819 
820 	generation = obddev->u.cli.cl_import->imp_generation;
821 resend:
822 	flags = saved_flags;
823 	if (!it) {
824 		/* The only way right now is FLOCK, in this case we hide flock
825 		   policy as lmm, but lmmsize is 0 */
826 		LASSERT(lmm && lmmsize == 0);
827 		LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
828 			 einfo->ei_type);
829 		policy = (ldlm_policy_data_t *)lmm;
830 		res_id.name[3] = LDLM_FLOCK;
831 		req = NULL;
832 	} else if (it->it_op & IT_OPEN) {
833 		req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
834 					   einfo->ei_cbdata);
835 		policy = &update_policy;
836 		einfo->ei_cbdata = NULL;
837 		lmm = NULL;
838 	} else if (it->it_op & IT_UNLINK) {
839 		req = mdc_intent_unlink_pack(exp, it, op_data);
840 	} else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
841 		req = mdc_intent_getattr_pack(exp, it, op_data);
842 	} else if (it->it_op & IT_READDIR) {
843 		req = mdc_enqueue_pack(exp, 0);
844 	} else if (it->it_op & IT_LAYOUT) {
845 		if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
846 			return -EOPNOTSUPP;
847 		req = mdc_intent_layout_pack(exp, it, op_data);
848 		lvb_type = LVB_T_LAYOUT;
849 	} else if (it->it_op & IT_GETXATTR) {
850 		req = mdc_intent_getxattr_pack(exp, it, op_data);
851 	} else {
852 		LBUG();
853 		return -EINVAL;
854 	}
855 
856 	if (IS_ERR(req))
857 		return PTR_ERR(req);
858 
859 	if (req != NULL && it && it->it_op & IT_CREAT)
860 		/* ask ptlrpc not to resend on EINPROGRESS since we have our own
861 		 * retry logic */
862 		req->rq_no_retry_einprogress = 1;
863 
864 	if (resends) {
865 		req->rq_generation_set = 1;
866 		req->rq_import_generation = generation;
867 		req->rq_sent = get_seconds() + resends;
868 	}
869 
870 	/* It is important to obtain rpc_lock first (if applicable), so that
871 	 * threads that are serialised with rpc_lock are not polluting our
872 	 * rpcs in flight counter. We do not do flock request limiting, though*/
873 	if (it) {
874 		mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
875 		rc = mdc_enter_request(&obddev->u.cli);
876 		if (rc != 0) {
877 			mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
878 			mdc_clear_replay_flag(req, 0);
879 			ptlrpc_req_finished(req);
880 			return rc;
881 		}
882 	}
883 
884 	rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
885 			      0, lvb_type, lockh, 0);
886 	if (!it) {
887 		/* For flock requests we immediately return without further
888 		   delay and let caller deal with the rest, since rest of
889 		   this function metadata processing makes no sense for flock
890 		   requests anyway. But in case of problem during comms with
891 		   Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
892 		   can not rely on caller and this mainly for F_UNLCKs
893 		   (explicits or automatically generated by Kernel to clean
894 		   current FLocks upon exit) that can't be trashed */
895 		if ((rc == -EINTR) || (rc == -ETIMEDOUT))
896 			goto resend;
897 		return rc;
898 	}
899 
900 	mdc_exit_request(&obddev->u.cli);
901 	mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
902 
903 	if (rc < 0) {
904 		CDEBUG_LIMIT((rc == -EACCES || rc == -EIDRM) ? D_INFO : D_ERROR,
905 			     "%s: ldlm_cli_enqueue failed: rc = %d\n",
906 			     obddev->obd_name, rc);
907 
908 		mdc_clear_replay_flag(req, rc);
909 		ptlrpc_req_finished(req);
910 		return rc;
911 	}
912 
913 	lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
914 	LASSERT(lockrep != NULL);
915 
916 	lockrep->lock_policy_res2 =
917 		ptlrpc_status_ntoh(lockrep->lock_policy_res2);
918 
919 	/* Retry the create infinitely when we get -EINPROGRESS from
920 	 * server. This is required by the new quota design. */
921 	if (it && it->it_op & IT_CREAT &&
922 	    (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
923 		mdc_clear_replay_flag(req, rc);
924 		ptlrpc_req_finished(req);
925 		resends++;
926 
927 		CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
928 		       obddev->obd_name, resends, it->it_op,
929 		       PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
930 
931 		if (generation == obddev->u.cli.cl_import->imp_generation) {
932 			goto resend;
933 		} else {
934 			CDEBUG(D_HA, "resend cross eviction\n");
935 			return -EIO;
936 		}
937 	}
938 
939 	rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
940 	if (rc < 0) {
941 		if (lustre_handle_is_used(lockh)) {
942 			ldlm_lock_decref(lockh, einfo->ei_mode);
943 			memset(lockh, 0, sizeof(*lockh));
944 		}
945 		ptlrpc_req_finished(req);
946 
947 		it->d.lustre.it_lock_handle = 0;
948 		it->d.lustre.it_lock_mode = 0;
949 		it->d.lustre.it_data = NULL;
950 	}
951 
952 	return rc;
953 }
954 
mdc_finish_intent_lock(struct obd_export * exp,struct ptlrpc_request * request,struct md_op_data * op_data,struct lookup_intent * it,struct lustre_handle * lockh)955 static int mdc_finish_intent_lock(struct obd_export *exp,
956 				  struct ptlrpc_request *request,
957 				  struct md_op_data *op_data,
958 				  struct lookup_intent *it,
959 				  struct lustre_handle *lockh)
960 {
961 	struct lustre_handle old_lock;
962 	struct mdt_body *mdt_body;
963 	struct ldlm_lock *lock;
964 	int rc;
965 
966 	LASSERT(request != NULL);
967 	LASSERT(request != LP_POISON);
968 	LASSERT(request->rq_repmsg != LP_POISON);
969 
970 	if (!it_disposition(it, DISP_IT_EXECD)) {
971 		/* The server failed before it even started executing the
972 		 * intent, i.e. because it couldn't unpack the request. */
973 		LASSERT(it->d.lustre.it_status != 0);
974 		return it->d.lustre.it_status;
975 	}
976 	rc = it_open_error(DISP_IT_EXECD, it);
977 	if (rc)
978 		return rc;
979 
980 	mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
981 	LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
982 
983 	/* If we were revalidating a fid/name pair, mark the intent in
984 	 * case we fail and get called again from lookup */
985 	if (fid_is_sane(&op_data->op_fid2) &&
986 	    it->it_create_mode & M_CHECK_STALE &&
987 	    it->it_op != IT_GETATTR) {
988 
989 		/* Also: did we find the same inode? */
990 		/* sever can return one of two fids:
991 		 * op_fid2 - new allocated fid - if file is created.
992 		 * op_fid3 - existent fid - if file only open.
993 		 * op_fid3 is saved in lmv_intent_open */
994 		if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
995 		    (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
996 			CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
997 			       "\n", PFID(&op_data->op_fid2),
998 			       PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
999 			return -ESTALE;
1000 		}
1001 	}
1002 
1003 	rc = it_open_error(DISP_LOOKUP_EXECD, it);
1004 	if (rc)
1005 		return rc;
1006 
1007 	/* keep requests around for the multiple phases of the call
1008 	 * this shows the DISP_XX must guarantee we make it into the call
1009 	 */
1010 	if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1011 	    it_disposition(it, DISP_OPEN_CREATE) &&
1012 	    !it_open_error(DISP_OPEN_CREATE, it)) {
1013 		it_set_disposition(it, DISP_ENQ_CREATE_REF);
1014 		ptlrpc_request_addref(request); /* balanced in ll_create_node */
1015 	}
1016 	if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1017 	    it_disposition(it, DISP_OPEN_OPEN) &&
1018 	    !it_open_error(DISP_OPEN_OPEN, it)) {
1019 		it_set_disposition(it, DISP_ENQ_OPEN_REF);
1020 		ptlrpc_request_addref(request); /* balanced in ll_file_open */
1021 		/* BUG 11546 - eviction in the middle of open rpc processing */
1022 		OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
1023 	}
1024 
1025 	if (it->it_op & IT_CREAT) {
1026 		/* XXX this belongs in ll_create_it */
1027 	} else if (it->it_op == IT_OPEN) {
1028 		LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1029 	} else {
1030 		LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
1031 	}
1032 
1033 	/* If we already have a matching lock, then cancel the new
1034 	 * one.  We have to set the data here instead of in
1035 	 * mdc_enqueue, because we need to use the child's inode as
1036 	 * the l_ast_data to match, and that's not available until
1037 	 * intent_finish has performed the iget().) */
1038 	lock = ldlm_handle2lock(lockh);
1039 	if (lock) {
1040 		ldlm_policy_data_t policy = lock->l_policy_data;
1041 
1042 		LDLM_DEBUG(lock, "matching against this");
1043 
1044 		LASSERTF(fid_res_name_eq(&mdt_body->fid1,
1045 					 &lock->l_resource->lr_name),
1046 			 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1047 			 PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
1048 		LDLM_LOCK_PUT(lock);
1049 
1050 		memcpy(&old_lock, lockh, sizeof(*lockh));
1051 		if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1052 				    LDLM_IBITS, &policy, LCK_NL,
1053 				    &old_lock, 0)) {
1054 			ldlm_lock_decref_and_cancel(lockh,
1055 						    it->d.lustre.it_lock_mode);
1056 			memcpy(lockh, &old_lock, sizeof(old_lock));
1057 			it->d.lustre.it_lock_handle = lockh->cookie;
1058 		}
1059 	}
1060 	CDEBUG(D_DENTRY,
1061 	       "D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1062 	       op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1063 	       it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1064 	return rc;
1065 }
1066 
mdc_revalidate_lock(struct obd_export * exp,struct lookup_intent * it,struct lu_fid * fid,__u64 * bits)1067 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1068 			struct lu_fid *fid, __u64 *bits)
1069 {
1070 	/* We could just return 1 immediately, but since we should only
1071 	 * be called in revalidate_it if we already have a lock, let's
1072 	 * verify that. */
1073 	struct ldlm_res_id res_id;
1074 	struct lustre_handle lockh;
1075 	ldlm_policy_data_t policy;
1076 	ldlm_mode_t mode;
1077 
1078 	if (it->d.lustre.it_lock_handle) {
1079 		lockh.cookie = it->d.lustre.it_lock_handle;
1080 		mode = ldlm_revalidate_lock_handle(&lockh, bits);
1081 	} else {
1082 		fid_build_reg_res_name(fid, &res_id);
1083 		switch (it->it_op) {
1084 		case IT_GETATTR:
1085 			/* File attributes are held under multiple bits:
1086 			 * nlink is under lookup lock, size and times are
1087 			 * under UPDATE lock and recently we've also got
1088 			 * a separate permissions lock for owner/group/acl that
1089 			 * were protected by lookup lock before.
1090 			 * Getattr must provide all of that information,
1091 			 * so we need to ensure we have all of those locks.
1092 			 * Unfortunately, if the bits are split across multiple
1093 			 * locks, there's no easy way to match all of them here,
1094 			 * so an extra RPC would be performed to fetch all
1095 			 * of those bits at once for now. */
1096 			/* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1097 			 * but for old MDTs (< 2.4), permission is covered
1098 			 * by LOOKUP lock, so it needs to match all bits here.*/
1099 			policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1100 						  MDS_INODELOCK_LOOKUP |
1101 						  MDS_INODELOCK_PERM;
1102 			break;
1103 		case IT_LAYOUT:
1104 			policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1105 			break;
1106 		default:
1107 			policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1108 			break;
1109 		}
1110 
1111 		mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1112 				       LDLM_IBITS, &policy,
1113 				      LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1114 				      &lockh);
1115 	}
1116 
1117 	if (mode) {
1118 		it->d.lustre.it_lock_handle = lockh.cookie;
1119 		it->d.lustre.it_lock_mode = mode;
1120 	} else {
1121 		it->d.lustre.it_lock_handle = 0;
1122 		it->d.lustre.it_lock_mode = 0;
1123 	}
1124 
1125 	return !!mode;
1126 }
1127 
1128 /*
1129  * This long block is all about fixing up the lock and request state
1130  * so that it is correct as of the moment _before_ the operation was
1131  * applied; that way, the VFS will think that everything is normal and
1132  * call Lustre's regular VFS methods.
1133  *
1134  * If we're performing a creation, that means that unless the creation
1135  * failed with EEXIST, we should fake up a negative dentry.
1136  *
1137  * For everything else, we want to lookup to succeed.
1138  *
1139  * One additional note: if CREATE or OPEN succeeded, we add an extra
1140  * reference to the request because we need to keep it around until
1141  * ll_create/ll_open gets called.
1142  *
1143  * The server will return to us, in it_disposition, an indication of
1144  * exactly what d.lustre.it_status refers to.
1145  *
1146  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1147  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1148  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1149  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1150  * was successful.
1151  *
1152  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1153  * child lookup.
1154  */
mdc_intent_lock(struct obd_export * exp,struct md_op_data * op_data,void * lmm,int lmmsize,struct lookup_intent * it,int lookup_flags,struct ptlrpc_request ** reqp,ldlm_blocking_callback cb_blocking,__u64 extra_lock_flags)1155 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1156 		    void *lmm, int lmmsize, struct lookup_intent *it,
1157 		    int lookup_flags, struct ptlrpc_request **reqp,
1158 		    ldlm_blocking_callback cb_blocking,
1159 		    __u64 extra_lock_flags)
1160 {
1161 	struct ldlm_enqueue_info einfo = {
1162 		.ei_type	= LDLM_IBITS,
1163 		.ei_mode	= it_to_lock_mode(it),
1164 		.ei_cb_bl	= cb_blocking,
1165 		.ei_cb_cp	= ldlm_completion_ast,
1166 	};
1167 	struct lustre_handle lockh;
1168 	int rc = 0;
1169 
1170 	LASSERT(it);
1171 
1172 	CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1173 		", intent: %s flags %#Lo\n", op_data->op_namelen,
1174 		op_data->op_name, PFID(&op_data->op_fid2),
1175 		PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1176 		it->it_flags);
1177 
1178 	lockh.cookie = 0;
1179 	if (fid_is_sane(&op_data->op_fid2) &&
1180 	    (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1181 		/* We could just return 1 immediately, but since we should only
1182 		 * be called in revalidate_it if we already have a lock, let's
1183 		 * verify that. */
1184 		it->d.lustre.it_lock_handle = 0;
1185 		rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1186 		/* Only return failure if it was not GETATTR by cfid
1187 		   (from inode_revalidate) */
1188 		if (rc || op_data->op_namelen != 0)
1189 			return rc;
1190 	}
1191 
1192 	/* For case if upper layer did not alloc fid, do it now. */
1193 	if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1194 		rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1195 		if (rc < 0) {
1196 			CERROR("Can't alloc new fid, rc %d\n", rc);
1197 			return rc;
1198 		}
1199 	}
1200 	rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh, lmm, lmmsize, NULL,
1201 			 extra_lock_flags);
1202 	if (rc < 0)
1203 		return rc;
1204 
1205 	*reqp = it->d.lustre.it_data;
1206 	rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1207 	return rc;
1208 }
1209 
mdc_intent_getattr_async_interpret(const struct lu_env * env,struct ptlrpc_request * req,void * args,int rc)1210 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1211 					      struct ptlrpc_request *req,
1212 					      void *args, int rc)
1213 {
1214 	struct mdc_getattr_args  *ga = args;
1215 	struct obd_export	*exp = ga->ga_exp;
1216 	struct md_enqueue_info   *minfo = ga->ga_minfo;
1217 	struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1218 	struct lookup_intent     *it;
1219 	struct lustre_handle     *lockh;
1220 	struct obd_device	*obddev;
1221 	struct ldlm_reply	 *lockrep;
1222 	__u64		     flags = LDLM_FL_HAS_INTENT;
1223 
1224 	it    = &minfo->mi_it;
1225 	lockh = &minfo->mi_lockh;
1226 
1227 	obddev = class_exp2obd(exp);
1228 
1229 	mdc_exit_request(&obddev->u.cli);
1230 	if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1231 		rc = -ETIMEDOUT;
1232 
1233 	rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1234 				   &flags, NULL, 0, lockh, rc);
1235 	if (rc < 0) {
1236 		CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1237 		mdc_clear_replay_flag(req, rc);
1238 		goto out;
1239 	}
1240 
1241 	lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1242 	LASSERT(lockrep != NULL);
1243 
1244 	lockrep->lock_policy_res2 =
1245 		ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1246 
1247 	rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1248 	if (rc)
1249 		goto out;
1250 
1251 	rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1252 
1253 out:
1254 	OBD_FREE_PTR(einfo);
1255 	minfo->mi_cb(req, minfo, rc);
1256 	return 0;
1257 }
1258 
mdc_intent_getattr_async(struct obd_export * exp,struct md_enqueue_info * minfo,struct ldlm_enqueue_info * einfo)1259 int mdc_intent_getattr_async(struct obd_export *exp,
1260 			     struct md_enqueue_info *minfo,
1261 			     struct ldlm_enqueue_info *einfo)
1262 {
1263 	struct md_op_data       *op_data = &minfo->mi_data;
1264 	struct lookup_intent    *it = &minfo->mi_it;
1265 	struct ptlrpc_request   *req;
1266 	struct mdc_getattr_args *ga;
1267 	struct obd_device       *obddev = class_exp2obd(exp);
1268 	struct ldlm_res_id       res_id;
1269 	/*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1270 	 *     for statahead currently. Consider CMD in future, such two bits
1271 	 *     maybe managed by different MDS, should be adjusted then. */
1272 	ldlm_policy_data_t       policy = {
1273 					.l_inodebits = { MDS_INODELOCK_LOOKUP |
1274 							 MDS_INODELOCK_UPDATE }
1275 				 };
1276 	int		      rc = 0;
1277 	__u64		    flags = LDLM_FL_HAS_INTENT;
1278 
1279 	CDEBUG(D_DLMTRACE,
1280 		"name: %.*s in inode "DFID", intent: %s flags %#Lo\n",
1281 		op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1282 		ldlm_it2str(it->it_op), it->it_flags);
1283 
1284 	fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1285 	req = mdc_intent_getattr_pack(exp, it, op_data);
1286 	if (IS_ERR(req))
1287 		return PTR_ERR(req);
1288 
1289 	rc = mdc_enter_request(&obddev->u.cli);
1290 	if (rc != 0) {
1291 		ptlrpc_req_finished(req);
1292 		return rc;
1293 	}
1294 
1295 	rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1296 			      0, LVB_T_NONE, &minfo->mi_lockh, 1);
1297 	if (rc < 0) {
1298 		mdc_exit_request(&obddev->u.cli);
1299 		ptlrpc_req_finished(req);
1300 		return rc;
1301 	}
1302 
1303 	CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1304 	ga = ptlrpc_req_async_args(req);
1305 	ga->ga_exp = exp;
1306 	ga->ga_minfo = minfo;
1307 	ga->ga_einfo = einfo;
1308 
1309 	req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1310 	ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1311 
1312 	return 0;
1313 }
1314