1 /*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26 /*
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32 /*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 # include <linux/module.h>
40
41 #include "../include/lustre_intent.h"
42 #include "../include/obd.h"
43 #include "../include/obd_class.h"
44 #include "../include/lustre_dlm.h"
45 #include "../include/lustre_fid.h" /* fid_res_name_eq() */
46 #include "../include/lustre_mdc.h"
47 #include "../include/lustre_net.h"
48 #include "../include/lustre_req_layout.h"
49 #include "mdc_internal.h"
50
51 struct mdc_getattr_args {
52 struct obd_export *ga_exp;
53 struct md_enqueue_info *ga_minfo;
54 struct ldlm_enqueue_info *ga_einfo;
55 };
56
it_disposition(struct lookup_intent * it,int flag)57 int it_disposition(struct lookup_intent *it, int flag)
58 {
59 return it->d.lustre.it_disposition & flag;
60 }
61 EXPORT_SYMBOL(it_disposition);
62
it_set_disposition(struct lookup_intent * it,int flag)63 void it_set_disposition(struct lookup_intent *it, int flag)
64 {
65 it->d.lustre.it_disposition |= flag;
66 }
67 EXPORT_SYMBOL(it_set_disposition);
68
it_clear_disposition(struct lookup_intent * it,int flag)69 void it_clear_disposition(struct lookup_intent *it, int flag)
70 {
71 it->d.lustre.it_disposition &= ~flag;
72 }
73 EXPORT_SYMBOL(it_clear_disposition);
74
it_open_error(int phase,struct lookup_intent * it)75 int it_open_error(int phase, struct lookup_intent *it)
76 {
77 if (it_disposition(it, DISP_OPEN_LEASE)) {
78 if (phase >= DISP_OPEN_LEASE)
79 return it->d.lustre.it_status;
80 else
81 return 0;
82 }
83 if (it_disposition(it, DISP_OPEN_OPEN)) {
84 if (phase >= DISP_OPEN_OPEN)
85 return it->d.lustre.it_status;
86 else
87 return 0;
88 }
89
90 if (it_disposition(it, DISP_OPEN_CREATE)) {
91 if (phase >= DISP_OPEN_CREATE)
92 return it->d.lustre.it_status;
93 else
94 return 0;
95 }
96
97 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
98 if (phase >= DISP_LOOKUP_EXECD)
99 return it->d.lustre.it_status;
100 else
101 return 0;
102 }
103
104 if (it_disposition(it, DISP_IT_EXECD)) {
105 if (phase >= DISP_IT_EXECD)
106 return it->d.lustre.it_status;
107 else
108 return 0;
109 }
110 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
111 it->d.lustre.it_status);
112 LBUG();
113 return 0;
114 }
115 EXPORT_SYMBOL(it_open_error);
116
117 /* this must be called on a lockh that is known to have a referenced lock */
mdc_set_lock_data(struct obd_export * exp,__u64 * lockh,void * data,__u64 * bits)118 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
119 __u64 *bits)
120 {
121 struct ldlm_lock *lock;
122 struct inode *new_inode = data;
123
124 if (bits)
125 *bits = 0;
126
127 if (!*lockh)
128 return 0;
129
130 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
131
132 LASSERT(lock != NULL);
133 lock_res_and_lock(lock);
134 if (lock->l_resource->lr_lvb_inode &&
135 lock->l_resource->lr_lvb_inode != data) {
136 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
137
138 LASSERTF(old_inode->i_state & I_FREEING,
139 "Found existing inode %p/%lu/%u state %lu in lock: setting data to %p/%lu/%u\n",
140 old_inode, old_inode->i_ino, old_inode->i_generation,
141 old_inode->i_state, new_inode, new_inode->i_ino,
142 new_inode->i_generation);
143 }
144 lock->l_resource->lr_lvb_inode = new_inode;
145 if (bits)
146 *bits = lock->l_policy_data.l_inodebits.bits;
147
148 unlock_res_and_lock(lock);
149 LDLM_LOCK_PUT(lock);
150
151 return 0;
152 }
153
mdc_lock_match(struct obd_export * exp,__u64 flags,const struct lu_fid * fid,ldlm_type_t type,ldlm_policy_data_t * policy,ldlm_mode_t mode,struct lustre_handle * lockh)154 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
155 const struct lu_fid *fid, ldlm_type_t type,
156 ldlm_policy_data_t *policy, ldlm_mode_t mode,
157 struct lustre_handle *lockh)
158 {
159 struct ldlm_res_id res_id;
160 ldlm_mode_t rc;
161
162 fid_build_reg_res_name(fid, &res_id);
163 /* LU-4405: Clear bits not supported by server */
164 policy->l_inodebits.bits &= exp_connect_ibits(exp);
165 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
166 &res_id, type, policy, mode, lockh, 0);
167 return rc;
168 }
169
mdc_cancel_unused(struct obd_export * exp,const struct lu_fid * fid,ldlm_policy_data_t * policy,ldlm_mode_t mode,ldlm_cancel_flags_t flags,void * opaque)170 int mdc_cancel_unused(struct obd_export *exp,
171 const struct lu_fid *fid,
172 ldlm_policy_data_t *policy,
173 ldlm_mode_t mode,
174 ldlm_cancel_flags_t flags,
175 void *opaque)
176 {
177 struct ldlm_res_id res_id;
178 struct obd_device *obd = class_exp2obd(exp);
179 int rc;
180
181 fid_build_reg_res_name(fid, &res_id);
182 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
183 policy, mode, flags, opaque);
184 return rc;
185 }
186
mdc_null_inode(struct obd_export * exp,const struct lu_fid * fid)187 int mdc_null_inode(struct obd_export *exp,
188 const struct lu_fid *fid)
189 {
190 struct ldlm_res_id res_id;
191 struct ldlm_resource *res;
192 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
193
194 LASSERTF(ns != NULL, "no namespace passed\n");
195
196 fid_build_reg_res_name(fid, &res_id);
197
198 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
199 if (res == NULL)
200 return 0;
201
202 lock_res(res);
203 res->lr_lvb_inode = NULL;
204 unlock_res(res);
205
206 ldlm_resource_putref(res);
207 return 0;
208 }
209
210 /* find any ldlm lock of the inode in mdc
211 * return 0 not find
212 * 1 find one
213 * < 0 error */
mdc_find_cbdata(struct obd_export * exp,const struct lu_fid * fid,ldlm_iterator_t it,void * data)214 int mdc_find_cbdata(struct obd_export *exp,
215 const struct lu_fid *fid,
216 ldlm_iterator_t it, void *data)
217 {
218 struct ldlm_res_id res_id;
219 int rc = 0;
220
221 fid_build_reg_res_name((struct lu_fid *)fid, &res_id);
222 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
223 it, data);
224 if (rc == LDLM_ITER_STOP)
225 return 1;
226 else if (rc == LDLM_ITER_CONTINUE)
227 return 0;
228 return rc;
229 }
230
mdc_clear_replay_flag(struct ptlrpc_request * req,int rc)231 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
232 {
233 /* Don't hold error requests for replay. */
234 if (req->rq_replay) {
235 spin_lock(&req->rq_lock);
236 req->rq_replay = 0;
237 spin_unlock(&req->rq_lock);
238 }
239 if (rc && req->rq_transno != 0) {
240 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
241 LBUG();
242 }
243 }
244
245 /* Save a large LOV EA into the request buffer so that it is available
246 * for replay. We don't do this in the initial request because the
247 * original request doesn't need this buffer (at most it sends just the
248 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
249 * buffer and may also be difficult to allocate and save a very large
250 * request buffer for each open. (bug 5707)
251 *
252 * OOM here may cause recovery failure if lmm is needed (only for the
253 * original open if the MDS crashed just when this client also OOM'd)
254 * but this is incredibly unlikely, and questionable whether the client
255 * could do MDS recovery under OOM anyways... */
mdc_realloc_openmsg(struct ptlrpc_request * req,struct mdt_body * body)256 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
257 struct mdt_body *body)
258 {
259 int rc;
260
261 /* FIXME: remove this explicit offset. */
262 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
263 body->eadatasize);
264 if (rc) {
265 CERROR("Can't enlarge segment %d size to %d\n",
266 DLM_INTENT_REC_OFF + 4, body->eadatasize);
267 body->valid &= ~OBD_MD_FLEASIZE;
268 body->eadatasize = 0;
269 }
270 }
271
mdc_intent_open_pack(struct obd_export * exp,struct lookup_intent * it,struct md_op_data * op_data,void * lmm,int lmmsize,void * cb_data)272 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
273 struct lookup_intent *it,
274 struct md_op_data *op_data,
275 void *lmm, int lmmsize,
276 void *cb_data)
277 {
278 struct ptlrpc_request *req;
279 struct obd_device *obddev = class_exp2obd(exp);
280 struct ldlm_intent *lit;
281 LIST_HEAD(cancels);
282 int count = 0;
283 int mode;
284 int rc;
285
286 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
287
288 /* XXX: openlock is not cancelled for cross-refs. */
289 /* If inode is known, cancel conflicting OPEN locks. */
290 if (fid_is_sane(&op_data->op_fid2)) {
291 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
292 if (it->it_flags & FMODE_WRITE)
293 mode = LCK_EX;
294 else
295 mode = LCK_PR;
296 } else {
297 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
298 mode = LCK_CW;
299 else if (it->it_flags & __FMODE_EXEC)
300 mode = LCK_PR;
301 else
302 mode = LCK_CR;
303 }
304 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
305 &cancels, mode,
306 MDS_INODELOCK_OPEN);
307 }
308
309 /* If CREATE, cancel parent's UPDATE lock. */
310 if (it->it_op & IT_CREAT)
311 mode = LCK_EX;
312 else
313 mode = LCK_CR;
314 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
315 &cancels, mode,
316 MDS_INODELOCK_UPDATE);
317
318 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
319 &RQF_LDLM_INTENT_OPEN);
320 if (req == NULL) {
321 ldlm_lock_list_put(&cancels, l_bl_ast, count);
322 return ERR_PTR(-ENOMEM);
323 }
324
325 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
326 op_data->op_namelen + 1);
327 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
328 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
329
330 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
331 if (rc < 0) {
332 ptlrpc_request_free(req);
333 return ERR_PTR(rc);
334 }
335
336 spin_lock(&req->rq_lock);
337 req->rq_replay = req->rq_import->imp_replayable;
338 spin_unlock(&req->rq_lock);
339
340 /* pack the intent */
341 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
342 lit->opc = (__u64)it->it_op;
343
344 /* pack the intended request */
345 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
346 lmmsize);
347
348 /* for remote client, fetch remote perm for current user */
349 if (client_is_remote(exp))
350 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
351 sizeof(struct mdt_remote_perm));
352 ptlrpc_request_set_replen(req);
353 return req;
354 }
355
356 static struct ptlrpc_request *
mdc_intent_getxattr_pack(struct obd_export * exp,struct lookup_intent * it,struct md_op_data * op_data)357 mdc_intent_getxattr_pack(struct obd_export *exp,
358 struct lookup_intent *it,
359 struct md_op_data *op_data)
360 {
361 struct ptlrpc_request *req;
362 struct ldlm_intent *lit;
363 int rc, count = 0, maxdata;
364 LIST_HEAD(cancels);
365
366 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
367 &RQF_LDLM_INTENT_GETXATTR);
368 if (req == NULL)
369 return ERR_PTR(-ENOMEM);
370
371 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
372 if (rc) {
373 ptlrpc_request_free(req);
374 return ERR_PTR(rc);
375 }
376
377 /* pack the intent */
378 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
379 lit->opc = IT_GETXATTR;
380
381 maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
382
383 /* pack the intended request */
384 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1,
385 0);
386
387 req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
388 RCL_SERVER, maxdata);
389
390 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
391 RCL_SERVER, maxdata);
392
393 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
394 RCL_SERVER, maxdata);
395
396 ptlrpc_request_set_replen(req);
397
398 return req;
399 }
400
mdc_intent_unlink_pack(struct obd_export * exp,struct lookup_intent * it,struct md_op_data * op_data)401 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
402 struct lookup_intent *it,
403 struct md_op_data *op_data)
404 {
405 struct ptlrpc_request *req;
406 struct obd_device *obddev = class_exp2obd(exp);
407 struct ldlm_intent *lit;
408 int rc;
409
410 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
411 &RQF_LDLM_INTENT_UNLINK);
412 if (req == NULL)
413 return ERR_PTR(-ENOMEM);
414
415 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
416 op_data->op_namelen + 1);
417
418 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
419 if (rc) {
420 ptlrpc_request_free(req);
421 return ERR_PTR(rc);
422 }
423
424 /* pack the intent */
425 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
426 lit->opc = (__u64)it->it_op;
427
428 /* pack the intended request */
429 mdc_unlink_pack(req, op_data);
430
431 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
432 obddev->u.cli.cl_default_mds_easize);
433 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
434 obddev->u.cli.cl_default_mds_cookiesize);
435 ptlrpc_request_set_replen(req);
436 return req;
437 }
438
mdc_intent_getattr_pack(struct obd_export * exp,struct lookup_intent * it,struct md_op_data * op_data)439 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
440 struct lookup_intent *it,
441 struct md_op_data *op_data)
442 {
443 struct ptlrpc_request *req;
444 struct obd_device *obddev = class_exp2obd(exp);
445 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
446 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
447 OBD_MD_MEA |
448 (client_is_remote(exp) ?
449 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
450 struct ldlm_intent *lit;
451 int rc;
452 int easize;
453
454 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
455 &RQF_LDLM_INTENT_GETATTR);
456 if (req == NULL)
457 return ERR_PTR(-ENOMEM);
458
459 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
460 op_data->op_namelen + 1);
461
462 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
463 if (rc) {
464 ptlrpc_request_free(req);
465 return ERR_PTR(rc);
466 }
467
468 /* pack the intent */
469 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
470 lit->opc = (__u64)it->it_op;
471
472 if (obddev->u.cli.cl_default_mds_easize > 0)
473 easize = obddev->u.cli.cl_default_mds_easize;
474 else
475 easize = obddev->u.cli.cl_max_mds_easize;
476
477 /* pack the intended request */
478 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
479
480 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
481 if (client_is_remote(exp))
482 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
483 sizeof(struct mdt_remote_perm));
484 ptlrpc_request_set_replen(req);
485 return req;
486 }
487
mdc_intent_layout_pack(struct obd_export * exp,struct lookup_intent * it,struct md_op_data * unused)488 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
489 struct lookup_intent *it,
490 struct md_op_data *unused)
491 {
492 struct obd_device *obd = class_exp2obd(exp);
493 struct ptlrpc_request *req;
494 struct ldlm_intent *lit;
495 struct layout_intent *layout;
496 int rc;
497
498 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
499 &RQF_LDLM_INTENT_LAYOUT);
500 if (req == NULL)
501 return ERR_PTR(-ENOMEM);
502
503 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
504 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
505 if (rc) {
506 ptlrpc_request_free(req);
507 return ERR_PTR(rc);
508 }
509
510 /* pack the intent */
511 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
512 lit->opc = (__u64)it->it_op;
513
514 /* pack the layout intent request */
515 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
516 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
517 * set for replication */
518 layout->li_opc = LAYOUT_INTENT_ACCESS;
519
520 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
521 obd->u.cli.cl_default_mds_easize);
522 ptlrpc_request_set_replen(req);
523 return req;
524 }
525
526 static struct ptlrpc_request *
mdc_enqueue_pack(struct obd_export * exp,int lvb_len)527 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
528 {
529 struct ptlrpc_request *req;
530 int rc;
531
532 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
533 if (req == NULL)
534 return ERR_PTR(-ENOMEM);
535
536 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
537 if (rc) {
538 ptlrpc_request_free(req);
539 return ERR_PTR(rc);
540 }
541
542 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
543 ptlrpc_request_set_replen(req);
544 return req;
545 }
546
mdc_finish_enqueue(struct obd_export * exp,struct ptlrpc_request * req,struct ldlm_enqueue_info * einfo,struct lookup_intent * it,struct lustre_handle * lockh,int rc)547 static int mdc_finish_enqueue(struct obd_export *exp,
548 struct ptlrpc_request *req,
549 struct ldlm_enqueue_info *einfo,
550 struct lookup_intent *it,
551 struct lustre_handle *lockh,
552 int rc)
553 {
554 struct req_capsule *pill = &req->rq_pill;
555 struct ldlm_request *lockreq;
556 struct ldlm_reply *lockrep;
557 struct lustre_intent_data *intent = &it->d.lustre;
558 struct ldlm_lock *lock;
559 void *lvb_data = NULL;
560 int lvb_len = 0;
561
562 LASSERT(rc >= 0);
563 /* Similarly, if we're going to replay this request, we don't want to
564 * actually get a lock, just perform the intent. */
565 if (req->rq_transno || req->rq_replay) {
566 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
567 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
568 }
569
570 if (rc == ELDLM_LOCK_ABORTED) {
571 einfo->ei_mode = 0;
572 memset(lockh, 0, sizeof(*lockh));
573 rc = 0;
574 } else { /* rc = 0 */
575 lock = ldlm_handle2lock(lockh);
576 LASSERT(lock != NULL);
577
578 /* If the server gave us back a different lock mode, we should
579 * fix up our variables. */
580 if (lock->l_req_mode != einfo->ei_mode) {
581 ldlm_lock_addref(lockh, lock->l_req_mode);
582 ldlm_lock_decref(lockh, einfo->ei_mode);
583 einfo->ei_mode = lock->l_req_mode;
584 }
585 LDLM_LOCK_PUT(lock);
586 }
587
588 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
589 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
590
591 intent->it_disposition = (int)lockrep->lock_policy_res1;
592 intent->it_status = (int)lockrep->lock_policy_res2;
593 intent->it_lock_mode = einfo->ei_mode;
594 intent->it_lock_handle = lockh->cookie;
595 intent->it_data = req;
596
597 /* Technically speaking rq_transno must already be zero if
598 * it_status is in error, so the check is a bit redundant */
599 if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
600 mdc_clear_replay_flag(req, intent->it_status);
601
602 /* If we're doing an IT_OPEN which did not result in an actual
603 * successful open, then we need to remove the bit which saves
604 * this request for unconditional replay.
605 *
606 * It's important that we do this first! Otherwise we might exit the
607 * function without doing so, and try to replay a failed create
608 * (bug 3440) */
609 if (it->it_op & IT_OPEN && req->rq_replay &&
610 (!it_disposition(it, DISP_OPEN_OPEN) || intent->it_status != 0))
611 mdc_clear_replay_flag(req, intent->it_status);
612
613 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
614 it->it_op, intent->it_disposition, intent->it_status);
615
616 /* We know what to expect, so we do any byte flipping required here */
617 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
618 struct mdt_body *body;
619
620 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
621 if (body == NULL) {
622 CERROR("Can't swab mdt_body\n");
623 return -EPROTO;
624 }
625
626 if (it_disposition(it, DISP_OPEN_OPEN) &&
627 !it_open_error(DISP_OPEN_OPEN, it)) {
628 /*
629 * If this is a successful OPEN request, we need to set
630 * replay handler and data early, so that if replay
631 * happens immediately after swabbing below, new reply
632 * is swabbed by that handler correctly.
633 */
634 mdc_set_open_replay_data(NULL, NULL, it);
635 }
636
637 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
638 void *eadata;
639
640 mdc_update_max_ea_from_body(exp, body);
641
642 /*
643 * The eadata is opaque; just check that it is there.
644 * Eventually, obd_unpackmd() will check the contents.
645 */
646 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
647 body->eadatasize);
648 if (eadata == NULL)
649 return -EPROTO;
650
651 /* save lvb data and length in case this is for layout
652 * lock */
653 lvb_data = eadata;
654 lvb_len = body->eadatasize;
655
656 /*
657 * We save the reply LOV EA in case we have to replay a
658 * create for recovery. If we didn't allocate a large
659 * enough request buffer above we need to reallocate it
660 * here to hold the actual LOV EA.
661 *
662 * To not save LOV EA if request is not going to replay
663 * (for example error one).
664 */
665 if ((it->it_op & IT_OPEN) && req->rq_replay) {
666 void *lmm;
667
668 if (req_capsule_get_size(pill, &RMF_EADATA,
669 RCL_CLIENT) <
670 body->eadatasize)
671 mdc_realloc_openmsg(req, body);
672 else
673 req_capsule_shrink(pill, &RMF_EADATA,
674 body->eadatasize,
675 RCL_CLIENT);
676
677 req_capsule_set_size(pill, &RMF_EADATA,
678 RCL_CLIENT,
679 body->eadatasize);
680
681 lmm = req_capsule_client_get(pill, &RMF_EADATA);
682 if (lmm)
683 memcpy(lmm, eadata, body->eadatasize);
684 }
685 }
686
687 if (body->valid & OBD_MD_FLRMTPERM) {
688 struct mdt_remote_perm *perm;
689
690 LASSERT(client_is_remote(exp));
691 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
692 lustre_swab_mdt_remote_perm);
693 if (perm == NULL)
694 return -EPROTO;
695 }
696 } else if (it->it_op & IT_LAYOUT) {
697 /* maybe the lock was granted right away and layout
698 * is packed into RMF_DLM_LVB of req */
699 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
700 if (lvb_len > 0) {
701 lvb_data = req_capsule_server_sized_get(pill,
702 &RMF_DLM_LVB, lvb_len);
703 if (lvb_data == NULL)
704 return -EPROTO;
705 }
706 }
707
708 /* fill in stripe data for layout lock */
709 lock = ldlm_handle2lock(lockh);
710 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
711 void *lmm;
712
713 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
714 ldlm_it2str(it->it_op), lvb_len);
715
716 lmm = libcfs_kvzalloc(lvb_len, GFP_NOFS);
717 if (lmm == NULL) {
718 LDLM_LOCK_PUT(lock);
719 return -ENOMEM;
720 }
721 memcpy(lmm, lvb_data, lvb_len);
722
723 /* install lvb_data */
724 lock_res_and_lock(lock);
725 if (lock->l_lvb_data == NULL) {
726 lock->l_lvb_type = LVB_T_LAYOUT;
727 lock->l_lvb_data = lmm;
728 lock->l_lvb_len = lvb_len;
729 lmm = NULL;
730 }
731 unlock_res_and_lock(lock);
732 if (lmm != NULL)
733 kvfree(lmm);
734 }
735 if (lock != NULL)
736 LDLM_LOCK_PUT(lock);
737
738 return rc;
739 }
740
741 /* We always reserve enough space in the reply packet for a stripe MD, because
742 * we don't know in advance the file type. */
mdc_enqueue(struct obd_export * exp,struct ldlm_enqueue_info * einfo,struct lookup_intent * it,struct md_op_data * op_data,struct lustre_handle * lockh,void * lmm,int lmmsize,struct ptlrpc_request ** reqp,u64 extra_lock_flags)743 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
744 struct lookup_intent *it, struct md_op_data *op_data,
745 struct lustre_handle *lockh, void *lmm, int lmmsize,
746 struct ptlrpc_request **reqp, u64 extra_lock_flags)
747 {
748 static const ldlm_policy_data_t lookup_policy = {
749 .l_inodebits = { MDS_INODELOCK_LOOKUP }
750 };
751 static const ldlm_policy_data_t update_policy = {
752 .l_inodebits = { MDS_INODELOCK_UPDATE }
753 };
754 static const ldlm_policy_data_t layout_policy = {
755 .l_inodebits = { MDS_INODELOCK_LAYOUT }
756 };
757 static const ldlm_policy_data_t getxattr_policy = {
758 .l_inodebits = { MDS_INODELOCK_XATTR }
759 };
760 ldlm_policy_data_t const *policy = &lookup_policy;
761 struct obd_device *obddev = class_exp2obd(exp);
762 struct ptlrpc_request *req;
763 u64 flags, saved_flags = extra_lock_flags;
764 struct ldlm_res_id res_id;
765 int generation, resends = 0;
766 struct ldlm_reply *lockrep;
767 enum lvb_type lvb_type = LVB_T_NONE;
768 int rc;
769
770 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
771 einfo->ei_type);
772
773 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
774
775 if (it) {
776 saved_flags |= LDLM_FL_HAS_INTENT;
777 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
778 policy = &update_policy;
779 else if (it->it_op & IT_LAYOUT)
780 policy = &layout_policy;
781 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
782 policy = &getxattr_policy;
783 }
784
785 LASSERT(reqp == NULL);
786
787 generation = obddev->u.cli.cl_import->imp_generation;
788 resend:
789 flags = saved_flags;
790 if (!it) {
791 /* The only way right now is FLOCK, in this case we hide flock
792 policy as lmm, but lmmsize is 0 */
793 LASSERT(lmm && lmmsize == 0);
794 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
795 einfo->ei_type);
796 policy = lmm;
797 res_id.name[3] = LDLM_FLOCK;
798 req = NULL;
799 } else if (it->it_op & IT_OPEN) {
800 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
801 einfo->ei_cbdata);
802 policy = &update_policy;
803 einfo->ei_cbdata = NULL;
804 lmm = NULL;
805 } else if (it->it_op & IT_UNLINK) {
806 req = mdc_intent_unlink_pack(exp, it, op_data);
807 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
808 req = mdc_intent_getattr_pack(exp, it, op_data);
809 } else if (it->it_op & IT_READDIR) {
810 req = mdc_enqueue_pack(exp, 0);
811 } else if (it->it_op & IT_LAYOUT) {
812 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
813 return -EOPNOTSUPP;
814 req = mdc_intent_layout_pack(exp, it, op_data);
815 lvb_type = LVB_T_LAYOUT;
816 } else if (it->it_op & IT_GETXATTR) {
817 req = mdc_intent_getxattr_pack(exp, it, op_data);
818 } else {
819 LBUG();
820 return -EINVAL;
821 }
822
823 if (IS_ERR(req))
824 return PTR_ERR(req);
825
826 if (req != NULL && it && it->it_op & IT_CREAT)
827 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
828 * retry logic */
829 req->rq_no_retry_einprogress = 1;
830
831 if (resends) {
832 req->rq_generation_set = 1;
833 req->rq_import_generation = generation;
834 req->rq_sent = ktime_get_real_seconds() + resends;
835 }
836
837 /* It is important to obtain rpc_lock first (if applicable), so that
838 * threads that are serialised with rpc_lock are not polluting our
839 * rpcs in flight counter. We do not do flock request limiting, though*/
840 if (it) {
841 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
842 rc = mdc_enter_request(&obddev->u.cli);
843 if (rc != 0) {
844 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
845 mdc_clear_replay_flag(req, 0);
846 ptlrpc_req_finished(req);
847 return rc;
848 }
849 }
850
851 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
852 0, lvb_type, lockh, 0);
853 if (!it) {
854 /* For flock requests we immediately return without further
855 delay and let caller deal with the rest, since rest of
856 this function metadata processing makes no sense for flock
857 requests anyway. But in case of problem during comms with
858 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
859 can not rely on caller and this mainly for F_UNLCKs
860 (explicits or automatically generated by Kernel to clean
861 current FLocks upon exit) that can't be trashed */
862 if ((rc == -EINTR) || (rc == -ETIMEDOUT))
863 goto resend;
864 return rc;
865 }
866
867 mdc_exit_request(&obddev->u.cli);
868 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
869
870 if (rc < 0) {
871 CDEBUG_LIMIT((rc == -EACCES || rc == -EIDRM) ? D_INFO : D_ERROR,
872 "%s: ldlm_cli_enqueue failed: rc = %d\n",
873 obddev->obd_name, rc);
874
875 mdc_clear_replay_flag(req, rc);
876 ptlrpc_req_finished(req);
877 return rc;
878 }
879
880 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
881 LASSERT(lockrep != NULL);
882
883 lockrep->lock_policy_res2 =
884 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
885
886 /* Retry the create infinitely when we get -EINPROGRESS from
887 * server. This is required by the new quota design. */
888 if (it->it_op & IT_CREAT &&
889 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
890 mdc_clear_replay_flag(req, rc);
891 ptlrpc_req_finished(req);
892 resends++;
893
894 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
895 obddev->obd_name, resends, it->it_op,
896 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
897
898 if (generation == obddev->u.cli.cl_import->imp_generation) {
899 goto resend;
900 } else {
901 CDEBUG(D_HA, "resend cross eviction\n");
902 return -EIO;
903 }
904 }
905
906 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
907 if (rc < 0) {
908 if (lustre_handle_is_used(lockh)) {
909 ldlm_lock_decref(lockh, einfo->ei_mode);
910 memset(lockh, 0, sizeof(*lockh));
911 }
912 ptlrpc_req_finished(req);
913
914 it->d.lustre.it_lock_handle = 0;
915 it->d.lustre.it_lock_mode = 0;
916 it->d.lustre.it_data = NULL;
917 }
918
919 return rc;
920 }
921
mdc_finish_intent_lock(struct obd_export * exp,struct ptlrpc_request * request,struct md_op_data * op_data,struct lookup_intent * it,struct lustre_handle * lockh)922 static int mdc_finish_intent_lock(struct obd_export *exp,
923 struct ptlrpc_request *request,
924 struct md_op_data *op_data,
925 struct lookup_intent *it,
926 struct lustre_handle *lockh)
927 {
928 struct lustre_handle old_lock;
929 struct mdt_body *mdt_body;
930 struct ldlm_lock *lock;
931 int rc;
932
933 LASSERT(request != NULL);
934 LASSERT(request != LP_POISON);
935 LASSERT(request->rq_repmsg != LP_POISON);
936
937 if (!it_disposition(it, DISP_IT_EXECD)) {
938 /* The server failed before it even started executing the
939 * intent, i.e. because it couldn't unpack the request. */
940 LASSERT(it->d.lustre.it_status != 0);
941 return it->d.lustre.it_status;
942 }
943 rc = it_open_error(DISP_IT_EXECD, it);
944 if (rc)
945 return rc;
946
947 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
948 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
949
950 /* If we were revalidating a fid/name pair, mark the intent in
951 * case we fail and get called again from lookup */
952 if (fid_is_sane(&op_data->op_fid2) &&
953 it->it_create_mode & M_CHECK_STALE &&
954 it->it_op != IT_GETATTR) {
955
956 /* Also: did we find the same inode? */
957 /* sever can return one of two fids:
958 * op_fid2 - new allocated fid - if file is created.
959 * op_fid3 - existent fid - if file only open.
960 * op_fid3 is saved in lmv_intent_open */
961 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
962 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
963 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
964 "\n", PFID(&op_data->op_fid2),
965 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
966 return -ESTALE;
967 }
968 }
969
970 rc = it_open_error(DISP_LOOKUP_EXECD, it);
971 if (rc)
972 return rc;
973
974 /* keep requests around for the multiple phases of the call
975 * this shows the DISP_XX must guarantee we make it into the call
976 */
977 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
978 it_disposition(it, DISP_OPEN_CREATE) &&
979 !it_open_error(DISP_OPEN_CREATE, it)) {
980 it_set_disposition(it, DISP_ENQ_CREATE_REF);
981 ptlrpc_request_addref(request); /* balanced in ll_create_node */
982 }
983 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
984 it_disposition(it, DISP_OPEN_OPEN) &&
985 !it_open_error(DISP_OPEN_OPEN, it)) {
986 it_set_disposition(it, DISP_ENQ_OPEN_REF);
987 ptlrpc_request_addref(request); /* balanced in ll_file_open */
988 /* BUG 11546 - eviction in the middle of open rpc processing */
989 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
990 }
991
992 if (it->it_op & IT_CREAT) {
993 /* XXX this belongs in ll_create_it */
994 } else if (it->it_op == IT_OPEN) {
995 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
996 } else {
997 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
998 }
999
1000 /* If we already have a matching lock, then cancel the new
1001 * one. We have to set the data here instead of in
1002 * mdc_enqueue, because we need to use the child's inode as
1003 * the l_ast_data to match, and that's not available until
1004 * intent_finish has performed the iget().) */
1005 lock = ldlm_handle2lock(lockh);
1006 if (lock) {
1007 ldlm_policy_data_t policy = lock->l_policy_data;
1008
1009 LDLM_DEBUG(lock, "matching against this");
1010
1011 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
1012 &lock->l_resource->lr_name),
1013 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1014 PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
1015 LDLM_LOCK_PUT(lock);
1016
1017 memcpy(&old_lock, lockh, sizeof(*lockh));
1018 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1019 LDLM_IBITS, &policy, LCK_NL,
1020 &old_lock, 0)) {
1021 ldlm_lock_decref_and_cancel(lockh,
1022 it->d.lustre.it_lock_mode);
1023 memcpy(lockh, &old_lock, sizeof(old_lock));
1024 it->d.lustre.it_lock_handle = lockh->cookie;
1025 }
1026 }
1027 CDEBUG(D_DENTRY,
1028 "D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1029 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1030 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1031 return rc;
1032 }
1033
mdc_revalidate_lock(struct obd_export * exp,struct lookup_intent * it,struct lu_fid * fid,__u64 * bits)1034 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1035 struct lu_fid *fid, __u64 *bits)
1036 {
1037 /* We could just return 1 immediately, but since we should only
1038 * be called in revalidate_it if we already have a lock, let's
1039 * verify that. */
1040 struct ldlm_res_id res_id;
1041 struct lustre_handle lockh;
1042 ldlm_policy_data_t policy;
1043 ldlm_mode_t mode;
1044
1045 if (it->d.lustre.it_lock_handle) {
1046 lockh.cookie = it->d.lustre.it_lock_handle;
1047 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1048 } else {
1049 fid_build_reg_res_name(fid, &res_id);
1050 switch (it->it_op) {
1051 case IT_GETATTR:
1052 /* File attributes are held under multiple bits:
1053 * nlink is under lookup lock, size and times are
1054 * under UPDATE lock and recently we've also got
1055 * a separate permissions lock for owner/group/acl that
1056 * were protected by lookup lock before.
1057 * Getattr must provide all of that information,
1058 * so we need to ensure we have all of those locks.
1059 * Unfortunately, if the bits are split across multiple
1060 * locks, there's no easy way to match all of them here,
1061 * so an extra RPC would be performed to fetch all
1062 * of those bits at once for now. */
1063 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1064 * but for old MDTs (< 2.4), permission is covered
1065 * by LOOKUP lock, so it needs to match all bits here.*/
1066 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1067 MDS_INODELOCK_LOOKUP |
1068 MDS_INODELOCK_PERM;
1069 break;
1070 case IT_LAYOUT:
1071 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1072 break;
1073 default:
1074 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1075 break;
1076 }
1077
1078 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1079 LDLM_IBITS, &policy,
1080 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1081 &lockh);
1082 }
1083
1084 if (mode) {
1085 it->d.lustre.it_lock_handle = lockh.cookie;
1086 it->d.lustre.it_lock_mode = mode;
1087 } else {
1088 it->d.lustre.it_lock_handle = 0;
1089 it->d.lustre.it_lock_mode = 0;
1090 }
1091
1092 return !!mode;
1093 }
1094
1095 /*
1096 * This long block is all about fixing up the lock and request state
1097 * so that it is correct as of the moment _before_ the operation was
1098 * applied; that way, the VFS will think that everything is normal and
1099 * call Lustre's regular VFS methods.
1100 *
1101 * If we're performing a creation, that means that unless the creation
1102 * failed with EEXIST, we should fake up a negative dentry.
1103 *
1104 * For everything else, we want to lookup to succeed.
1105 *
1106 * One additional note: if CREATE or OPEN succeeded, we add an extra
1107 * reference to the request because we need to keep it around until
1108 * ll_create/ll_open gets called.
1109 *
1110 * The server will return to us, in it_disposition, an indication of
1111 * exactly what d.lustre.it_status refers to.
1112 *
1113 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1114 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1115 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1116 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1117 * was successful.
1118 *
1119 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1120 * child lookup.
1121 */
mdc_intent_lock(struct obd_export * exp,struct md_op_data * op_data,void * lmm,int lmmsize,struct lookup_intent * it,int lookup_flags,struct ptlrpc_request ** reqp,ldlm_blocking_callback cb_blocking,__u64 extra_lock_flags)1122 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1123 void *lmm, int lmmsize, struct lookup_intent *it,
1124 int lookup_flags, struct ptlrpc_request **reqp,
1125 ldlm_blocking_callback cb_blocking,
1126 __u64 extra_lock_flags)
1127 {
1128 struct ldlm_enqueue_info einfo = {
1129 .ei_type = LDLM_IBITS,
1130 .ei_mode = it_to_lock_mode(it),
1131 .ei_cb_bl = cb_blocking,
1132 .ei_cb_cp = ldlm_completion_ast,
1133 };
1134 struct lustre_handle lockh;
1135 int rc = 0;
1136
1137 LASSERT(it);
1138
1139 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1140 ", intent: %s flags %#Lo\n", op_data->op_namelen,
1141 op_data->op_name, PFID(&op_data->op_fid2),
1142 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1143 it->it_flags);
1144
1145 lockh.cookie = 0;
1146 if (fid_is_sane(&op_data->op_fid2) &&
1147 (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1148 /* We could just return 1 immediately, but since we should only
1149 * be called in revalidate_it if we already have a lock, let's
1150 * verify that. */
1151 it->d.lustre.it_lock_handle = 0;
1152 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1153 /* Only return failure if it was not GETATTR by cfid
1154 (from inode_revalidate) */
1155 if (rc || op_data->op_namelen != 0)
1156 return rc;
1157 }
1158
1159 /* For case if upper layer did not alloc fid, do it now. */
1160 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1161 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1162 if (rc < 0) {
1163 CERROR("Can't alloc new fid, rc %d\n", rc);
1164 return rc;
1165 }
1166 }
1167 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh, lmm, lmmsize, NULL,
1168 extra_lock_flags);
1169 if (rc < 0)
1170 return rc;
1171
1172 *reqp = it->d.lustre.it_data;
1173 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1174 return rc;
1175 }
1176
mdc_intent_getattr_async_interpret(const struct lu_env * env,struct ptlrpc_request * req,void * args,int rc)1177 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1178 struct ptlrpc_request *req,
1179 void *args, int rc)
1180 {
1181 struct mdc_getattr_args *ga = args;
1182 struct obd_export *exp = ga->ga_exp;
1183 struct md_enqueue_info *minfo = ga->ga_minfo;
1184 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1185 struct lookup_intent *it;
1186 struct lustre_handle *lockh;
1187 struct obd_device *obddev;
1188 struct ldlm_reply *lockrep;
1189 __u64 flags = LDLM_FL_HAS_INTENT;
1190
1191 it = &minfo->mi_it;
1192 lockh = &minfo->mi_lockh;
1193
1194 obddev = class_exp2obd(exp);
1195
1196 mdc_exit_request(&obddev->u.cli);
1197 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1198 rc = -ETIMEDOUT;
1199
1200 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1201 &flags, NULL, 0, lockh, rc);
1202 if (rc < 0) {
1203 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1204 mdc_clear_replay_flag(req, rc);
1205 goto out;
1206 }
1207
1208 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1209 LASSERT(lockrep != NULL);
1210
1211 lockrep->lock_policy_res2 =
1212 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1213
1214 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1215 if (rc)
1216 goto out;
1217
1218 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1219
1220 out:
1221 kfree(einfo);
1222 minfo->mi_cb(req, minfo, rc);
1223 return 0;
1224 }
1225
mdc_intent_getattr_async(struct obd_export * exp,struct md_enqueue_info * minfo,struct ldlm_enqueue_info * einfo)1226 int mdc_intent_getattr_async(struct obd_export *exp,
1227 struct md_enqueue_info *minfo,
1228 struct ldlm_enqueue_info *einfo)
1229 {
1230 struct md_op_data *op_data = &minfo->mi_data;
1231 struct lookup_intent *it = &minfo->mi_it;
1232 struct ptlrpc_request *req;
1233 struct mdc_getattr_args *ga;
1234 struct obd_device *obddev = class_exp2obd(exp);
1235 struct ldlm_res_id res_id;
1236 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1237 * for statahead currently. Consider CMD in future, such two bits
1238 * maybe managed by different MDS, should be adjusted then. */
1239 ldlm_policy_data_t policy = {
1240 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1241 MDS_INODELOCK_UPDATE }
1242 };
1243 int rc = 0;
1244 __u64 flags = LDLM_FL_HAS_INTENT;
1245
1246 CDEBUG(D_DLMTRACE,
1247 "name: %.*s in inode "DFID", intent: %s flags %#Lo\n",
1248 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1249 ldlm_it2str(it->it_op), it->it_flags);
1250
1251 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1252 req = mdc_intent_getattr_pack(exp, it, op_data);
1253 if (IS_ERR(req))
1254 return PTR_ERR(req);
1255
1256 rc = mdc_enter_request(&obddev->u.cli);
1257 if (rc != 0) {
1258 ptlrpc_req_finished(req);
1259 return rc;
1260 }
1261
1262 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1263 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1264 if (rc < 0) {
1265 mdc_exit_request(&obddev->u.cli);
1266 ptlrpc_req_finished(req);
1267 return rc;
1268 }
1269
1270 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1271 ga = ptlrpc_req_async_args(req);
1272 ga->ga_exp = exp;
1273 ga->ga_minfo = minfo;
1274 ga->ga_einfo = einfo;
1275
1276 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1277 ptlrpcd_add_req(req);
1278
1279 return 0;
1280 }
1281