1 /*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26 /*
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32 /*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include "../../include/linux/libcfs/libcfs.h"
40
41
42 #include "../include/lustre_dlm.h"
43 #include "../include/lustre_net.h"
44 #include "../include/lustre/lustre_user.h"
45 #include "../include/obd_cksum.h"
46
47 #include "../include/lustre_ha.h"
48 #include "../include/lprocfs_status.h"
49 #include "../include/lustre_debug.h"
50 #include "../include/lustre_param.h"
51 #include "../include/lustre_fid.h"
52 #include "../include/obd_class.h"
53 #include "osc_internal.h"
54 #include "osc_cl_internal.h"
55
56 struct osc_brw_async_args {
57 struct obdo *aa_oa;
58 int aa_requested_nob;
59 int aa_nio_count;
60 u32 aa_page_count;
61 int aa_resends;
62 struct brw_page **aa_ppga;
63 struct client_obd *aa_cli;
64 struct list_head aa_oaps;
65 struct list_head aa_exts;
66 struct obd_capa *aa_ocapa;
67 struct cl_req *aa_clerq;
68 };
69
70 struct osc_async_args {
71 struct obd_info *aa_oi;
72 };
73
74 struct osc_setattr_args {
75 struct obdo *sa_oa;
76 obd_enqueue_update_f sa_upcall;
77 void *sa_cookie;
78 };
79
80 struct osc_fsync_args {
81 struct obd_info *fa_oi;
82 obd_enqueue_update_f fa_upcall;
83 void *fa_cookie;
84 };
85
86 struct osc_enqueue_args {
87 struct obd_export *oa_exp;
88 __u64 *oa_flags;
89 obd_enqueue_update_f oa_upcall;
90 void *oa_cookie;
91 struct ost_lvb *oa_lvb;
92 struct lustre_handle *oa_lockh;
93 struct ldlm_enqueue_info *oa_ei;
94 unsigned int oa_agl:1;
95 };
96
97 static void osc_release_ppga(struct brw_page **ppga, u32 count);
98 static int brw_interpret(const struct lu_env *env,
99 struct ptlrpc_request *req, void *data, int rc);
100 int osc_cleanup(struct obd_device *obd);
101
102 /* Pack OSC object metadata for disk storage (LE byte order). */
osc_packmd(struct obd_export * exp,struct lov_mds_md ** lmmp,struct lov_stripe_md * lsm)103 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
104 struct lov_stripe_md *lsm)
105 {
106 int lmm_size;
107
108 lmm_size = sizeof(**lmmp);
109 if (lmmp == NULL)
110 return lmm_size;
111
112 if (*lmmp != NULL && lsm == NULL) {
113 OBD_FREE(*lmmp, lmm_size);
114 *lmmp = NULL;
115 return 0;
116 } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
117 return -EBADF;
118 }
119
120 if (*lmmp == NULL) {
121 OBD_ALLOC(*lmmp, lmm_size);
122 if (*lmmp == NULL)
123 return -ENOMEM;
124 }
125
126 if (lsm)
127 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
128
129 return lmm_size;
130 }
131
132 /* Unpack OSC object metadata from disk storage (LE byte order). */
osc_unpackmd(struct obd_export * exp,struct lov_stripe_md ** lsmp,struct lov_mds_md * lmm,int lmm_bytes)133 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
134 struct lov_mds_md *lmm, int lmm_bytes)
135 {
136 int lsm_size;
137 struct obd_import *imp = class_exp2cliimp(exp);
138
139 if (lmm != NULL) {
140 if (lmm_bytes < sizeof(*lmm)) {
141 CERROR("%s: lov_mds_md too small: %d, need %d\n",
142 exp->exp_obd->obd_name, lmm_bytes,
143 (int)sizeof(*lmm));
144 return -EINVAL;
145 }
146 /* XXX LOV_MAGIC etc check? */
147
148 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
149 CERROR("%s: zero lmm_object_id: rc = %d\n",
150 exp->exp_obd->obd_name, -EINVAL);
151 return -EINVAL;
152 }
153 }
154
155 lsm_size = lov_stripe_md_size(1);
156 if (lsmp == NULL)
157 return lsm_size;
158
159 if (*lsmp != NULL && lmm == NULL) {
160 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
161 OBD_FREE(*lsmp, lsm_size);
162 *lsmp = NULL;
163 return 0;
164 }
165
166 if (*lsmp == NULL) {
167 OBD_ALLOC(*lsmp, lsm_size);
168 if (unlikely(*lsmp == NULL))
169 return -ENOMEM;
170 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
171 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
172 OBD_FREE(*lsmp, lsm_size);
173 return -ENOMEM;
174 }
175 loi_init((*lsmp)->lsm_oinfo[0]);
176 } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
177 return -EBADF;
178 }
179
180 if (lmm != NULL)
181 /* XXX zero *lsmp? */
182 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
183
184 if (imp != NULL &&
185 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
186 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
187 else
188 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
189
190 return lsm_size;
191 }
192
osc_pack_capa(struct ptlrpc_request * req,struct ost_body * body,void * capa)193 static inline void osc_pack_capa(struct ptlrpc_request *req,
194 struct ost_body *body, void *capa)
195 {
196 struct obd_capa *oc = (struct obd_capa *)capa;
197 struct lustre_capa *c;
198
199 if (!capa)
200 return;
201
202 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
203 LASSERT(c);
204 capa_cpy(c, oc);
205 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
206 DEBUG_CAPA(D_SEC, c, "pack");
207 }
208
osc_pack_req_body(struct ptlrpc_request * req,struct obd_info * oinfo)209 static inline void osc_pack_req_body(struct ptlrpc_request *req,
210 struct obd_info *oinfo)
211 {
212 struct ost_body *body;
213
214 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
215 LASSERT(body);
216
217 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
218 oinfo->oi_oa);
219 osc_pack_capa(req, body, oinfo->oi_capa);
220 }
221
osc_set_capa_size(struct ptlrpc_request * req,const struct req_msg_field * field,struct obd_capa * oc)222 static inline void osc_set_capa_size(struct ptlrpc_request *req,
223 const struct req_msg_field *field,
224 struct obd_capa *oc)
225 {
226 if (oc == NULL)
227 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
228 else
229 /* it is already calculated as sizeof struct obd_capa */
230 ;
231 }
232
osc_getattr_interpret(const struct lu_env * env,struct ptlrpc_request * req,struct osc_async_args * aa,int rc)233 static int osc_getattr_interpret(const struct lu_env *env,
234 struct ptlrpc_request *req,
235 struct osc_async_args *aa, int rc)
236 {
237 struct ost_body *body;
238
239 if (rc != 0)
240 goto out;
241
242 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
243 if (body) {
244 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
245 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
246 aa->aa_oi->oi_oa, &body->oa);
247
248 /* This should really be sent by the OST */
249 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
250 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
251 } else {
252 CDEBUG(D_INFO, "can't unpack ost_body\n");
253 rc = -EPROTO;
254 aa->aa_oi->oi_oa->o_valid = 0;
255 }
256 out:
257 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
258 return rc;
259 }
260
osc_getattr_async(struct obd_export * exp,struct obd_info * oinfo,struct ptlrpc_request_set * set)261 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
262 struct ptlrpc_request_set *set)
263 {
264 struct ptlrpc_request *req;
265 struct osc_async_args *aa;
266 int rc;
267
268 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
269 if (req == NULL)
270 return -ENOMEM;
271
272 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
273 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
274 if (rc) {
275 ptlrpc_request_free(req);
276 return rc;
277 }
278
279 osc_pack_req_body(req, oinfo);
280
281 ptlrpc_request_set_replen(req);
282 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
283
284 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
285 aa = ptlrpc_req_async_args(req);
286 aa->aa_oi = oinfo;
287
288 ptlrpc_set_add_req(set, req);
289 return 0;
290 }
291
osc_getattr(const struct lu_env * env,struct obd_export * exp,struct obd_info * oinfo)292 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
293 struct obd_info *oinfo)
294 {
295 struct ptlrpc_request *req;
296 struct ost_body *body;
297 int rc;
298
299 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
300 if (req == NULL)
301 return -ENOMEM;
302
303 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
304 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
305 if (rc) {
306 ptlrpc_request_free(req);
307 return rc;
308 }
309
310 osc_pack_req_body(req, oinfo);
311
312 ptlrpc_request_set_replen(req);
313
314 rc = ptlrpc_queue_wait(req);
315 if (rc)
316 goto out;
317
318 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
319 if (body == NULL) {
320 rc = -EPROTO;
321 goto out;
322 }
323
324 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
325 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
326 &body->oa);
327
328 oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
329 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
330
331 out:
332 ptlrpc_req_finished(req);
333 return rc;
334 }
335
osc_setattr(const struct lu_env * env,struct obd_export * exp,struct obd_info * oinfo,struct obd_trans_info * oti)336 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
337 struct obd_info *oinfo, struct obd_trans_info *oti)
338 {
339 struct ptlrpc_request *req;
340 struct ost_body *body;
341 int rc;
342
343 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
344
345 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
346 if (req == NULL)
347 return -ENOMEM;
348
349 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
350 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
351 if (rc) {
352 ptlrpc_request_free(req);
353 return rc;
354 }
355
356 osc_pack_req_body(req, oinfo);
357
358 ptlrpc_request_set_replen(req);
359
360 rc = ptlrpc_queue_wait(req);
361 if (rc)
362 goto out;
363
364 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
365 if (body == NULL) {
366 rc = -EPROTO;
367 goto out;
368 }
369
370 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
371 &body->oa);
372
373 out:
374 ptlrpc_req_finished(req);
375 return rc;
376 }
377
osc_setattr_interpret(const struct lu_env * env,struct ptlrpc_request * req,struct osc_setattr_args * sa,int rc)378 static int osc_setattr_interpret(const struct lu_env *env,
379 struct ptlrpc_request *req,
380 struct osc_setattr_args *sa, int rc)
381 {
382 struct ost_body *body;
383
384 if (rc != 0)
385 goto out;
386
387 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
388 if (body == NULL) {
389 rc = -EPROTO;
390 goto out;
391 }
392
393 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
394 &body->oa);
395 out:
396 rc = sa->sa_upcall(sa->sa_cookie, rc);
397 return rc;
398 }
399
osc_setattr_async_base(struct obd_export * exp,struct obd_info * oinfo,struct obd_trans_info * oti,obd_enqueue_update_f upcall,void * cookie,struct ptlrpc_request_set * rqset)400 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
401 struct obd_trans_info *oti,
402 obd_enqueue_update_f upcall, void *cookie,
403 struct ptlrpc_request_set *rqset)
404 {
405 struct ptlrpc_request *req;
406 struct osc_setattr_args *sa;
407 int rc;
408
409 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
410 if (req == NULL)
411 return -ENOMEM;
412
413 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
414 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
415 if (rc) {
416 ptlrpc_request_free(req);
417 return rc;
418 }
419
420 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
421 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
422
423 osc_pack_req_body(req, oinfo);
424
425 ptlrpc_request_set_replen(req);
426
427 /* do mds to ost setattr asynchronously */
428 if (!rqset) {
429 /* Do not wait for response. */
430 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
431 } else {
432 req->rq_interpret_reply =
433 (ptlrpc_interpterer_t)osc_setattr_interpret;
434
435 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
436 sa = ptlrpc_req_async_args(req);
437 sa->sa_oa = oinfo->oi_oa;
438 sa->sa_upcall = upcall;
439 sa->sa_cookie = cookie;
440
441 if (rqset == PTLRPCD_SET)
442 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
443 else
444 ptlrpc_set_add_req(rqset, req);
445 }
446
447 return 0;
448 }
449
osc_setattr_async(struct obd_export * exp,struct obd_info * oinfo,struct obd_trans_info * oti,struct ptlrpc_request_set * rqset)450 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
451 struct obd_trans_info *oti,
452 struct ptlrpc_request_set *rqset)
453 {
454 return osc_setattr_async_base(exp, oinfo, oti,
455 oinfo->oi_cb_up, oinfo, rqset);
456 }
457
osc_real_create(struct obd_export * exp,struct obdo * oa,struct lov_stripe_md ** ea,struct obd_trans_info * oti)458 int osc_real_create(struct obd_export *exp, struct obdo *oa,
459 struct lov_stripe_md **ea, struct obd_trans_info *oti)
460 {
461 struct ptlrpc_request *req;
462 struct ost_body *body;
463 struct lov_stripe_md *lsm;
464 int rc;
465
466 LASSERT(oa);
467 LASSERT(ea);
468
469 lsm = *ea;
470 if (!lsm) {
471 rc = obd_alloc_memmd(exp, &lsm);
472 if (rc < 0)
473 return rc;
474 }
475
476 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
477 if (req == NULL) {
478 rc = -ENOMEM;
479 goto out;
480 }
481
482 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
483 if (rc) {
484 ptlrpc_request_free(req);
485 goto out;
486 }
487
488 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
489 LASSERT(body);
490
491 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
492
493 ptlrpc_request_set_replen(req);
494
495 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
496 oa->o_flags == OBD_FL_DELORPHAN) {
497 DEBUG_REQ(D_HA, req,
498 "delorphan from OST integration");
499 /* Don't resend the delorphan req */
500 req->rq_no_resend = req->rq_no_delay = 1;
501 }
502
503 rc = ptlrpc_queue_wait(req);
504 if (rc)
505 goto out_req;
506
507 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
508 if (body == NULL) {
509 rc = -EPROTO;
510 goto out_req;
511 }
512
513 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
514 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
515
516 oa->o_blksize = cli_brw_size(exp->exp_obd);
517 oa->o_valid |= OBD_MD_FLBLKSZ;
518
519 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
520 * have valid lsm_oinfo data structs, so don't go touching that.
521 * This needs to be fixed in a big way.
522 */
523 lsm->lsm_oi = oa->o_oi;
524 *ea = lsm;
525
526 if (oti != NULL) {
527 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
528
529 if (oa->o_valid & OBD_MD_FLCOOKIE) {
530 if (!oti->oti_logcookies)
531 oti_alloc_cookies(oti, 1);
532 *oti->oti_logcookies = oa->o_lcookie;
533 }
534 }
535
536 CDEBUG(D_HA, "transno: %lld\n",
537 lustre_msg_get_transno(req->rq_repmsg));
538 out_req:
539 ptlrpc_req_finished(req);
540 out:
541 if (rc && !*ea)
542 obd_free_memmd(exp, &lsm);
543 return rc;
544 }
545
osc_punch_base(struct obd_export * exp,struct obd_info * oinfo,obd_enqueue_update_f upcall,void * cookie,struct ptlrpc_request_set * rqset)546 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
547 obd_enqueue_update_f upcall, void *cookie,
548 struct ptlrpc_request_set *rqset)
549 {
550 struct ptlrpc_request *req;
551 struct osc_setattr_args *sa;
552 struct ost_body *body;
553 int rc;
554
555 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
556 if (req == NULL)
557 return -ENOMEM;
558
559 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
560 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
561 if (rc) {
562 ptlrpc_request_free(req);
563 return rc;
564 }
565 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
566 ptlrpc_at_set_req_timeout(req);
567
568 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
569 LASSERT(body);
570 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
571 oinfo->oi_oa);
572 osc_pack_capa(req, body, oinfo->oi_capa);
573
574 ptlrpc_request_set_replen(req);
575
576 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
577 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
578 sa = ptlrpc_req_async_args(req);
579 sa->sa_oa = oinfo->oi_oa;
580 sa->sa_upcall = upcall;
581 sa->sa_cookie = cookie;
582 if (rqset == PTLRPCD_SET)
583 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
584 else
585 ptlrpc_set_add_req(rqset, req);
586
587 return 0;
588 }
589
osc_sync_interpret(const struct lu_env * env,struct ptlrpc_request * req,void * arg,int rc)590 static int osc_sync_interpret(const struct lu_env *env,
591 struct ptlrpc_request *req,
592 void *arg, int rc)
593 {
594 struct osc_fsync_args *fa = arg;
595 struct ost_body *body;
596
597 if (rc)
598 goto out;
599
600 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
601 if (body == NULL) {
602 CERROR ("can't unpack ost_body\n");
603 rc = -EPROTO;
604 goto out;
605 }
606
607 *fa->fa_oi->oi_oa = body->oa;
608 out:
609 rc = fa->fa_upcall(fa->fa_cookie, rc);
610 return rc;
611 }
612
osc_sync_base(struct obd_export * exp,struct obd_info * oinfo,obd_enqueue_update_f upcall,void * cookie,struct ptlrpc_request_set * rqset)613 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
614 obd_enqueue_update_f upcall, void *cookie,
615 struct ptlrpc_request_set *rqset)
616 {
617 struct ptlrpc_request *req;
618 struct ost_body *body;
619 struct osc_fsync_args *fa;
620 int rc;
621
622 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
623 if (req == NULL)
624 return -ENOMEM;
625
626 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
627 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
628 if (rc) {
629 ptlrpc_request_free(req);
630 return rc;
631 }
632
633 /* overload the size and blocks fields in the oa with start/end */
634 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
635 LASSERT(body);
636 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
637 oinfo->oi_oa);
638 osc_pack_capa(req, body, oinfo->oi_capa);
639
640 ptlrpc_request_set_replen(req);
641 req->rq_interpret_reply = osc_sync_interpret;
642
643 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
644 fa = ptlrpc_req_async_args(req);
645 fa->fa_oi = oinfo;
646 fa->fa_upcall = upcall;
647 fa->fa_cookie = cookie;
648
649 if (rqset == PTLRPCD_SET)
650 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
651 else
652 ptlrpc_set_add_req(rqset, req);
653
654 return 0;
655 }
656
657 /* Find and cancel locally locks matched by @mode in the resource found by
658 * @objid. Found locks are added into @cancel list. Returns the amount of
659 * locks added to @cancels list. */
osc_resource_get_unused(struct obd_export * exp,struct obdo * oa,struct list_head * cancels,ldlm_mode_t mode,__u64 lock_flags)660 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
661 struct list_head *cancels,
662 ldlm_mode_t mode, __u64 lock_flags)
663 {
664 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
665 struct ldlm_res_id res_id;
666 struct ldlm_resource *res;
667 int count;
668
669 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
670 * export) but disabled through procfs (flag in NS).
671 *
672 * This distinguishes from a case when ELC is not supported originally,
673 * when we still want to cancel locks in advance and just cancel them
674 * locally, without sending any RPC. */
675 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
676 return 0;
677
678 ostid_build_res_name(&oa->o_oi, &res_id);
679 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
680 if (res == NULL)
681 return 0;
682
683 LDLM_RESOURCE_ADDREF(res);
684 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
685 lock_flags, 0, NULL);
686 LDLM_RESOURCE_DELREF(res);
687 ldlm_resource_putref(res);
688 return count;
689 }
690
osc_destroy_interpret(const struct lu_env * env,struct ptlrpc_request * req,void * data,int rc)691 static int osc_destroy_interpret(const struct lu_env *env,
692 struct ptlrpc_request *req, void *data,
693 int rc)
694 {
695 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
696
697 atomic_dec(&cli->cl_destroy_in_flight);
698 wake_up(&cli->cl_destroy_waitq);
699 return 0;
700 }
701
osc_can_send_destroy(struct client_obd * cli)702 static int osc_can_send_destroy(struct client_obd *cli)
703 {
704 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
705 cli->cl_max_rpcs_in_flight) {
706 /* The destroy request can be sent */
707 return 1;
708 }
709 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
710 cli->cl_max_rpcs_in_flight) {
711 /*
712 * The counter has been modified between the two atomic
713 * operations.
714 */
715 wake_up(&cli->cl_destroy_waitq);
716 }
717 return 0;
718 }
719
osc_create(const struct lu_env * env,struct obd_export * exp,struct obdo * oa,struct lov_stripe_md ** ea,struct obd_trans_info * oti)720 int osc_create(const struct lu_env *env, struct obd_export *exp,
721 struct obdo *oa, struct lov_stripe_md **ea,
722 struct obd_trans_info *oti)
723 {
724 int rc = 0;
725
726 LASSERT(oa);
727 LASSERT(ea);
728 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
729
730 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
731 oa->o_flags == OBD_FL_RECREATE_OBJS) {
732 return osc_real_create(exp, oa, ea, oti);
733 }
734
735 if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
736 return osc_real_create(exp, oa, ea, oti);
737
738 /* we should not get here anymore */
739 LBUG();
740
741 return rc;
742 }
743
744 /* Destroy requests can be async always on the client, and we don't even really
745 * care about the return code since the client cannot do anything at all about
746 * a destroy failure.
747 * When the MDS is unlinking a filename, it saves the file objects into a
748 * recovery llog, and these object records are cancelled when the OST reports
749 * they were destroyed and sync'd to disk (i.e. transaction committed).
750 * If the client dies, or the OST is down when the object should be destroyed,
751 * the records are not cancelled, and when the OST reconnects to the MDS next,
752 * it will retrieve the llog unlink logs and then sends the log cancellation
753 * cookies to the MDS after committing destroy transactions. */
osc_destroy(const struct lu_env * env,struct obd_export * exp,struct obdo * oa,struct lov_stripe_md * ea,struct obd_trans_info * oti,struct obd_export * md_export,void * capa)754 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
755 struct obdo *oa, struct lov_stripe_md *ea,
756 struct obd_trans_info *oti, struct obd_export *md_export,
757 void *capa)
758 {
759 struct client_obd *cli = &exp->exp_obd->u.cli;
760 struct ptlrpc_request *req;
761 struct ost_body *body;
762 LIST_HEAD(cancels);
763 int rc, count;
764
765 if (!oa) {
766 CDEBUG(D_INFO, "oa NULL\n");
767 return -EINVAL;
768 }
769
770 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
771 LDLM_FL_DISCARD_DATA);
772
773 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
774 if (req == NULL) {
775 ldlm_lock_list_put(&cancels, l_bl_ast, count);
776 return -ENOMEM;
777 }
778
779 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
780 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
781 0, &cancels, count);
782 if (rc) {
783 ptlrpc_request_free(req);
784 return rc;
785 }
786
787 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
788 ptlrpc_at_set_req_timeout(req);
789
790 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
791 oa->o_lcookie = *oti->oti_logcookies;
792 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
793 LASSERT(body);
794 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
795
796 osc_pack_capa(req, body, (struct obd_capa *)capa);
797 ptlrpc_request_set_replen(req);
798
799 /* If osc_destroy is for destroying the unlink orphan,
800 * sent from MDT to OST, which should not be blocked here,
801 * because the process might be triggered by ptlrpcd, and
802 * it is not good to block ptlrpcd thread (b=16006)*/
803 if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
804 req->rq_interpret_reply = osc_destroy_interpret;
805 if (!osc_can_send_destroy(cli)) {
806 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
807 NULL);
808
809 /*
810 * Wait until the number of on-going destroy RPCs drops
811 * under max_rpc_in_flight
812 */
813 l_wait_event_exclusive(cli->cl_destroy_waitq,
814 osc_can_send_destroy(cli), &lwi);
815 }
816 }
817
818 /* Do not wait for response */
819 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
820 return 0;
821 }
822
osc_announce_cached(struct client_obd * cli,struct obdo * oa,long writing_bytes)823 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
824 long writing_bytes)
825 {
826 u32 bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
827
828 LASSERT(!(oa->o_valid & bits));
829
830 oa->o_valid |= bits;
831 client_obd_list_lock(&cli->cl_loi_list_lock);
832 oa->o_dirty = cli->cl_dirty;
833 if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
834 cli->cl_dirty_max)) {
835 CERROR("dirty %lu - %lu > dirty_max %lu\n",
836 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
837 oa->o_undirty = 0;
838 } else if (unlikely(atomic_read(&obd_dirty_pages) -
839 atomic_read(&obd_dirty_transit_pages) >
840 (long)(obd_max_dirty_pages + 1))) {
841 /* The atomic_read() allowing the atomic_inc() are
842 * not covered by a lock thus they may safely race and trip
843 * this CERROR() unless we add in a small fudge factor (+1). */
844 CERROR("dirty %d - %d > system dirty_max %d\n",
845 atomic_read(&obd_dirty_pages),
846 atomic_read(&obd_dirty_transit_pages),
847 obd_max_dirty_pages);
848 oa->o_undirty = 0;
849 } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
850 CERROR("dirty %lu - dirty_max %lu too big???\n",
851 cli->cl_dirty, cli->cl_dirty_max);
852 oa->o_undirty = 0;
853 } else {
854 long max_in_flight = (cli->cl_max_pages_per_rpc <<
855 PAGE_CACHE_SHIFT)*
856 (cli->cl_max_rpcs_in_flight + 1);
857 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
858 }
859 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
860 oa->o_dropped = cli->cl_lost_grant;
861 cli->cl_lost_grant = 0;
862 client_obd_list_unlock(&cli->cl_loi_list_lock);
863 CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
864 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
865
866 }
867
osc_update_next_shrink(struct client_obd * cli)868 void osc_update_next_shrink(struct client_obd *cli)
869 {
870 cli->cl_next_shrink_grant =
871 cfs_time_shift(cli->cl_grant_shrink_interval);
872 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
873 cli->cl_next_shrink_grant);
874 }
875
__osc_update_grant(struct client_obd * cli,u64 grant)876 static void __osc_update_grant(struct client_obd *cli, u64 grant)
877 {
878 client_obd_list_lock(&cli->cl_loi_list_lock);
879 cli->cl_avail_grant += grant;
880 client_obd_list_unlock(&cli->cl_loi_list_lock);
881 }
882
osc_update_grant(struct client_obd * cli,struct ost_body * body)883 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
884 {
885 if (body->oa.o_valid & OBD_MD_FLGRANT) {
886 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
887 __osc_update_grant(cli, body->oa.o_grant);
888 }
889 }
890
891 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
892 u32 keylen, void *key, u32 vallen,
893 void *val, struct ptlrpc_request_set *set);
894
osc_shrink_grant_interpret(const struct lu_env * env,struct ptlrpc_request * req,void * aa,int rc)895 static int osc_shrink_grant_interpret(const struct lu_env *env,
896 struct ptlrpc_request *req,
897 void *aa, int rc)
898 {
899 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
900 struct obdo *oa = ((struct osc_brw_async_args *)aa)->aa_oa;
901 struct ost_body *body;
902
903 if (rc != 0) {
904 __osc_update_grant(cli, oa->o_grant);
905 goto out;
906 }
907
908 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
909 LASSERT(body);
910 osc_update_grant(cli, body);
911 out:
912 OBDO_FREE(oa);
913 return rc;
914 }
915
osc_shrink_grant_local(struct client_obd * cli,struct obdo * oa)916 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
917 {
918 client_obd_list_lock(&cli->cl_loi_list_lock);
919 oa->o_grant = cli->cl_avail_grant / 4;
920 cli->cl_avail_grant -= oa->o_grant;
921 client_obd_list_unlock(&cli->cl_loi_list_lock);
922 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
923 oa->o_valid |= OBD_MD_FLFLAGS;
924 oa->o_flags = 0;
925 }
926 oa->o_flags |= OBD_FL_SHRINK_GRANT;
927 osc_update_next_shrink(cli);
928 }
929
930 /* Shrink the current grant, either from some large amount to enough for a
931 * full set of in-flight RPCs, or if we have already shrunk to that limit
932 * then to enough for a single RPC. This avoids keeping more grant than
933 * needed, and avoids shrinking the grant piecemeal. */
osc_shrink_grant(struct client_obd * cli)934 static int osc_shrink_grant(struct client_obd *cli)
935 {
936 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
937 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
938
939 client_obd_list_lock(&cli->cl_loi_list_lock);
940 if (cli->cl_avail_grant <= target_bytes)
941 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
942 client_obd_list_unlock(&cli->cl_loi_list_lock);
943
944 return osc_shrink_grant_to_target(cli, target_bytes);
945 }
946
osc_shrink_grant_to_target(struct client_obd * cli,__u64 target_bytes)947 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
948 {
949 int rc = 0;
950 struct ost_body *body;
951
952 client_obd_list_lock(&cli->cl_loi_list_lock);
953 /* Don't shrink if we are already above or below the desired limit
954 * We don't want to shrink below a single RPC, as that will negatively
955 * impact block allocation and long-term performance. */
956 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
957 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
958
959 if (target_bytes >= cli->cl_avail_grant) {
960 client_obd_list_unlock(&cli->cl_loi_list_lock);
961 return 0;
962 }
963 client_obd_list_unlock(&cli->cl_loi_list_lock);
964
965 OBD_ALLOC_PTR(body);
966 if (!body)
967 return -ENOMEM;
968
969 osc_announce_cached(cli, &body->oa, 0);
970
971 client_obd_list_lock(&cli->cl_loi_list_lock);
972 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
973 cli->cl_avail_grant = target_bytes;
974 client_obd_list_unlock(&cli->cl_loi_list_lock);
975 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
976 body->oa.o_valid |= OBD_MD_FLFLAGS;
977 body->oa.o_flags = 0;
978 }
979 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
980 osc_update_next_shrink(cli);
981
982 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
983 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
984 sizeof(*body), body, NULL);
985 if (rc != 0)
986 __osc_update_grant(cli, body->oa.o_grant);
987 OBD_FREE_PTR(body);
988 return rc;
989 }
990
osc_should_shrink_grant(struct client_obd * client)991 static int osc_should_shrink_grant(struct client_obd *client)
992 {
993 unsigned long time = cfs_time_current();
994 unsigned long next_shrink = client->cl_next_shrink_grant;
995
996 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
997 OBD_CONNECT_GRANT_SHRINK) == 0)
998 return 0;
999
1000 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1001 /* Get the current RPC size directly, instead of going via:
1002 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
1003 * Keep comment here so that it can be found by searching. */
1004 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
1005
1006 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1007 client->cl_avail_grant > brw_size)
1008 return 1;
1009 else
1010 osc_update_next_shrink(client);
1011 }
1012 return 0;
1013 }
1014
osc_grant_shrink_grant_cb(struct timeout_item * item,void * data)1015 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1016 {
1017 struct client_obd *client;
1018
1019 list_for_each_entry(client, &item->ti_obd_list,
1020 cl_grant_shrink_list) {
1021 if (osc_should_shrink_grant(client))
1022 osc_shrink_grant(client);
1023 }
1024 return 0;
1025 }
1026
osc_add_shrink_grant(struct client_obd * client)1027 static int osc_add_shrink_grant(struct client_obd *client)
1028 {
1029 int rc;
1030
1031 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1032 TIMEOUT_GRANT,
1033 osc_grant_shrink_grant_cb, NULL,
1034 &client->cl_grant_shrink_list);
1035 if (rc) {
1036 CERROR("add grant client %s error %d\n",
1037 client->cl_import->imp_obd->obd_name, rc);
1038 return rc;
1039 }
1040 CDEBUG(D_CACHE, "add grant client %s \n",
1041 client->cl_import->imp_obd->obd_name);
1042 osc_update_next_shrink(client);
1043 return 0;
1044 }
1045
osc_del_shrink_grant(struct client_obd * client)1046 static int osc_del_shrink_grant(struct client_obd *client)
1047 {
1048 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1049 TIMEOUT_GRANT);
1050 }
1051
osc_init_grant(struct client_obd * cli,struct obd_connect_data * ocd)1052 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1053 {
1054 /*
1055 * ocd_grant is the total grant amount we're expect to hold: if we've
1056 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1057 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1058 *
1059 * race is tolerable here: if we're evicted, but imp_state already
1060 * left EVICTED state, then cl_dirty must be 0 already.
1061 */
1062 client_obd_list_lock(&cli->cl_loi_list_lock);
1063 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1064 cli->cl_avail_grant = ocd->ocd_grant;
1065 else
1066 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1067
1068 if (cli->cl_avail_grant < 0) {
1069 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1070 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1071 ocd->ocd_grant, cli->cl_dirty);
1072 /* workaround for servers which do not have the patch from
1073 * LU-2679 */
1074 cli->cl_avail_grant = ocd->ocd_grant;
1075 }
1076
1077 /* determine the appropriate chunk size used by osc_extent. */
1078 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1079 client_obd_list_unlock(&cli->cl_loi_list_lock);
1080
1081 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld chunk bits: %d\n",
1082 cli->cl_import->imp_obd->obd_name,
1083 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1084
1085 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1086 list_empty(&cli->cl_grant_shrink_list))
1087 osc_add_shrink_grant(cli);
1088 }
1089
1090 /* We assume that the reason this OSC got a short read is because it read
1091 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1092 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1093 * this stripe never got written at or beyond this stripe offset yet. */
handle_short_read(int nob_read,u32 page_count,struct brw_page ** pga)1094 static void handle_short_read(int nob_read, u32 page_count,
1095 struct brw_page **pga)
1096 {
1097 char *ptr;
1098 int i = 0;
1099
1100 /* skip bytes read OK */
1101 while (nob_read > 0) {
1102 LASSERT (page_count > 0);
1103
1104 if (pga[i]->count > nob_read) {
1105 /* EOF inside this page */
1106 ptr = kmap(pga[i]->pg) +
1107 (pga[i]->off & ~CFS_PAGE_MASK);
1108 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1109 kunmap(pga[i]->pg);
1110 page_count--;
1111 i++;
1112 break;
1113 }
1114
1115 nob_read -= pga[i]->count;
1116 page_count--;
1117 i++;
1118 }
1119
1120 /* zero remaining pages */
1121 while (page_count-- > 0) {
1122 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1123 memset(ptr, 0, pga[i]->count);
1124 kunmap(pga[i]->pg);
1125 i++;
1126 }
1127 }
1128
check_write_rcs(struct ptlrpc_request * req,int requested_nob,int niocount,u32 page_count,struct brw_page ** pga)1129 static int check_write_rcs(struct ptlrpc_request *req,
1130 int requested_nob, int niocount,
1131 u32 page_count, struct brw_page **pga)
1132 {
1133 int i;
1134 __u32 *remote_rcs;
1135
1136 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1137 sizeof(*remote_rcs) *
1138 niocount);
1139 if (remote_rcs == NULL) {
1140 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1141 return -EPROTO;
1142 }
1143
1144 /* return error if any niobuf was in error */
1145 for (i = 0; i < niocount; i++) {
1146 if ((int)remote_rcs[i] < 0)
1147 return remote_rcs[i];
1148
1149 if (remote_rcs[i] != 0) {
1150 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1151 i, remote_rcs[i], req);
1152 return -EPROTO;
1153 }
1154 }
1155
1156 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1157 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1158 req->rq_bulk->bd_nob_transferred, requested_nob);
1159 return -EPROTO;
1160 }
1161
1162 return 0;
1163 }
1164
can_merge_pages(struct brw_page * p1,struct brw_page * p2)1165 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1166 {
1167 if (p1->flag != p2->flag) {
1168 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1169 OBD_BRW_SYNC | OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1170
1171 /* warn if we try to combine flags that we don't know to be
1172 * safe to combine */
1173 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1174 CWARN("Saw flags 0x%x and 0x%x in the same brw, please report this at http://bugs.whamcloud.com/\n",
1175 p1->flag, p2->flag);
1176 }
1177 return 0;
1178 }
1179
1180 return (p1->off + p1->count == p2->off);
1181 }
1182
osc_checksum_bulk(int nob,u32 pg_count,struct brw_page ** pga,int opc,cksum_type_t cksum_type)1183 static u32 osc_checksum_bulk(int nob, u32 pg_count,
1184 struct brw_page **pga, int opc,
1185 cksum_type_t cksum_type)
1186 {
1187 __u32 cksum;
1188 int i = 0;
1189 struct cfs_crypto_hash_desc *hdesc;
1190 unsigned int bufsize;
1191 int err;
1192 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1193
1194 LASSERT(pg_count > 0);
1195
1196 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1197 if (IS_ERR(hdesc)) {
1198 CERROR("Unable to initialize checksum hash %s\n",
1199 cfs_crypto_hash_name(cfs_alg));
1200 return PTR_ERR(hdesc);
1201 }
1202
1203 while (nob > 0 && pg_count > 0) {
1204 int count = pga[i]->count > nob ? nob : pga[i]->count;
1205
1206 /* corrupt the data before we compute the checksum, to
1207 * simulate an OST->client data error */
1208 if (i == 0 && opc == OST_READ &&
1209 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1210 unsigned char *ptr = kmap(pga[i]->pg);
1211 int off = pga[i]->off & ~CFS_PAGE_MASK;
1212 memcpy(ptr + off, "bad1", min(4, nob));
1213 kunmap(pga[i]->pg);
1214 }
1215 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1216 pga[i]->off & ~CFS_PAGE_MASK,
1217 count);
1218 CDEBUG(D_PAGE,
1219 "page %p map %p index %lu flags %lx count %u priv %0lx: off %d\n",
1220 pga[i]->pg, pga[i]->pg->mapping, pga[i]->pg->index,
1221 (long)pga[i]->pg->flags, page_count(pga[i]->pg),
1222 page_private(pga[i]->pg),
1223 (int)(pga[i]->off & ~CFS_PAGE_MASK));
1224
1225 nob -= pga[i]->count;
1226 pg_count--;
1227 i++;
1228 }
1229
1230 bufsize = 4;
1231 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1232
1233 if (err)
1234 cfs_crypto_hash_final(hdesc, NULL, NULL);
1235
1236 /* For sending we only compute the wrong checksum instead
1237 * of corrupting the data so it is still correct on a redo */
1238 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1239 cksum++;
1240
1241 return cksum;
1242 }
1243
osc_brw_prep_request(int cmd,struct client_obd * cli,struct obdo * oa,struct lov_stripe_md * lsm,u32 page_count,struct brw_page ** pga,struct ptlrpc_request ** reqp,struct obd_capa * ocapa,int reserve,int resend)1244 static int osc_brw_prep_request(int cmd, struct client_obd *cli,
1245 struct obdo *oa,
1246 struct lov_stripe_md *lsm, u32 page_count,
1247 struct brw_page **pga,
1248 struct ptlrpc_request **reqp,
1249 struct obd_capa *ocapa, int reserve,
1250 int resend)
1251 {
1252 struct ptlrpc_request *req;
1253 struct ptlrpc_bulk_desc *desc;
1254 struct ost_body *body;
1255 struct obd_ioobj *ioobj;
1256 struct niobuf_remote *niobuf;
1257 int niocount, i, requested_nob, opc, rc;
1258 struct osc_brw_async_args *aa;
1259 struct req_capsule *pill;
1260 struct brw_page *pg_prev;
1261
1262 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1263 return -ENOMEM; /* Recoverable */
1264 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1265 return -EINVAL; /* Fatal */
1266
1267 if ((cmd & OBD_BRW_WRITE) != 0) {
1268 opc = OST_WRITE;
1269 req = ptlrpc_request_alloc_pool(cli->cl_import,
1270 cli->cl_import->imp_rq_pool,
1271 &RQF_OST_BRW_WRITE);
1272 } else {
1273 opc = OST_READ;
1274 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1275 }
1276 if (req == NULL)
1277 return -ENOMEM;
1278
1279 for (niocount = i = 1; i < page_count; i++) {
1280 if (!can_merge_pages(pga[i - 1], pga[i]))
1281 niocount++;
1282 }
1283
1284 pill = &req->rq_pill;
1285 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1286 sizeof(*ioobj));
1287 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1288 niocount * sizeof(*niobuf));
1289 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1290
1291 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1292 if (rc) {
1293 ptlrpc_request_free(req);
1294 return rc;
1295 }
1296 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1297 ptlrpc_at_set_req_timeout(req);
1298 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1299 * retry logic */
1300 req->rq_no_retry_einprogress = 1;
1301
1302 desc = ptlrpc_prep_bulk_imp(req, page_count,
1303 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1304 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1305 OST_BULK_PORTAL);
1306
1307 if (desc == NULL) {
1308 rc = -ENOMEM;
1309 goto out;
1310 }
1311 /* NB request now owns desc and will free it when it gets freed */
1312
1313 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1314 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1315 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1316 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1317
1318 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1319
1320 obdo_to_ioobj(oa, ioobj);
1321 ioobj->ioo_bufcnt = niocount;
1322 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1323 * that might be send for this request. The actual number is decided
1324 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1325 * "max - 1" for old client compatibility sending "0", and also so the
1326 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1327 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1328 osc_pack_capa(req, body, ocapa);
1329 LASSERT(page_count > 0);
1330 pg_prev = pga[0];
1331 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1332 struct brw_page *pg = pga[i];
1333 int poff = pg->off & ~CFS_PAGE_MASK;
1334
1335 LASSERT(pg->count > 0);
1336 /* make sure there is no gap in the middle of page array */
1337 LASSERTF(page_count == 1 ||
1338 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1339 ergo(i > 0 && i < page_count - 1,
1340 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1341 ergo(i == page_count - 1, poff == 0)),
1342 "i: %d/%d pg: %p off: %llu, count: %u\n",
1343 i, page_count, pg, pg->off, pg->count);
1344 LASSERTF(i == 0 || pg->off > pg_prev->off,
1345 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu prev_pg %p [pri %lu ind %lu] off %llu\n",
1346 i, page_count,
1347 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1348 pg_prev->pg, page_private(pg_prev->pg),
1349 pg_prev->pg->index, pg_prev->off);
1350 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1351 (pg->flag & OBD_BRW_SRVLOCK));
1352
1353 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1354 requested_nob += pg->count;
1355
1356 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1357 niobuf--;
1358 niobuf->len += pg->count;
1359 } else {
1360 niobuf->offset = pg->off;
1361 niobuf->len = pg->count;
1362 niobuf->flags = pg->flag;
1363 }
1364 pg_prev = pg;
1365 }
1366
1367 LASSERTF((void *)(niobuf - niocount) ==
1368 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1369 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1370 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1371
1372 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1373 if (resend) {
1374 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1375 body->oa.o_valid |= OBD_MD_FLFLAGS;
1376 body->oa.o_flags = 0;
1377 }
1378 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1379 }
1380
1381 if (osc_should_shrink_grant(cli))
1382 osc_shrink_grant_local(cli, &body->oa);
1383
1384 /* size[REQ_REC_OFF] still sizeof (*body) */
1385 if (opc == OST_WRITE) {
1386 if (cli->cl_checksum &&
1387 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1388 /* store cl_cksum_type in a local variable since
1389 * it can be changed via lprocfs */
1390 cksum_type_t cksum_type = cli->cl_cksum_type;
1391
1392 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1393 oa->o_flags &= OBD_FL_LOCAL_MASK;
1394 body->oa.o_flags = 0;
1395 }
1396 body->oa.o_flags |= cksum_type_pack(cksum_type);
1397 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1398 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1399 page_count, pga,
1400 OST_WRITE,
1401 cksum_type);
1402 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1403 body->oa.o_cksum);
1404 /* save this in 'oa', too, for later checking */
1405 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1406 oa->o_flags |= cksum_type_pack(cksum_type);
1407 } else {
1408 /* clear out the checksum flag, in case this is a
1409 * resend but cl_checksum is no longer set. b=11238 */
1410 oa->o_valid &= ~OBD_MD_FLCKSUM;
1411 }
1412 oa->o_cksum = body->oa.o_cksum;
1413 /* 1 RC per niobuf */
1414 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1415 sizeof(__u32) * niocount);
1416 } else {
1417 if (cli->cl_checksum &&
1418 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1419 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1420 body->oa.o_flags = 0;
1421 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1422 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1423 }
1424 }
1425 ptlrpc_request_set_replen(req);
1426
1427 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1428 aa = ptlrpc_req_async_args(req);
1429 aa->aa_oa = oa;
1430 aa->aa_requested_nob = requested_nob;
1431 aa->aa_nio_count = niocount;
1432 aa->aa_page_count = page_count;
1433 aa->aa_resends = 0;
1434 aa->aa_ppga = pga;
1435 aa->aa_cli = cli;
1436 INIT_LIST_HEAD(&aa->aa_oaps);
1437 if (ocapa && reserve)
1438 aa->aa_ocapa = capa_get(ocapa);
1439
1440 *reqp = req;
1441 return 0;
1442
1443 out:
1444 ptlrpc_req_finished(req);
1445 return rc;
1446 }
1447
check_write_checksum(struct obdo * oa,const lnet_process_id_t * peer,__u32 client_cksum,__u32 server_cksum,int nob,u32 page_count,struct brw_page ** pga,cksum_type_t client_cksum_type)1448 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1449 __u32 client_cksum, __u32 server_cksum, int nob,
1450 u32 page_count, struct brw_page **pga,
1451 cksum_type_t client_cksum_type)
1452 {
1453 __u32 new_cksum;
1454 char *msg;
1455 cksum_type_t cksum_type;
1456
1457 if (server_cksum == client_cksum) {
1458 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1459 return 0;
1460 }
1461
1462 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1463 oa->o_flags : 0);
1464 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1465 cksum_type);
1466
1467 if (cksum_type != client_cksum_type)
1468 msg = "the server did not use the checksum type specified in the original request - likely a protocol problem"
1469 ;
1470 else if (new_cksum == server_cksum)
1471 msg = "changed on the client after we checksummed it - likely false positive due to mmap IO (bug 11742)"
1472 ;
1473 else if (new_cksum == client_cksum)
1474 msg = "changed in transit before arrival at OST";
1475 else
1476 msg = "changed in transit AND doesn't match the original - likely false positive due to mmap IO (bug 11742)"
1477 ;
1478
1479 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1480 " object "DOSTID" extent [%llu-%llu]\n",
1481 msg, libcfs_nid2str(peer->nid),
1482 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1483 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1484 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1485 POSTID(&oa->o_oi), pga[0]->off,
1486 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1487 CERROR("original client csum %x (type %x), server csum %x (type %x), client csum now %x\n",
1488 client_cksum, client_cksum_type,
1489 server_cksum, cksum_type, new_cksum);
1490 return 1;
1491 }
1492
1493 /* Note rc enters this function as number of bytes transferred */
osc_brw_fini_request(struct ptlrpc_request * req,int rc)1494 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1495 {
1496 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1497 const lnet_process_id_t *peer =
1498 &req->rq_import->imp_connection->c_peer;
1499 struct client_obd *cli = aa->aa_cli;
1500 struct ost_body *body;
1501 __u32 client_cksum = 0;
1502
1503 if (rc < 0 && rc != -EDQUOT) {
1504 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1505 return rc;
1506 }
1507
1508 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1509 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1510 if (body == NULL) {
1511 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1512 return -EPROTO;
1513 }
1514
1515 /* set/clear over quota flag for a uid/gid */
1516 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1517 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1518 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1519
1520 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %#llx, flags %x\n",
1521 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1522 body->oa.o_flags);
1523 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1524 }
1525
1526 osc_update_grant(cli, body);
1527
1528 if (rc < 0)
1529 return rc;
1530
1531 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1532 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1533
1534 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1535 if (rc > 0) {
1536 CERROR("Unexpected +ve rc %d\n", rc);
1537 return -EPROTO;
1538 }
1539 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1540
1541 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1542 return -EAGAIN;
1543
1544 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1545 check_write_checksum(&body->oa, peer, client_cksum,
1546 body->oa.o_cksum, aa->aa_requested_nob,
1547 aa->aa_page_count, aa->aa_ppga,
1548 cksum_type_unpack(aa->aa_oa->o_flags)))
1549 return -EAGAIN;
1550
1551 rc = check_write_rcs(req, aa->aa_requested_nob,
1552 aa->aa_nio_count,
1553 aa->aa_page_count, aa->aa_ppga);
1554 goto out;
1555 }
1556
1557 /* The rest of this function executes only for OST_READs */
1558
1559 /* if unwrap_bulk failed, return -EAGAIN to retry */
1560 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1561 if (rc < 0) {
1562 rc = -EAGAIN;
1563 goto out;
1564 }
1565
1566 if (rc > aa->aa_requested_nob) {
1567 CERROR("Unexpected rc %d (%d requested)\n", rc,
1568 aa->aa_requested_nob);
1569 return -EPROTO;
1570 }
1571
1572 if (rc != req->rq_bulk->bd_nob_transferred) {
1573 CERROR ("Unexpected rc %d (%d transferred)\n",
1574 rc, req->rq_bulk->bd_nob_transferred);
1575 return -EPROTO;
1576 }
1577
1578 if (rc < aa->aa_requested_nob)
1579 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1580
1581 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1582 static int cksum_counter;
1583 __u32 server_cksum = body->oa.o_cksum;
1584 char *via;
1585 char *router;
1586 cksum_type_t cksum_type;
1587
1588 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1589 body->oa.o_flags : 0);
1590 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1591 aa->aa_ppga, OST_READ,
1592 cksum_type);
1593
1594 if (peer->nid == req->rq_bulk->bd_sender) {
1595 via = router = "";
1596 } else {
1597 via = " via ";
1598 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1599 }
1600
1601 if (server_cksum != client_cksum) {
1602 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from %s%s%s inode " DFID " object " DOSTID " extent [%llu-%llu]\n",
1603 req->rq_import->imp_obd->obd_name,
1604 libcfs_nid2str(peer->nid),
1605 via, router,
1606 body->oa.o_valid & OBD_MD_FLFID ?
1607 body->oa.o_parent_seq : (__u64)0,
1608 body->oa.o_valid & OBD_MD_FLFID ?
1609 body->oa.o_parent_oid : 0,
1610 body->oa.o_valid & OBD_MD_FLFID ?
1611 body->oa.o_parent_ver : 0,
1612 POSTID(&body->oa.o_oi),
1613 aa->aa_ppga[0]->off,
1614 aa->aa_ppga[aa->aa_page_count-1]->off +
1615 aa->aa_ppga[aa->aa_page_count-1]->count -
1616 1);
1617 CERROR("client %x, server %x, cksum_type %x\n",
1618 client_cksum, server_cksum, cksum_type);
1619 cksum_counter = 0;
1620 aa->aa_oa->o_cksum = client_cksum;
1621 rc = -EAGAIN;
1622 } else {
1623 cksum_counter++;
1624 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1625 rc = 0;
1626 }
1627 } else if (unlikely(client_cksum)) {
1628 static int cksum_missed;
1629
1630 cksum_missed++;
1631 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1632 CERROR("Checksum %u requested from %s but not sent\n",
1633 cksum_missed, libcfs_nid2str(peer->nid));
1634 } else {
1635 rc = 0;
1636 }
1637 out:
1638 if (rc >= 0)
1639 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1640 aa->aa_oa, &body->oa);
1641
1642 return rc;
1643 }
1644
osc_brw_redo_request(struct ptlrpc_request * request,struct osc_brw_async_args * aa,int rc)1645 static int osc_brw_redo_request(struct ptlrpc_request *request,
1646 struct osc_brw_async_args *aa, int rc)
1647 {
1648 struct ptlrpc_request *new_req;
1649 struct osc_brw_async_args *new_aa;
1650 struct osc_async_page *oap;
1651
1652 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1653 "redo for recoverable error %d", rc);
1654
1655 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1656 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1657 aa->aa_cli, aa->aa_oa,
1658 NULL /* lsm unused by osc currently */,
1659 aa->aa_page_count, aa->aa_ppga,
1660 &new_req, aa->aa_ocapa, 0, 1);
1661 if (rc)
1662 return rc;
1663
1664 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1665 if (oap->oap_request != NULL) {
1666 LASSERTF(request == oap->oap_request,
1667 "request %p != oap_request %p\n",
1668 request, oap->oap_request);
1669 if (oap->oap_interrupted) {
1670 ptlrpc_req_finished(new_req);
1671 return -EINTR;
1672 }
1673 }
1674 }
1675 /* New request takes over pga and oaps from old request.
1676 * Note that copying a list_head doesn't work, need to move it... */
1677 aa->aa_resends++;
1678 new_req->rq_interpret_reply = request->rq_interpret_reply;
1679 new_req->rq_async_args = request->rq_async_args;
1680 /* cap resend delay to the current request timeout, this is similar to
1681 * what ptlrpc does (see after_reply()) */
1682 if (aa->aa_resends > new_req->rq_timeout)
1683 new_req->rq_sent = get_seconds() + new_req->rq_timeout;
1684 else
1685 new_req->rq_sent = get_seconds() + aa->aa_resends;
1686 new_req->rq_generation_set = 1;
1687 new_req->rq_import_generation = request->rq_import_generation;
1688
1689 new_aa = ptlrpc_req_async_args(new_req);
1690
1691 INIT_LIST_HEAD(&new_aa->aa_oaps);
1692 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1693 INIT_LIST_HEAD(&new_aa->aa_exts);
1694 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1695 new_aa->aa_resends = aa->aa_resends;
1696
1697 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1698 if (oap->oap_request) {
1699 ptlrpc_req_finished(oap->oap_request);
1700 oap->oap_request = ptlrpc_request_addref(new_req);
1701 }
1702 }
1703
1704 new_aa->aa_ocapa = aa->aa_ocapa;
1705 aa->aa_ocapa = NULL;
1706
1707 /* XXX: This code will run into problem if we're going to support
1708 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1709 * and wait for all of them to be finished. We should inherit request
1710 * set from old request. */
1711 ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1712
1713 DEBUG_REQ(D_INFO, new_req, "new request");
1714 return 0;
1715 }
1716
1717 /*
1718 * ugh, we want disk allocation on the target to happen in offset order. we'll
1719 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1720 * fine for our small page arrays and doesn't require allocation. its an
1721 * insertion sort that swaps elements that are strides apart, shrinking the
1722 * stride down until its '1' and the array is sorted.
1723 */
sort_brw_pages(struct brw_page ** array,int num)1724 static void sort_brw_pages(struct brw_page **array, int num)
1725 {
1726 int stride, i, j;
1727 struct brw_page *tmp;
1728
1729 if (num == 1)
1730 return;
1731 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1732 ;
1733
1734 do {
1735 stride /= 3;
1736 for (i = stride ; i < num ; i++) {
1737 tmp = array[i];
1738 j = i;
1739 while (j >= stride && array[j - stride]->off > tmp->off) {
1740 array[j] = array[j - stride];
1741 j -= stride;
1742 }
1743 array[j] = tmp;
1744 }
1745 } while (stride > 1);
1746 }
1747
osc_release_ppga(struct brw_page ** ppga,u32 count)1748 static void osc_release_ppga(struct brw_page **ppga, u32 count)
1749 {
1750 LASSERT(ppga != NULL);
1751 OBD_FREE(ppga, sizeof(*ppga) * count);
1752 }
1753
brw_interpret(const struct lu_env * env,struct ptlrpc_request * req,void * data,int rc)1754 static int brw_interpret(const struct lu_env *env,
1755 struct ptlrpc_request *req, void *data, int rc)
1756 {
1757 struct osc_brw_async_args *aa = data;
1758 struct osc_extent *ext;
1759 struct osc_extent *tmp;
1760 struct cl_object *obj = NULL;
1761 struct client_obd *cli = aa->aa_cli;
1762
1763 rc = osc_brw_fini_request(req, rc);
1764 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1765 /* When server return -EINPROGRESS, client should always retry
1766 * regardless of the number of times the bulk was resent already. */
1767 if (osc_recoverable_error(rc)) {
1768 if (req->rq_import_generation !=
1769 req->rq_import->imp_generation) {
1770 CDEBUG(D_HA, "%s: resend cross eviction for object: " DOSTID ", rc = %d.\n",
1771 req->rq_import->imp_obd->obd_name,
1772 POSTID(&aa->aa_oa->o_oi), rc);
1773 } else if (rc == -EINPROGRESS ||
1774 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1775 rc = osc_brw_redo_request(req, aa, rc);
1776 } else {
1777 CERROR("%s: too many resent retries for object: %llu:%llu, rc = %d.\n",
1778 req->rq_import->imp_obd->obd_name,
1779 POSTID(&aa->aa_oa->o_oi), rc);
1780 }
1781
1782 if (rc == 0)
1783 return 0;
1784 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1785 rc = -EIO;
1786 }
1787
1788 if (aa->aa_ocapa) {
1789 capa_put(aa->aa_ocapa);
1790 aa->aa_ocapa = NULL;
1791 }
1792
1793 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1794 if (obj == NULL && rc == 0) {
1795 obj = osc2cl(ext->oe_obj);
1796 cl_object_get(obj);
1797 }
1798
1799 list_del_init(&ext->oe_link);
1800 osc_extent_finish(env, ext, 1, rc);
1801 }
1802 LASSERT(list_empty(&aa->aa_exts));
1803 LASSERT(list_empty(&aa->aa_oaps));
1804
1805 if (obj != NULL) {
1806 struct obdo *oa = aa->aa_oa;
1807 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1808 unsigned long valid = 0;
1809
1810 LASSERT(rc == 0);
1811 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1812 attr->cat_blocks = oa->o_blocks;
1813 valid |= CAT_BLOCKS;
1814 }
1815 if (oa->o_valid & OBD_MD_FLMTIME) {
1816 attr->cat_mtime = oa->o_mtime;
1817 valid |= CAT_MTIME;
1818 }
1819 if (oa->o_valid & OBD_MD_FLATIME) {
1820 attr->cat_atime = oa->o_atime;
1821 valid |= CAT_ATIME;
1822 }
1823 if (oa->o_valid & OBD_MD_FLCTIME) {
1824 attr->cat_ctime = oa->o_ctime;
1825 valid |= CAT_CTIME;
1826 }
1827 if (valid != 0) {
1828 cl_object_attr_lock(obj);
1829 cl_object_attr_set(env, obj, attr, valid);
1830 cl_object_attr_unlock(obj);
1831 }
1832 cl_object_put(env, obj);
1833 }
1834 OBDO_FREE(aa->aa_oa);
1835
1836 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1837 req->rq_bulk->bd_nob_transferred);
1838 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1839 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1840
1841 client_obd_list_lock(&cli->cl_loi_list_lock);
1842 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1843 * is called so we know whether to go to sync BRWs or wait for more
1844 * RPCs to complete */
1845 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1846 cli->cl_w_in_flight--;
1847 else
1848 cli->cl_r_in_flight--;
1849 osc_wake_cache_waiters(cli);
1850 client_obd_list_unlock(&cli->cl_loi_list_lock);
1851
1852 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1853 return rc;
1854 }
1855
1856 /**
1857 * Build an RPC by the list of extent @ext_list. The caller must ensure
1858 * that the total pages in this list are NOT over max pages per RPC.
1859 * Extents in the list must be in OES_RPC state.
1860 */
osc_build_rpc(const struct lu_env * env,struct client_obd * cli,struct list_head * ext_list,int cmd,pdl_policy_t pol)1861 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1862 struct list_head *ext_list, int cmd, pdl_policy_t pol)
1863 {
1864 struct ptlrpc_request *req = NULL;
1865 struct osc_extent *ext;
1866 struct brw_page **pga = NULL;
1867 struct osc_brw_async_args *aa = NULL;
1868 struct obdo *oa = NULL;
1869 struct osc_async_page *oap;
1870 struct osc_async_page *tmp;
1871 struct cl_req *clerq = NULL;
1872 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1873 CRT_READ;
1874 struct ldlm_lock *lock = NULL;
1875 struct cl_req_attr *crattr = NULL;
1876 u64 starting_offset = OBD_OBJECT_EOF;
1877 u64 ending_offset = 0;
1878 int mpflag = 0;
1879 int mem_tight = 0;
1880 int page_count = 0;
1881 int i;
1882 int rc;
1883 struct ost_body *body;
1884 LIST_HEAD(rpc_list);
1885
1886 LASSERT(!list_empty(ext_list));
1887
1888 /* add pages into rpc_list to build BRW rpc */
1889 list_for_each_entry(ext, ext_list, oe_link) {
1890 LASSERT(ext->oe_state == OES_RPC);
1891 mem_tight |= ext->oe_memalloc;
1892 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1893 ++page_count;
1894 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1895 if (starting_offset > oap->oap_obj_off)
1896 starting_offset = oap->oap_obj_off;
1897 else
1898 LASSERT(oap->oap_page_off == 0);
1899 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1900 ending_offset = oap->oap_obj_off +
1901 oap->oap_count;
1902 else
1903 LASSERT(oap->oap_page_off + oap->oap_count ==
1904 PAGE_CACHE_SIZE);
1905 }
1906 }
1907
1908 if (mem_tight)
1909 mpflag = cfs_memory_pressure_get_and_set();
1910
1911 OBD_ALLOC(crattr, sizeof(*crattr));
1912 if (crattr == NULL) {
1913 rc = -ENOMEM;
1914 goto out;
1915 }
1916
1917 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1918 if (pga == NULL) {
1919 rc = -ENOMEM;
1920 goto out;
1921 }
1922
1923 OBDO_ALLOC(oa);
1924 if (oa == NULL) {
1925 rc = -ENOMEM;
1926 goto out;
1927 }
1928
1929 i = 0;
1930 list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1931 struct cl_page *page = oap2cl_page(oap);
1932 if (clerq == NULL) {
1933 clerq = cl_req_alloc(env, page, crt,
1934 1 /* only 1-object rpcs for now */);
1935 if (IS_ERR(clerq)) {
1936 rc = PTR_ERR(clerq);
1937 goto out;
1938 }
1939 lock = oap->oap_ldlm_lock;
1940 }
1941 if (mem_tight)
1942 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1943 pga[i] = &oap->oap_brw_page;
1944 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1945 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1946 pga[i]->pg, page_index(oap->oap_page), oap,
1947 pga[i]->flag);
1948 i++;
1949 cl_req_page_add(env, clerq, page);
1950 }
1951
1952 /* always get the data for the obdo for the rpc */
1953 LASSERT(clerq != NULL);
1954 crattr->cra_oa = oa;
1955 cl_req_attr_set(env, clerq, crattr, ~0ULL);
1956 if (lock) {
1957 oa->o_handle = lock->l_remote_handle;
1958 oa->o_valid |= OBD_MD_FLHANDLE;
1959 }
1960
1961 rc = cl_req_prep(env, clerq);
1962 if (rc != 0) {
1963 CERROR("cl_req_prep failed: %d\n", rc);
1964 goto out;
1965 }
1966
1967 sort_brw_pages(pga, page_count);
1968 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1969 pga, &req, crattr->cra_capa, 1, 0);
1970 if (rc != 0) {
1971 CERROR("prep_req failed: %d\n", rc);
1972 goto out;
1973 }
1974
1975 req->rq_interpret_reply = brw_interpret;
1976
1977 if (mem_tight != 0)
1978 req->rq_memalloc = 1;
1979
1980 /* Need to update the timestamps after the request is built in case
1981 * we race with setattr (locally or in queue at OST). If OST gets
1982 * later setattr before earlier BRW (as determined by the request xid),
1983 * the OST will not use BRW timestamps. Sadly, there is no obvious
1984 * way to do this in a single call. bug 10150 */
1985 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1986 crattr->cra_oa = &body->oa;
1987 cl_req_attr_set(env, clerq, crattr,
1988 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1989
1990 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1991
1992 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1993 aa = ptlrpc_req_async_args(req);
1994 INIT_LIST_HEAD(&aa->aa_oaps);
1995 list_splice_init(&rpc_list, &aa->aa_oaps);
1996 INIT_LIST_HEAD(&aa->aa_exts);
1997 list_splice_init(ext_list, &aa->aa_exts);
1998 aa->aa_clerq = clerq;
1999
2000 /* queued sync pages can be torn down while the pages
2001 * were between the pending list and the rpc */
2002 tmp = NULL;
2003 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2004 /* only one oap gets a request reference */
2005 if (tmp == NULL)
2006 tmp = oap;
2007 if (oap->oap_interrupted && !req->rq_intr) {
2008 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2009 oap, req);
2010 ptlrpc_mark_interrupted(req);
2011 }
2012 }
2013 if (tmp != NULL)
2014 tmp->oap_request = ptlrpc_request_addref(req);
2015
2016 client_obd_list_lock(&cli->cl_loi_list_lock);
2017 starting_offset >>= PAGE_CACHE_SHIFT;
2018 if (cmd == OBD_BRW_READ) {
2019 cli->cl_r_in_flight++;
2020 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2021 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2022 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2023 starting_offset + 1);
2024 } else {
2025 cli->cl_w_in_flight++;
2026 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2027 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2028 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2029 starting_offset + 1);
2030 }
2031 client_obd_list_unlock(&cli->cl_loi_list_lock);
2032
2033 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2034 page_count, aa, cli->cl_r_in_flight,
2035 cli->cl_w_in_flight);
2036
2037 /* XXX: Maybe the caller can check the RPC bulk descriptor to
2038 * see which CPU/NUMA node the majority of pages were allocated
2039 * on, and try to assign the async RPC to the CPU core
2040 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2041 *
2042 * But on the other hand, we expect that multiple ptlrpcd
2043 * threads and the initial write sponsor can run in parallel,
2044 * especially when data checksum is enabled, which is CPU-bound
2045 * operation and single ptlrpcd thread cannot process in time.
2046 * So more ptlrpcd threads sharing BRW load
2047 * (with PDL_POLICY_ROUND) seems better.
2048 */
2049 ptlrpcd_add_req(req, pol, -1);
2050 rc = 0;
2051
2052 out:
2053 if (mem_tight != 0)
2054 cfs_memory_pressure_restore(mpflag);
2055
2056 if (crattr != NULL) {
2057 capa_put(crattr->cra_capa);
2058 OBD_FREE(crattr, sizeof(*crattr));
2059 }
2060
2061 if (rc != 0) {
2062 LASSERT(req == NULL);
2063
2064 if (oa)
2065 OBDO_FREE(oa);
2066 if (pga)
2067 OBD_FREE(pga, sizeof(*pga) * page_count);
2068 /* this should happen rarely and is pretty bad, it makes the
2069 * pending list not follow the dirty order */
2070 while (!list_empty(ext_list)) {
2071 ext = list_entry(ext_list->next, struct osc_extent,
2072 oe_link);
2073 list_del_init(&ext->oe_link);
2074 osc_extent_finish(env, ext, 0, rc);
2075 }
2076 if (clerq && !IS_ERR(clerq))
2077 cl_req_completion(env, clerq, rc);
2078 }
2079 return rc;
2080 }
2081
osc_set_lock_data_with_check(struct ldlm_lock * lock,struct ldlm_enqueue_info * einfo)2082 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2083 struct ldlm_enqueue_info *einfo)
2084 {
2085 void *data = einfo->ei_cbdata;
2086 int set = 0;
2087
2088 LASSERT(lock != NULL);
2089 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2090 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2091 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2092 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2093
2094 lock_res_and_lock(lock);
2095 spin_lock(&osc_ast_guard);
2096
2097 if (lock->l_ast_data == NULL)
2098 lock->l_ast_data = data;
2099 if (lock->l_ast_data == data)
2100 set = 1;
2101
2102 spin_unlock(&osc_ast_guard);
2103 unlock_res_and_lock(lock);
2104
2105 return set;
2106 }
2107
osc_set_data_with_check(struct lustre_handle * lockh,struct ldlm_enqueue_info * einfo)2108 static int osc_set_data_with_check(struct lustre_handle *lockh,
2109 struct ldlm_enqueue_info *einfo)
2110 {
2111 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2112 int set = 0;
2113
2114 if (lock != NULL) {
2115 set = osc_set_lock_data_with_check(lock, einfo);
2116 LDLM_LOCK_PUT(lock);
2117 } else
2118 CERROR("lockh %p, data %p - client evicted?\n",
2119 lockh, einfo->ei_cbdata);
2120 return set;
2121 }
2122
2123 /* find any ldlm lock of the inode in osc
2124 * return 0 not find
2125 * 1 find one
2126 * < 0 error */
osc_find_cbdata(struct obd_export * exp,struct lov_stripe_md * lsm,ldlm_iterator_t replace,void * data)2127 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2128 ldlm_iterator_t replace, void *data)
2129 {
2130 struct ldlm_res_id res_id;
2131 struct obd_device *obd = class_exp2obd(exp);
2132 int rc = 0;
2133
2134 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2135 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2136 if (rc == LDLM_ITER_STOP)
2137 return 1;
2138 if (rc == LDLM_ITER_CONTINUE)
2139 return 0;
2140 return rc;
2141 }
2142
osc_enqueue_fini(struct ptlrpc_request * req,struct ost_lvb * lvb,obd_enqueue_update_f upcall,void * cookie,__u64 * flags,int agl,int rc)2143 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2144 obd_enqueue_update_f upcall, void *cookie,
2145 __u64 *flags, int agl, int rc)
2146 {
2147 int intent = *flags & LDLM_FL_HAS_INTENT;
2148
2149 if (intent) {
2150 /* The request was created before ldlm_cli_enqueue call. */
2151 if (rc == ELDLM_LOCK_ABORTED) {
2152 struct ldlm_reply *rep;
2153 rep = req_capsule_server_get(&req->rq_pill,
2154 &RMF_DLM_REP);
2155
2156 LASSERT(rep != NULL);
2157 rep->lock_policy_res1 =
2158 ptlrpc_status_ntoh(rep->lock_policy_res1);
2159 if (rep->lock_policy_res1)
2160 rc = rep->lock_policy_res1;
2161 }
2162 }
2163
2164 if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2165 (rc == 0)) {
2166 *flags |= LDLM_FL_LVB_READY;
2167 CDEBUG(D_INODE, "got kms %llu blocks %llu mtime %llu\n",
2168 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2169 }
2170
2171 /* Call the update callback. */
2172 rc = (*upcall)(cookie, rc);
2173 return rc;
2174 }
2175
osc_enqueue_interpret(const struct lu_env * env,struct ptlrpc_request * req,struct osc_enqueue_args * aa,int rc)2176 static int osc_enqueue_interpret(const struct lu_env *env,
2177 struct ptlrpc_request *req,
2178 struct osc_enqueue_args *aa, int rc)
2179 {
2180 struct ldlm_lock *lock;
2181 struct lustre_handle handle;
2182 __u32 mode;
2183 struct ost_lvb *lvb;
2184 __u32 lvb_len;
2185 __u64 *flags = aa->oa_flags;
2186
2187 /* Make a local copy of a lock handle and a mode, because aa->oa_*
2188 * might be freed anytime after lock upcall has been called. */
2189 lustre_handle_copy(&handle, aa->oa_lockh);
2190 mode = aa->oa_ei->ei_mode;
2191
2192 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2193 * be valid. */
2194 lock = ldlm_handle2lock(&handle);
2195
2196 /* Take an additional reference so that a blocking AST that
2197 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2198 * to arrive after an upcall has been executed by
2199 * osc_enqueue_fini(). */
2200 ldlm_lock_addref(&handle, mode);
2201
2202 /* Let CP AST to grant the lock first. */
2203 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2204
2205 if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2206 lvb = NULL;
2207 lvb_len = 0;
2208 } else {
2209 lvb = aa->oa_lvb;
2210 lvb_len = sizeof(*aa->oa_lvb);
2211 }
2212
2213 /* Complete obtaining the lock procedure. */
2214 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2215 mode, flags, lvb, lvb_len, &handle, rc);
2216 /* Complete osc stuff. */
2217 rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2218 flags, aa->oa_agl, rc);
2219
2220 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2221
2222 /* Release the lock for async request. */
2223 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2224 /*
2225 * Releases a reference taken by ldlm_cli_enqueue(), if it is
2226 * not already released by
2227 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2228 */
2229 ldlm_lock_decref(&handle, mode);
2230
2231 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2232 aa->oa_lockh, req, aa);
2233 ldlm_lock_decref(&handle, mode);
2234 LDLM_LOCK_PUT(lock);
2235 return rc;
2236 }
2237
2238 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2239
2240 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2241 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2242 * other synchronous requests, however keeping some locks and trying to obtain
2243 * others may take a considerable amount of time in a case of ost failure; and
2244 * when other sync requests do not get released lock from a client, the client
2245 * is excluded from the cluster -- such scenarious make the life difficult, so
2246 * release locks just after they are obtained. */
osc_enqueue_base(struct obd_export * exp,struct ldlm_res_id * res_id,__u64 * flags,ldlm_policy_data_t * policy,struct ost_lvb * lvb,int kms_valid,obd_enqueue_update_f upcall,void * cookie,struct ldlm_enqueue_info * einfo,struct lustre_handle * lockh,struct ptlrpc_request_set * rqset,int async,int agl)2247 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2248 __u64 *flags, ldlm_policy_data_t *policy,
2249 struct ost_lvb *lvb, int kms_valid,
2250 obd_enqueue_update_f upcall, void *cookie,
2251 struct ldlm_enqueue_info *einfo,
2252 struct lustre_handle *lockh,
2253 struct ptlrpc_request_set *rqset, int async, int agl)
2254 {
2255 struct obd_device *obd = exp->exp_obd;
2256 struct ptlrpc_request *req = NULL;
2257 int intent = *flags & LDLM_FL_HAS_INTENT;
2258 __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2259 ldlm_mode_t mode;
2260 int rc;
2261
2262 /* Filesystem lock extents are extended to page boundaries so that
2263 * dealing with the page cache is a little smoother. */
2264 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2265 policy->l_extent.end |= ~CFS_PAGE_MASK;
2266
2267 /*
2268 * kms is not valid when either object is completely fresh (so that no
2269 * locks are cached), or object was evicted. In the latter case cached
2270 * lock cannot be used, because it would prime inode state with
2271 * potentially stale LVB.
2272 */
2273 if (!kms_valid)
2274 goto no_match;
2275
2276 /* Next, search for already existing extent locks that will cover us */
2277 /* If we're trying to read, we also search for an existing PW lock. The
2278 * VFS and page cache already protect us locally, so lots of readers/
2279 * writers can share a single PW lock.
2280 *
2281 * There are problems with conversion deadlocks, so instead of
2282 * converting a read lock to a write lock, we'll just enqueue a new
2283 * one.
2284 *
2285 * At some point we should cancel the read lock instead of making them
2286 * send us a blocking callback, but there are problems with canceling
2287 * locks out from other users right now, too. */
2288 mode = einfo->ei_mode;
2289 if (einfo->ei_mode == LCK_PR)
2290 mode |= LCK_PW;
2291 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2292 einfo->ei_type, policy, mode, lockh, 0);
2293 if (mode) {
2294 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2295
2296 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2297 /* For AGL, if enqueue RPC is sent but the lock is not
2298 * granted, then skip to process this strpe.
2299 * Return -ECANCELED to tell the caller. */
2300 ldlm_lock_decref(lockh, mode);
2301 LDLM_LOCK_PUT(matched);
2302 return -ECANCELED;
2303 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2304 *flags |= LDLM_FL_LVB_READY;
2305 /* addref the lock only if not async requests and PW
2306 * lock is matched whereas we asked for PR. */
2307 if (!rqset && einfo->ei_mode != mode)
2308 ldlm_lock_addref(lockh, LCK_PR);
2309 if (intent) {
2310 /* I would like to be able to ASSERT here that
2311 * rss <= kms, but I can't, for reasons which
2312 * are explained in lov_enqueue() */
2313 }
2314
2315 /* We already have a lock, and it's referenced.
2316 *
2317 * At this point, the cl_lock::cll_state is CLS_QUEUING,
2318 * AGL upcall may change it to CLS_HELD directly. */
2319 (*upcall)(cookie, ELDLM_OK);
2320
2321 if (einfo->ei_mode != mode)
2322 ldlm_lock_decref(lockh, LCK_PW);
2323 else if (rqset)
2324 /* For async requests, decref the lock. */
2325 ldlm_lock_decref(lockh, einfo->ei_mode);
2326 LDLM_LOCK_PUT(matched);
2327 return ELDLM_OK;
2328 } else {
2329 ldlm_lock_decref(lockh, mode);
2330 LDLM_LOCK_PUT(matched);
2331 }
2332 }
2333
2334 no_match:
2335 if (intent) {
2336 LIST_HEAD(cancels);
2337 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2338 &RQF_LDLM_ENQUEUE_LVB);
2339 if (req == NULL)
2340 return -ENOMEM;
2341
2342 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2343 if (rc) {
2344 ptlrpc_request_free(req);
2345 return rc;
2346 }
2347
2348 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2349 sizeof(*lvb));
2350 ptlrpc_request_set_replen(req);
2351 }
2352
2353 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2354 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2355
2356 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2357 sizeof(*lvb), LVB_T_OST, lockh, async);
2358 if (rqset) {
2359 if (!rc) {
2360 struct osc_enqueue_args *aa;
2361 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2362 aa = ptlrpc_req_async_args(req);
2363 aa->oa_ei = einfo;
2364 aa->oa_exp = exp;
2365 aa->oa_flags = flags;
2366 aa->oa_upcall = upcall;
2367 aa->oa_cookie = cookie;
2368 aa->oa_lvb = lvb;
2369 aa->oa_lockh = lockh;
2370 aa->oa_agl = !!agl;
2371
2372 req->rq_interpret_reply =
2373 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2374 if (rqset == PTLRPCD_SET)
2375 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2376 else
2377 ptlrpc_set_add_req(rqset, req);
2378 } else if (intent) {
2379 ptlrpc_req_finished(req);
2380 }
2381 return rc;
2382 }
2383
2384 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2385 if (intent)
2386 ptlrpc_req_finished(req);
2387
2388 return rc;
2389 }
2390
osc_match_base(struct obd_export * exp,struct ldlm_res_id * res_id,__u32 type,ldlm_policy_data_t * policy,__u32 mode,__u64 * flags,void * data,struct lustre_handle * lockh,int unref)2391 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2392 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2393 __u64 *flags, void *data, struct lustre_handle *lockh,
2394 int unref)
2395 {
2396 struct obd_device *obd = exp->exp_obd;
2397 __u64 lflags = *flags;
2398 ldlm_mode_t rc;
2399
2400 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2401 return -EIO;
2402
2403 /* Filesystem lock extents are extended to page boundaries so that
2404 * dealing with the page cache is a little smoother */
2405 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2406 policy->l_extent.end |= ~CFS_PAGE_MASK;
2407
2408 /* Next, search for already existing extent locks that will cover us */
2409 /* If we're trying to read, we also search for an existing PW lock. The
2410 * VFS and page cache already protect us locally, so lots of readers/
2411 * writers can share a single PW lock. */
2412 rc = mode;
2413 if (mode == LCK_PR)
2414 rc |= LCK_PW;
2415 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2416 res_id, type, policy, rc, lockh, unref);
2417 if (rc) {
2418 if (data != NULL) {
2419 if (!osc_set_data_with_check(lockh, data)) {
2420 if (!(lflags & LDLM_FL_TEST_LOCK))
2421 ldlm_lock_decref(lockh, rc);
2422 return 0;
2423 }
2424 }
2425 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2426 ldlm_lock_addref(lockh, LCK_PR);
2427 ldlm_lock_decref(lockh, LCK_PW);
2428 }
2429 return rc;
2430 }
2431 return rc;
2432 }
2433
osc_cancel_base(struct lustre_handle * lockh,__u32 mode)2434 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2435 {
2436 if (unlikely(mode == LCK_GROUP))
2437 ldlm_lock_decref_and_cancel(lockh, mode);
2438 else
2439 ldlm_lock_decref(lockh, mode);
2440
2441 return 0;
2442 }
2443
osc_statfs_interpret(const struct lu_env * env,struct ptlrpc_request * req,struct osc_async_args * aa,int rc)2444 static int osc_statfs_interpret(const struct lu_env *env,
2445 struct ptlrpc_request *req,
2446 struct osc_async_args *aa, int rc)
2447 {
2448 struct obd_statfs *msfs;
2449
2450 if (rc == -EBADR)
2451 /* The request has in fact never been sent
2452 * due to issues at a higher level (LOV).
2453 * Exit immediately since the caller is
2454 * aware of the problem and takes care
2455 * of the clean up */
2456 return rc;
2457
2458 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2459 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY)) {
2460 rc = 0;
2461 goto out;
2462 }
2463
2464 if (rc != 0)
2465 goto out;
2466
2467 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2468 if (msfs == NULL) {
2469 rc = -EPROTO;
2470 goto out;
2471 }
2472
2473 *aa->aa_oi->oi_osfs = *msfs;
2474 out:
2475 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2476 return rc;
2477 }
2478
osc_statfs_async(struct obd_export * exp,struct obd_info * oinfo,__u64 max_age,struct ptlrpc_request_set * rqset)2479 static int osc_statfs_async(struct obd_export *exp,
2480 struct obd_info *oinfo, __u64 max_age,
2481 struct ptlrpc_request_set *rqset)
2482 {
2483 struct obd_device *obd = class_exp2obd(exp);
2484 struct ptlrpc_request *req;
2485 struct osc_async_args *aa;
2486 int rc;
2487
2488 /* We could possibly pass max_age in the request (as an absolute
2489 * timestamp or a "seconds.usec ago") so the target can avoid doing
2490 * extra calls into the filesystem if that isn't necessary (e.g.
2491 * during mount that would help a bit). Having relative timestamps
2492 * is not so great if request processing is slow, while absolute
2493 * timestamps are not ideal because they need time synchronization. */
2494 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2495 if (req == NULL)
2496 return -ENOMEM;
2497
2498 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2499 if (rc) {
2500 ptlrpc_request_free(req);
2501 return rc;
2502 }
2503 ptlrpc_request_set_replen(req);
2504 req->rq_request_portal = OST_CREATE_PORTAL;
2505 ptlrpc_at_set_req_timeout(req);
2506
2507 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2508 /* procfs requests not want stat in wait for avoid deadlock */
2509 req->rq_no_resend = 1;
2510 req->rq_no_delay = 1;
2511 }
2512
2513 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2514 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2515 aa = ptlrpc_req_async_args(req);
2516 aa->aa_oi = oinfo;
2517
2518 ptlrpc_set_add_req(rqset, req);
2519 return 0;
2520 }
2521
osc_statfs(const struct lu_env * env,struct obd_export * exp,struct obd_statfs * osfs,__u64 max_age,__u32 flags)2522 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2523 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2524 {
2525 struct obd_device *obd = class_exp2obd(exp);
2526 struct obd_statfs *msfs;
2527 struct ptlrpc_request *req;
2528 struct obd_import *imp = NULL;
2529 int rc;
2530
2531 /*Since the request might also come from lprocfs, so we need
2532 *sync this with client_disconnect_export Bug15684*/
2533 down_read(&obd->u.cli.cl_sem);
2534 if (obd->u.cli.cl_import)
2535 imp = class_import_get(obd->u.cli.cl_import);
2536 up_read(&obd->u.cli.cl_sem);
2537 if (!imp)
2538 return -ENODEV;
2539
2540 /* We could possibly pass max_age in the request (as an absolute
2541 * timestamp or a "seconds.usec ago") so the target can avoid doing
2542 * extra calls into the filesystem if that isn't necessary (e.g.
2543 * during mount that would help a bit). Having relative timestamps
2544 * is not so great if request processing is slow, while absolute
2545 * timestamps are not ideal because they need time synchronization. */
2546 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2547
2548 class_import_put(imp);
2549
2550 if (req == NULL)
2551 return -ENOMEM;
2552
2553 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2554 if (rc) {
2555 ptlrpc_request_free(req);
2556 return rc;
2557 }
2558 ptlrpc_request_set_replen(req);
2559 req->rq_request_portal = OST_CREATE_PORTAL;
2560 ptlrpc_at_set_req_timeout(req);
2561
2562 if (flags & OBD_STATFS_NODELAY) {
2563 /* procfs requests not want stat in wait for avoid deadlock */
2564 req->rq_no_resend = 1;
2565 req->rq_no_delay = 1;
2566 }
2567
2568 rc = ptlrpc_queue_wait(req);
2569 if (rc)
2570 goto out;
2571
2572 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2573 if (msfs == NULL) {
2574 rc = -EPROTO;
2575 goto out;
2576 }
2577
2578 *osfs = *msfs;
2579
2580 out:
2581 ptlrpc_req_finished(req);
2582 return rc;
2583 }
2584
2585 /* Retrieve object striping information.
2586 *
2587 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2588 * the maximum number of OST indices which will fit in the user buffer.
2589 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2590 */
osc_getstripe(struct lov_stripe_md * lsm,struct lov_user_md * lump)2591 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2592 {
2593 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2594 struct lov_user_md_v3 lum, *lumk;
2595 struct lov_user_ost_data_v1 *lmm_objects;
2596 int rc = 0, lum_size;
2597
2598 if (!lsm)
2599 return -ENODATA;
2600
2601 /* we only need the header part from user space to get lmm_magic and
2602 * lmm_stripe_count, (the header part is common to v1 and v3) */
2603 lum_size = sizeof(struct lov_user_md_v1);
2604 if (copy_from_user(&lum, lump, lum_size))
2605 return -EFAULT;
2606
2607 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2608 (lum.lmm_magic != LOV_USER_MAGIC_V3))
2609 return -EINVAL;
2610
2611 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2612 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2613 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2614 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2615
2616 /* we can use lov_mds_md_size() to compute lum_size
2617 * because lov_user_md_vX and lov_mds_md_vX have the same size */
2618 if (lum.lmm_stripe_count > 0) {
2619 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2620 OBD_ALLOC(lumk, lum_size);
2621 if (!lumk)
2622 return -ENOMEM;
2623
2624 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2625 lmm_objects =
2626 &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2627 else
2628 lmm_objects = &(lumk->lmm_objects[0]);
2629 lmm_objects->l_ost_oi = lsm->lsm_oi;
2630 } else {
2631 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2632 lumk = &lum;
2633 }
2634
2635 lumk->lmm_oi = lsm->lsm_oi;
2636 lumk->lmm_stripe_count = 1;
2637
2638 if (copy_to_user(lump, lumk, lum_size))
2639 rc = -EFAULT;
2640
2641 if (lumk != &lum)
2642 OBD_FREE(lumk, lum_size);
2643
2644 return rc;
2645 }
2646
2647
osc_iocontrol(unsigned int cmd,struct obd_export * exp,int len,void * karg,void * uarg)2648 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2649 void *karg, void *uarg)
2650 {
2651 struct obd_device *obd = exp->exp_obd;
2652 struct obd_ioctl_data *data = karg;
2653 int err = 0;
2654
2655 if (!try_module_get(THIS_MODULE)) {
2656 CERROR("Can't get module. Is it alive?");
2657 return -EINVAL;
2658 }
2659 switch (cmd) {
2660 case OBD_IOC_LOV_GET_CONFIG: {
2661 char *buf;
2662 struct lov_desc *desc;
2663 struct obd_uuid uuid;
2664
2665 buf = NULL;
2666 len = 0;
2667 if (obd_ioctl_getdata(&buf, &len, (void *)uarg)) {
2668 err = -EINVAL;
2669 goto out;
2670 }
2671
2672 data = (struct obd_ioctl_data *)buf;
2673
2674 if (sizeof(*desc) > data->ioc_inllen1) {
2675 obd_ioctl_freedata(buf, len);
2676 err = -EINVAL;
2677 goto out;
2678 }
2679
2680 if (data->ioc_inllen2 < sizeof(uuid)) {
2681 obd_ioctl_freedata(buf, len);
2682 err = -EINVAL;
2683 goto out;
2684 }
2685
2686 desc = (struct lov_desc *)data->ioc_inlbuf1;
2687 desc->ld_tgt_count = 1;
2688 desc->ld_active_tgt_count = 1;
2689 desc->ld_default_stripe_count = 1;
2690 desc->ld_default_stripe_size = 0;
2691 desc->ld_default_stripe_offset = 0;
2692 desc->ld_pattern = 0;
2693 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2694
2695 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2696
2697 err = copy_to_user((void *)uarg, buf, len);
2698 if (err)
2699 err = -EFAULT;
2700 obd_ioctl_freedata(buf, len);
2701 goto out;
2702 }
2703 case LL_IOC_LOV_SETSTRIPE:
2704 err = obd_alloc_memmd(exp, karg);
2705 if (err > 0)
2706 err = 0;
2707 goto out;
2708 case LL_IOC_LOV_GETSTRIPE:
2709 err = osc_getstripe(karg, uarg);
2710 goto out;
2711 case OBD_IOC_CLIENT_RECOVER:
2712 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2713 data->ioc_inlbuf1, 0);
2714 if (err > 0)
2715 err = 0;
2716 goto out;
2717 case IOC_OSC_SET_ACTIVE:
2718 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2719 data->ioc_offset);
2720 goto out;
2721 case OBD_IOC_POLL_QUOTACHECK:
2722 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2723 goto out;
2724 case OBD_IOC_PING_TARGET:
2725 err = ptlrpc_obd_ping(obd);
2726 goto out;
2727 default:
2728 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2729 cmd, current_comm());
2730 err = -ENOTTY;
2731 goto out;
2732 }
2733 out:
2734 module_put(THIS_MODULE);
2735 return err;
2736 }
2737
osc_get_info(const struct lu_env * env,struct obd_export * exp,u32 keylen,void * key,__u32 * vallen,void * val,struct lov_stripe_md * lsm)2738 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2739 u32 keylen, void *key, __u32 *vallen, void *val,
2740 struct lov_stripe_md *lsm)
2741 {
2742 if (!vallen || !val)
2743 return -EFAULT;
2744
2745 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2746 __u32 *stripe = val;
2747 *vallen = sizeof(*stripe);
2748 *stripe = 0;
2749 return 0;
2750 } else if (KEY_IS(KEY_LAST_ID)) {
2751 struct ptlrpc_request *req;
2752 u64 *reply;
2753 char *tmp;
2754 int rc;
2755
2756 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2757 &RQF_OST_GET_INFO_LAST_ID);
2758 if (req == NULL)
2759 return -ENOMEM;
2760
2761 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2762 RCL_CLIENT, keylen);
2763 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2764 if (rc) {
2765 ptlrpc_request_free(req);
2766 return rc;
2767 }
2768
2769 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2770 memcpy(tmp, key, keylen);
2771
2772 req->rq_no_delay = req->rq_no_resend = 1;
2773 ptlrpc_request_set_replen(req);
2774 rc = ptlrpc_queue_wait(req);
2775 if (rc)
2776 goto out;
2777
2778 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
2779 if (reply == NULL) {
2780 rc = -EPROTO;
2781 goto out;
2782 }
2783
2784 *((u64 *)val) = *reply;
2785 out:
2786 ptlrpc_req_finished(req);
2787 return rc;
2788 } else if (KEY_IS(KEY_FIEMAP)) {
2789 struct ll_fiemap_info_key *fm_key =
2790 (struct ll_fiemap_info_key *)key;
2791 struct ldlm_res_id res_id;
2792 ldlm_policy_data_t policy;
2793 struct lustre_handle lockh;
2794 ldlm_mode_t mode = 0;
2795 struct ptlrpc_request *req;
2796 struct ll_user_fiemap *reply;
2797 char *tmp;
2798 int rc;
2799
2800 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2801 goto skip_locking;
2802
2803 policy.l_extent.start = fm_key->fiemap.fm_start &
2804 CFS_PAGE_MASK;
2805
2806 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2807 fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2808 policy.l_extent.end = OBD_OBJECT_EOF;
2809 else
2810 policy.l_extent.end = (fm_key->fiemap.fm_start +
2811 fm_key->fiemap.fm_length +
2812 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2813
2814 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2815 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2816 LDLM_FL_BLOCK_GRANTED |
2817 LDLM_FL_LVB_READY,
2818 &res_id, LDLM_EXTENT, &policy,
2819 LCK_PR | LCK_PW, &lockh, 0);
2820 if (mode) { /* lock is cached on client */
2821 if (mode != LCK_PR) {
2822 ldlm_lock_addref(&lockh, LCK_PR);
2823 ldlm_lock_decref(&lockh, LCK_PW);
2824 }
2825 } else { /* no cached lock, needs acquire lock on server side */
2826 fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2827 fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2828 }
2829
2830 skip_locking:
2831 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2832 &RQF_OST_GET_INFO_FIEMAP);
2833 if (req == NULL) {
2834 rc = -ENOMEM;
2835 goto drop_lock;
2836 }
2837
2838 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2839 RCL_CLIENT, keylen);
2840 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2841 RCL_CLIENT, *vallen);
2842 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2843 RCL_SERVER, *vallen);
2844
2845 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2846 if (rc) {
2847 ptlrpc_request_free(req);
2848 goto drop_lock;
2849 }
2850
2851 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2852 memcpy(tmp, key, keylen);
2853 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2854 memcpy(tmp, val, *vallen);
2855
2856 ptlrpc_request_set_replen(req);
2857 rc = ptlrpc_queue_wait(req);
2858 if (rc)
2859 goto fini_req;
2860
2861 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2862 if (reply == NULL) {
2863 rc = -EPROTO;
2864 goto fini_req;
2865 }
2866
2867 memcpy(val, reply, *vallen);
2868 fini_req:
2869 ptlrpc_req_finished(req);
2870 drop_lock:
2871 if (mode)
2872 ldlm_lock_decref(&lockh, LCK_PR);
2873 return rc;
2874 }
2875
2876 return -EINVAL;
2877 }
2878
osc_set_info_async(const struct lu_env * env,struct obd_export * exp,u32 keylen,void * key,u32 vallen,void * val,struct ptlrpc_request_set * set)2879 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2880 u32 keylen, void *key, u32 vallen,
2881 void *val, struct ptlrpc_request_set *set)
2882 {
2883 struct ptlrpc_request *req;
2884 struct obd_device *obd = exp->exp_obd;
2885 struct obd_import *imp = class_exp2cliimp(exp);
2886 char *tmp;
2887 int rc;
2888
2889 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2890
2891 if (KEY_IS(KEY_CHECKSUM)) {
2892 if (vallen != sizeof(int))
2893 return -EINVAL;
2894 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2895 return 0;
2896 }
2897
2898 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2899 sptlrpc_conf_client_adapt(obd);
2900 return 0;
2901 }
2902
2903 if (KEY_IS(KEY_FLUSH_CTX)) {
2904 sptlrpc_import_flush_my_ctx(imp);
2905 return 0;
2906 }
2907
2908 if (KEY_IS(KEY_CACHE_SET)) {
2909 struct client_obd *cli = &obd->u.cli;
2910
2911 LASSERT(cli->cl_cache == NULL); /* only once */
2912 cli->cl_cache = (struct cl_client_cache *)val;
2913 atomic_inc(&cli->cl_cache->ccc_users);
2914 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2915
2916 /* add this osc into entity list */
2917 LASSERT(list_empty(&cli->cl_lru_osc));
2918 spin_lock(&cli->cl_cache->ccc_lru_lock);
2919 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2920 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2921
2922 return 0;
2923 }
2924
2925 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2926 struct client_obd *cli = &obd->u.cli;
2927 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
2928 int target = *(int *)val;
2929
2930 nr = osc_lru_shrink(cli, min(nr, target));
2931 *(int *)val -= nr;
2932 return 0;
2933 }
2934
2935 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2936 return -EINVAL;
2937
2938 /* We pass all other commands directly to OST. Since nobody calls osc
2939 methods directly and everybody is supposed to go through LOV, we
2940 assume lov checked invalid values for us.
2941 The only recognised values so far are evict_by_nid and mds_conn.
2942 Even if something bad goes through, we'd get a -EINVAL from OST
2943 anyway. */
2944
2945 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2946 &RQF_OST_SET_GRANT_INFO :
2947 &RQF_OBD_SET_INFO);
2948 if (req == NULL)
2949 return -ENOMEM;
2950
2951 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2952 RCL_CLIENT, keylen);
2953 if (!KEY_IS(KEY_GRANT_SHRINK))
2954 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2955 RCL_CLIENT, vallen);
2956 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2957 if (rc) {
2958 ptlrpc_request_free(req);
2959 return rc;
2960 }
2961
2962 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2963 memcpy(tmp, key, keylen);
2964 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2965 &RMF_OST_BODY :
2966 &RMF_SETINFO_VAL);
2967 memcpy(tmp, val, vallen);
2968
2969 if (KEY_IS(KEY_GRANT_SHRINK)) {
2970 struct osc_brw_async_args *aa;
2971 struct obdo *oa;
2972
2973 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2974 aa = ptlrpc_req_async_args(req);
2975 OBDO_ALLOC(oa);
2976 if (!oa) {
2977 ptlrpc_req_finished(req);
2978 return -ENOMEM;
2979 }
2980 *oa = ((struct ost_body *)val)->oa;
2981 aa->aa_oa = oa;
2982 req->rq_interpret_reply = osc_shrink_grant_interpret;
2983 }
2984
2985 ptlrpc_request_set_replen(req);
2986 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2987 LASSERT(set != NULL);
2988 ptlrpc_set_add_req(set, req);
2989 ptlrpc_check_set(NULL, set);
2990 } else
2991 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2992
2993 return 0;
2994 }
2995
osc_reconnect(const struct lu_env * env,struct obd_export * exp,struct obd_device * obd,struct obd_uuid * cluuid,struct obd_connect_data * data,void * localdata)2996 static int osc_reconnect(const struct lu_env *env,
2997 struct obd_export *exp, struct obd_device *obd,
2998 struct obd_uuid *cluuid,
2999 struct obd_connect_data *data,
3000 void *localdata)
3001 {
3002 struct client_obd *cli = &obd->u.cli;
3003
3004 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3005 long lost_grant;
3006
3007 client_obd_list_lock(&cli->cl_loi_list_lock);
3008 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3009 2 * cli_brw_size(obd);
3010 lost_grant = cli->cl_lost_grant;
3011 cli->cl_lost_grant = 0;
3012 client_obd_list_unlock(&cli->cl_loi_list_lock);
3013
3014 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d ocd_grant: %d, lost: %ld.\n",
3015 data->ocd_connect_flags,
3016 data->ocd_version, data->ocd_grant, lost_grant);
3017 }
3018
3019 return 0;
3020 }
3021
osc_disconnect(struct obd_export * exp)3022 static int osc_disconnect(struct obd_export *exp)
3023 {
3024 struct obd_device *obd = class_exp2obd(exp);
3025 int rc;
3026
3027 rc = client_disconnect_export(exp);
3028 /**
3029 * Initially we put del_shrink_grant before disconnect_export, but it
3030 * causes the following problem if setup (connect) and cleanup
3031 * (disconnect) are tangled together.
3032 * connect p1 disconnect p2
3033 * ptlrpc_connect_import
3034 * ............... class_manual_cleanup
3035 * osc_disconnect
3036 * del_shrink_grant
3037 * ptlrpc_connect_interrupt
3038 * init_grant_shrink
3039 * add this client to shrink list
3040 * cleanup_osc
3041 * Bang! pinger trigger the shrink.
3042 * So the osc should be disconnected from the shrink list, after we
3043 * are sure the import has been destroyed. BUG18662
3044 */
3045 if (obd->u.cli.cl_import == NULL)
3046 osc_del_shrink_grant(&obd->u.cli);
3047 return rc;
3048 }
3049
osc_import_event(struct obd_device * obd,struct obd_import * imp,enum obd_import_event event)3050 static int osc_import_event(struct obd_device *obd,
3051 struct obd_import *imp,
3052 enum obd_import_event event)
3053 {
3054 struct client_obd *cli;
3055 int rc = 0;
3056
3057 LASSERT(imp->imp_obd == obd);
3058
3059 switch (event) {
3060 case IMP_EVENT_DISCON: {
3061 cli = &obd->u.cli;
3062 client_obd_list_lock(&cli->cl_loi_list_lock);
3063 cli->cl_avail_grant = 0;
3064 cli->cl_lost_grant = 0;
3065 client_obd_list_unlock(&cli->cl_loi_list_lock);
3066 break;
3067 }
3068 case IMP_EVENT_INACTIVE: {
3069 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3070 break;
3071 }
3072 case IMP_EVENT_INVALIDATE: {
3073 struct ldlm_namespace *ns = obd->obd_namespace;
3074 struct lu_env *env;
3075 int refcheck;
3076
3077 env = cl_env_get(&refcheck);
3078 if (!IS_ERR(env)) {
3079 /* Reset grants */
3080 cli = &obd->u.cli;
3081 /* all pages go to failing rpcs due to the invalid
3082 * import */
3083 osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3084
3085 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3086 cl_env_put(env, &refcheck);
3087 } else
3088 rc = PTR_ERR(env);
3089 break;
3090 }
3091 case IMP_EVENT_ACTIVE: {
3092 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3093 break;
3094 }
3095 case IMP_EVENT_OCD: {
3096 struct obd_connect_data *ocd = &imp->imp_connect_data;
3097
3098 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3099 osc_init_grant(&obd->u.cli, ocd);
3100
3101 /* See bug 7198 */
3102 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3103 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3104
3105 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3106 break;
3107 }
3108 case IMP_EVENT_DEACTIVATE: {
3109 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3110 break;
3111 }
3112 case IMP_EVENT_ACTIVATE: {
3113 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3114 break;
3115 }
3116 default:
3117 CERROR("Unknown import event %d\n", event);
3118 LBUG();
3119 }
3120 return rc;
3121 }
3122
3123 /**
3124 * Determine whether the lock can be canceled before replaying the lock
3125 * during recovery, see bug16774 for detailed information.
3126 *
3127 * \retval zero the lock can't be canceled
3128 * \retval other ok to cancel
3129 */
osc_cancel_for_recovery(struct ldlm_lock * lock)3130 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3131 {
3132 check_res_locked(lock->l_resource);
3133
3134 /*
3135 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3136 *
3137 * XXX as a future improvement, we can also cancel unused write lock
3138 * if it doesn't have dirty data and active mmaps.
3139 */
3140 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3141 (lock->l_granted_mode == LCK_PR ||
3142 lock->l_granted_mode == LCK_CR) &&
3143 (osc_dlm_lock_pageref(lock) == 0))
3144 return 1;
3145
3146 return 0;
3147 }
3148
brw_queue_work(const struct lu_env * env,void * data)3149 static int brw_queue_work(const struct lu_env *env, void *data)
3150 {
3151 struct client_obd *cli = data;
3152
3153 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3154
3155 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3156 return 0;
3157 }
3158
osc_setup(struct obd_device * obd,struct lustre_cfg * lcfg)3159 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3160 {
3161 struct lprocfs_static_vars lvars = { NULL };
3162 struct client_obd *cli = &obd->u.cli;
3163 void *handler;
3164 int rc;
3165
3166 rc = ptlrpcd_addref();
3167 if (rc)
3168 return rc;
3169
3170 rc = client_obd_setup(obd, lcfg);
3171 if (rc)
3172 goto out_ptlrpcd;
3173
3174 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3175 if (IS_ERR(handler)) {
3176 rc = PTR_ERR(handler);
3177 goto out_client_setup;
3178 }
3179 cli->cl_writeback_work = handler;
3180
3181 rc = osc_quota_setup(obd);
3182 if (rc)
3183 goto out_ptlrpcd_work;
3184
3185 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3186 lprocfs_osc_init_vars(&lvars);
3187 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3188 lproc_osc_attach_seqstat(obd);
3189 sptlrpc_lprocfs_cliobd_attach(obd);
3190 ptlrpc_lprocfs_register_obd(obd);
3191 }
3192
3193 /* We need to allocate a few requests more, because
3194 * brw_interpret tries to create new requests before freeing
3195 * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3196 * reserved, but I'm afraid that might be too much wasted RAM
3197 * in fact, so 2 is just my guess and still should work. */
3198 cli->cl_import->imp_rq_pool =
3199 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3200 OST_MAXREQSIZE,
3201 ptlrpc_add_rqs_to_pool);
3202
3203 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3204 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3205 return rc;
3206
3207 out_ptlrpcd_work:
3208 ptlrpcd_destroy_work(handler);
3209 out_client_setup:
3210 client_obd_cleanup(obd);
3211 out_ptlrpcd:
3212 ptlrpcd_decref();
3213 return rc;
3214 }
3215
osc_precleanup(struct obd_device * obd,enum obd_cleanup_stage stage)3216 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3217 {
3218 switch (stage) {
3219 case OBD_CLEANUP_EARLY: {
3220 struct obd_import *imp;
3221 imp = obd->u.cli.cl_import;
3222 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3223 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3224 ptlrpc_deactivate_import(imp);
3225 spin_lock(&imp->imp_lock);
3226 imp->imp_pingable = 0;
3227 spin_unlock(&imp->imp_lock);
3228 break;
3229 }
3230 case OBD_CLEANUP_EXPORTS: {
3231 struct client_obd *cli = &obd->u.cli;
3232 /* LU-464
3233 * for echo client, export may be on zombie list, wait for
3234 * zombie thread to cull it, because cli.cl_import will be
3235 * cleared in client_disconnect_export():
3236 * class_export_destroy() -> obd_cleanup() ->
3237 * echo_device_free() -> echo_client_cleanup() ->
3238 * obd_disconnect() -> osc_disconnect() ->
3239 * client_disconnect_export()
3240 */
3241 obd_zombie_barrier();
3242 if (cli->cl_writeback_work) {
3243 ptlrpcd_destroy_work(cli->cl_writeback_work);
3244 cli->cl_writeback_work = NULL;
3245 }
3246 obd_cleanup_client_import(obd);
3247 ptlrpc_lprocfs_unregister_obd(obd);
3248 lprocfs_obd_cleanup(obd);
3249 break;
3250 }
3251 }
3252 return 0;
3253 }
3254
osc_cleanup(struct obd_device * obd)3255 int osc_cleanup(struct obd_device *obd)
3256 {
3257 struct client_obd *cli = &obd->u.cli;
3258 int rc;
3259
3260 /* lru cleanup */
3261 if (cli->cl_cache != NULL) {
3262 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3263 spin_lock(&cli->cl_cache->ccc_lru_lock);
3264 list_del_init(&cli->cl_lru_osc);
3265 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3266 cli->cl_lru_left = NULL;
3267 atomic_dec(&cli->cl_cache->ccc_users);
3268 cli->cl_cache = NULL;
3269 }
3270
3271 /* free memory of osc quota cache */
3272 osc_quota_cleanup(obd);
3273
3274 rc = client_obd_cleanup(obd);
3275
3276 ptlrpcd_decref();
3277 return rc;
3278 }
3279
osc_process_config_base(struct obd_device * obd,struct lustre_cfg * lcfg)3280 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3281 {
3282 struct lprocfs_static_vars lvars = { NULL };
3283 int rc = 0;
3284
3285 lprocfs_osc_init_vars(&lvars);
3286
3287 switch (lcfg->lcfg_command) {
3288 default:
3289 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3290 lcfg, obd);
3291 if (rc > 0)
3292 rc = 0;
3293 break;
3294 }
3295
3296 return rc;
3297 }
3298
osc_process_config(struct obd_device * obd,u32 len,void * buf)3299 static int osc_process_config(struct obd_device *obd, u32 len, void *buf)
3300 {
3301 return osc_process_config_base(obd, buf);
3302 }
3303
3304 struct obd_ops osc_obd_ops = {
3305 .o_owner = THIS_MODULE,
3306 .o_setup = osc_setup,
3307 .o_precleanup = osc_precleanup,
3308 .o_cleanup = osc_cleanup,
3309 .o_add_conn = client_import_add_conn,
3310 .o_del_conn = client_import_del_conn,
3311 .o_connect = client_connect_import,
3312 .o_reconnect = osc_reconnect,
3313 .o_disconnect = osc_disconnect,
3314 .o_statfs = osc_statfs,
3315 .o_statfs_async = osc_statfs_async,
3316 .o_packmd = osc_packmd,
3317 .o_unpackmd = osc_unpackmd,
3318 .o_create = osc_create,
3319 .o_destroy = osc_destroy,
3320 .o_getattr = osc_getattr,
3321 .o_getattr_async = osc_getattr_async,
3322 .o_setattr = osc_setattr,
3323 .o_setattr_async = osc_setattr_async,
3324 .o_find_cbdata = osc_find_cbdata,
3325 .o_iocontrol = osc_iocontrol,
3326 .o_get_info = osc_get_info,
3327 .o_set_info_async = osc_set_info_async,
3328 .o_import_event = osc_import_event,
3329 .o_process_config = osc_process_config,
3330 .o_quotactl = osc_quotactl,
3331 .o_quotacheck = osc_quotacheck,
3332 };
3333
3334 extern struct lu_kmem_descr osc_caches[];
3335 extern spinlock_t osc_ast_guard;
3336 extern struct lock_class_key osc_ast_guard_class;
3337
osc_init(void)3338 static int __init osc_init(void)
3339 {
3340 struct lprocfs_static_vars lvars = { NULL };
3341 int rc;
3342
3343 /* print an address of _any_ initialized kernel symbol from this
3344 * module, to allow debugging with gdb that doesn't support data
3345 * symbols from modules.*/
3346 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3347
3348 rc = lu_kmem_init(osc_caches);
3349 if (rc)
3350 return rc;
3351
3352 lprocfs_osc_init_vars(&lvars);
3353
3354 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3355 LUSTRE_OSC_NAME, &osc_device_type);
3356 if (rc) {
3357 lu_kmem_fini(osc_caches);
3358 return rc;
3359 }
3360
3361 spin_lock_init(&osc_ast_guard);
3362 lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3363
3364 return rc;
3365 }
3366
osc_exit(void)3367 static void /*__exit*/ osc_exit(void)
3368 {
3369 class_unregister_type(LUSTRE_OSC_NAME);
3370 lu_kmem_fini(osc_caches);
3371 }
3372
3373 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3374 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3375 MODULE_LICENSE("GPL");
3376 MODULE_VERSION(LUSTRE_VERSION_STRING);
3377
3378 module_init(osc_init);
3379 module_exit(osc_exit);
3380