1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36 
37 #define DEBUG_SUBSYSTEM S_OSC
38 
39 #include "../../include/linux/libcfs/libcfs.h"
40 
41 
42 #include "../include/lustre_dlm.h"
43 #include "../include/lustre_net.h"
44 #include "../include/lustre/lustre_user.h"
45 #include "../include/obd_cksum.h"
46 
47 #include "../include/lustre_ha.h"
48 #include "../include/lprocfs_status.h"
49 #include "../include/lustre_debug.h"
50 #include "../include/lustre_param.h"
51 #include "../include/lustre_fid.h"
52 #include "../include/obd_class.h"
53 #include "osc_internal.h"
54 #include "osc_cl_internal.h"
55 
56 struct osc_brw_async_args {
57 	struct obdo       *aa_oa;
58 	int		aa_requested_nob;
59 	int		aa_nio_count;
60 	u32		aa_page_count;
61 	int		aa_resends;
62 	struct brw_page  **aa_ppga;
63 	struct client_obd *aa_cli;
64 	struct list_head	 aa_oaps;
65 	struct list_head	 aa_exts;
66 	struct obd_capa   *aa_ocapa;
67 	struct cl_req     *aa_clerq;
68 };
69 
70 struct osc_async_args {
71 	struct obd_info   *aa_oi;
72 };
73 
74 struct osc_setattr_args {
75 	struct obdo	 *sa_oa;
76 	obd_enqueue_update_f sa_upcall;
77 	void		*sa_cookie;
78 };
79 
80 struct osc_fsync_args {
81 	struct obd_info     *fa_oi;
82 	obd_enqueue_update_f fa_upcall;
83 	void		*fa_cookie;
84 };
85 
86 struct osc_enqueue_args {
87 	struct obd_export	*oa_exp;
88 	__u64		    *oa_flags;
89 	obd_enqueue_update_f      oa_upcall;
90 	void		     *oa_cookie;
91 	struct ost_lvb	   *oa_lvb;
92 	struct lustre_handle     *oa_lockh;
93 	struct ldlm_enqueue_info *oa_ei;
94 	unsigned int	      oa_agl:1;
95 };
96 
97 static void osc_release_ppga(struct brw_page **ppga, u32 count);
98 static int brw_interpret(const struct lu_env *env,
99 			 struct ptlrpc_request *req, void *data, int rc);
100 int osc_cleanup(struct obd_device *obd);
101 
102 /* Pack OSC object metadata for disk storage (LE byte order). */
osc_packmd(struct obd_export * exp,struct lov_mds_md ** lmmp,struct lov_stripe_md * lsm)103 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
104 		      struct lov_stripe_md *lsm)
105 {
106 	int lmm_size;
107 
108 	lmm_size = sizeof(**lmmp);
109 	if (lmmp == NULL)
110 		return lmm_size;
111 
112 	if (*lmmp != NULL && lsm == NULL) {
113 		OBD_FREE(*lmmp, lmm_size);
114 		*lmmp = NULL;
115 		return 0;
116 	} else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
117 		return -EBADF;
118 	}
119 
120 	if (*lmmp == NULL) {
121 		OBD_ALLOC(*lmmp, lmm_size);
122 		if (*lmmp == NULL)
123 			return -ENOMEM;
124 	}
125 
126 	if (lsm)
127 		ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
128 
129 	return lmm_size;
130 }
131 
132 /* Unpack OSC object metadata from disk storage (LE byte order). */
osc_unpackmd(struct obd_export * exp,struct lov_stripe_md ** lsmp,struct lov_mds_md * lmm,int lmm_bytes)133 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
134 			struct lov_mds_md *lmm, int lmm_bytes)
135 {
136 	int lsm_size;
137 	struct obd_import *imp = class_exp2cliimp(exp);
138 
139 	if (lmm != NULL) {
140 		if (lmm_bytes < sizeof(*lmm)) {
141 			CERROR("%s: lov_mds_md too small: %d, need %d\n",
142 			       exp->exp_obd->obd_name, lmm_bytes,
143 			       (int)sizeof(*lmm));
144 			return -EINVAL;
145 		}
146 		/* XXX LOV_MAGIC etc check? */
147 
148 		if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
149 			CERROR("%s: zero lmm_object_id: rc = %d\n",
150 			       exp->exp_obd->obd_name, -EINVAL);
151 			return -EINVAL;
152 		}
153 	}
154 
155 	lsm_size = lov_stripe_md_size(1);
156 	if (lsmp == NULL)
157 		return lsm_size;
158 
159 	if (*lsmp != NULL && lmm == NULL) {
160 		OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
161 		OBD_FREE(*lsmp, lsm_size);
162 		*lsmp = NULL;
163 		return 0;
164 	}
165 
166 	if (*lsmp == NULL) {
167 		OBD_ALLOC(*lsmp, lsm_size);
168 		if (unlikely(*lsmp == NULL))
169 			return -ENOMEM;
170 		OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
171 		if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
172 			OBD_FREE(*lsmp, lsm_size);
173 			return -ENOMEM;
174 		}
175 		loi_init((*lsmp)->lsm_oinfo[0]);
176 	} else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
177 		return -EBADF;
178 	}
179 
180 	if (lmm != NULL)
181 		/* XXX zero *lsmp? */
182 		ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
183 
184 	if (imp != NULL &&
185 	    (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
186 		(*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
187 	else
188 		(*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
189 
190 	return lsm_size;
191 }
192 
osc_pack_capa(struct ptlrpc_request * req,struct ost_body * body,void * capa)193 static inline void osc_pack_capa(struct ptlrpc_request *req,
194 				 struct ost_body *body, void *capa)
195 {
196 	struct obd_capa *oc = (struct obd_capa *)capa;
197 	struct lustre_capa *c;
198 
199 	if (!capa)
200 		return;
201 
202 	c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
203 	LASSERT(c);
204 	capa_cpy(c, oc);
205 	body->oa.o_valid |= OBD_MD_FLOSSCAPA;
206 	DEBUG_CAPA(D_SEC, c, "pack");
207 }
208 
osc_pack_req_body(struct ptlrpc_request * req,struct obd_info * oinfo)209 static inline void osc_pack_req_body(struct ptlrpc_request *req,
210 				     struct obd_info *oinfo)
211 {
212 	struct ost_body *body;
213 
214 	body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
215 	LASSERT(body);
216 
217 	lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
218 			     oinfo->oi_oa);
219 	osc_pack_capa(req, body, oinfo->oi_capa);
220 }
221 
osc_set_capa_size(struct ptlrpc_request * req,const struct req_msg_field * field,struct obd_capa * oc)222 static inline void osc_set_capa_size(struct ptlrpc_request *req,
223 				     const struct req_msg_field *field,
224 				     struct obd_capa *oc)
225 {
226 	if (oc == NULL)
227 		req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
228 	else
229 		/* it is already calculated as sizeof struct obd_capa */
230 		;
231 }
232 
osc_getattr_interpret(const struct lu_env * env,struct ptlrpc_request * req,struct osc_async_args * aa,int rc)233 static int osc_getattr_interpret(const struct lu_env *env,
234 				 struct ptlrpc_request *req,
235 				 struct osc_async_args *aa, int rc)
236 {
237 	struct ost_body *body;
238 
239 	if (rc != 0)
240 		goto out;
241 
242 	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
243 	if (body) {
244 		CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
245 		lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
246 				     aa->aa_oi->oi_oa, &body->oa);
247 
248 		/* This should really be sent by the OST */
249 		aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
250 		aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
251 	} else {
252 		CDEBUG(D_INFO, "can't unpack ost_body\n");
253 		rc = -EPROTO;
254 		aa->aa_oi->oi_oa->o_valid = 0;
255 	}
256 out:
257 	rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
258 	return rc;
259 }
260 
osc_getattr_async(struct obd_export * exp,struct obd_info * oinfo,struct ptlrpc_request_set * set)261 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
262 			     struct ptlrpc_request_set *set)
263 {
264 	struct ptlrpc_request *req;
265 	struct osc_async_args *aa;
266 	int		    rc;
267 
268 	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
269 	if (req == NULL)
270 		return -ENOMEM;
271 
272 	osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
273 	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
274 	if (rc) {
275 		ptlrpc_request_free(req);
276 		return rc;
277 	}
278 
279 	osc_pack_req_body(req, oinfo);
280 
281 	ptlrpc_request_set_replen(req);
282 	req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
283 
284 	CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
285 	aa = ptlrpc_req_async_args(req);
286 	aa->aa_oi = oinfo;
287 
288 	ptlrpc_set_add_req(set, req);
289 	return 0;
290 }
291 
osc_getattr(const struct lu_env * env,struct obd_export * exp,struct obd_info * oinfo)292 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
293 		       struct obd_info *oinfo)
294 {
295 	struct ptlrpc_request *req;
296 	struct ost_body       *body;
297 	int		    rc;
298 
299 	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
300 	if (req == NULL)
301 		return -ENOMEM;
302 
303 	osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
304 	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
305 	if (rc) {
306 		ptlrpc_request_free(req);
307 		return rc;
308 	}
309 
310 	osc_pack_req_body(req, oinfo);
311 
312 	ptlrpc_request_set_replen(req);
313 
314 	rc = ptlrpc_queue_wait(req);
315 	if (rc)
316 		goto out;
317 
318 	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
319 	if (body == NULL) {
320 		rc = -EPROTO;
321 		goto out;
322 	}
323 
324 	CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
325 	lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
326 			     &body->oa);
327 
328 	oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
329 	oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
330 
331  out:
332 	ptlrpc_req_finished(req);
333 	return rc;
334 }
335 
osc_setattr(const struct lu_env * env,struct obd_export * exp,struct obd_info * oinfo,struct obd_trans_info * oti)336 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
337 		       struct obd_info *oinfo, struct obd_trans_info *oti)
338 {
339 	struct ptlrpc_request *req;
340 	struct ost_body       *body;
341 	int		    rc;
342 
343 	LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
344 
345 	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
346 	if (req == NULL)
347 		return -ENOMEM;
348 
349 	osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
350 	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
351 	if (rc) {
352 		ptlrpc_request_free(req);
353 		return rc;
354 	}
355 
356 	osc_pack_req_body(req, oinfo);
357 
358 	ptlrpc_request_set_replen(req);
359 
360 	rc = ptlrpc_queue_wait(req);
361 	if (rc)
362 		goto out;
363 
364 	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
365 	if (body == NULL) {
366 		rc = -EPROTO;
367 		goto out;
368 	}
369 
370 	lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
371 			     &body->oa);
372 
373 out:
374 	ptlrpc_req_finished(req);
375 	return rc;
376 }
377 
osc_setattr_interpret(const struct lu_env * env,struct ptlrpc_request * req,struct osc_setattr_args * sa,int rc)378 static int osc_setattr_interpret(const struct lu_env *env,
379 				 struct ptlrpc_request *req,
380 				 struct osc_setattr_args *sa, int rc)
381 {
382 	struct ost_body *body;
383 
384 	if (rc != 0)
385 		goto out;
386 
387 	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
388 	if (body == NULL) {
389 		rc = -EPROTO;
390 		goto out;
391 	}
392 
393 	lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
394 			     &body->oa);
395 out:
396 	rc = sa->sa_upcall(sa->sa_cookie, rc);
397 	return rc;
398 }
399 
osc_setattr_async_base(struct obd_export * exp,struct obd_info * oinfo,struct obd_trans_info * oti,obd_enqueue_update_f upcall,void * cookie,struct ptlrpc_request_set * rqset)400 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
401 			   struct obd_trans_info *oti,
402 			   obd_enqueue_update_f upcall, void *cookie,
403 			   struct ptlrpc_request_set *rqset)
404 {
405 	struct ptlrpc_request   *req;
406 	struct osc_setattr_args *sa;
407 	int		      rc;
408 
409 	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
410 	if (req == NULL)
411 		return -ENOMEM;
412 
413 	osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
414 	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
415 	if (rc) {
416 		ptlrpc_request_free(req);
417 		return rc;
418 	}
419 
420 	if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
421 		oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
422 
423 	osc_pack_req_body(req, oinfo);
424 
425 	ptlrpc_request_set_replen(req);
426 
427 	/* do mds to ost setattr asynchronously */
428 	if (!rqset) {
429 		/* Do not wait for response. */
430 		ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
431 	} else {
432 		req->rq_interpret_reply =
433 			(ptlrpc_interpterer_t)osc_setattr_interpret;
434 
435 		CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
436 		sa = ptlrpc_req_async_args(req);
437 		sa->sa_oa = oinfo->oi_oa;
438 		sa->sa_upcall = upcall;
439 		sa->sa_cookie = cookie;
440 
441 		if (rqset == PTLRPCD_SET)
442 			ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
443 		else
444 			ptlrpc_set_add_req(rqset, req);
445 	}
446 
447 	return 0;
448 }
449 
osc_setattr_async(struct obd_export * exp,struct obd_info * oinfo,struct obd_trans_info * oti,struct ptlrpc_request_set * rqset)450 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
451 			     struct obd_trans_info *oti,
452 			     struct ptlrpc_request_set *rqset)
453 {
454 	return osc_setattr_async_base(exp, oinfo, oti,
455 				      oinfo->oi_cb_up, oinfo, rqset);
456 }
457 
osc_real_create(struct obd_export * exp,struct obdo * oa,struct lov_stripe_md ** ea,struct obd_trans_info * oti)458 int osc_real_create(struct obd_export *exp, struct obdo *oa,
459 		    struct lov_stripe_md **ea, struct obd_trans_info *oti)
460 {
461 	struct ptlrpc_request *req;
462 	struct ost_body       *body;
463 	struct lov_stripe_md  *lsm;
464 	int		    rc;
465 
466 	LASSERT(oa);
467 	LASSERT(ea);
468 
469 	lsm = *ea;
470 	if (!lsm) {
471 		rc = obd_alloc_memmd(exp, &lsm);
472 		if (rc < 0)
473 			return rc;
474 	}
475 
476 	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
477 	if (req == NULL) {
478 		rc = -ENOMEM;
479 		goto out;
480 	}
481 
482 	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
483 	if (rc) {
484 		ptlrpc_request_free(req);
485 		goto out;
486 	}
487 
488 	body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
489 	LASSERT(body);
490 
491 	lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
492 
493 	ptlrpc_request_set_replen(req);
494 
495 	if ((oa->o_valid & OBD_MD_FLFLAGS) &&
496 	    oa->o_flags == OBD_FL_DELORPHAN) {
497 		DEBUG_REQ(D_HA, req,
498 			  "delorphan from OST integration");
499 		/* Don't resend the delorphan req */
500 		req->rq_no_resend = req->rq_no_delay = 1;
501 	}
502 
503 	rc = ptlrpc_queue_wait(req);
504 	if (rc)
505 		goto out_req;
506 
507 	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
508 	if (body == NULL) {
509 		rc = -EPROTO;
510 		goto out_req;
511 	}
512 
513 	CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
514 	lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
515 
516 	oa->o_blksize = cli_brw_size(exp->exp_obd);
517 	oa->o_valid |= OBD_MD_FLBLKSZ;
518 
519 	/* XXX LOV STACKING: the lsm that is passed to us from LOV does not
520 	 * have valid lsm_oinfo data structs, so don't go touching that.
521 	 * This needs to be fixed in a big way.
522 	 */
523 	lsm->lsm_oi = oa->o_oi;
524 	*ea = lsm;
525 
526 	if (oti != NULL) {
527 		oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
528 
529 		if (oa->o_valid & OBD_MD_FLCOOKIE) {
530 			if (!oti->oti_logcookies)
531 				oti_alloc_cookies(oti, 1);
532 			*oti->oti_logcookies = oa->o_lcookie;
533 		}
534 	}
535 
536 	CDEBUG(D_HA, "transno: %lld\n",
537 	       lustre_msg_get_transno(req->rq_repmsg));
538 out_req:
539 	ptlrpc_req_finished(req);
540 out:
541 	if (rc && !*ea)
542 		obd_free_memmd(exp, &lsm);
543 	return rc;
544 }
545 
osc_punch_base(struct obd_export * exp,struct obd_info * oinfo,obd_enqueue_update_f upcall,void * cookie,struct ptlrpc_request_set * rqset)546 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
547 		   obd_enqueue_update_f upcall, void *cookie,
548 		   struct ptlrpc_request_set *rqset)
549 {
550 	struct ptlrpc_request   *req;
551 	struct osc_setattr_args *sa;
552 	struct ost_body	 *body;
553 	int		      rc;
554 
555 	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
556 	if (req == NULL)
557 		return -ENOMEM;
558 
559 	osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
560 	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
561 	if (rc) {
562 		ptlrpc_request_free(req);
563 		return rc;
564 	}
565 	req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
566 	ptlrpc_at_set_req_timeout(req);
567 
568 	body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
569 	LASSERT(body);
570 	lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
571 			     oinfo->oi_oa);
572 	osc_pack_capa(req, body, oinfo->oi_capa);
573 
574 	ptlrpc_request_set_replen(req);
575 
576 	req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
577 	CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
578 	sa = ptlrpc_req_async_args(req);
579 	sa->sa_oa     = oinfo->oi_oa;
580 	sa->sa_upcall = upcall;
581 	sa->sa_cookie = cookie;
582 	if (rqset == PTLRPCD_SET)
583 		ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
584 	else
585 		ptlrpc_set_add_req(rqset, req);
586 
587 	return 0;
588 }
589 
osc_sync_interpret(const struct lu_env * env,struct ptlrpc_request * req,void * arg,int rc)590 static int osc_sync_interpret(const struct lu_env *env,
591 			      struct ptlrpc_request *req,
592 			      void *arg, int rc)
593 {
594 	struct osc_fsync_args *fa = arg;
595 	struct ost_body *body;
596 
597 	if (rc)
598 		goto out;
599 
600 	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
601 	if (body == NULL) {
602 		CERROR ("can't unpack ost_body\n");
603 		rc = -EPROTO;
604 		goto out;
605 	}
606 
607 	*fa->fa_oi->oi_oa = body->oa;
608 out:
609 	rc = fa->fa_upcall(fa->fa_cookie, rc);
610 	return rc;
611 }
612 
osc_sync_base(struct obd_export * exp,struct obd_info * oinfo,obd_enqueue_update_f upcall,void * cookie,struct ptlrpc_request_set * rqset)613 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
614 		  obd_enqueue_update_f upcall, void *cookie,
615 		  struct ptlrpc_request_set *rqset)
616 {
617 	struct ptlrpc_request *req;
618 	struct ost_body       *body;
619 	struct osc_fsync_args *fa;
620 	int		    rc;
621 
622 	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
623 	if (req == NULL)
624 		return -ENOMEM;
625 
626 	osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
627 	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
628 	if (rc) {
629 		ptlrpc_request_free(req);
630 		return rc;
631 	}
632 
633 	/* overload the size and blocks fields in the oa with start/end */
634 	body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
635 	LASSERT(body);
636 	lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
637 			     oinfo->oi_oa);
638 	osc_pack_capa(req, body, oinfo->oi_capa);
639 
640 	ptlrpc_request_set_replen(req);
641 	req->rq_interpret_reply = osc_sync_interpret;
642 
643 	CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
644 	fa = ptlrpc_req_async_args(req);
645 	fa->fa_oi = oinfo;
646 	fa->fa_upcall = upcall;
647 	fa->fa_cookie = cookie;
648 
649 	if (rqset == PTLRPCD_SET)
650 		ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
651 	else
652 		ptlrpc_set_add_req(rqset, req);
653 
654 	return 0;
655 }
656 
657 /* Find and cancel locally locks matched by @mode in the resource found by
658  * @objid. Found locks are added into @cancel list. Returns the amount of
659  * locks added to @cancels list. */
osc_resource_get_unused(struct obd_export * exp,struct obdo * oa,struct list_head * cancels,ldlm_mode_t mode,__u64 lock_flags)660 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
661 				   struct list_head *cancels,
662 				   ldlm_mode_t mode, __u64 lock_flags)
663 {
664 	struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
665 	struct ldlm_res_id res_id;
666 	struct ldlm_resource *res;
667 	int count;
668 
669 	/* Return, i.e. cancel nothing, only if ELC is supported (flag in
670 	 * export) but disabled through procfs (flag in NS).
671 	 *
672 	 * This distinguishes from a case when ELC is not supported originally,
673 	 * when we still want to cancel locks in advance and just cancel them
674 	 * locally, without sending any RPC. */
675 	if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
676 		return 0;
677 
678 	ostid_build_res_name(&oa->o_oi, &res_id);
679 	res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
680 	if (res == NULL)
681 		return 0;
682 
683 	LDLM_RESOURCE_ADDREF(res);
684 	count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
685 					   lock_flags, 0, NULL);
686 	LDLM_RESOURCE_DELREF(res);
687 	ldlm_resource_putref(res);
688 	return count;
689 }
690 
osc_destroy_interpret(const struct lu_env * env,struct ptlrpc_request * req,void * data,int rc)691 static int osc_destroy_interpret(const struct lu_env *env,
692 				 struct ptlrpc_request *req, void *data,
693 				 int rc)
694 {
695 	struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
696 
697 	atomic_dec(&cli->cl_destroy_in_flight);
698 	wake_up(&cli->cl_destroy_waitq);
699 	return 0;
700 }
701 
osc_can_send_destroy(struct client_obd * cli)702 static int osc_can_send_destroy(struct client_obd *cli)
703 {
704 	if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
705 	    cli->cl_max_rpcs_in_flight) {
706 		/* The destroy request can be sent */
707 		return 1;
708 	}
709 	if (atomic_dec_return(&cli->cl_destroy_in_flight) <
710 	    cli->cl_max_rpcs_in_flight) {
711 		/*
712 		 * The counter has been modified between the two atomic
713 		 * operations.
714 		 */
715 		wake_up(&cli->cl_destroy_waitq);
716 	}
717 	return 0;
718 }
719 
osc_create(const struct lu_env * env,struct obd_export * exp,struct obdo * oa,struct lov_stripe_md ** ea,struct obd_trans_info * oti)720 int osc_create(const struct lu_env *env, struct obd_export *exp,
721 	       struct obdo *oa, struct lov_stripe_md **ea,
722 	       struct obd_trans_info *oti)
723 {
724 	int rc = 0;
725 
726 	LASSERT(oa);
727 	LASSERT(ea);
728 	LASSERT(oa->o_valid & OBD_MD_FLGROUP);
729 
730 	if ((oa->o_valid & OBD_MD_FLFLAGS) &&
731 	    oa->o_flags == OBD_FL_RECREATE_OBJS) {
732 		return osc_real_create(exp, oa, ea, oti);
733 	}
734 
735 	if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
736 		return osc_real_create(exp, oa, ea, oti);
737 
738 	/* we should not get here anymore */
739 	LBUG();
740 
741 	return rc;
742 }
743 
744 /* Destroy requests can be async always on the client, and we don't even really
745  * care about the return code since the client cannot do anything at all about
746  * a destroy failure.
747  * When the MDS is unlinking a filename, it saves the file objects into a
748  * recovery llog, and these object records are cancelled when the OST reports
749  * they were destroyed and sync'd to disk (i.e. transaction committed).
750  * If the client dies, or the OST is down when the object should be destroyed,
751  * the records are not cancelled, and when the OST reconnects to the MDS next,
752  * it will retrieve the llog unlink logs and then sends the log cancellation
753  * cookies to the MDS after committing destroy transactions. */
osc_destroy(const struct lu_env * env,struct obd_export * exp,struct obdo * oa,struct lov_stripe_md * ea,struct obd_trans_info * oti,struct obd_export * md_export,void * capa)754 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
755 		       struct obdo *oa, struct lov_stripe_md *ea,
756 		       struct obd_trans_info *oti, struct obd_export *md_export,
757 		       void *capa)
758 {
759 	struct client_obd     *cli = &exp->exp_obd->u.cli;
760 	struct ptlrpc_request *req;
761 	struct ost_body       *body;
762 	LIST_HEAD(cancels);
763 	int rc, count;
764 
765 	if (!oa) {
766 		CDEBUG(D_INFO, "oa NULL\n");
767 		return -EINVAL;
768 	}
769 
770 	count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
771 					LDLM_FL_DISCARD_DATA);
772 
773 	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
774 	if (req == NULL) {
775 		ldlm_lock_list_put(&cancels, l_bl_ast, count);
776 		return -ENOMEM;
777 	}
778 
779 	osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
780 	rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
781 			       0, &cancels, count);
782 	if (rc) {
783 		ptlrpc_request_free(req);
784 		return rc;
785 	}
786 
787 	req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
788 	ptlrpc_at_set_req_timeout(req);
789 
790 	if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
791 		oa->o_lcookie = *oti->oti_logcookies;
792 	body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
793 	LASSERT(body);
794 	lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
795 
796 	osc_pack_capa(req, body, (struct obd_capa *)capa);
797 	ptlrpc_request_set_replen(req);
798 
799 	/* If osc_destroy is for destroying the unlink orphan,
800 	 * sent from MDT to OST, which should not be blocked here,
801 	 * because the process might be triggered by ptlrpcd, and
802 	 * it is not good to block ptlrpcd thread (b=16006)*/
803 	if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
804 		req->rq_interpret_reply = osc_destroy_interpret;
805 		if (!osc_can_send_destroy(cli)) {
806 			struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
807 							  NULL);
808 
809 			/*
810 			 * Wait until the number of on-going destroy RPCs drops
811 			 * under max_rpc_in_flight
812 			 */
813 			l_wait_event_exclusive(cli->cl_destroy_waitq,
814 					       osc_can_send_destroy(cli), &lwi);
815 		}
816 	}
817 
818 	/* Do not wait for response */
819 	ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
820 	return 0;
821 }
822 
osc_announce_cached(struct client_obd * cli,struct obdo * oa,long writing_bytes)823 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
824 				long writing_bytes)
825 {
826 	u32 bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
827 
828 	LASSERT(!(oa->o_valid & bits));
829 
830 	oa->o_valid |= bits;
831 	client_obd_list_lock(&cli->cl_loi_list_lock);
832 	oa->o_dirty = cli->cl_dirty;
833 	if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
834 		     cli->cl_dirty_max)) {
835 		CERROR("dirty %lu - %lu > dirty_max %lu\n",
836 		       cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
837 		oa->o_undirty = 0;
838 	} else if (unlikely(atomic_read(&obd_dirty_pages) -
839 			    atomic_read(&obd_dirty_transit_pages) >
840 			    (long)(obd_max_dirty_pages + 1))) {
841 		/* The atomic_read() allowing the atomic_inc() are
842 		 * not covered by a lock thus they may safely race and trip
843 		 * this CERROR() unless we add in a small fudge factor (+1). */
844 		CERROR("dirty %d - %d > system dirty_max %d\n",
845 		       atomic_read(&obd_dirty_pages),
846 		       atomic_read(&obd_dirty_transit_pages),
847 		       obd_max_dirty_pages);
848 		oa->o_undirty = 0;
849 	} else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
850 		CERROR("dirty %lu - dirty_max %lu too big???\n",
851 		       cli->cl_dirty, cli->cl_dirty_max);
852 		oa->o_undirty = 0;
853 	} else {
854 		long max_in_flight = (cli->cl_max_pages_per_rpc <<
855 				      PAGE_CACHE_SHIFT)*
856 				     (cli->cl_max_rpcs_in_flight + 1);
857 		oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
858 	}
859 	oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
860 	oa->o_dropped = cli->cl_lost_grant;
861 	cli->cl_lost_grant = 0;
862 	client_obd_list_unlock(&cli->cl_loi_list_lock);
863 	CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
864 	       oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
865 
866 }
867 
osc_update_next_shrink(struct client_obd * cli)868 void osc_update_next_shrink(struct client_obd *cli)
869 {
870 	cli->cl_next_shrink_grant =
871 		cfs_time_shift(cli->cl_grant_shrink_interval);
872 	CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
873 	       cli->cl_next_shrink_grant);
874 }
875 
__osc_update_grant(struct client_obd * cli,u64 grant)876 static void __osc_update_grant(struct client_obd *cli, u64 grant)
877 {
878 	client_obd_list_lock(&cli->cl_loi_list_lock);
879 	cli->cl_avail_grant += grant;
880 	client_obd_list_unlock(&cli->cl_loi_list_lock);
881 }
882 
osc_update_grant(struct client_obd * cli,struct ost_body * body)883 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
884 {
885 	if (body->oa.o_valid & OBD_MD_FLGRANT) {
886 		CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
887 		__osc_update_grant(cli, body->oa.o_grant);
888 	}
889 }
890 
891 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
892 			      u32 keylen, void *key, u32 vallen,
893 			      void *val, struct ptlrpc_request_set *set);
894 
osc_shrink_grant_interpret(const struct lu_env * env,struct ptlrpc_request * req,void * aa,int rc)895 static int osc_shrink_grant_interpret(const struct lu_env *env,
896 				      struct ptlrpc_request *req,
897 				      void *aa, int rc)
898 {
899 	struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
900 	struct obdo *oa = ((struct osc_brw_async_args *)aa)->aa_oa;
901 	struct ost_body *body;
902 
903 	if (rc != 0) {
904 		__osc_update_grant(cli, oa->o_grant);
905 		goto out;
906 	}
907 
908 	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
909 	LASSERT(body);
910 	osc_update_grant(cli, body);
911 out:
912 	OBDO_FREE(oa);
913 	return rc;
914 }
915 
osc_shrink_grant_local(struct client_obd * cli,struct obdo * oa)916 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
917 {
918 	client_obd_list_lock(&cli->cl_loi_list_lock);
919 	oa->o_grant = cli->cl_avail_grant / 4;
920 	cli->cl_avail_grant -= oa->o_grant;
921 	client_obd_list_unlock(&cli->cl_loi_list_lock);
922 	if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
923 		oa->o_valid |= OBD_MD_FLFLAGS;
924 		oa->o_flags = 0;
925 	}
926 	oa->o_flags |= OBD_FL_SHRINK_GRANT;
927 	osc_update_next_shrink(cli);
928 }
929 
930 /* Shrink the current grant, either from some large amount to enough for a
931  * full set of in-flight RPCs, or if we have already shrunk to that limit
932  * then to enough for a single RPC.  This avoids keeping more grant than
933  * needed, and avoids shrinking the grant piecemeal. */
osc_shrink_grant(struct client_obd * cli)934 static int osc_shrink_grant(struct client_obd *cli)
935 {
936 	__u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
937 			     (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
938 
939 	client_obd_list_lock(&cli->cl_loi_list_lock);
940 	if (cli->cl_avail_grant <= target_bytes)
941 		target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
942 	client_obd_list_unlock(&cli->cl_loi_list_lock);
943 
944 	return osc_shrink_grant_to_target(cli, target_bytes);
945 }
946 
osc_shrink_grant_to_target(struct client_obd * cli,__u64 target_bytes)947 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
948 {
949 	int			rc = 0;
950 	struct ost_body	*body;
951 
952 	client_obd_list_lock(&cli->cl_loi_list_lock);
953 	/* Don't shrink if we are already above or below the desired limit
954 	 * We don't want to shrink below a single RPC, as that will negatively
955 	 * impact block allocation and long-term performance. */
956 	if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
957 		target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
958 
959 	if (target_bytes >= cli->cl_avail_grant) {
960 		client_obd_list_unlock(&cli->cl_loi_list_lock);
961 		return 0;
962 	}
963 	client_obd_list_unlock(&cli->cl_loi_list_lock);
964 
965 	OBD_ALLOC_PTR(body);
966 	if (!body)
967 		return -ENOMEM;
968 
969 	osc_announce_cached(cli, &body->oa, 0);
970 
971 	client_obd_list_lock(&cli->cl_loi_list_lock);
972 	body->oa.o_grant = cli->cl_avail_grant - target_bytes;
973 	cli->cl_avail_grant = target_bytes;
974 	client_obd_list_unlock(&cli->cl_loi_list_lock);
975 	if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
976 		body->oa.o_valid |= OBD_MD_FLFLAGS;
977 		body->oa.o_flags = 0;
978 	}
979 	body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
980 	osc_update_next_shrink(cli);
981 
982 	rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
983 				sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
984 				sizeof(*body), body, NULL);
985 	if (rc != 0)
986 		__osc_update_grant(cli, body->oa.o_grant);
987 	OBD_FREE_PTR(body);
988 	return rc;
989 }
990 
osc_should_shrink_grant(struct client_obd * client)991 static int osc_should_shrink_grant(struct client_obd *client)
992 {
993 	unsigned long time = cfs_time_current();
994 	unsigned long next_shrink = client->cl_next_shrink_grant;
995 
996 	if ((client->cl_import->imp_connect_data.ocd_connect_flags &
997 	     OBD_CONNECT_GRANT_SHRINK) == 0)
998 		return 0;
999 
1000 	if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1001 		/* Get the current RPC size directly, instead of going via:
1002 		 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
1003 		 * Keep comment here so that it can be found by searching. */
1004 		int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
1005 
1006 		if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1007 		    client->cl_avail_grant > brw_size)
1008 			return 1;
1009 		else
1010 			osc_update_next_shrink(client);
1011 	}
1012 	return 0;
1013 }
1014 
osc_grant_shrink_grant_cb(struct timeout_item * item,void * data)1015 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1016 {
1017 	struct client_obd *client;
1018 
1019 	list_for_each_entry(client, &item->ti_obd_list,
1020 				cl_grant_shrink_list) {
1021 		if (osc_should_shrink_grant(client))
1022 			osc_shrink_grant(client);
1023 	}
1024 	return 0;
1025 }
1026 
osc_add_shrink_grant(struct client_obd * client)1027 static int osc_add_shrink_grant(struct client_obd *client)
1028 {
1029 	int rc;
1030 
1031 	rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1032 				       TIMEOUT_GRANT,
1033 				       osc_grant_shrink_grant_cb, NULL,
1034 				       &client->cl_grant_shrink_list);
1035 	if (rc) {
1036 		CERROR("add grant client %s error %d\n",
1037 			client->cl_import->imp_obd->obd_name, rc);
1038 		return rc;
1039 	}
1040 	CDEBUG(D_CACHE, "add grant client %s \n",
1041 	       client->cl_import->imp_obd->obd_name);
1042 	osc_update_next_shrink(client);
1043 	return 0;
1044 }
1045 
osc_del_shrink_grant(struct client_obd * client)1046 static int osc_del_shrink_grant(struct client_obd *client)
1047 {
1048 	return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1049 					 TIMEOUT_GRANT);
1050 }
1051 
osc_init_grant(struct client_obd * cli,struct obd_connect_data * ocd)1052 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1053 {
1054 	/*
1055 	 * ocd_grant is the total grant amount we're expect to hold: if we've
1056 	 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1057 	 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1058 	 *
1059 	 * race is tolerable here: if we're evicted, but imp_state already
1060 	 * left EVICTED state, then cl_dirty must be 0 already.
1061 	 */
1062 	client_obd_list_lock(&cli->cl_loi_list_lock);
1063 	if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1064 		cli->cl_avail_grant = ocd->ocd_grant;
1065 	else
1066 		cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1067 
1068 	if (cli->cl_avail_grant < 0) {
1069 		CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1070 		      cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1071 		      ocd->ocd_grant, cli->cl_dirty);
1072 		/* workaround for servers which do not have the patch from
1073 		 * LU-2679 */
1074 		cli->cl_avail_grant = ocd->ocd_grant;
1075 	}
1076 
1077 	/* determine the appropriate chunk size used by osc_extent. */
1078 	cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1079 	client_obd_list_unlock(&cli->cl_loi_list_lock);
1080 
1081 	CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld chunk bits: %d\n",
1082 	       cli->cl_import->imp_obd->obd_name,
1083 	       cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1084 
1085 	if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1086 	    list_empty(&cli->cl_grant_shrink_list))
1087 		osc_add_shrink_grant(cli);
1088 }
1089 
1090 /* We assume that the reason this OSC got a short read is because it read
1091  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1092  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1093  * this stripe never got written at or beyond this stripe offset yet. */
handle_short_read(int nob_read,u32 page_count,struct brw_page ** pga)1094 static void handle_short_read(int nob_read, u32 page_count,
1095 			      struct brw_page **pga)
1096 {
1097 	char *ptr;
1098 	int i = 0;
1099 
1100 	/* skip bytes read OK */
1101 	while (nob_read > 0) {
1102 		LASSERT (page_count > 0);
1103 
1104 		if (pga[i]->count > nob_read) {
1105 			/* EOF inside this page */
1106 			ptr = kmap(pga[i]->pg) +
1107 				(pga[i]->off & ~CFS_PAGE_MASK);
1108 			memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1109 			kunmap(pga[i]->pg);
1110 			page_count--;
1111 			i++;
1112 			break;
1113 		}
1114 
1115 		nob_read -= pga[i]->count;
1116 		page_count--;
1117 		i++;
1118 	}
1119 
1120 	/* zero remaining pages */
1121 	while (page_count-- > 0) {
1122 		ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1123 		memset(ptr, 0, pga[i]->count);
1124 		kunmap(pga[i]->pg);
1125 		i++;
1126 	}
1127 }
1128 
check_write_rcs(struct ptlrpc_request * req,int requested_nob,int niocount,u32 page_count,struct brw_page ** pga)1129 static int check_write_rcs(struct ptlrpc_request *req,
1130 			   int requested_nob, int niocount,
1131 			   u32 page_count, struct brw_page **pga)
1132 {
1133 	int     i;
1134 	__u32   *remote_rcs;
1135 
1136 	remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1137 						  sizeof(*remote_rcs) *
1138 						  niocount);
1139 	if (remote_rcs == NULL) {
1140 		CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1141 		return -EPROTO;
1142 	}
1143 
1144 	/* return error if any niobuf was in error */
1145 	for (i = 0; i < niocount; i++) {
1146 		if ((int)remote_rcs[i] < 0)
1147 			return remote_rcs[i];
1148 
1149 		if (remote_rcs[i] != 0) {
1150 			CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1151 				i, remote_rcs[i], req);
1152 			return -EPROTO;
1153 		}
1154 	}
1155 
1156 	if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1157 		CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1158 		       req->rq_bulk->bd_nob_transferred, requested_nob);
1159 		return -EPROTO;
1160 	}
1161 
1162 	return 0;
1163 }
1164 
can_merge_pages(struct brw_page * p1,struct brw_page * p2)1165 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1166 {
1167 	if (p1->flag != p2->flag) {
1168 		unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1169 				  OBD_BRW_SYNC | OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1170 
1171 		/* warn if we try to combine flags that we don't know to be
1172 		 * safe to combine */
1173 		if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1174 			CWARN("Saw flags 0x%x and 0x%x in the same brw, please report this at http://bugs.whamcloud.com/\n",
1175 			      p1->flag, p2->flag);
1176 		}
1177 		return 0;
1178 	}
1179 
1180 	return (p1->off + p1->count == p2->off);
1181 }
1182 
osc_checksum_bulk(int nob,u32 pg_count,struct brw_page ** pga,int opc,cksum_type_t cksum_type)1183 static u32 osc_checksum_bulk(int nob, u32 pg_count,
1184 				   struct brw_page **pga, int opc,
1185 				   cksum_type_t cksum_type)
1186 {
1187 	__u32				cksum;
1188 	int				i = 0;
1189 	struct cfs_crypto_hash_desc	*hdesc;
1190 	unsigned int			bufsize;
1191 	int				err;
1192 	unsigned char			cfs_alg = cksum_obd2cfs(cksum_type);
1193 
1194 	LASSERT(pg_count > 0);
1195 
1196 	hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1197 	if (IS_ERR(hdesc)) {
1198 		CERROR("Unable to initialize checksum hash %s\n",
1199 		       cfs_crypto_hash_name(cfs_alg));
1200 		return PTR_ERR(hdesc);
1201 	}
1202 
1203 	while (nob > 0 && pg_count > 0) {
1204 		int count = pga[i]->count > nob ? nob : pga[i]->count;
1205 
1206 		/* corrupt the data before we compute the checksum, to
1207 		 * simulate an OST->client data error */
1208 		if (i == 0 && opc == OST_READ &&
1209 		    OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1210 			unsigned char *ptr = kmap(pga[i]->pg);
1211 			int off = pga[i]->off & ~CFS_PAGE_MASK;
1212 			memcpy(ptr + off, "bad1", min(4, nob));
1213 			kunmap(pga[i]->pg);
1214 		}
1215 		cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1216 				  pga[i]->off & ~CFS_PAGE_MASK,
1217 				  count);
1218 		CDEBUG(D_PAGE,
1219 		       "page %p map %p index %lu flags %lx count %u priv %0lx: off %d\n",
1220 		       pga[i]->pg, pga[i]->pg->mapping, pga[i]->pg->index,
1221 		       (long)pga[i]->pg->flags, page_count(pga[i]->pg),
1222 		       page_private(pga[i]->pg),
1223 		       (int)(pga[i]->off & ~CFS_PAGE_MASK));
1224 
1225 		nob -= pga[i]->count;
1226 		pg_count--;
1227 		i++;
1228 	}
1229 
1230 	bufsize = 4;
1231 	err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1232 
1233 	if (err)
1234 		cfs_crypto_hash_final(hdesc, NULL, NULL);
1235 
1236 	/* For sending we only compute the wrong checksum instead
1237 	 * of corrupting the data so it is still correct on a redo */
1238 	if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1239 		cksum++;
1240 
1241 	return cksum;
1242 }
1243 
osc_brw_prep_request(int cmd,struct client_obd * cli,struct obdo * oa,struct lov_stripe_md * lsm,u32 page_count,struct brw_page ** pga,struct ptlrpc_request ** reqp,struct obd_capa * ocapa,int reserve,int resend)1244 static int osc_brw_prep_request(int cmd, struct client_obd *cli,
1245 				struct obdo *oa,
1246 				struct lov_stripe_md *lsm, u32 page_count,
1247 				struct brw_page **pga,
1248 				struct ptlrpc_request **reqp,
1249 				struct obd_capa *ocapa, int reserve,
1250 				int resend)
1251 {
1252 	struct ptlrpc_request   *req;
1253 	struct ptlrpc_bulk_desc *desc;
1254 	struct ost_body	 *body;
1255 	struct obd_ioobj	*ioobj;
1256 	struct niobuf_remote    *niobuf;
1257 	int niocount, i, requested_nob, opc, rc;
1258 	struct osc_brw_async_args *aa;
1259 	struct req_capsule      *pill;
1260 	struct brw_page *pg_prev;
1261 
1262 	if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1263 		return -ENOMEM; /* Recoverable */
1264 	if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1265 		return -EINVAL; /* Fatal */
1266 
1267 	if ((cmd & OBD_BRW_WRITE) != 0) {
1268 		opc = OST_WRITE;
1269 		req = ptlrpc_request_alloc_pool(cli->cl_import,
1270 						cli->cl_import->imp_rq_pool,
1271 						&RQF_OST_BRW_WRITE);
1272 	} else {
1273 		opc = OST_READ;
1274 		req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1275 	}
1276 	if (req == NULL)
1277 		return -ENOMEM;
1278 
1279 	for (niocount = i = 1; i < page_count; i++) {
1280 		if (!can_merge_pages(pga[i - 1], pga[i]))
1281 			niocount++;
1282 	}
1283 
1284 	pill = &req->rq_pill;
1285 	req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1286 			     sizeof(*ioobj));
1287 	req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1288 			     niocount * sizeof(*niobuf));
1289 	osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1290 
1291 	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1292 	if (rc) {
1293 		ptlrpc_request_free(req);
1294 		return rc;
1295 	}
1296 	req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1297 	ptlrpc_at_set_req_timeout(req);
1298 	/* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1299 	 * retry logic */
1300 	req->rq_no_retry_einprogress = 1;
1301 
1302 	desc = ptlrpc_prep_bulk_imp(req, page_count,
1303 		cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1304 		opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1305 		OST_BULK_PORTAL);
1306 
1307 	if (desc == NULL) {
1308 		rc = -ENOMEM;
1309 		goto out;
1310 	}
1311 	/* NB request now owns desc and will free it when it gets freed */
1312 
1313 	body = req_capsule_client_get(pill, &RMF_OST_BODY);
1314 	ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1315 	niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1316 	LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1317 
1318 	lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1319 
1320 	obdo_to_ioobj(oa, ioobj);
1321 	ioobj->ioo_bufcnt = niocount;
1322 	/* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1323 	 * that might be send for this request.  The actual number is decided
1324 	 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1325 	 * "max - 1" for old client compatibility sending "0", and also so the
1326 	 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1327 	ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1328 	osc_pack_capa(req, body, ocapa);
1329 	LASSERT(page_count > 0);
1330 	pg_prev = pga[0];
1331 	for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1332 		struct brw_page *pg = pga[i];
1333 		int poff = pg->off & ~CFS_PAGE_MASK;
1334 
1335 		LASSERT(pg->count > 0);
1336 		/* make sure there is no gap in the middle of page array */
1337 		LASSERTF(page_count == 1 ||
1338 			 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1339 			  ergo(i > 0 && i < page_count - 1,
1340 			       poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1341 			  ergo(i == page_count - 1, poff == 0)),
1342 			 "i: %d/%d pg: %p off: %llu, count: %u\n",
1343 			 i, page_count, pg, pg->off, pg->count);
1344 		LASSERTF(i == 0 || pg->off > pg_prev->off,
1345 			 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu prev_pg %p [pri %lu ind %lu] off %llu\n",
1346 			 i, page_count,
1347 			 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1348 			 pg_prev->pg, page_private(pg_prev->pg),
1349 			 pg_prev->pg->index, pg_prev->off);
1350 		LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1351 			(pg->flag & OBD_BRW_SRVLOCK));
1352 
1353 		ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1354 		requested_nob += pg->count;
1355 
1356 		if (i > 0 && can_merge_pages(pg_prev, pg)) {
1357 			niobuf--;
1358 			niobuf->len += pg->count;
1359 		} else {
1360 			niobuf->offset = pg->off;
1361 			niobuf->len    = pg->count;
1362 			niobuf->flags  = pg->flag;
1363 		}
1364 		pg_prev = pg;
1365 	}
1366 
1367 	LASSERTF((void *)(niobuf - niocount) ==
1368 		req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1369 		"want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1370 		&RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1371 
1372 	osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1373 	if (resend) {
1374 		if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1375 			body->oa.o_valid |= OBD_MD_FLFLAGS;
1376 			body->oa.o_flags = 0;
1377 		}
1378 		body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1379 	}
1380 
1381 	if (osc_should_shrink_grant(cli))
1382 		osc_shrink_grant_local(cli, &body->oa);
1383 
1384 	/* size[REQ_REC_OFF] still sizeof (*body) */
1385 	if (opc == OST_WRITE) {
1386 		if (cli->cl_checksum &&
1387 		    !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1388 			/* store cl_cksum_type in a local variable since
1389 			 * it can be changed via lprocfs */
1390 			cksum_type_t cksum_type = cli->cl_cksum_type;
1391 
1392 			if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1393 				oa->o_flags &= OBD_FL_LOCAL_MASK;
1394 				body->oa.o_flags = 0;
1395 			}
1396 			body->oa.o_flags |= cksum_type_pack(cksum_type);
1397 			body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1398 			body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1399 							     page_count, pga,
1400 							     OST_WRITE,
1401 							     cksum_type);
1402 			CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1403 			       body->oa.o_cksum);
1404 			/* save this in 'oa', too, for later checking */
1405 			oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1406 			oa->o_flags |= cksum_type_pack(cksum_type);
1407 		} else {
1408 			/* clear out the checksum flag, in case this is a
1409 			 * resend but cl_checksum is no longer set. b=11238 */
1410 			oa->o_valid &= ~OBD_MD_FLCKSUM;
1411 		}
1412 		oa->o_cksum = body->oa.o_cksum;
1413 		/* 1 RC per niobuf */
1414 		req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1415 				     sizeof(__u32) * niocount);
1416 	} else {
1417 		if (cli->cl_checksum &&
1418 		    !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1419 			if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1420 				body->oa.o_flags = 0;
1421 			body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1422 			body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1423 		}
1424 	}
1425 	ptlrpc_request_set_replen(req);
1426 
1427 	CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1428 	aa = ptlrpc_req_async_args(req);
1429 	aa->aa_oa = oa;
1430 	aa->aa_requested_nob = requested_nob;
1431 	aa->aa_nio_count = niocount;
1432 	aa->aa_page_count = page_count;
1433 	aa->aa_resends = 0;
1434 	aa->aa_ppga = pga;
1435 	aa->aa_cli = cli;
1436 	INIT_LIST_HEAD(&aa->aa_oaps);
1437 	if (ocapa && reserve)
1438 		aa->aa_ocapa = capa_get(ocapa);
1439 
1440 	*reqp = req;
1441 	return 0;
1442 
1443  out:
1444 	ptlrpc_req_finished(req);
1445 	return rc;
1446 }
1447 
check_write_checksum(struct obdo * oa,const lnet_process_id_t * peer,__u32 client_cksum,__u32 server_cksum,int nob,u32 page_count,struct brw_page ** pga,cksum_type_t client_cksum_type)1448 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1449 				__u32 client_cksum, __u32 server_cksum, int nob,
1450 				u32 page_count, struct brw_page **pga,
1451 				cksum_type_t client_cksum_type)
1452 {
1453 	__u32 new_cksum;
1454 	char *msg;
1455 	cksum_type_t cksum_type;
1456 
1457 	if (server_cksum == client_cksum) {
1458 		CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1459 		return 0;
1460 	}
1461 
1462 	cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1463 				       oa->o_flags : 0);
1464 	new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1465 				      cksum_type);
1466 
1467 	if (cksum_type != client_cksum_type)
1468 		msg = "the server did not use the checksum type specified in the original request - likely a protocol problem"
1469 			;
1470 	else if (new_cksum == server_cksum)
1471 		msg = "changed on the client after we checksummed it - likely false positive due to mmap IO (bug 11742)"
1472 			;
1473 	else if (new_cksum == client_cksum)
1474 		msg = "changed in transit before arrival at OST";
1475 	else
1476 		msg = "changed in transit AND doesn't match the original - likely false positive due to mmap IO (bug 11742)"
1477 			;
1478 
1479 	LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1480 			   " object "DOSTID" extent [%llu-%llu]\n",
1481 			   msg, libcfs_nid2str(peer->nid),
1482 			   oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1483 			   oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1484 			   oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1485 			   POSTID(&oa->o_oi), pga[0]->off,
1486 			   pga[page_count-1]->off + pga[page_count-1]->count - 1);
1487 	CERROR("original client csum %x (type %x), server csum %x (type %x), client csum now %x\n",
1488 	       client_cksum, client_cksum_type,
1489 	       server_cksum, cksum_type, new_cksum);
1490 	return 1;
1491 }
1492 
1493 /* Note rc enters this function as number of bytes transferred */
osc_brw_fini_request(struct ptlrpc_request * req,int rc)1494 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1495 {
1496 	struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1497 	const lnet_process_id_t *peer =
1498 			&req->rq_import->imp_connection->c_peer;
1499 	struct client_obd *cli = aa->aa_cli;
1500 	struct ost_body *body;
1501 	__u32 client_cksum = 0;
1502 
1503 	if (rc < 0 && rc != -EDQUOT) {
1504 		DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1505 		return rc;
1506 	}
1507 
1508 	LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1509 	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1510 	if (body == NULL) {
1511 		DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1512 		return -EPROTO;
1513 	}
1514 
1515 	/* set/clear over quota flag for a uid/gid */
1516 	if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1517 	    body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1518 		unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1519 
1520 		CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %#llx, flags %x\n",
1521 		       body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1522 		       body->oa.o_flags);
1523 		osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1524 	}
1525 
1526 	osc_update_grant(cli, body);
1527 
1528 	if (rc < 0)
1529 		return rc;
1530 
1531 	if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1532 		client_cksum = aa->aa_oa->o_cksum; /* save for later */
1533 
1534 	if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1535 		if (rc > 0) {
1536 			CERROR("Unexpected +ve rc %d\n", rc);
1537 			return -EPROTO;
1538 		}
1539 		LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1540 
1541 		if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1542 			return -EAGAIN;
1543 
1544 		if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1545 		    check_write_checksum(&body->oa, peer, client_cksum,
1546 					 body->oa.o_cksum, aa->aa_requested_nob,
1547 					 aa->aa_page_count, aa->aa_ppga,
1548 					 cksum_type_unpack(aa->aa_oa->o_flags)))
1549 			return -EAGAIN;
1550 
1551 		rc = check_write_rcs(req, aa->aa_requested_nob,
1552 				     aa->aa_nio_count,
1553 				     aa->aa_page_count, aa->aa_ppga);
1554 		goto out;
1555 	}
1556 
1557 	/* The rest of this function executes only for OST_READs */
1558 
1559 	/* if unwrap_bulk failed, return -EAGAIN to retry */
1560 	rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1561 	if (rc < 0) {
1562 		rc = -EAGAIN;
1563 		goto out;
1564 	}
1565 
1566 	if (rc > aa->aa_requested_nob) {
1567 		CERROR("Unexpected rc %d (%d requested)\n", rc,
1568 		       aa->aa_requested_nob);
1569 		return -EPROTO;
1570 	}
1571 
1572 	if (rc != req->rq_bulk->bd_nob_transferred) {
1573 		CERROR ("Unexpected rc %d (%d transferred)\n",
1574 			rc, req->rq_bulk->bd_nob_transferred);
1575 		return -EPROTO;
1576 	}
1577 
1578 	if (rc < aa->aa_requested_nob)
1579 		handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1580 
1581 	if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1582 		static int cksum_counter;
1583 		__u32      server_cksum = body->oa.o_cksum;
1584 		char      *via;
1585 		char      *router;
1586 		cksum_type_t cksum_type;
1587 
1588 		cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1589 					       body->oa.o_flags : 0);
1590 		client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1591 						 aa->aa_ppga, OST_READ,
1592 						 cksum_type);
1593 
1594 		if (peer->nid == req->rq_bulk->bd_sender) {
1595 			via = router = "";
1596 		} else {
1597 			via = " via ";
1598 			router = libcfs_nid2str(req->rq_bulk->bd_sender);
1599 		}
1600 
1601 		if (server_cksum != client_cksum) {
1602 			LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from %s%s%s inode " DFID " object " DOSTID " extent [%llu-%llu]\n",
1603 					   req->rq_import->imp_obd->obd_name,
1604 					   libcfs_nid2str(peer->nid),
1605 					   via, router,
1606 					   body->oa.o_valid & OBD_MD_FLFID ?
1607 					   body->oa.o_parent_seq : (__u64)0,
1608 					   body->oa.o_valid & OBD_MD_FLFID ?
1609 					   body->oa.o_parent_oid : 0,
1610 					   body->oa.o_valid & OBD_MD_FLFID ?
1611 					   body->oa.o_parent_ver : 0,
1612 					   POSTID(&body->oa.o_oi),
1613 					   aa->aa_ppga[0]->off,
1614 					   aa->aa_ppga[aa->aa_page_count-1]->off +
1615 					   aa->aa_ppga[aa->aa_page_count-1]->count -
1616 					   1);
1617 			CERROR("client %x, server %x, cksum_type %x\n",
1618 			       client_cksum, server_cksum, cksum_type);
1619 			cksum_counter = 0;
1620 			aa->aa_oa->o_cksum = client_cksum;
1621 			rc = -EAGAIN;
1622 		} else {
1623 			cksum_counter++;
1624 			CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1625 			rc = 0;
1626 		}
1627 	} else if (unlikely(client_cksum)) {
1628 		static int cksum_missed;
1629 
1630 		cksum_missed++;
1631 		if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1632 			CERROR("Checksum %u requested from %s but not sent\n",
1633 			       cksum_missed, libcfs_nid2str(peer->nid));
1634 	} else {
1635 		rc = 0;
1636 	}
1637 out:
1638 	if (rc >= 0)
1639 		lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1640 				     aa->aa_oa, &body->oa);
1641 
1642 	return rc;
1643 }
1644 
osc_brw_redo_request(struct ptlrpc_request * request,struct osc_brw_async_args * aa,int rc)1645 static int osc_brw_redo_request(struct ptlrpc_request *request,
1646 				struct osc_brw_async_args *aa, int rc)
1647 {
1648 	struct ptlrpc_request *new_req;
1649 	struct osc_brw_async_args *new_aa;
1650 	struct osc_async_page *oap;
1651 
1652 	DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1653 		  "redo for recoverable error %d", rc);
1654 
1655 	rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1656 					OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1657 				  aa->aa_cli, aa->aa_oa,
1658 				  NULL /* lsm unused by osc currently */,
1659 				  aa->aa_page_count, aa->aa_ppga,
1660 				  &new_req, aa->aa_ocapa, 0, 1);
1661 	if (rc)
1662 		return rc;
1663 
1664 	list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1665 		if (oap->oap_request != NULL) {
1666 			LASSERTF(request == oap->oap_request,
1667 				 "request %p != oap_request %p\n",
1668 				 request, oap->oap_request);
1669 			if (oap->oap_interrupted) {
1670 				ptlrpc_req_finished(new_req);
1671 				return -EINTR;
1672 			}
1673 		}
1674 	}
1675 	/* New request takes over pga and oaps from old request.
1676 	 * Note that copying a list_head doesn't work, need to move it... */
1677 	aa->aa_resends++;
1678 	new_req->rq_interpret_reply = request->rq_interpret_reply;
1679 	new_req->rq_async_args = request->rq_async_args;
1680 	/* cap resend delay to the current request timeout, this is similar to
1681 	 * what ptlrpc does (see after_reply()) */
1682 	if (aa->aa_resends > new_req->rq_timeout)
1683 		new_req->rq_sent = get_seconds() + new_req->rq_timeout;
1684 	else
1685 		new_req->rq_sent = get_seconds() + aa->aa_resends;
1686 	new_req->rq_generation_set = 1;
1687 	new_req->rq_import_generation = request->rq_import_generation;
1688 
1689 	new_aa = ptlrpc_req_async_args(new_req);
1690 
1691 	INIT_LIST_HEAD(&new_aa->aa_oaps);
1692 	list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1693 	INIT_LIST_HEAD(&new_aa->aa_exts);
1694 	list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1695 	new_aa->aa_resends = aa->aa_resends;
1696 
1697 	list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1698 		if (oap->oap_request) {
1699 			ptlrpc_req_finished(oap->oap_request);
1700 			oap->oap_request = ptlrpc_request_addref(new_req);
1701 		}
1702 	}
1703 
1704 	new_aa->aa_ocapa = aa->aa_ocapa;
1705 	aa->aa_ocapa = NULL;
1706 
1707 	/* XXX: This code will run into problem if we're going to support
1708 	 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1709 	 * and wait for all of them to be finished. We should inherit request
1710 	 * set from old request. */
1711 	ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1712 
1713 	DEBUG_REQ(D_INFO, new_req, "new request");
1714 	return 0;
1715 }
1716 
1717 /*
1718  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1719  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1720  * fine for our small page arrays and doesn't require allocation.  its an
1721  * insertion sort that swaps elements that are strides apart, shrinking the
1722  * stride down until its '1' and the array is sorted.
1723  */
sort_brw_pages(struct brw_page ** array,int num)1724 static void sort_brw_pages(struct brw_page **array, int num)
1725 {
1726 	int stride, i, j;
1727 	struct brw_page *tmp;
1728 
1729 	if (num == 1)
1730 		return;
1731 	for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1732 		;
1733 
1734 	do {
1735 		stride /= 3;
1736 		for (i = stride ; i < num ; i++) {
1737 			tmp = array[i];
1738 			j = i;
1739 			while (j >= stride && array[j - stride]->off > tmp->off) {
1740 				array[j] = array[j - stride];
1741 				j -= stride;
1742 			}
1743 			array[j] = tmp;
1744 		}
1745 	} while (stride > 1);
1746 }
1747 
osc_release_ppga(struct brw_page ** ppga,u32 count)1748 static void osc_release_ppga(struct brw_page **ppga, u32 count)
1749 {
1750 	LASSERT(ppga != NULL);
1751 	OBD_FREE(ppga, sizeof(*ppga) * count);
1752 }
1753 
brw_interpret(const struct lu_env * env,struct ptlrpc_request * req,void * data,int rc)1754 static int brw_interpret(const struct lu_env *env,
1755 			 struct ptlrpc_request *req, void *data, int rc)
1756 {
1757 	struct osc_brw_async_args *aa = data;
1758 	struct osc_extent *ext;
1759 	struct osc_extent *tmp;
1760 	struct cl_object  *obj = NULL;
1761 	struct client_obd *cli = aa->aa_cli;
1762 
1763 	rc = osc_brw_fini_request(req, rc);
1764 	CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1765 	/* When server return -EINPROGRESS, client should always retry
1766 	 * regardless of the number of times the bulk was resent already. */
1767 	if (osc_recoverable_error(rc)) {
1768 		if (req->rq_import_generation !=
1769 		    req->rq_import->imp_generation) {
1770 			CDEBUG(D_HA, "%s: resend cross eviction for object: " DOSTID ", rc = %d.\n",
1771 			       req->rq_import->imp_obd->obd_name,
1772 			       POSTID(&aa->aa_oa->o_oi), rc);
1773 		} else if (rc == -EINPROGRESS ||
1774 		    client_should_resend(aa->aa_resends, aa->aa_cli)) {
1775 			rc = osc_brw_redo_request(req, aa, rc);
1776 		} else {
1777 			CERROR("%s: too many resent retries for object: %llu:%llu, rc = %d.\n",
1778 			       req->rq_import->imp_obd->obd_name,
1779 			       POSTID(&aa->aa_oa->o_oi), rc);
1780 		}
1781 
1782 		if (rc == 0)
1783 			return 0;
1784 		else if (rc == -EAGAIN || rc == -EINPROGRESS)
1785 			rc = -EIO;
1786 	}
1787 
1788 	if (aa->aa_ocapa) {
1789 		capa_put(aa->aa_ocapa);
1790 		aa->aa_ocapa = NULL;
1791 	}
1792 
1793 	list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1794 		if (obj == NULL && rc == 0) {
1795 			obj = osc2cl(ext->oe_obj);
1796 			cl_object_get(obj);
1797 		}
1798 
1799 		list_del_init(&ext->oe_link);
1800 		osc_extent_finish(env, ext, 1, rc);
1801 	}
1802 	LASSERT(list_empty(&aa->aa_exts));
1803 	LASSERT(list_empty(&aa->aa_oaps));
1804 
1805 	if (obj != NULL) {
1806 		struct obdo *oa = aa->aa_oa;
1807 		struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
1808 		unsigned long valid = 0;
1809 
1810 		LASSERT(rc == 0);
1811 		if (oa->o_valid & OBD_MD_FLBLOCKS) {
1812 			attr->cat_blocks = oa->o_blocks;
1813 			valid |= CAT_BLOCKS;
1814 		}
1815 		if (oa->o_valid & OBD_MD_FLMTIME) {
1816 			attr->cat_mtime = oa->o_mtime;
1817 			valid |= CAT_MTIME;
1818 		}
1819 		if (oa->o_valid & OBD_MD_FLATIME) {
1820 			attr->cat_atime = oa->o_atime;
1821 			valid |= CAT_ATIME;
1822 		}
1823 		if (oa->o_valid & OBD_MD_FLCTIME) {
1824 			attr->cat_ctime = oa->o_ctime;
1825 			valid |= CAT_CTIME;
1826 		}
1827 		if (valid != 0) {
1828 			cl_object_attr_lock(obj);
1829 			cl_object_attr_set(env, obj, attr, valid);
1830 			cl_object_attr_unlock(obj);
1831 		}
1832 		cl_object_put(env, obj);
1833 	}
1834 	OBDO_FREE(aa->aa_oa);
1835 
1836 	cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1837 			  req->rq_bulk->bd_nob_transferred);
1838 	osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1839 	ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1840 
1841 	client_obd_list_lock(&cli->cl_loi_list_lock);
1842 	/* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1843 	 * is called so we know whether to go to sync BRWs or wait for more
1844 	 * RPCs to complete */
1845 	if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1846 		cli->cl_w_in_flight--;
1847 	else
1848 		cli->cl_r_in_flight--;
1849 	osc_wake_cache_waiters(cli);
1850 	client_obd_list_unlock(&cli->cl_loi_list_lock);
1851 
1852 	osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1853 	return rc;
1854 }
1855 
1856 /**
1857  * Build an RPC by the list of extent @ext_list. The caller must ensure
1858  * that the total pages in this list are NOT over max pages per RPC.
1859  * Extents in the list must be in OES_RPC state.
1860  */
osc_build_rpc(const struct lu_env * env,struct client_obd * cli,struct list_head * ext_list,int cmd,pdl_policy_t pol)1861 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1862 		  struct list_head *ext_list, int cmd, pdl_policy_t pol)
1863 {
1864 	struct ptlrpc_request		*req = NULL;
1865 	struct osc_extent		*ext;
1866 	struct brw_page			**pga = NULL;
1867 	struct osc_brw_async_args	*aa = NULL;
1868 	struct obdo			*oa = NULL;
1869 	struct osc_async_page		*oap;
1870 	struct osc_async_page		*tmp;
1871 	struct cl_req			*clerq = NULL;
1872 	enum cl_req_type		crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1873 								      CRT_READ;
1874 	struct ldlm_lock		*lock = NULL;
1875 	struct cl_req_attr		*crattr = NULL;
1876 	u64				starting_offset = OBD_OBJECT_EOF;
1877 	u64				ending_offset = 0;
1878 	int				mpflag = 0;
1879 	int				mem_tight = 0;
1880 	int				page_count = 0;
1881 	int				i;
1882 	int				rc;
1883 	struct ost_body			*body;
1884 	LIST_HEAD(rpc_list);
1885 
1886 	LASSERT(!list_empty(ext_list));
1887 
1888 	/* add pages into rpc_list to build BRW rpc */
1889 	list_for_each_entry(ext, ext_list, oe_link) {
1890 		LASSERT(ext->oe_state == OES_RPC);
1891 		mem_tight |= ext->oe_memalloc;
1892 		list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1893 			++page_count;
1894 			list_add_tail(&oap->oap_rpc_item, &rpc_list);
1895 			if (starting_offset > oap->oap_obj_off)
1896 				starting_offset = oap->oap_obj_off;
1897 			else
1898 				LASSERT(oap->oap_page_off == 0);
1899 			if (ending_offset < oap->oap_obj_off + oap->oap_count)
1900 				ending_offset = oap->oap_obj_off +
1901 						oap->oap_count;
1902 			else
1903 				LASSERT(oap->oap_page_off + oap->oap_count ==
1904 					PAGE_CACHE_SIZE);
1905 		}
1906 	}
1907 
1908 	if (mem_tight)
1909 		mpflag = cfs_memory_pressure_get_and_set();
1910 
1911 	OBD_ALLOC(crattr, sizeof(*crattr));
1912 	if (crattr == NULL) {
1913 		rc = -ENOMEM;
1914 		goto out;
1915 	}
1916 
1917 	OBD_ALLOC(pga, sizeof(*pga) * page_count);
1918 	if (pga == NULL) {
1919 		rc = -ENOMEM;
1920 		goto out;
1921 	}
1922 
1923 	OBDO_ALLOC(oa);
1924 	if (oa == NULL) {
1925 		rc = -ENOMEM;
1926 		goto out;
1927 	}
1928 
1929 	i = 0;
1930 	list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1931 		struct cl_page *page = oap2cl_page(oap);
1932 		if (clerq == NULL) {
1933 			clerq = cl_req_alloc(env, page, crt,
1934 					     1 /* only 1-object rpcs for now */);
1935 			if (IS_ERR(clerq)) {
1936 				rc = PTR_ERR(clerq);
1937 				goto out;
1938 			}
1939 			lock = oap->oap_ldlm_lock;
1940 		}
1941 		if (mem_tight)
1942 			oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1943 		pga[i] = &oap->oap_brw_page;
1944 		pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1945 		CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1946 		       pga[i]->pg, page_index(oap->oap_page), oap,
1947 		       pga[i]->flag);
1948 		i++;
1949 		cl_req_page_add(env, clerq, page);
1950 	}
1951 
1952 	/* always get the data for the obdo for the rpc */
1953 	LASSERT(clerq != NULL);
1954 	crattr->cra_oa = oa;
1955 	cl_req_attr_set(env, clerq, crattr, ~0ULL);
1956 	if (lock) {
1957 		oa->o_handle = lock->l_remote_handle;
1958 		oa->o_valid |= OBD_MD_FLHANDLE;
1959 	}
1960 
1961 	rc = cl_req_prep(env, clerq);
1962 	if (rc != 0) {
1963 		CERROR("cl_req_prep failed: %d\n", rc);
1964 		goto out;
1965 	}
1966 
1967 	sort_brw_pages(pga, page_count);
1968 	rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1969 			pga, &req, crattr->cra_capa, 1, 0);
1970 	if (rc != 0) {
1971 		CERROR("prep_req failed: %d\n", rc);
1972 		goto out;
1973 	}
1974 
1975 	req->rq_interpret_reply = brw_interpret;
1976 
1977 	if (mem_tight != 0)
1978 		req->rq_memalloc = 1;
1979 
1980 	/* Need to update the timestamps after the request is built in case
1981 	 * we race with setattr (locally or in queue at OST).  If OST gets
1982 	 * later setattr before earlier BRW (as determined by the request xid),
1983 	 * the OST will not use BRW timestamps.  Sadly, there is no obvious
1984 	 * way to do this in a single call.  bug 10150 */
1985 	body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1986 	crattr->cra_oa = &body->oa;
1987 	cl_req_attr_set(env, clerq, crattr,
1988 			OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1989 
1990 	lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1991 
1992 	CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1993 	aa = ptlrpc_req_async_args(req);
1994 	INIT_LIST_HEAD(&aa->aa_oaps);
1995 	list_splice_init(&rpc_list, &aa->aa_oaps);
1996 	INIT_LIST_HEAD(&aa->aa_exts);
1997 	list_splice_init(ext_list, &aa->aa_exts);
1998 	aa->aa_clerq = clerq;
1999 
2000 	/* queued sync pages can be torn down while the pages
2001 	 * were between the pending list and the rpc */
2002 	tmp = NULL;
2003 	list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2004 		/* only one oap gets a request reference */
2005 		if (tmp == NULL)
2006 			tmp = oap;
2007 		if (oap->oap_interrupted && !req->rq_intr) {
2008 			CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2009 					oap, req);
2010 			ptlrpc_mark_interrupted(req);
2011 		}
2012 	}
2013 	if (tmp != NULL)
2014 		tmp->oap_request = ptlrpc_request_addref(req);
2015 
2016 	client_obd_list_lock(&cli->cl_loi_list_lock);
2017 	starting_offset >>= PAGE_CACHE_SHIFT;
2018 	if (cmd == OBD_BRW_READ) {
2019 		cli->cl_r_in_flight++;
2020 		lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2021 		lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2022 		lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2023 				      starting_offset + 1);
2024 	} else {
2025 		cli->cl_w_in_flight++;
2026 		lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2027 		lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2028 		lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2029 				      starting_offset + 1);
2030 	}
2031 	client_obd_list_unlock(&cli->cl_loi_list_lock);
2032 
2033 	DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2034 		  page_count, aa, cli->cl_r_in_flight,
2035 		  cli->cl_w_in_flight);
2036 
2037 	/* XXX: Maybe the caller can check the RPC bulk descriptor to
2038 	 * see which CPU/NUMA node the majority of pages were allocated
2039 	 * on, and try to assign the async RPC to the CPU core
2040 	 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2041 	 *
2042 	 * But on the other hand, we expect that multiple ptlrpcd
2043 	 * threads and the initial write sponsor can run in parallel,
2044 	 * especially when data checksum is enabled, which is CPU-bound
2045 	 * operation and single ptlrpcd thread cannot process in time.
2046 	 * So more ptlrpcd threads sharing BRW load
2047 	 * (with PDL_POLICY_ROUND) seems better.
2048 	 */
2049 	ptlrpcd_add_req(req, pol, -1);
2050 	rc = 0;
2051 
2052 out:
2053 	if (mem_tight != 0)
2054 		cfs_memory_pressure_restore(mpflag);
2055 
2056 	if (crattr != NULL) {
2057 		capa_put(crattr->cra_capa);
2058 		OBD_FREE(crattr, sizeof(*crattr));
2059 	}
2060 
2061 	if (rc != 0) {
2062 		LASSERT(req == NULL);
2063 
2064 		if (oa)
2065 			OBDO_FREE(oa);
2066 		if (pga)
2067 			OBD_FREE(pga, sizeof(*pga) * page_count);
2068 		/* this should happen rarely and is pretty bad, it makes the
2069 		 * pending list not follow the dirty order */
2070 		while (!list_empty(ext_list)) {
2071 			ext = list_entry(ext_list->next, struct osc_extent,
2072 					     oe_link);
2073 			list_del_init(&ext->oe_link);
2074 			osc_extent_finish(env, ext, 0, rc);
2075 		}
2076 		if (clerq && !IS_ERR(clerq))
2077 			cl_req_completion(env, clerq, rc);
2078 	}
2079 	return rc;
2080 }
2081 
osc_set_lock_data_with_check(struct ldlm_lock * lock,struct ldlm_enqueue_info * einfo)2082 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2083 					struct ldlm_enqueue_info *einfo)
2084 {
2085 	void *data = einfo->ei_cbdata;
2086 	int set = 0;
2087 
2088 	LASSERT(lock != NULL);
2089 	LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2090 	LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2091 	LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2092 	LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2093 
2094 	lock_res_and_lock(lock);
2095 	spin_lock(&osc_ast_guard);
2096 
2097 	if (lock->l_ast_data == NULL)
2098 		lock->l_ast_data = data;
2099 	if (lock->l_ast_data == data)
2100 		set = 1;
2101 
2102 	spin_unlock(&osc_ast_guard);
2103 	unlock_res_and_lock(lock);
2104 
2105 	return set;
2106 }
2107 
osc_set_data_with_check(struct lustre_handle * lockh,struct ldlm_enqueue_info * einfo)2108 static int osc_set_data_with_check(struct lustre_handle *lockh,
2109 				   struct ldlm_enqueue_info *einfo)
2110 {
2111 	struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2112 	int set = 0;
2113 
2114 	if (lock != NULL) {
2115 		set = osc_set_lock_data_with_check(lock, einfo);
2116 		LDLM_LOCK_PUT(lock);
2117 	} else
2118 		CERROR("lockh %p, data %p - client evicted?\n",
2119 		       lockh, einfo->ei_cbdata);
2120 	return set;
2121 }
2122 
2123 /* find any ldlm lock of the inode in osc
2124  * return 0    not find
2125  *	1    find one
2126  *      < 0    error */
osc_find_cbdata(struct obd_export * exp,struct lov_stripe_md * lsm,ldlm_iterator_t replace,void * data)2127 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2128 			   ldlm_iterator_t replace, void *data)
2129 {
2130 	struct ldlm_res_id res_id;
2131 	struct obd_device *obd = class_exp2obd(exp);
2132 	int rc = 0;
2133 
2134 	ostid_build_res_name(&lsm->lsm_oi, &res_id);
2135 	rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2136 	if (rc == LDLM_ITER_STOP)
2137 		return 1;
2138 	if (rc == LDLM_ITER_CONTINUE)
2139 		return 0;
2140 	return rc;
2141 }
2142 
osc_enqueue_fini(struct ptlrpc_request * req,struct ost_lvb * lvb,obd_enqueue_update_f upcall,void * cookie,__u64 * flags,int agl,int rc)2143 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2144 			    obd_enqueue_update_f upcall, void *cookie,
2145 			    __u64 *flags, int agl, int rc)
2146 {
2147 	int intent = *flags & LDLM_FL_HAS_INTENT;
2148 
2149 	if (intent) {
2150 		/* The request was created before ldlm_cli_enqueue call. */
2151 		if (rc == ELDLM_LOCK_ABORTED) {
2152 			struct ldlm_reply *rep;
2153 			rep = req_capsule_server_get(&req->rq_pill,
2154 						     &RMF_DLM_REP);
2155 
2156 			LASSERT(rep != NULL);
2157 			rep->lock_policy_res1 =
2158 				ptlrpc_status_ntoh(rep->lock_policy_res1);
2159 			if (rep->lock_policy_res1)
2160 				rc = rep->lock_policy_res1;
2161 		}
2162 	}
2163 
2164 	if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2165 	    (rc == 0)) {
2166 		*flags |= LDLM_FL_LVB_READY;
2167 		CDEBUG(D_INODE, "got kms %llu blocks %llu mtime %llu\n",
2168 		       lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2169 	}
2170 
2171 	/* Call the update callback. */
2172 	rc = (*upcall)(cookie, rc);
2173 	return rc;
2174 }
2175 
osc_enqueue_interpret(const struct lu_env * env,struct ptlrpc_request * req,struct osc_enqueue_args * aa,int rc)2176 static int osc_enqueue_interpret(const struct lu_env *env,
2177 				 struct ptlrpc_request *req,
2178 				 struct osc_enqueue_args *aa, int rc)
2179 {
2180 	struct ldlm_lock *lock;
2181 	struct lustre_handle handle;
2182 	__u32 mode;
2183 	struct ost_lvb *lvb;
2184 	__u32 lvb_len;
2185 	__u64 *flags = aa->oa_flags;
2186 
2187 	/* Make a local copy of a lock handle and a mode, because aa->oa_*
2188 	 * might be freed anytime after lock upcall has been called. */
2189 	lustre_handle_copy(&handle, aa->oa_lockh);
2190 	mode = aa->oa_ei->ei_mode;
2191 
2192 	/* ldlm_cli_enqueue is holding a reference on the lock, so it must
2193 	 * be valid. */
2194 	lock = ldlm_handle2lock(&handle);
2195 
2196 	/* Take an additional reference so that a blocking AST that
2197 	 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2198 	 * to arrive after an upcall has been executed by
2199 	 * osc_enqueue_fini(). */
2200 	ldlm_lock_addref(&handle, mode);
2201 
2202 	/* Let CP AST to grant the lock first. */
2203 	OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2204 
2205 	if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2206 		lvb = NULL;
2207 		lvb_len = 0;
2208 	} else {
2209 		lvb = aa->oa_lvb;
2210 		lvb_len = sizeof(*aa->oa_lvb);
2211 	}
2212 
2213 	/* Complete obtaining the lock procedure. */
2214 	rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2215 				   mode, flags, lvb, lvb_len, &handle, rc);
2216 	/* Complete osc stuff. */
2217 	rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2218 			      flags, aa->oa_agl, rc);
2219 
2220 	OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2221 
2222 	/* Release the lock for async request. */
2223 	if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2224 		/*
2225 		 * Releases a reference taken by ldlm_cli_enqueue(), if it is
2226 		 * not already released by
2227 		 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2228 		 */
2229 		ldlm_lock_decref(&handle, mode);
2230 
2231 	LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2232 		 aa->oa_lockh, req, aa);
2233 	ldlm_lock_decref(&handle, mode);
2234 	LDLM_LOCK_PUT(lock);
2235 	return rc;
2236 }
2237 
2238 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2239 
2240 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2241  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2242  * other synchronous requests, however keeping some locks and trying to obtain
2243  * others may take a considerable amount of time in a case of ost failure; and
2244  * when other sync requests do not get released lock from a client, the client
2245  * is excluded from the cluster -- such scenarious make the life difficult, so
2246  * release locks just after they are obtained. */
osc_enqueue_base(struct obd_export * exp,struct ldlm_res_id * res_id,__u64 * flags,ldlm_policy_data_t * policy,struct ost_lvb * lvb,int kms_valid,obd_enqueue_update_f upcall,void * cookie,struct ldlm_enqueue_info * einfo,struct lustre_handle * lockh,struct ptlrpc_request_set * rqset,int async,int agl)2247 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2248 		     __u64 *flags, ldlm_policy_data_t *policy,
2249 		     struct ost_lvb *lvb, int kms_valid,
2250 		     obd_enqueue_update_f upcall, void *cookie,
2251 		     struct ldlm_enqueue_info *einfo,
2252 		     struct lustre_handle *lockh,
2253 		     struct ptlrpc_request_set *rqset, int async, int agl)
2254 {
2255 	struct obd_device *obd = exp->exp_obd;
2256 	struct ptlrpc_request *req = NULL;
2257 	int intent = *flags & LDLM_FL_HAS_INTENT;
2258 	__u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2259 	ldlm_mode_t mode;
2260 	int rc;
2261 
2262 	/* Filesystem lock extents are extended to page boundaries so that
2263 	 * dealing with the page cache is a little smoother.  */
2264 	policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2265 	policy->l_extent.end |= ~CFS_PAGE_MASK;
2266 
2267 	/*
2268 	 * kms is not valid when either object is completely fresh (so that no
2269 	 * locks are cached), or object was evicted. In the latter case cached
2270 	 * lock cannot be used, because it would prime inode state with
2271 	 * potentially stale LVB.
2272 	 */
2273 	if (!kms_valid)
2274 		goto no_match;
2275 
2276 	/* Next, search for already existing extent locks that will cover us */
2277 	/* If we're trying to read, we also search for an existing PW lock.  The
2278 	 * VFS and page cache already protect us locally, so lots of readers/
2279 	 * writers can share a single PW lock.
2280 	 *
2281 	 * There are problems with conversion deadlocks, so instead of
2282 	 * converting a read lock to a write lock, we'll just enqueue a new
2283 	 * one.
2284 	 *
2285 	 * At some point we should cancel the read lock instead of making them
2286 	 * send us a blocking callback, but there are problems with canceling
2287 	 * locks out from other users right now, too. */
2288 	mode = einfo->ei_mode;
2289 	if (einfo->ei_mode == LCK_PR)
2290 		mode |= LCK_PW;
2291 	mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2292 			       einfo->ei_type, policy, mode, lockh, 0);
2293 	if (mode) {
2294 		struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2295 
2296 		if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2297 			/* For AGL, if enqueue RPC is sent but the lock is not
2298 			 * granted, then skip to process this strpe.
2299 			 * Return -ECANCELED to tell the caller. */
2300 			ldlm_lock_decref(lockh, mode);
2301 			LDLM_LOCK_PUT(matched);
2302 			return -ECANCELED;
2303 		} else if (osc_set_lock_data_with_check(matched, einfo)) {
2304 			*flags |= LDLM_FL_LVB_READY;
2305 			/* addref the lock only if not async requests and PW
2306 			 * lock is matched whereas we asked for PR. */
2307 			if (!rqset && einfo->ei_mode != mode)
2308 				ldlm_lock_addref(lockh, LCK_PR);
2309 			if (intent) {
2310 				/* I would like to be able to ASSERT here that
2311 				 * rss <= kms, but I can't, for reasons which
2312 				 * are explained in lov_enqueue() */
2313 			}
2314 
2315 			/* We already have a lock, and it's referenced.
2316 			 *
2317 			 * At this point, the cl_lock::cll_state is CLS_QUEUING,
2318 			 * AGL upcall may change it to CLS_HELD directly. */
2319 			(*upcall)(cookie, ELDLM_OK);
2320 
2321 			if (einfo->ei_mode != mode)
2322 				ldlm_lock_decref(lockh, LCK_PW);
2323 			else if (rqset)
2324 				/* For async requests, decref the lock. */
2325 				ldlm_lock_decref(lockh, einfo->ei_mode);
2326 			LDLM_LOCK_PUT(matched);
2327 			return ELDLM_OK;
2328 		} else {
2329 			ldlm_lock_decref(lockh, mode);
2330 			LDLM_LOCK_PUT(matched);
2331 		}
2332 	}
2333 
2334  no_match:
2335 	if (intent) {
2336 		LIST_HEAD(cancels);
2337 		req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2338 					   &RQF_LDLM_ENQUEUE_LVB);
2339 		if (req == NULL)
2340 			return -ENOMEM;
2341 
2342 		rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2343 		if (rc) {
2344 			ptlrpc_request_free(req);
2345 			return rc;
2346 		}
2347 
2348 		req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2349 				     sizeof(*lvb));
2350 		ptlrpc_request_set_replen(req);
2351 	}
2352 
2353 	/* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2354 	*flags &= ~LDLM_FL_BLOCK_GRANTED;
2355 
2356 	rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2357 			      sizeof(*lvb), LVB_T_OST, lockh, async);
2358 	if (rqset) {
2359 		if (!rc) {
2360 			struct osc_enqueue_args *aa;
2361 			CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2362 			aa = ptlrpc_req_async_args(req);
2363 			aa->oa_ei = einfo;
2364 			aa->oa_exp = exp;
2365 			aa->oa_flags  = flags;
2366 			aa->oa_upcall = upcall;
2367 			aa->oa_cookie = cookie;
2368 			aa->oa_lvb    = lvb;
2369 			aa->oa_lockh  = lockh;
2370 			aa->oa_agl    = !!agl;
2371 
2372 			req->rq_interpret_reply =
2373 				(ptlrpc_interpterer_t)osc_enqueue_interpret;
2374 			if (rqset == PTLRPCD_SET)
2375 				ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2376 			else
2377 				ptlrpc_set_add_req(rqset, req);
2378 		} else if (intent) {
2379 			ptlrpc_req_finished(req);
2380 		}
2381 		return rc;
2382 	}
2383 
2384 	rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2385 	if (intent)
2386 		ptlrpc_req_finished(req);
2387 
2388 	return rc;
2389 }
2390 
osc_match_base(struct obd_export * exp,struct ldlm_res_id * res_id,__u32 type,ldlm_policy_data_t * policy,__u32 mode,__u64 * flags,void * data,struct lustre_handle * lockh,int unref)2391 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2392 		   __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2393 		   __u64 *flags, void *data, struct lustre_handle *lockh,
2394 		   int unref)
2395 {
2396 	struct obd_device *obd = exp->exp_obd;
2397 	__u64 lflags = *flags;
2398 	ldlm_mode_t rc;
2399 
2400 	if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2401 		return -EIO;
2402 
2403 	/* Filesystem lock extents are extended to page boundaries so that
2404 	 * dealing with the page cache is a little smoother */
2405 	policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2406 	policy->l_extent.end |= ~CFS_PAGE_MASK;
2407 
2408 	/* Next, search for already existing extent locks that will cover us */
2409 	/* If we're trying to read, we also search for an existing PW lock.  The
2410 	 * VFS and page cache already protect us locally, so lots of readers/
2411 	 * writers can share a single PW lock. */
2412 	rc = mode;
2413 	if (mode == LCK_PR)
2414 		rc |= LCK_PW;
2415 	rc = ldlm_lock_match(obd->obd_namespace, lflags,
2416 			     res_id, type, policy, rc, lockh, unref);
2417 	if (rc) {
2418 		if (data != NULL) {
2419 			if (!osc_set_data_with_check(lockh, data)) {
2420 				if (!(lflags & LDLM_FL_TEST_LOCK))
2421 					ldlm_lock_decref(lockh, rc);
2422 				return 0;
2423 			}
2424 		}
2425 		if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2426 			ldlm_lock_addref(lockh, LCK_PR);
2427 			ldlm_lock_decref(lockh, LCK_PW);
2428 		}
2429 		return rc;
2430 	}
2431 	return rc;
2432 }
2433 
osc_cancel_base(struct lustre_handle * lockh,__u32 mode)2434 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2435 {
2436 	if (unlikely(mode == LCK_GROUP))
2437 		ldlm_lock_decref_and_cancel(lockh, mode);
2438 	else
2439 		ldlm_lock_decref(lockh, mode);
2440 
2441 	return 0;
2442 }
2443 
osc_statfs_interpret(const struct lu_env * env,struct ptlrpc_request * req,struct osc_async_args * aa,int rc)2444 static int osc_statfs_interpret(const struct lu_env *env,
2445 				struct ptlrpc_request *req,
2446 				struct osc_async_args *aa, int rc)
2447 {
2448 	struct obd_statfs *msfs;
2449 
2450 	if (rc == -EBADR)
2451 		/* The request has in fact never been sent
2452 		 * due to issues at a higher level (LOV).
2453 		 * Exit immediately since the caller is
2454 		 * aware of the problem and takes care
2455 		 * of the clean up */
2456 		 return rc;
2457 
2458 	if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2459 	    (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY)) {
2460 		rc = 0;
2461 		goto out;
2462 	}
2463 
2464 	if (rc != 0)
2465 		goto out;
2466 
2467 	msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2468 	if (msfs == NULL) {
2469 		rc = -EPROTO;
2470 		goto out;
2471 	}
2472 
2473 	*aa->aa_oi->oi_osfs = *msfs;
2474 out:
2475 	rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2476 	return rc;
2477 }
2478 
osc_statfs_async(struct obd_export * exp,struct obd_info * oinfo,__u64 max_age,struct ptlrpc_request_set * rqset)2479 static int osc_statfs_async(struct obd_export *exp,
2480 			    struct obd_info *oinfo, __u64 max_age,
2481 			    struct ptlrpc_request_set *rqset)
2482 {
2483 	struct obd_device     *obd = class_exp2obd(exp);
2484 	struct ptlrpc_request *req;
2485 	struct osc_async_args *aa;
2486 	int		    rc;
2487 
2488 	/* We could possibly pass max_age in the request (as an absolute
2489 	 * timestamp or a "seconds.usec ago") so the target can avoid doing
2490 	 * extra calls into the filesystem if that isn't necessary (e.g.
2491 	 * during mount that would help a bit).  Having relative timestamps
2492 	 * is not so great if request processing is slow, while absolute
2493 	 * timestamps are not ideal because they need time synchronization. */
2494 	req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2495 	if (req == NULL)
2496 		return -ENOMEM;
2497 
2498 	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2499 	if (rc) {
2500 		ptlrpc_request_free(req);
2501 		return rc;
2502 	}
2503 	ptlrpc_request_set_replen(req);
2504 	req->rq_request_portal = OST_CREATE_PORTAL;
2505 	ptlrpc_at_set_req_timeout(req);
2506 
2507 	if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2508 		/* procfs requests not want stat in wait for avoid deadlock */
2509 		req->rq_no_resend = 1;
2510 		req->rq_no_delay = 1;
2511 	}
2512 
2513 	req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2514 	CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2515 	aa = ptlrpc_req_async_args(req);
2516 	aa->aa_oi = oinfo;
2517 
2518 	ptlrpc_set_add_req(rqset, req);
2519 	return 0;
2520 }
2521 
osc_statfs(const struct lu_env * env,struct obd_export * exp,struct obd_statfs * osfs,__u64 max_age,__u32 flags)2522 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2523 		      struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2524 {
2525 	struct obd_device     *obd = class_exp2obd(exp);
2526 	struct obd_statfs     *msfs;
2527 	struct ptlrpc_request *req;
2528 	struct obd_import     *imp = NULL;
2529 	int rc;
2530 
2531 	/*Since the request might also come from lprocfs, so we need
2532 	 *sync this with client_disconnect_export Bug15684*/
2533 	down_read(&obd->u.cli.cl_sem);
2534 	if (obd->u.cli.cl_import)
2535 		imp = class_import_get(obd->u.cli.cl_import);
2536 	up_read(&obd->u.cli.cl_sem);
2537 	if (!imp)
2538 		return -ENODEV;
2539 
2540 	/* We could possibly pass max_age in the request (as an absolute
2541 	 * timestamp or a "seconds.usec ago") so the target can avoid doing
2542 	 * extra calls into the filesystem if that isn't necessary (e.g.
2543 	 * during mount that would help a bit).  Having relative timestamps
2544 	 * is not so great if request processing is slow, while absolute
2545 	 * timestamps are not ideal because they need time synchronization. */
2546 	req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2547 
2548 	class_import_put(imp);
2549 
2550 	if (req == NULL)
2551 		return -ENOMEM;
2552 
2553 	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2554 	if (rc) {
2555 		ptlrpc_request_free(req);
2556 		return rc;
2557 	}
2558 	ptlrpc_request_set_replen(req);
2559 	req->rq_request_portal = OST_CREATE_PORTAL;
2560 	ptlrpc_at_set_req_timeout(req);
2561 
2562 	if (flags & OBD_STATFS_NODELAY) {
2563 		/* procfs requests not want stat in wait for avoid deadlock */
2564 		req->rq_no_resend = 1;
2565 		req->rq_no_delay = 1;
2566 	}
2567 
2568 	rc = ptlrpc_queue_wait(req);
2569 	if (rc)
2570 		goto out;
2571 
2572 	msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2573 	if (msfs == NULL) {
2574 		rc = -EPROTO;
2575 		goto out;
2576 	}
2577 
2578 	*osfs = *msfs;
2579 
2580  out:
2581 	ptlrpc_req_finished(req);
2582 	return rc;
2583 }
2584 
2585 /* Retrieve object striping information.
2586  *
2587  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2588  * the maximum number of OST indices which will fit in the user buffer.
2589  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2590  */
osc_getstripe(struct lov_stripe_md * lsm,struct lov_user_md * lump)2591 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2592 {
2593 	/* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2594 	struct lov_user_md_v3 lum, *lumk;
2595 	struct lov_user_ost_data_v1 *lmm_objects;
2596 	int rc = 0, lum_size;
2597 
2598 	if (!lsm)
2599 		return -ENODATA;
2600 
2601 	/* we only need the header part from user space to get lmm_magic and
2602 	 * lmm_stripe_count, (the header part is common to v1 and v3) */
2603 	lum_size = sizeof(struct lov_user_md_v1);
2604 	if (copy_from_user(&lum, lump, lum_size))
2605 		return -EFAULT;
2606 
2607 	if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2608 	    (lum.lmm_magic != LOV_USER_MAGIC_V3))
2609 		return -EINVAL;
2610 
2611 	/* lov_user_md_vX and lov_mds_md_vX must have the same size */
2612 	LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2613 	LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2614 	LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2615 
2616 	/* we can use lov_mds_md_size() to compute lum_size
2617 	 * because lov_user_md_vX and lov_mds_md_vX have the same size */
2618 	if (lum.lmm_stripe_count > 0) {
2619 		lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2620 		OBD_ALLOC(lumk, lum_size);
2621 		if (!lumk)
2622 			return -ENOMEM;
2623 
2624 		if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2625 			lmm_objects =
2626 			    &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2627 		else
2628 			lmm_objects = &(lumk->lmm_objects[0]);
2629 		lmm_objects->l_ost_oi = lsm->lsm_oi;
2630 	} else {
2631 		lum_size = lov_mds_md_size(0, lum.lmm_magic);
2632 		lumk = &lum;
2633 	}
2634 
2635 	lumk->lmm_oi = lsm->lsm_oi;
2636 	lumk->lmm_stripe_count = 1;
2637 
2638 	if (copy_to_user(lump, lumk, lum_size))
2639 		rc = -EFAULT;
2640 
2641 	if (lumk != &lum)
2642 		OBD_FREE(lumk, lum_size);
2643 
2644 	return rc;
2645 }
2646 
2647 
osc_iocontrol(unsigned int cmd,struct obd_export * exp,int len,void * karg,void * uarg)2648 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2649 			 void *karg, void *uarg)
2650 {
2651 	struct obd_device *obd = exp->exp_obd;
2652 	struct obd_ioctl_data *data = karg;
2653 	int err = 0;
2654 
2655 	if (!try_module_get(THIS_MODULE)) {
2656 		CERROR("Can't get module. Is it alive?");
2657 		return -EINVAL;
2658 	}
2659 	switch (cmd) {
2660 	case OBD_IOC_LOV_GET_CONFIG: {
2661 		char *buf;
2662 		struct lov_desc *desc;
2663 		struct obd_uuid uuid;
2664 
2665 		buf = NULL;
2666 		len = 0;
2667 		if (obd_ioctl_getdata(&buf, &len, (void *)uarg)) {
2668 			err = -EINVAL;
2669 			goto out;
2670 		}
2671 
2672 		data = (struct obd_ioctl_data *)buf;
2673 
2674 		if (sizeof(*desc) > data->ioc_inllen1) {
2675 			obd_ioctl_freedata(buf, len);
2676 			err = -EINVAL;
2677 			goto out;
2678 		}
2679 
2680 		if (data->ioc_inllen2 < sizeof(uuid)) {
2681 			obd_ioctl_freedata(buf, len);
2682 			err = -EINVAL;
2683 			goto out;
2684 		}
2685 
2686 		desc = (struct lov_desc *)data->ioc_inlbuf1;
2687 		desc->ld_tgt_count = 1;
2688 		desc->ld_active_tgt_count = 1;
2689 		desc->ld_default_stripe_count = 1;
2690 		desc->ld_default_stripe_size = 0;
2691 		desc->ld_default_stripe_offset = 0;
2692 		desc->ld_pattern = 0;
2693 		memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2694 
2695 		memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2696 
2697 		err = copy_to_user((void *)uarg, buf, len);
2698 		if (err)
2699 			err = -EFAULT;
2700 		obd_ioctl_freedata(buf, len);
2701 		goto out;
2702 	}
2703 	case LL_IOC_LOV_SETSTRIPE:
2704 		err = obd_alloc_memmd(exp, karg);
2705 		if (err > 0)
2706 			err = 0;
2707 		goto out;
2708 	case LL_IOC_LOV_GETSTRIPE:
2709 		err = osc_getstripe(karg, uarg);
2710 		goto out;
2711 	case OBD_IOC_CLIENT_RECOVER:
2712 		err = ptlrpc_recover_import(obd->u.cli.cl_import,
2713 					    data->ioc_inlbuf1, 0);
2714 		if (err > 0)
2715 			err = 0;
2716 		goto out;
2717 	case IOC_OSC_SET_ACTIVE:
2718 		err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2719 					       data->ioc_offset);
2720 		goto out;
2721 	case OBD_IOC_POLL_QUOTACHECK:
2722 		err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2723 		goto out;
2724 	case OBD_IOC_PING_TARGET:
2725 		err = ptlrpc_obd_ping(obd);
2726 		goto out;
2727 	default:
2728 		CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2729 		       cmd, current_comm());
2730 		err = -ENOTTY;
2731 		goto out;
2732 	}
2733 out:
2734 	module_put(THIS_MODULE);
2735 	return err;
2736 }
2737 
osc_get_info(const struct lu_env * env,struct obd_export * exp,u32 keylen,void * key,__u32 * vallen,void * val,struct lov_stripe_md * lsm)2738 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2739 			u32 keylen, void *key, __u32 *vallen, void *val,
2740 			struct lov_stripe_md *lsm)
2741 {
2742 	if (!vallen || !val)
2743 		return -EFAULT;
2744 
2745 	if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2746 		__u32 *stripe = val;
2747 		*vallen = sizeof(*stripe);
2748 		*stripe = 0;
2749 		return 0;
2750 	} else if (KEY_IS(KEY_LAST_ID)) {
2751 		struct ptlrpc_request *req;
2752 		u64		*reply;
2753 		char		  *tmp;
2754 		int		    rc;
2755 
2756 		req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2757 					   &RQF_OST_GET_INFO_LAST_ID);
2758 		if (req == NULL)
2759 			return -ENOMEM;
2760 
2761 		req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2762 				     RCL_CLIENT, keylen);
2763 		rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2764 		if (rc) {
2765 			ptlrpc_request_free(req);
2766 			return rc;
2767 		}
2768 
2769 		tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2770 		memcpy(tmp, key, keylen);
2771 
2772 		req->rq_no_delay = req->rq_no_resend = 1;
2773 		ptlrpc_request_set_replen(req);
2774 		rc = ptlrpc_queue_wait(req);
2775 		if (rc)
2776 			goto out;
2777 
2778 		reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
2779 		if (reply == NULL) {
2780 			rc = -EPROTO;
2781 			goto out;
2782 		}
2783 
2784 		*((u64 *)val) = *reply;
2785 	out:
2786 		ptlrpc_req_finished(req);
2787 		return rc;
2788 	} else if (KEY_IS(KEY_FIEMAP)) {
2789 		struct ll_fiemap_info_key *fm_key =
2790 				(struct ll_fiemap_info_key *)key;
2791 		struct ldlm_res_id	 res_id;
2792 		ldlm_policy_data_t	 policy;
2793 		struct lustre_handle	 lockh;
2794 		ldlm_mode_t		 mode = 0;
2795 		struct ptlrpc_request	*req;
2796 		struct ll_user_fiemap	*reply;
2797 		char			*tmp;
2798 		int			 rc;
2799 
2800 		if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2801 			goto skip_locking;
2802 
2803 		policy.l_extent.start = fm_key->fiemap.fm_start &
2804 						CFS_PAGE_MASK;
2805 
2806 		if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2807 		    fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2808 			policy.l_extent.end = OBD_OBJECT_EOF;
2809 		else
2810 			policy.l_extent.end = (fm_key->fiemap.fm_start +
2811 				fm_key->fiemap.fm_length +
2812 				PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2813 
2814 		ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2815 		mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2816 				       LDLM_FL_BLOCK_GRANTED |
2817 				       LDLM_FL_LVB_READY,
2818 				       &res_id, LDLM_EXTENT, &policy,
2819 				       LCK_PR | LCK_PW, &lockh, 0);
2820 		if (mode) { /* lock is cached on client */
2821 			if (mode != LCK_PR) {
2822 				ldlm_lock_addref(&lockh, LCK_PR);
2823 				ldlm_lock_decref(&lockh, LCK_PW);
2824 			}
2825 		} else { /* no cached lock, needs acquire lock on server side */
2826 			fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2827 			fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2828 		}
2829 
2830 skip_locking:
2831 		req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2832 					   &RQF_OST_GET_INFO_FIEMAP);
2833 		if (req == NULL) {
2834 			rc = -ENOMEM;
2835 			goto drop_lock;
2836 		}
2837 
2838 		req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2839 				     RCL_CLIENT, keylen);
2840 		req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2841 				     RCL_CLIENT, *vallen);
2842 		req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2843 				     RCL_SERVER, *vallen);
2844 
2845 		rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2846 		if (rc) {
2847 			ptlrpc_request_free(req);
2848 			goto drop_lock;
2849 		}
2850 
2851 		tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2852 		memcpy(tmp, key, keylen);
2853 		tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2854 		memcpy(tmp, val, *vallen);
2855 
2856 		ptlrpc_request_set_replen(req);
2857 		rc = ptlrpc_queue_wait(req);
2858 		if (rc)
2859 			goto fini_req;
2860 
2861 		reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2862 		if (reply == NULL) {
2863 			rc = -EPROTO;
2864 			goto fini_req;
2865 		}
2866 
2867 		memcpy(val, reply, *vallen);
2868 fini_req:
2869 		ptlrpc_req_finished(req);
2870 drop_lock:
2871 		if (mode)
2872 			ldlm_lock_decref(&lockh, LCK_PR);
2873 		return rc;
2874 	}
2875 
2876 	return -EINVAL;
2877 }
2878 
osc_set_info_async(const struct lu_env * env,struct obd_export * exp,u32 keylen,void * key,u32 vallen,void * val,struct ptlrpc_request_set * set)2879 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2880 			      u32 keylen, void *key, u32 vallen,
2881 			      void *val, struct ptlrpc_request_set *set)
2882 {
2883 	struct ptlrpc_request *req;
2884 	struct obd_device     *obd = exp->exp_obd;
2885 	struct obd_import     *imp = class_exp2cliimp(exp);
2886 	char		  *tmp;
2887 	int		    rc;
2888 
2889 	OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2890 
2891 	if (KEY_IS(KEY_CHECKSUM)) {
2892 		if (vallen != sizeof(int))
2893 			return -EINVAL;
2894 		exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2895 		return 0;
2896 	}
2897 
2898 	if (KEY_IS(KEY_SPTLRPC_CONF)) {
2899 		sptlrpc_conf_client_adapt(obd);
2900 		return 0;
2901 	}
2902 
2903 	if (KEY_IS(KEY_FLUSH_CTX)) {
2904 		sptlrpc_import_flush_my_ctx(imp);
2905 		return 0;
2906 	}
2907 
2908 	if (KEY_IS(KEY_CACHE_SET)) {
2909 		struct client_obd *cli = &obd->u.cli;
2910 
2911 		LASSERT(cli->cl_cache == NULL); /* only once */
2912 		cli->cl_cache = (struct cl_client_cache *)val;
2913 		atomic_inc(&cli->cl_cache->ccc_users);
2914 		cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2915 
2916 		/* add this osc into entity list */
2917 		LASSERT(list_empty(&cli->cl_lru_osc));
2918 		spin_lock(&cli->cl_cache->ccc_lru_lock);
2919 		list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2920 		spin_unlock(&cli->cl_cache->ccc_lru_lock);
2921 
2922 		return 0;
2923 	}
2924 
2925 	if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2926 		struct client_obd *cli = &obd->u.cli;
2927 		int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
2928 		int target = *(int *)val;
2929 
2930 		nr = osc_lru_shrink(cli, min(nr, target));
2931 		*(int *)val -= nr;
2932 		return 0;
2933 	}
2934 
2935 	if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2936 		return -EINVAL;
2937 
2938 	/* We pass all other commands directly to OST. Since nobody calls osc
2939 	   methods directly and everybody is supposed to go through LOV, we
2940 	   assume lov checked invalid values for us.
2941 	   The only recognised values so far are evict_by_nid and mds_conn.
2942 	   Even if something bad goes through, we'd get a -EINVAL from OST
2943 	   anyway. */
2944 
2945 	req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2946 						&RQF_OST_SET_GRANT_INFO :
2947 						&RQF_OBD_SET_INFO);
2948 	if (req == NULL)
2949 		return -ENOMEM;
2950 
2951 	req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2952 			     RCL_CLIENT, keylen);
2953 	if (!KEY_IS(KEY_GRANT_SHRINK))
2954 		req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2955 				     RCL_CLIENT, vallen);
2956 	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2957 	if (rc) {
2958 		ptlrpc_request_free(req);
2959 		return rc;
2960 	}
2961 
2962 	tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2963 	memcpy(tmp, key, keylen);
2964 	tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2965 							&RMF_OST_BODY :
2966 							&RMF_SETINFO_VAL);
2967 	memcpy(tmp, val, vallen);
2968 
2969 	if (KEY_IS(KEY_GRANT_SHRINK)) {
2970 		struct osc_brw_async_args *aa;
2971 		struct obdo *oa;
2972 
2973 		CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2974 		aa = ptlrpc_req_async_args(req);
2975 		OBDO_ALLOC(oa);
2976 		if (!oa) {
2977 			ptlrpc_req_finished(req);
2978 			return -ENOMEM;
2979 		}
2980 		*oa = ((struct ost_body *)val)->oa;
2981 		aa->aa_oa = oa;
2982 		req->rq_interpret_reply = osc_shrink_grant_interpret;
2983 	}
2984 
2985 	ptlrpc_request_set_replen(req);
2986 	if (!KEY_IS(KEY_GRANT_SHRINK)) {
2987 		LASSERT(set != NULL);
2988 		ptlrpc_set_add_req(set, req);
2989 		ptlrpc_check_set(NULL, set);
2990 	} else
2991 		ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2992 
2993 	return 0;
2994 }
2995 
osc_reconnect(const struct lu_env * env,struct obd_export * exp,struct obd_device * obd,struct obd_uuid * cluuid,struct obd_connect_data * data,void * localdata)2996 static int osc_reconnect(const struct lu_env *env,
2997 			 struct obd_export *exp, struct obd_device *obd,
2998 			 struct obd_uuid *cluuid,
2999 			 struct obd_connect_data *data,
3000 			 void *localdata)
3001 {
3002 	struct client_obd *cli = &obd->u.cli;
3003 
3004 	if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3005 		long lost_grant;
3006 
3007 		client_obd_list_lock(&cli->cl_loi_list_lock);
3008 		data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3009 				2 * cli_brw_size(obd);
3010 		lost_grant = cli->cl_lost_grant;
3011 		cli->cl_lost_grant = 0;
3012 		client_obd_list_unlock(&cli->cl_loi_list_lock);
3013 
3014 		CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d ocd_grant: %d, lost: %ld.\n",
3015 		       data->ocd_connect_flags,
3016 		       data->ocd_version, data->ocd_grant, lost_grant);
3017 	}
3018 
3019 	return 0;
3020 }
3021 
osc_disconnect(struct obd_export * exp)3022 static int osc_disconnect(struct obd_export *exp)
3023 {
3024 	struct obd_device *obd = class_exp2obd(exp);
3025 	int rc;
3026 
3027 	rc = client_disconnect_export(exp);
3028 	/**
3029 	 * Initially we put del_shrink_grant before disconnect_export, but it
3030 	 * causes the following problem if setup (connect) and cleanup
3031 	 * (disconnect) are tangled together.
3032 	 *      connect p1		     disconnect p2
3033 	 *   ptlrpc_connect_import
3034 	 *     ...............	       class_manual_cleanup
3035 	 *				     osc_disconnect
3036 	 *				     del_shrink_grant
3037 	 *   ptlrpc_connect_interrupt
3038 	 *     init_grant_shrink
3039 	 *   add this client to shrink list
3040 	 *				      cleanup_osc
3041 	 * Bang! pinger trigger the shrink.
3042 	 * So the osc should be disconnected from the shrink list, after we
3043 	 * are sure the import has been destroyed. BUG18662
3044 	 */
3045 	if (obd->u.cli.cl_import == NULL)
3046 		osc_del_shrink_grant(&obd->u.cli);
3047 	return rc;
3048 }
3049 
osc_import_event(struct obd_device * obd,struct obd_import * imp,enum obd_import_event event)3050 static int osc_import_event(struct obd_device *obd,
3051 			    struct obd_import *imp,
3052 			    enum obd_import_event event)
3053 {
3054 	struct client_obd *cli;
3055 	int rc = 0;
3056 
3057 	LASSERT(imp->imp_obd == obd);
3058 
3059 	switch (event) {
3060 	case IMP_EVENT_DISCON: {
3061 		cli = &obd->u.cli;
3062 		client_obd_list_lock(&cli->cl_loi_list_lock);
3063 		cli->cl_avail_grant = 0;
3064 		cli->cl_lost_grant = 0;
3065 		client_obd_list_unlock(&cli->cl_loi_list_lock);
3066 		break;
3067 	}
3068 	case IMP_EVENT_INACTIVE: {
3069 		rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3070 		break;
3071 	}
3072 	case IMP_EVENT_INVALIDATE: {
3073 		struct ldlm_namespace *ns = obd->obd_namespace;
3074 		struct lu_env	 *env;
3075 		int		    refcheck;
3076 
3077 		env = cl_env_get(&refcheck);
3078 		if (!IS_ERR(env)) {
3079 			/* Reset grants */
3080 			cli = &obd->u.cli;
3081 			/* all pages go to failing rpcs due to the invalid
3082 			 * import */
3083 			osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3084 
3085 			ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3086 			cl_env_put(env, &refcheck);
3087 		} else
3088 			rc = PTR_ERR(env);
3089 		break;
3090 	}
3091 	case IMP_EVENT_ACTIVE: {
3092 		rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3093 		break;
3094 	}
3095 	case IMP_EVENT_OCD: {
3096 		struct obd_connect_data *ocd = &imp->imp_connect_data;
3097 
3098 		if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3099 			osc_init_grant(&obd->u.cli, ocd);
3100 
3101 		/* See bug 7198 */
3102 		if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3103 			imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3104 
3105 		rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3106 		break;
3107 	}
3108 	case IMP_EVENT_DEACTIVATE: {
3109 		rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3110 		break;
3111 	}
3112 	case IMP_EVENT_ACTIVATE: {
3113 		rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3114 		break;
3115 	}
3116 	default:
3117 		CERROR("Unknown import event %d\n", event);
3118 		LBUG();
3119 	}
3120 	return rc;
3121 }
3122 
3123 /**
3124  * Determine whether the lock can be canceled before replaying the lock
3125  * during recovery, see bug16774 for detailed information.
3126  *
3127  * \retval zero the lock can't be canceled
3128  * \retval other ok to cancel
3129  */
osc_cancel_for_recovery(struct ldlm_lock * lock)3130 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3131 {
3132 	check_res_locked(lock->l_resource);
3133 
3134 	/*
3135 	 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3136 	 *
3137 	 * XXX as a future improvement, we can also cancel unused write lock
3138 	 * if it doesn't have dirty data and active mmaps.
3139 	 */
3140 	if (lock->l_resource->lr_type == LDLM_EXTENT &&
3141 	    (lock->l_granted_mode == LCK_PR ||
3142 	     lock->l_granted_mode == LCK_CR) &&
3143 	    (osc_dlm_lock_pageref(lock) == 0))
3144 		return 1;
3145 
3146 	return 0;
3147 }
3148 
brw_queue_work(const struct lu_env * env,void * data)3149 static int brw_queue_work(const struct lu_env *env, void *data)
3150 {
3151 	struct client_obd *cli = data;
3152 
3153 	CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3154 
3155 	osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3156 	return 0;
3157 }
3158 
osc_setup(struct obd_device * obd,struct lustre_cfg * lcfg)3159 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3160 {
3161 	struct lprocfs_static_vars lvars = { NULL };
3162 	struct client_obd	  *cli = &obd->u.cli;
3163 	void		       *handler;
3164 	int			rc;
3165 
3166 	rc = ptlrpcd_addref();
3167 	if (rc)
3168 		return rc;
3169 
3170 	rc = client_obd_setup(obd, lcfg);
3171 	if (rc)
3172 		goto out_ptlrpcd;
3173 
3174 	handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3175 	if (IS_ERR(handler)) {
3176 		rc = PTR_ERR(handler);
3177 		goto out_client_setup;
3178 	}
3179 	cli->cl_writeback_work = handler;
3180 
3181 	rc = osc_quota_setup(obd);
3182 	if (rc)
3183 		goto out_ptlrpcd_work;
3184 
3185 	cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3186 	lprocfs_osc_init_vars(&lvars);
3187 	if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3188 		lproc_osc_attach_seqstat(obd);
3189 		sptlrpc_lprocfs_cliobd_attach(obd);
3190 		ptlrpc_lprocfs_register_obd(obd);
3191 	}
3192 
3193 	/* We need to allocate a few requests more, because
3194 	 * brw_interpret tries to create new requests before freeing
3195 	 * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3196 	 * reserved, but I'm afraid that might be too much wasted RAM
3197 	 * in fact, so 2 is just my guess and still should work. */
3198 	cli->cl_import->imp_rq_pool =
3199 		ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3200 				    OST_MAXREQSIZE,
3201 				    ptlrpc_add_rqs_to_pool);
3202 
3203 	INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3204 	ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3205 	return rc;
3206 
3207 out_ptlrpcd_work:
3208 	ptlrpcd_destroy_work(handler);
3209 out_client_setup:
3210 	client_obd_cleanup(obd);
3211 out_ptlrpcd:
3212 	ptlrpcd_decref();
3213 	return rc;
3214 }
3215 
osc_precleanup(struct obd_device * obd,enum obd_cleanup_stage stage)3216 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3217 {
3218 	switch (stage) {
3219 	case OBD_CLEANUP_EARLY: {
3220 		struct obd_import *imp;
3221 		imp = obd->u.cli.cl_import;
3222 		CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3223 		/* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3224 		ptlrpc_deactivate_import(imp);
3225 		spin_lock(&imp->imp_lock);
3226 		imp->imp_pingable = 0;
3227 		spin_unlock(&imp->imp_lock);
3228 		break;
3229 	}
3230 	case OBD_CLEANUP_EXPORTS: {
3231 		struct client_obd *cli = &obd->u.cli;
3232 		/* LU-464
3233 		 * for echo client, export may be on zombie list, wait for
3234 		 * zombie thread to cull it, because cli.cl_import will be
3235 		 * cleared in client_disconnect_export():
3236 		 *   class_export_destroy() -> obd_cleanup() ->
3237 		 *   echo_device_free() -> echo_client_cleanup() ->
3238 		 *   obd_disconnect() -> osc_disconnect() ->
3239 		 *   client_disconnect_export()
3240 		 */
3241 		obd_zombie_barrier();
3242 		if (cli->cl_writeback_work) {
3243 			ptlrpcd_destroy_work(cli->cl_writeback_work);
3244 			cli->cl_writeback_work = NULL;
3245 		}
3246 		obd_cleanup_client_import(obd);
3247 		ptlrpc_lprocfs_unregister_obd(obd);
3248 		lprocfs_obd_cleanup(obd);
3249 		break;
3250 		}
3251 	}
3252 	return 0;
3253 }
3254 
osc_cleanup(struct obd_device * obd)3255 int osc_cleanup(struct obd_device *obd)
3256 {
3257 	struct client_obd *cli = &obd->u.cli;
3258 	int rc;
3259 
3260 	/* lru cleanup */
3261 	if (cli->cl_cache != NULL) {
3262 		LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3263 		spin_lock(&cli->cl_cache->ccc_lru_lock);
3264 		list_del_init(&cli->cl_lru_osc);
3265 		spin_unlock(&cli->cl_cache->ccc_lru_lock);
3266 		cli->cl_lru_left = NULL;
3267 		atomic_dec(&cli->cl_cache->ccc_users);
3268 		cli->cl_cache = NULL;
3269 	}
3270 
3271 	/* free memory of osc quota cache */
3272 	osc_quota_cleanup(obd);
3273 
3274 	rc = client_obd_cleanup(obd);
3275 
3276 	ptlrpcd_decref();
3277 	return rc;
3278 }
3279 
osc_process_config_base(struct obd_device * obd,struct lustre_cfg * lcfg)3280 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3281 {
3282 	struct lprocfs_static_vars lvars = { NULL };
3283 	int rc = 0;
3284 
3285 	lprocfs_osc_init_vars(&lvars);
3286 
3287 	switch (lcfg->lcfg_command) {
3288 	default:
3289 		rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3290 					      lcfg, obd);
3291 		if (rc > 0)
3292 			rc = 0;
3293 		break;
3294 	}
3295 
3296 	return rc;
3297 }
3298 
osc_process_config(struct obd_device * obd,u32 len,void * buf)3299 static int osc_process_config(struct obd_device *obd, u32 len, void *buf)
3300 {
3301 	return osc_process_config_base(obd, buf);
3302 }
3303 
3304 struct obd_ops osc_obd_ops = {
3305 	.o_owner		= THIS_MODULE,
3306 	.o_setup		= osc_setup,
3307 	.o_precleanup	   = osc_precleanup,
3308 	.o_cleanup	      = osc_cleanup,
3309 	.o_add_conn	     = client_import_add_conn,
3310 	.o_del_conn	     = client_import_del_conn,
3311 	.o_connect	      = client_connect_import,
3312 	.o_reconnect	    = osc_reconnect,
3313 	.o_disconnect	   = osc_disconnect,
3314 	.o_statfs	       = osc_statfs,
3315 	.o_statfs_async	 = osc_statfs_async,
3316 	.o_packmd	       = osc_packmd,
3317 	.o_unpackmd	     = osc_unpackmd,
3318 	.o_create	       = osc_create,
3319 	.o_destroy	      = osc_destroy,
3320 	.o_getattr	      = osc_getattr,
3321 	.o_getattr_async	= osc_getattr_async,
3322 	.o_setattr	      = osc_setattr,
3323 	.o_setattr_async	= osc_setattr_async,
3324 	.o_find_cbdata	  = osc_find_cbdata,
3325 	.o_iocontrol	    = osc_iocontrol,
3326 	.o_get_info	     = osc_get_info,
3327 	.o_set_info_async       = osc_set_info_async,
3328 	.o_import_event	 = osc_import_event,
3329 	.o_process_config       = osc_process_config,
3330 	.o_quotactl	     = osc_quotactl,
3331 	.o_quotacheck	   = osc_quotacheck,
3332 };
3333 
3334 extern struct lu_kmem_descr osc_caches[];
3335 extern spinlock_t osc_ast_guard;
3336 extern struct lock_class_key osc_ast_guard_class;
3337 
osc_init(void)3338 static int __init osc_init(void)
3339 {
3340 	struct lprocfs_static_vars lvars = { NULL };
3341 	int rc;
3342 
3343 	/* print an address of _any_ initialized kernel symbol from this
3344 	 * module, to allow debugging with gdb that doesn't support data
3345 	 * symbols from modules.*/
3346 	CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3347 
3348 	rc = lu_kmem_init(osc_caches);
3349 	if (rc)
3350 		return rc;
3351 
3352 	lprocfs_osc_init_vars(&lvars);
3353 
3354 	rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3355 				 LUSTRE_OSC_NAME, &osc_device_type);
3356 	if (rc) {
3357 		lu_kmem_fini(osc_caches);
3358 		return rc;
3359 	}
3360 
3361 	spin_lock_init(&osc_ast_guard);
3362 	lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3363 
3364 	return rc;
3365 }
3366 
osc_exit(void)3367 static void /*__exit*/ osc_exit(void)
3368 {
3369 	class_unregister_type(LUSTRE_OSC_NAME);
3370 	lu_kmem_fini(osc_caches);
3371 }
3372 
3373 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3374 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3375 MODULE_LICENSE("GPL");
3376 MODULE_VERSION(LUSTRE_VERSION_STRING);
3377 
3378 module_init(osc_init);
3379 module_exit(osc_exit);
3380