1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36 
37 #define DEBUG_SUBSYSTEM S_MDC
38 
39 # include <linux/module.h>
40 
41 #include "../include/lustre_intent.h"
42 #include "../include/obd.h"
43 #include "../include/obd_class.h"
44 #include "../include/lustre_dlm.h"
45 #include "../include/lustre_fid.h"	/* fid_res_name_eq() */
46 #include "../include/lustre_mdc.h"
47 #include "../include/lustre_net.h"
48 #include "../include/lustre_req_layout.h"
49 #include "mdc_internal.h"
50 
51 struct mdc_getattr_args {
52 	struct obd_export	   *ga_exp;
53 	struct md_enqueue_info      *ga_minfo;
54 	struct ldlm_enqueue_info    *ga_einfo;
55 };
56 
it_disposition(struct lookup_intent * it,int flag)57 int it_disposition(struct lookup_intent *it, int flag)
58 {
59 	return it->d.lustre.it_disposition & flag;
60 }
61 EXPORT_SYMBOL(it_disposition);
62 
it_set_disposition(struct lookup_intent * it,int flag)63 void it_set_disposition(struct lookup_intent *it, int flag)
64 {
65 	it->d.lustre.it_disposition |= flag;
66 }
67 EXPORT_SYMBOL(it_set_disposition);
68 
it_clear_disposition(struct lookup_intent * it,int flag)69 void it_clear_disposition(struct lookup_intent *it, int flag)
70 {
71 	it->d.lustre.it_disposition &= ~flag;
72 }
73 EXPORT_SYMBOL(it_clear_disposition);
74 
it_open_error(int phase,struct lookup_intent * it)75 int it_open_error(int phase, struct lookup_intent *it)
76 {
77 	if (it_disposition(it, DISP_OPEN_LEASE)) {
78 		if (phase >= DISP_OPEN_LEASE)
79 			return it->d.lustre.it_status;
80 		else
81 			return 0;
82 	}
83 	if (it_disposition(it, DISP_OPEN_OPEN)) {
84 		if (phase >= DISP_OPEN_OPEN)
85 			return it->d.lustre.it_status;
86 		else
87 			return 0;
88 	}
89 
90 	if (it_disposition(it, DISP_OPEN_CREATE)) {
91 		if (phase >= DISP_OPEN_CREATE)
92 			return it->d.lustre.it_status;
93 		else
94 			return 0;
95 	}
96 
97 	if (it_disposition(it, DISP_LOOKUP_EXECD)) {
98 		if (phase >= DISP_LOOKUP_EXECD)
99 			return it->d.lustre.it_status;
100 		else
101 			return 0;
102 	}
103 
104 	if (it_disposition(it, DISP_IT_EXECD)) {
105 		if (phase >= DISP_IT_EXECD)
106 			return it->d.lustre.it_status;
107 		else
108 			return 0;
109 	}
110 	CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
111 	       it->d.lustre.it_status);
112 	LBUG();
113 	return 0;
114 }
115 EXPORT_SYMBOL(it_open_error);
116 
117 /* this must be called on a lockh that is known to have a referenced lock */
mdc_set_lock_data(struct obd_export * exp,__u64 * lockh,void * data,__u64 * bits)118 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
119 		      __u64 *bits)
120 {
121 	struct ldlm_lock *lock;
122 	struct inode *new_inode = data;
123 
124 	if (bits)
125 		*bits = 0;
126 
127 	if (!*lockh)
128 		return 0;
129 
130 	lock = ldlm_handle2lock((struct lustre_handle *)lockh);
131 
132 	LASSERT(lock != NULL);
133 	lock_res_and_lock(lock);
134 	if (lock->l_resource->lr_lvb_inode &&
135 	    lock->l_resource->lr_lvb_inode != data) {
136 		struct inode *old_inode = lock->l_resource->lr_lvb_inode;
137 
138 		LASSERTF(old_inode->i_state & I_FREEING,
139 			 "Found existing inode %p/%lu/%u state %lu in lock: setting data to %p/%lu/%u\n",
140 			 old_inode, old_inode->i_ino, old_inode->i_generation,
141 			 old_inode->i_state, new_inode, new_inode->i_ino,
142 			 new_inode->i_generation);
143 	}
144 	lock->l_resource->lr_lvb_inode = new_inode;
145 	if (bits)
146 		*bits = lock->l_policy_data.l_inodebits.bits;
147 
148 	unlock_res_and_lock(lock);
149 	LDLM_LOCK_PUT(lock);
150 
151 	return 0;
152 }
153 
mdc_lock_match(struct obd_export * exp,__u64 flags,const struct lu_fid * fid,ldlm_type_t type,ldlm_policy_data_t * policy,ldlm_mode_t mode,struct lustre_handle * lockh)154 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
155 			   const struct lu_fid *fid, ldlm_type_t type,
156 			   ldlm_policy_data_t *policy, ldlm_mode_t mode,
157 			   struct lustre_handle *lockh)
158 {
159 	struct ldlm_res_id res_id;
160 	ldlm_mode_t rc;
161 
162 	fid_build_reg_res_name(fid, &res_id);
163 	/* LU-4405: Clear bits not supported by server */
164 	policy->l_inodebits.bits &= exp_connect_ibits(exp);
165 	rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
166 			     &res_id, type, policy, mode, lockh, 0);
167 	return rc;
168 }
169 
mdc_cancel_unused(struct obd_export * exp,const struct lu_fid * fid,ldlm_policy_data_t * policy,ldlm_mode_t mode,ldlm_cancel_flags_t flags,void * opaque)170 int mdc_cancel_unused(struct obd_export *exp,
171 		      const struct lu_fid *fid,
172 		      ldlm_policy_data_t *policy,
173 		      ldlm_mode_t mode,
174 		      ldlm_cancel_flags_t flags,
175 		      void *opaque)
176 {
177 	struct ldlm_res_id res_id;
178 	struct obd_device *obd = class_exp2obd(exp);
179 	int rc;
180 
181 	fid_build_reg_res_name(fid, &res_id);
182 	rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
183 					     policy, mode, flags, opaque);
184 	return rc;
185 }
186 
mdc_null_inode(struct obd_export * exp,const struct lu_fid * fid)187 int mdc_null_inode(struct obd_export *exp,
188 		   const struct lu_fid *fid)
189 {
190 	struct ldlm_res_id res_id;
191 	struct ldlm_resource *res;
192 	struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
193 
194 	LASSERTF(ns != NULL, "no namespace passed\n");
195 
196 	fid_build_reg_res_name(fid, &res_id);
197 
198 	res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
199 	if (res == NULL)
200 		return 0;
201 
202 	lock_res(res);
203 	res->lr_lvb_inode = NULL;
204 	unlock_res(res);
205 
206 	ldlm_resource_putref(res);
207 	return 0;
208 }
209 
210 /* find any ldlm lock of the inode in mdc
211  * return 0    not find
212  *	1    find one
213  *      < 0    error */
mdc_find_cbdata(struct obd_export * exp,const struct lu_fid * fid,ldlm_iterator_t it,void * data)214 int mdc_find_cbdata(struct obd_export *exp,
215 		    const struct lu_fid *fid,
216 		    ldlm_iterator_t it, void *data)
217 {
218 	struct ldlm_res_id res_id;
219 	int rc = 0;
220 
221 	fid_build_reg_res_name((struct lu_fid *)fid, &res_id);
222 	rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
223 				   it, data);
224 	if (rc == LDLM_ITER_STOP)
225 		return 1;
226 	else if (rc == LDLM_ITER_CONTINUE)
227 		return 0;
228 	return rc;
229 }
230 
mdc_clear_replay_flag(struct ptlrpc_request * req,int rc)231 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
232 {
233 	/* Don't hold error requests for replay. */
234 	if (req->rq_replay) {
235 		spin_lock(&req->rq_lock);
236 		req->rq_replay = 0;
237 		spin_unlock(&req->rq_lock);
238 	}
239 	if (rc && req->rq_transno != 0) {
240 		DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
241 		LBUG();
242 	}
243 }
244 
245 /* Save a large LOV EA into the request buffer so that it is available
246  * for replay.  We don't do this in the initial request because the
247  * original request doesn't need this buffer (at most it sends just the
248  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
249  * buffer and may also be difficult to allocate and save a very large
250  * request buffer for each open. (bug 5707)
251  *
252  * OOM here may cause recovery failure if lmm is needed (only for the
253  * original open if the MDS crashed just when this client also OOM'd)
254  * but this is incredibly unlikely, and questionable whether the client
255  * could do MDS recovery under OOM anyways... */
mdc_realloc_openmsg(struct ptlrpc_request * req,struct mdt_body * body)256 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
257 				struct mdt_body *body)
258 {
259 	int     rc;
260 
261 	/* FIXME: remove this explicit offset. */
262 	rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
263 					body->eadatasize);
264 	if (rc) {
265 		CERROR("Can't enlarge segment %d size to %d\n",
266 		       DLM_INTENT_REC_OFF + 4, body->eadatasize);
267 		body->valid &= ~OBD_MD_FLEASIZE;
268 		body->eadatasize = 0;
269 	}
270 }
271 
mdc_intent_open_pack(struct obd_export * exp,struct lookup_intent * it,struct md_op_data * op_data,void * lmm,int lmmsize,void * cb_data)272 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
273 						   struct lookup_intent *it,
274 						   struct md_op_data *op_data,
275 						   void *lmm, int lmmsize,
276 						   void *cb_data)
277 {
278 	struct ptlrpc_request *req;
279 	struct obd_device     *obddev = class_exp2obd(exp);
280 	struct ldlm_intent    *lit;
281 	LIST_HEAD(cancels);
282 	int		    count = 0;
283 	int		    mode;
284 	int		    rc;
285 
286 	it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
287 
288 	/* XXX: openlock is not cancelled for cross-refs. */
289 	/* If inode is known, cancel conflicting OPEN locks. */
290 	if (fid_is_sane(&op_data->op_fid2)) {
291 		if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
292 			if (it->it_flags & FMODE_WRITE)
293 				mode = LCK_EX;
294 			else
295 				mode = LCK_PR;
296 		} else {
297 			if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
298 				mode = LCK_CW;
299 			else if (it->it_flags & __FMODE_EXEC)
300 				mode = LCK_PR;
301 			else
302 				mode = LCK_CR;
303 		}
304 		count = mdc_resource_get_unused(exp, &op_data->op_fid2,
305 						&cancels, mode,
306 						MDS_INODELOCK_OPEN);
307 	}
308 
309 	/* If CREATE, cancel parent's UPDATE lock. */
310 	if (it->it_op & IT_CREAT)
311 		mode = LCK_EX;
312 	else
313 		mode = LCK_CR;
314 	count += mdc_resource_get_unused(exp, &op_data->op_fid1,
315 					 &cancels, mode,
316 					 MDS_INODELOCK_UPDATE);
317 
318 	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
319 				   &RQF_LDLM_INTENT_OPEN);
320 	if (req == NULL) {
321 		ldlm_lock_list_put(&cancels, l_bl_ast, count);
322 		return ERR_PTR(-ENOMEM);
323 	}
324 
325 	req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
326 			     op_data->op_namelen + 1);
327 	req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
328 			     max(lmmsize, obddev->u.cli.cl_default_mds_easize));
329 
330 	rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
331 	if (rc < 0) {
332 		ptlrpc_request_free(req);
333 		return ERR_PTR(rc);
334 	}
335 
336 	spin_lock(&req->rq_lock);
337 	req->rq_replay = req->rq_import->imp_replayable;
338 	spin_unlock(&req->rq_lock);
339 
340 	/* pack the intent */
341 	lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
342 	lit->opc = (__u64)it->it_op;
343 
344 	/* pack the intended request */
345 	mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
346 		      lmmsize);
347 
348 	/* for remote client, fetch remote perm for current user */
349 	if (client_is_remote(exp))
350 		req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
351 				     sizeof(struct mdt_remote_perm));
352 	ptlrpc_request_set_replen(req);
353 	return req;
354 }
355 
356 static struct ptlrpc_request *
mdc_intent_getxattr_pack(struct obd_export * exp,struct lookup_intent * it,struct md_op_data * op_data)357 mdc_intent_getxattr_pack(struct obd_export *exp,
358 			 struct lookup_intent *it,
359 			 struct md_op_data *op_data)
360 {
361 	struct ptlrpc_request	*req;
362 	struct ldlm_intent	*lit;
363 	int			rc, count = 0, maxdata;
364 	LIST_HEAD(cancels);
365 
366 	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
367 					&RQF_LDLM_INTENT_GETXATTR);
368 	if (req == NULL)
369 		return ERR_PTR(-ENOMEM);
370 
371 	rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
372 	if (rc) {
373 		ptlrpc_request_free(req);
374 		return ERR_PTR(rc);
375 	}
376 
377 	/* pack the intent */
378 	lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
379 	lit->opc = IT_GETXATTR;
380 
381 	maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
382 
383 	/* pack the intended request */
384 	mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1,
385 		      0);
386 
387 	req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
388 				RCL_SERVER, maxdata);
389 
390 	req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
391 				RCL_SERVER, maxdata);
392 
393 	req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
394 				RCL_SERVER, maxdata);
395 
396 	ptlrpc_request_set_replen(req);
397 
398 	return req;
399 }
400 
mdc_intent_unlink_pack(struct obd_export * exp,struct lookup_intent * it,struct md_op_data * op_data)401 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
402 						     struct lookup_intent *it,
403 						     struct md_op_data *op_data)
404 {
405 	struct ptlrpc_request *req;
406 	struct obd_device     *obddev = class_exp2obd(exp);
407 	struct ldlm_intent    *lit;
408 	int		    rc;
409 
410 	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
411 				   &RQF_LDLM_INTENT_UNLINK);
412 	if (req == NULL)
413 		return ERR_PTR(-ENOMEM);
414 
415 	req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
416 			     op_data->op_namelen + 1);
417 
418 	rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
419 	if (rc) {
420 		ptlrpc_request_free(req);
421 		return ERR_PTR(rc);
422 	}
423 
424 	/* pack the intent */
425 	lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
426 	lit->opc = (__u64)it->it_op;
427 
428 	/* pack the intended request */
429 	mdc_unlink_pack(req, op_data);
430 
431 	req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
432 			     obddev->u.cli.cl_default_mds_easize);
433 	req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
434 			     obddev->u.cli.cl_default_mds_cookiesize);
435 	ptlrpc_request_set_replen(req);
436 	return req;
437 }
438 
mdc_intent_getattr_pack(struct obd_export * exp,struct lookup_intent * it,struct md_op_data * op_data)439 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
440 						    struct lookup_intent *it,
441 						    struct md_op_data *op_data)
442 {
443 	struct ptlrpc_request *req;
444 	struct obd_device     *obddev = class_exp2obd(exp);
445 	u64		       valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
446 				       OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
447 				       OBD_MD_MEA |
448 				       (client_is_remote(exp) ?
449 					       OBD_MD_FLRMTPERM : OBD_MD_FLACL);
450 	struct ldlm_intent    *lit;
451 	int		    rc;
452 	int		    easize;
453 
454 	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
455 				   &RQF_LDLM_INTENT_GETATTR);
456 	if (req == NULL)
457 		return ERR_PTR(-ENOMEM);
458 
459 	req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
460 			     op_data->op_namelen + 1);
461 
462 	rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
463 	if (rc) {
464 		ptlrpc_request_free(req);
465 		return ERR_PTR(rc);
466 	}
467 
468 	/* pack the intent */
469 	lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
470 	lit->opc = (__u64)it->it_op;
471 
472 	if (obddev->u.cli.cl_default_mds_easize > 0)
473 		easize = obddev->u.cli.cl_default_mds_easize;
474 	else
475 		easize = obddev->u.cli.cl_max_mds_easize;
476 
477 	/* pack the intended request */
478 	mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
479 
480 	req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
481 	if (client_is_remote(exp))
482 		req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
483 				     sizeof(struct mdt_remote_perm));
484 	ptlrpc_request_set_replen(req);
485 	return req;
486 }
487 
mdc_intent_layout_pack(struct obd_export * exp,struct lookup_intent * it,struct md_op_data * unused)488 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
489 						     struct lookup_intent *it,
490 						     struct md_op_data *unused)
491 {
492 	struct obd_device     *obd = class_exp2obd(exp);
493 	struct ptlrpc_request *req;
494 	struct ldlm_intent    *lit;
495 	struct layout_intent  *layout;
496 	int rc;
497 
498 	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
499 				&RQF_LDLM_INTENT_LAYOUT);
500 	if (req == NULL)
501 		return ERR_PTR(-ENOMEM);
502 
503 	req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
504 	rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
505 	if (rc) {
506 		ptlrpc_request_free(req);
507 		return ERR_PTR(rc);
508 	}
509 
510 	/* pack the intent */
511 	lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
512 	lit->opc = (__u64)it->it_op;
513 
514 	/* pack the layout intent request */
515 	layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
516 	/* LAYOUT_INTENT_ACCESS is generic, specific operation will be
517 	 * set for replication */
518 	layout->li_opc = LAYOUT_INTENT_ACCESS;
519 
520 	req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
521 			     obd->u.cli.cl_default_mds_easize);
522 	ptlrpc_request_set_replen(req);
523 	return req;
524 }
525 
526 static struct ptlrpc_request *
mdc_enqueue_pack(struct obd_export * exp,int lvb_len)527 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
528 {
529 	struct ptlrpc_request *req;
530 	int rc;
531 
532 	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
533 	if (req == NULL)
534 		return ERR_PTR(-ENOMEM);
535 
536 	rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
537 	if (rc) {
538 		ptlrpc_request_free(req);
539 		return ERR_PTR(rc);
540 	}
541 
542 	req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
543 	ptlrpc_request_set_replen(req);
544 	return req;
545 }
546 
mdc_finish_enqueue(struct obd_export * exp,struct ptlrpc_request * req,struct ldlm_enqueue_info * einfo,struct lookup_intent * it,struct lustre_handle * lockh,int rc)547 static int mdc_finish_enqueue(struct obd_export *exp,
548 			      struct ptlrpc_request *req,
549 			      struct ldlm_enqueue_info *einfo,
550 			      struct lookup_intent *it,
551 			      struct lustre_handle *lockh,
552 			      int rc)
553 {
554 	struct req_capsule  *pill = &req->rq_pill;
555 	struct ldlm_request *lockreq;
556 	struct ldlm_reply   *lockrep;
557 	struct lustre_intent_data *intent = &it->d.lustre;
558 	struct ldlm_lock    *lock;
559 	void		*lvb_data = NULL;
560 	int		  lvb_len = 0;
561 
562 	LASSERT(rc >= 0);
563 	/* Similarly, if we're going to replay this request, we don't want to
564 	 * actually get a lock, just perform the intent. */
565 	if (req->rq_transno || req->rq_replay) {
566 		lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
567 		lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
568 	}
569 
570 	if (rc == ELDLM_LOCK_ABORTED) {
571 		einfo->ei_mode = 0;
572 		memset(lockh, 0, sizeof(*lockh));
573 		rc = 0;
574 	} else { /* rc = 0 */
575 		lock = ldlm_handle2lock(lockh);
576 		LASSERT(lock != NULL);
577 
578 		/* If the server gave us back a different lock mode, we should
579 		 * fix up our variables. */
580 		if (lock->l_req_mode != einfo->ei_mode) {
581 			ldlm_lock_addref(lockh, lock->l_req_mode);
582 			ldlm_lock_decref(lockh, einfo->ei_mode);
583 			einfo->ei_mode = lock->l_req_mode;
584 		}
585 		LDLM_LOCK_PUT(lock);
586 	}
587 
588 	lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
589 	LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
590 
591 	intent->it_disposition = (int)lockrep->lock_policy_res1;
592 	intent->it_status = (int)lockrep->lock_policy_res2;
593 	intent->it_lock_mode = einfo->ei_mode;
594 	intent->it_lock_handle = lockh->cookie;
595 	intent->it_data = req;
596 
597 	/* Technically speaking rq_transno must already be zero if
598 	 * it_status is in error, so the check is a bit redundant */
599 	if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
600 		mdc_clear_replay_flag(req, intent->it_status);
601 
602 	/* If we're doing an IT_OPEN which did not result in an actual
603 	 * successful open, then we need to remove the bit which saves
604 	 * this request for unconditional replay.
605 	 *
606 	 * It's important that we do this first!  Otherwise we might exit the
607 	 * function without doing so, and try to replay a failed create
608 	 * (bug 3440) */
609 	if (it->it_op & IT_OPEN && req->rq_replay &&
610 	    (!it_disposition(it, DISP_OPEN_OPEN) || intent->it_status != 0))
611 		mdc_clear_replay_flag(req, intent->it_status);
612 
613 	DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
614 		  it->it_op, intent->it_disposition, intent->it_status);
615 
616 	/* We know what to expect, so we do any byte flipping required here */
617 	if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
618 		struct mdt_body *body;
619 
620 		body = req_capsule_server_get(pill, &RMF_MDT_BODY);
621 		if (body == NULL) {
622 			CERROR("Can't swab mdt_body\n");
623 			return -EPROTO;
624 		}
625 
626 		if (it_disposition(it, DISP_OPEN_OPEN) &&
627 		    !it_open_error(DISP_OPEN_OPEN, it)) {
628 			/*
629 			 * If this is a successful OPEN request, we need to set
630 			 * replay handler and data early, so that if replay
631 			 * happens immediately after swabbing below, new reply
632 			 * is swabbed by that handler correctly.
633 			 */
634 			mdc_set_open_replay_data(NULL, NULL, it);
635 		}
636 
637 		if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
638 			void *eadata;
639 
640 			mdc_update_max_ea_from_body(exp, body);
641 
642 			/*
643 			 * The eadata is opaque; just check that it is there.
644 			 * Eventually, obd_unpackmd() will check the contents.
645 			 */
646 			eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
647 							      body->eadatasize);
648 			if (eadata == NULL)
649 				return -EPROTO;
650 
651 			/* save lvb data and length in case this is for layout
652 			 * lock */
653 			lvb_data = eadata;
654 			lvb_len = body->eadatasize;
655 
656 			/*
657 			 * We save the reply LOV EA in case we have to replay a
658 			 * create for recovery.  If we didn't allocate a large
659 			 * enough request buffer above we need to reallocate it
660 			 * here to hold the actual LOV EA.
661 			 *
662 			 * To not save LOV EA if request is not going to replay
663 			 * (for example error one).
664 			 */
665 			if ((it->it_op & IT_OPEN) && req->rq_replay) {
666 				void *lmm;
667 
668 				if (req_capsule_get_size(pill, &RMF_EADATA,
669 							 RCL_CLIENT) <
670 				    body->eadatasize)
671 					mdc_realloc_openmsg(req, body);
672 				else
673 					req_capsule_shrink(pill, &RMF_EADATA,
674 							   body->eadatasize,
675 							   RCL_CLIENT);
676 
677 				req_capsule_set_size(pill, &RMF_EADATA,
678 						     RCL_CLIENT,
679 						     body->eadatasize);
680 
681 				lmm = req_capsule_client_get(pill, &RMF_EADATA);
682 				if (lmm)
683 					memcpy(lmm, eadata, body->eadatasize);
684 			}
685 		}
686 
687 		if (body->valid & OBD_MD_FLRMTPERM) {
688 			struct mdt_remote_perm *perm;
689 
690 			LASSERT(client_is_remote(exp));
691 			perm = req_capsule_server_swab_get(pill, &RMF_ACL,
692 						lustre_swab_mdt_remote_perm);
693 			if (perm == NULL)
694 				return -EPROTO;
695 		}
696 	} else if (it->it_op & IT_LAYOUT) {
697 		/* maybe the lock was granted right away and layout
698 		 * is packed into RMF_DLM_LVB of req */
699 		lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
700 		if (lvb_len > 0) {
701 			lvb_data = req_capsule_server_sized_get(pill,
702 							&RMF_DLM_LVB, lvb_len);
703 			if (lvb_data == NULL)
704 				return -EPROTO;
705 		}
706 	}
707 
708 	/* fill in stripe data for layout lock */
709 	lock = ldlm_handle2lock(lockh);
710 	if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
711 		void *lmm;
712 
713 		LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
714 			ldlm_it2str(it->it_op), lvb_len);
715 
716 		lmm = libcfs_kvzalloc(lvb_len, GFP_NOFS);
717 		if (lmm == NULL) {
718 			LDLM_LOCK_PUT(lock);
719 			return -ENOMEM;
720 		}
721 		memcpy(lmm, lvb_data, lvb_len);
722 
723 		/* install lvb_data */
724 		lock_res_and_lock(lock);
725 		if (lock->l_lvb_data == NULL) {
726 			lock->l_lvb_type = LVB_T_LAYOUT;
727 			lock->l_lvb_data = lmm;
728 			lock->l_lvb_len = lvb_len;
729 			lmm = NULL;
730 		}
731 		unlock_res_and_lock(lock);
732 		if (lmm != NULL)
733 			kvfree(lmm);
734 	}
735 	if (lock != NULL)
736 		LDLM_LOCK_PUT(lock);
737 
738 	return rc;
739 }
740 
741 /* We always reserve enough space in the reply packet for a stripe MD, because
742  * we don't know in advance the file type. */
mdc_enqueue(struct obd_export * exp,struct ldlm_enqueue_info * einfo,struct lookup_intent * it,struct md_op_data * op_data,struct lustre_handle * lockh,void * lmm,int lmmsize,struct ptlrpc_request ** reqp,u64 extra_lock_flags)743 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
744 		struct lookup_intent *it, struct md_op_data *op_data,
745 		struct lustre_handle *lockh, void *lmm, int lmmsize,
746 		struct ptlrpc_request **reqp, u64 extra_lock_flags)
747 {
748 	static const ldlm_policy_data_t lookup_policy = {
749 		.l_inodebits = { MDS_INODELOCK_LOOKUP }
750 	};
751 	static const ldlm_policy_data_t update_policy = {
752 		.l_inodebits = { MDS_INODELOCK_UPDATE }
753 	};
754 	static const ldlm_policy_data_t layout_policy = {
755 		.l_inodebits = { MDS_INODELOCK_LAYOUT }
756 	};
757 	static const ldlm_policy_data_t getxattr_policy = {
758 		.l_inodebits = { MDS_INODELOCK_XATTR }
759 	};
760 	ldlm_policy_data_t const *policy = &lookup_policy;
761 	struct obd_device *obddev = class_exp2obd(exp);
762 	struct ptlrpc_request *req;
763 	u64 flags, saved_flags = extra_lock_flags;
764 	struct ldlm_res_id res_id;
765 	int generation, resends = 0;
766 	struct ldlm_reply *lockrep;
767 	enum lvb_type lvb_type = LVB_T_NONE;
768 	int rc;
769 
770 	LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
771 		 einfo->ei_type);
772 
773 	fid_build_reg_res_name(&op_data->op_fid1, &res_id);
774 
775 	if (it) {
776 		saved_flags |= LDLM_FL_HAS_INTENT;
777 		if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
778 			policy = &update_policy;
779 		else if (it->it_op & IT_LAYOUT)
780 			policy = &layout_policy;
781 		else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
782 			policy = &getxattr_policy;
783 	}
784 
785 	LASSERT(reqp == NULL);
786 
787 	generation = obddev->u.cli.cl_import->imp_generation;
788 resend:
789 	flags = saved_flags;
790 	if (!it) {
791 		/* The only way right now is FLOCK, in this case we hide flock
792 		   policy as lmm, but lmmsize is 0 */
793 		LASSERT(lmm && lmmsize == 0);
794 		LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
795 			 einfo->ei_type);
796 		policy = lmm;
797 		res_id.name[3] = LDLM_FLOCK;
798 		req = NULL;
799 	} else if (it->it_op & IT_OPEN) {
800 		req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
801 					   einfo->ei_cbdata);
802 		policy = &update_policy;
803 		einfo->ei_cbdata = NULL;
804 		lmm = NULL;
805 	} else if (it->it_op & IT_UNLINK) {
806 		req = mdc_intent_unlink_pack(exp, it, op_data);
807 	} else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
808 		req = mdc_intent_getattr_pack(exp, it, op_data);
809 	} else if (it->it_op & IT_READDIR) {
810 		req = mdc_enqueue_pack(exp, 0);
811 	} else if (it->it_op & IT_LAYOUT) {
812 		if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
813 			return -EOPNOTSUPP;
814 		req = mdc_intent_layout_pack(exp, it, op_data);
815 		lvb_type = LVB_T_LAYOUT;
816 	} else if (it->it_op & IT_GETXATTR) {
817 		req = mdc_intent_getxattr_pack(exp, it, op_data);
818 	} else {
819 		LBUG();
820 		return -EINVAL;
821 	}
822 
823 	if (IS_ERR(req))
824 		return PTR_ERR(req);
825 
826 	if (req != NULL && it && it->it_op & IT_CREAT)
827 		/* ask ptlrpc not to resend on EINPROGRESS since we have our own
828 		 * retry logic */
829 		req->rq_no_retry_einprogress = 1;
830 
831 	if (resends) {
832 		req->rq_generation_set = 1;
833 		req->rq_import_generation = generation;
834 		req->rq_sent = ktime_get_real_seconds() + resends;
835 	}
836 
837 	/* It is important to obtain rpc_lock first (if applicable), so that
838 	 * threads that are serialised with rpc_lock are not polluting our
839 	 * rpcs in flight counter. We do not do flock request limiting, though*/
840 	if (it) {
841 		mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
842 		rc = mdc_enter_request(&obddev->u.cli);
843 		if (rc != 0) {
844 			mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
845 			mdc_clear_replay_flag(req, 0);
846 			ptlrpc_req_finished(req);
847 			return rc;
848 		}
849 	}
850 
851 	rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
852 			      0, lvb_type, lockh, 0);
853 	if (!it) {
854 		/* For flock requests we immediately return without further
855 		   delay and let caller deal with the rest, since rest of
856 		   this function metadata processing makes no sense for flock
857 		   requests anyway. But in case of problem during comms with
858 		   Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
859 		   can not rely on caller and this mainly for F_UNLCKs
860 		   (explicits or automatically generated by Kernel to clean
861 		   current FLocks upon exit) that can't be trashed */
862 		if ((rc == -EINTR) || (rc == -ETIMEDOUT))
863 			goto resend;
864 		return rc;
865 	}
866 
867 	mdc_exit_request(&obddev->u.cli);
868 	mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
869 
870 	if (rc < 0) {
871 		CDEBUG_LIMIT((rc == -EACCES || rc == -EIDRM) ? D_INFO : D_ERROR,
872 			     "%s: ldlm_cli_enqueue failed: rc = %d\n",
873 			     obddev->obd_name, rc);
874 
875 		mdc_clear_replay_flag(req, rc);
876 		ptlrpc_req_finished(req);
877 		return rc;
878 	}
879 
880 	lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
881 	LASSERT(lockrep != NULL);
882 
883 	lockrep->lock_policy_res2 =
884 		ptlrpc_status_ntoh(lockrep->lock_policy_res2);
885 
886 	/* Retry the create infinitely when we get -EINPROGRESS from
887 	 * server. This is required by the new quota design. */
888 	if (it->it_op & IT_CREAT &&
889 	    (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
890 		mdc_clear_replay_flag(req, rc);
891 		ptlrpc_req_finished(req);
892 		resends++;
893 
894 		CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
895 		       obddev->obd_name, resends, it->it_op,
896 		       PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
897 
898 		if (generation == obddev->u.cli.cl_import->imp_generation) {
899 			goto resend;
900 		} else {
901 			CDEBUG(D_HA, "resend cross eviction\n");
902 			return -EIO;
903 		}
904 	}
905 
906 	rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
907 	if (rc < 0) {
908 		if (lustre_handle_is_used(lockh)) {
909 			ldlm_lock_decref(lockh, einfo->ei_mode);
910 			memset(lockh, 0, sizeof(*lockh));
911 		}
912 		ptlrpc_req_finished(req);
913 
914 		it->d.lustre.it_lock_handle = 0;
915 		it->d.lustre.it_lock_mode = 0;
916 		it->d.lustre.it_data = NULL;
917 	}
918 
919 	return rc;
920 }
921 
mdc_finish_intent_lock(struct obd_export * exp,struct ptlrpc_request * request,struct md_op_data * op_data,struct lookup_intent * it,struct lustre_handle * lockh)922 static int mdc_finish_intent_lock(struct obd_export *exp,
923 				  struct ptlrpc_request *request,
924 				  struct md_op_data *op_data,
925 				  struct lookup_intent *it,
926 				  struct lustre_handle *lockh)
927 {
928 	struct lustre_handle old_lock;
929 	struct mdt_body *mdt_body;
930 	struct ldlm_lock *lock;
931 	int rc;
932 
933 	LASSERT(request != NULL);
934 	LASSERT(request != LP_POISON);
935 	LASSERT(request->rq_repmsg != LP_POISON);
936 
937 	if (!it_disposition(it, DISP_IT_EXECD)) {
938 		/* The server failed before it even started executing the
939 		 * intent, i.e. because it couldn't unpack the request. */
940 		LASSERT(it->d.lustre.it_status != 0);
941 		return it->d.lustre.it_status;
942 	}
943 	rc = it_open_error(DISP_IT_EXECD, it);
944 	if (rc)
945 		return rc;
946 
947 	mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
948 	LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
949 
950 	/* If we were revalidating a fid/name pair, mark the intent in
951 	 * case we fail and get called again from lookup */
952 	if (fid_is_sane(&op_data->op_fid2) &&
953 	    it->it_create_mode & M_CHECK_STALE &&
954 	    it->it_op != IT_GETATTR) {
955 
956 		/* Also: did we find the same inode? */
957 		/* sever can return one of two fids:
958 		 * op_fid2 - new allocated fid - if file is created.
959 		 * op_fid3 - existent fid - if file only open.
960 		 * op_fid3 is saved in lmv_intent_open */
961 		if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
962 		    (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
963 			CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
964 			       "\n", PFID(&op_data->op_fid2),
965 			       PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
966 			return -ESTALE;
967 		}
968 	}
969 
970 	rc = it_open_error(DISP_LOOKUP_EXECD, it);
971 	if (rc)
972 		return rc;
973 
974 	/* keep requests around for the multiple phases of the call
975 	 * this shows the DISP_XX must guarantee we make it into the call
976 	 */
977 	if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
978 	    it_disposition(it, DISP_OPEN_CREATE) &&
979 	    !it_open_error(DISP_OPEN_CREATE, it)) {
980 		it_set_disposition(it, DISP_ENQ_CREATE_REF);
981 		ptlrpc_request_addref(request); /* balanced in ll_create_node */
982 	}
983 	if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
984 	    it_disposition(it, DISP_OPEN_OPEN) &&
985 	    !it_open_error(DISP_OPEN_OPEN, it)) {
986 		it_set_disposition(it, DISP_ENQ_OPEN_REF);
987 		ptlrpc_request_addref(request); /* balanced in ll_file_open */
988 		/* BUG 11546 - eviction in the middle of open rpc processing */
989 		OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
990 	}
991 
992 	if (it->it_op & IT_CREAT) {
993 		/* XXX this belongs in ll_create_it */
994 	} else if (it->it_op == IT_OPEN) {
995 		LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
996 	} else {
997 		LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
998 	}
999 
1000 	/* If we already have a matching lock, then cancel the new
1001 	 * one.  We have to set the data here instead of in
1002 	 * mdc_enqueue, because we need to use the child's inode as
1003 	 * the l_ast_data to match, and that's not available until
1004 	 * intent_finish has performed the iget().) */
1005 	lock = ldlm_handle2lock(lockh);
1006 	if (lock) {
1007 		ldlm_policy_data_t policy = lock->l_policy_data;
1008 
1009 		LDLM_DEBUG(lock, "matching against this");
1010 
1011 		LASSERTF(fid_res_name_eq(&mdt_body->fid1,
1012 					 &lock->l_resource->lr_name),
1013 			 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1014 			 PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
1015 		LDLM_LOCK_PUT(lock);
1016 
1017 		memcpy(&old_lock, lockh, sizeof(*lockh));
1018 		if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1019 				    LDLM_IBITS, &policy, LCK_NL,
1020 				    &old_lock, 0)) {
1021 			ldlm_lock_decref_and_cancel(lockh,
1022 						    it->d.lustre.it_lock_mode);
1023 			memcpy(lockh, &old_lock, sizeof(old_lock));
1024 			it->d.lustre.it_lock_handle = lockh->cookie;
1025 		}
1026 	}
1027 	CDEBUG(D_DENTRY,
1028 	       "D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1029 	       op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1030 	       it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1031 	return rc;
1032 }
1033 
mdc_revalidate_lock(struct obd_export * exp,struct lookup_intent * it,struct lu_fid * fid,__u64 * bits)1034 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1035 			struct lu_fid *fid, __u64 *bits)
1036 {
1037 	/* We could just return 1 immediately, but since we should only
1038 	 * be called in revalidate_it if we already have a lock, let's
1039 	 * verify that. */
1040 	struct ldlm_res_id res_id;
1041 	struct lustre_handle lockh;
1042 	ldlm_policy_data_t policy;
1043 	ldlm_mode_t mode;
1044 
1045 	if (it->d.lustre.it_lock_handle) {
1046 		lockh.cookie = it->d.lustre.it_lock_handle;
1047 		mode = ldlm_revalidate_lock_handle(&lockh, bits);
1048 	} else {
1049 		fid_build_reg_res_name(fid, &res_id);
1050 		switch (it->it_op) {
1051 		case IT_GETATTR:
1052 			/* File attributes are held under multiple bits:
1053 			 * nlink is under lookup lock, size and times are
1054 			 * under UPDATE lock and recently we've also got
1055 			 * a separate permissions lock for owner/group/acl that
1056 			 * were protected by lookup lock before.
1057 			 * Getattr must provide all of that information,
1058 			 * so we need to ensure we have all of those locks.
1059 			 * Unfortunately, if the bits are split across multiple
1060 			 * locks, there's no easy way to match all of them here,
1061 			 * so an extra RPC would be performed to fetch all
1062 			 * of those bits at once for now. */
1063 			/* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1064 			 * but for old MDTs (< 2.4), permission is covered
1065 			 * by LOOKUP lock, so it needs to match all bits here.*/
1066 			policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1067 						  MDS_INODELOCK_LOOKUP |
1068 						  MDS_INODELOCK_PERM;
1069 			break;
1070 		case IT_LAYOUT:
1071 			policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1072 			break;
1073 		default:
1074 			policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1075 			break;
1076 		}
1077 
1078 		mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1079 				       LDLM_IBITS, &policy,
1080 				      LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1081 				      &lockh);
1082 	}
1083 
1084 	if (mode) {
1085 		it->d.lustre.it_lock_handle = lockh.cookie;
1086 		it->d.lustre.it_lock_mode = mode;
1087 	} else {
1088 		it->d.lustre.it_lock_handle = 0;
1089 		it->d.lustre.it_lock_mode = 0;
1090 	}
1091 
1092 	return !!mode;
1093 }
1094 
1095 /*
1096  * This long block is all about fixing up the lock and request state
1097  * so that it is correct as of the moment _before_ the operation was
1098  * applied; that way, the VFS will think that everything is normal and
1099  * call Lustre's regular VFS methods.
1100  *
1101  * If we're performing a creation, that means that unless the creation
1102  * failed with EEXIST, we should fake up a negative dentry.
1103  *
1104  * For everything else, we want to lookup to succeed.
1105  *
1106  * One additional note: if CREATE or OPEN succeeded, we add an extra
1107  * reference to the request because we need to keep it around until
1108  * ll_create/ll_open gets called.
1109  *
1110  * The server will return to us, in it_disposition, an indication of
1111  * exactly what d.lustre.it_status refers to.
1112  *
1113  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1114  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1115  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1116  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1117  * was successful.
1118  *
1119  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1120  * child lookup.
1121  */
mdc_intent_lock(struct obd_export * exp,struct md_op_data * op_data,void * lmm,int lmmsize,struct lookup_intent * it,int lookup_flags,struct ptlrpc_request ** reqp,ldlm_blocking_callback cb_blocking,__u64 extra_lock_flags)1122 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1123 		    void *lmm, int lmmsize, struct lookup_intent *it,
1124 		    int lookup_flags, struct ptlrpc_request **reqp,
1125 		    ldlm_blocking_callback cb_blocking,
1126 		    __u64 extra_lock_flags)
1127 {
1128 	struct ldlm_enqueue_info einfo = {
1129 		.ei_type	= LDLM_IBITS,
1130 		.ei_mode	= it_to_lock_mode(it),
1131 		.ei_cb_bl	= cb_blocking,
1132 		.ei_cb_cp	= ldlm_completion_ast,
1133 	};
1134 	struct lustre_handle lockh;
1135 	int rc = 0;
1136 
1137 	LASSERT(it);
1138 
1139 	CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1140 		", intent: %s flags %#Lo\n", op_data->op_namelen,
1141 		op_data->op_name, PFID(&op_data->op_fid2),
1142 		PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1143 		it->it_flags);
1144 
1145 	lockh.cookie = 0;
1146 	if (fid_is_sane(&op_data->op_fid2) &&
1147 	    (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1148 		/* We could just return 1 immediately, but since we should only
1149 		 * be called in revalidate_it if we already have a lock, let's
1150 		 * verify that. */
1151 		it->d.lustre.it_lock_handle = 0;
1152 		rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1153 		/* Only return failure if it was not GETATTR by cfid
1154 		   (from inode_revalidate) */
1155 		if (rc || op_data->op_namelen != 0)
1156 			return rc;
1157 	}
1158 
1159 	/* For case if upper layer did not alloc fid, do it now. */
1160 	if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1161 		rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1162 		if (rc < 0) {
1163 			CERROR("Can't alloc new fid, rc %d\n", rc);
1164 			return rc;
1165 		}
1166 	}
1167 	rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh, lmm, lmmsize, NULL,
1168 			 extra_lock_flags);
1169 	if (rc < 0)
1170 		return rc;
1171 
1172 	*reqp = it->d.lustre.it_data;
1173 	rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1174 	return rc;
1175 }
1176 
mdc_intent_getattr_async_interpret(const struct lu_env * env,struct ptlrpc_request * req,void * args,int rc)1177 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1178 					      struct ptlrpc_request *req,
1179 					      void *args, int rc)
1180 {
1181 	struct mdc_getattr_args  *ga = args;
1182 	struct obd_export	*exp = ga->ga_exp;
1183 	struct md_enqueue_info   *minfo = ga->ga_minfo;
1184 	struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1185 	struct lookup_intent     *it;
1186 	struct lustre_handle     *lockh;
1187 	struct obd_device	*obddev;
1188 	struct ldlm_reply	 *lockrep;
1189 	__u64		     flags = LDLM_FL_HAS_INTENT;
1190 
1191 	it    = &minfo->mi_it;
1192 	lockh = &minfo->mi_lockh;
1193 
1194 	obddev = class_exp2obd(exp);
1195 
1196 	mdc_exit_request(&obddev->u.cli);
1197 	if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1198 		rc = -ETIMEDOUT;
1199 
1200 	rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1201 				   &flags, NULL, 0, lockh, rc);
1202 	if (rc < 0) {
1203 		CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1204 		mdc_clear_replay_flag(req, rc);
1205 		goto out;
1206 	}
1207 
1208 	lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1209 	LASSERT(lockrep != NULL);
1210 
1211 	lockrep->lock_policy_res2 =
1212 		ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1213 
1214 	rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1215 	if (rc)
1216 		goto out;
1217 
1218 	rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1219 
1220 out:
1221 	kfree(einfo);
1222 	minfo->mi_cb(req, minfo, rc);
1223 	return 0;
1224 }
1225 
mdc_intent_getattr_async(struct obd_export * exp,struct md_enqueue_info * minfo,struct ldlm_enqueue_info * einfo)1226 int mdc_intent_getattr_async(struct obd_export *exp,
1227 			     struct md_enqueue_info *minfo,
1228 			     struct ldlm_enqueue_info *einfo)
1229 {
1230 	struct md_op_data       *op_data = &minfo->mi_data;
1231 	struct lookup_intent    *it = &minfo->mi_it;
1232 	struct ptlrpc_request   *req;
1233 	struct mdc_getattr_args *ga;
1234 	struct obd_device       *obddev = class_exp2obd(exp);
1235 	struct ldlm_res_id       res_id;
1236 	/*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1237 	 *     for statahead currently. Consider CMD in future, such two bits
1238 	 *     maybe managed by different MDS, should be adjusted then. */
1239 	ldlm_policy_data_t       policy = {
1240 					.l_inodebits = { MDS_INODELOCK_LOOKUP |
1241 							 MDS_INODELOCK_UPDATE }
1242 				 };
1243 	int		      rc = 0;
1244 	__u64		    flags = LDLM_FL_HAS_INTENT;
1245 
1246 	CDEBUG(D_DLMTRACE,
1247 		"name: %.*s in inode "DFID", intent: %s flags %#Lo\n",
1248 		op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1249 		ldlm_it2str(it->it_op), it->it_flags);
1250 
1251 	fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1252 	req = mdc_intent_getattr_pack(exp, it, op_data);
1253 	if (IS_ERR(req))
1254 		return PTR_ERR(req);
1255 
1256 	rc = mdc_enter_request(&obddev->u.cli);
1257 	if (rc != 0) {
1258 		ptlrpc_req_finished(req);
1259 		return rc;
1260 	}
1261 
1262 	rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1263 			      0, LVB_T_NONE, &minfo->mi_lockh, 1);
1264 	if (rc < 0) {
1265 		mdc_exit_request(&obddev->u.cli);
1266 		ptlrpc_req_finished(req);
1267 		return rc;
1268 	}
1269 
1270 	CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1271 	ga = ptlrpc_req_async_args(req);
1272 	ga->ga_exp = exp;
1273 	ga->ga_minfo = minfo;
1274 	ga->ga_einfo = einfo;
1275 
1276 	req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1277 	ptlrpcd_add_req(req);
1278 
1279 	return 0;
1280 }
1281