1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36 
37 #define DEBUG_SUBSYSTEM S_OSC
38 
39 #include "../../include/linux/libcfs/libcfs.h"
40 
41 #include "../include/lustre_dlm.h"
42 #include "../include/lustre_net.h"
43 #include "../include/lustre/lustre_user.h"
44 #include "../include/obd_cksum.h"
45 
46 #include "../include/lustre_ha.h"
47 #include "../include/lprocfs_status.h"
48 #include "../include/lustre_debug.h"
49 #include "../include/lustre_param.h"
50 #include "../include/lustre_fid.h"
51 #include "../include/obd_class.h"
52 #include "../include/obd.h"
53 #include "osc_internal.h"
54 #include "osc_cl_internal.h"
55 
56 atomic_t osc_pool_req_count;
57 unsigned int osc_reqpool_maxreqcount;
58 struct ptlrpc_request_pool *osc_rq_pool;
59 
60 /* max memory used for request pool, unit is MB */
61 static unsigned int osc_reqpool_mem_max = 5;
62 module_param(osc_reqpool_mem_max, uint, 0444);
63 
64 struct osc_brw_async_args {
65 	struct obdo       *aa_oa;
66 	int		aa_requested_nob;
67 	int		aa_nio_count;
68 	u32		aa_page_count;
69 	int		aa_resends;
70 	struct brw_page  **aa_ppga;
71 	struct client_obd *aa_cli;
72 	struct list_head	 aa_oaps;
73 	struct list_head	 aa_exts;
74 	struct cl_req     *aa_clerq;
75 };
76 
77 struct osc_async_args {
78 	struct obd_info   *aa_oi;
79 };
80 
81 struct osc_setattr_args {
82 	struct obdo	 *sa_oa;
83 	obd_enqueue_update_f sa_upcall;
84 	void		*sa_cookie;
85 };
86 
87 struct osc_fsync_args {
88 	struct obd_info     *fa_oi;
89 	obd_enqueue_update_f fa_upcall;
90 	void		*fa_cookie;
91 };
92 
93 struct osc_enqueue_args {
94 	struct obd_export	*oa_exp;
95 	__u64		    *oa_flags;
96 	obd_enqueue_update_f      oa_upcall;
97 	void		     *oa_cookie;
98 	struct ost_lvb	   *oa_lvb;
99 	struct lustre_handle     *oa_lockh;
100 	struct ldlm_enqueue_info *oa_ei;
101 	unsigned int	      oa_agl:1;
102 };
103 
104 static void osc_release_ppga(struct brw_page **ppga, u32 count);
105 static int brw_interpret(const struct lu_env *env,
106 			 struct ptlrpc_request *req, void *data, int rc);
107 int osc_cleanup(struct obd_device *obd);
108 
109 /* Pack OSC object metadata for disk storage (LE byte order). */
osc_packmd(struct obd_export * exp,struct lov_mds_md ** lmmp,struct lov_stripe_md * lsm)110 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
111 		      struct lov_stripe_md *lsm)
112 {
113 	int lmm_size;
114 
115 	lmm_size = sizeof(**lmmp);
116 	if (lmmp == NULL)
117 		return lmm_size;
118 
119 	if (*lmmp != NULL && lsm == NULL) {
120 		kfree(*lmmp);
121 		*lmmp = NULL;
122 		return 0;
123 	} else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
124 		return -EBADF;
125 	}
126 
127 	if (*lmmp == NULL) {
128 		*lmmp = kzalloc(lmm_size, GFP_NOFS);
129 		if (!*lmmp)
130 			return -ENOMEM;
131 	}
132 
133 	if (lsm)
134 		ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
135 
136 	return lmm_size;
137 }
138 
139 /* Unpack OSC object metadata from disk storage (LE byte order). */
osc_unpackmd(struct obd_export * exp,struct lov_stripe_md ** lsmp,struct lov_mds_md * lmm,int lmm_bytes)140 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
141 			struct lov_mds_md *lmm, int lmm_bytes)
142 {
143 	int lsm_size;
144 	struct obd_import *imp = class_exp2cliimp(exp);
145 
146 	if (lmm != NULL) {
147 		if (lmm_bytes < sizeof(*lmm)) {
148 			CERROR("%s: lov_mds_md too small: %d, need %d\n",
149 			       exp->exp_obd->obd_name, lmm_bytes,
150 			       (int)sizeof(*lmm));
151 			return -EINVAL;
152 		}
153 		/* XXX LOV_MAGIC etc check? */
154 
155 		if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
156 			CERROR("%s: zero lmm_object_id: rc = %d\n",
157 			       exp->exp_obd->obd_name, -EINVAL);
158 			return -EINVAL;
159 		}
160 	}
161 
162 	lsm_size = lov_stripe_md_size(1);
163 	if (lsmp == NULL)
164 		return lsm_size;
165 
166 	if (*lsmp != NULL && lmm == NULL) {
167 		kfree((*lsmp)->lsm_oinfo[0]);
168 		kfree(*lsmp);
169 		*lsmp = NULL;
170 		return 0;
171 	}
172 
173 	if (*lsmp == NULL) {
174 		*lsmp = kzalloc(lsm_size, GFP_NOFS);
175 		if (unlikely(*lsmp == NULL))
176 			return -ENOMEM;
177 		(*lsmp)->lsm_oinfo[0] = kzalloc(sizeof(struct lov_oinfo),
178 						GFP_NOFS);
179 		if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
180 			kfree(*lsmp);
181 			return -ENOMEM;
182 		}
183 		loi_init((*lsmp)->lsm_oinfo[0]);
184 	} else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
185 		return -EBADF;
186 	}
187 
188 	if (lmm != NULL)
189 		/* XXX zero *lsmp? */
190 		ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
191 
192 	if (imp != NULL &&
193 	    (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
194 		(*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
195 	else
196 		(*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
197 
198 	return lsm_size;
199 }
200 
osc_pack_req_body(struct ptlrpc_request * req,struct obd_info * oinfo)201 static inline void osc_pack_req_body(struct ptlrpc_request *req,
202 				     struct obd_info *oinfo)
203 {
204 	struct ost_body *body;
205 
206 	body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
207 	LASSERT(body);
208 
209 	lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
210 			     oinfo->oi_oa);
211 }
212 
osc_getattr_interpret(const struct lu_env * env,struct ptlrpc_request * req,struct osc_async_args * aa,int rc)213 static int osc_getattr_interpret(const struct lu_env *env,
214 				 struct ptlrpc_request *req,
215 				 struct osc_async_args *aa, int rc)
216 {
217 	struct ost_body *body;
218 
219 	if (rc != 0)
220 		goto out;
221 
222 	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
223 	if (body) {
224 		CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
225 		lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
226 				     aa->aa_oi->oi_oa, &body->oa);
227 
228 		/* This should really be sent by the OST */
229 		aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
230 		aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
231 	} else {
232 		CDEBUG(D_INFO, "can't unpack ost_body\n");
233 		rc = -EPROTO;
234 		aa->aa_oi->oi_oa->o_valid = 0;
235 	}
236 out:
237 	rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
238 	return rc;
239 }
240 
osc_getattr_async(struct obd_export * exp,struct obd_info * oinfo,struct ptlrpc_request_set * set)241 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
242 			     struct ptlrpc_request_set *set)
243 {
244 	struct ptlrpc_request *req;
245 	struct osc_async_args *aa;
246 	int rc;
247 
248 	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
249 	if (req == NULL)
250 		return -ENOMEM;
251 
252 	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
253 	if (rc) {
254 		ptlrpc_request_free(req);
255 		return rc;
256 	}
257 
258 	osc_pack_req_body(req, oinfo);
259 
260 	ptlrpc_request_set_replen(req);
261 	req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
262 
263 	CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
264 	aa = ptlrpc_req_async_args(req);
265 	aa->aa_oi = oinfo;
266 
267 	ptlrpc_set_add_req(set, req);
268 	return 0;
269 }
270 
osc_getattr(const struct lu_env * env,struct obd_export * exp,struct obd_info * oinfo)271 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
272 		       struct obd_info *oinfo)
273 {
274 	struct ptlrpc_request *req;
275 	struct ost_body *body;
276 	int rc;
277 
278 	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
279 	if (req == NULL)
280 		return -ENOMEM;
281 
282 	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
283 	if (rc) {
284 		ptlrpc_request_free(req);
285 		return rc;
286 	}
287 
288 	osc_pack_req_body(req, oinfo);
289 
290 	ptlrpc_request_set_replen(req);
291 
292 	rc = ptlrpc_queue_wait(req);
293 	if (rc)
294 		goto out;
295 
296 	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
297 	if (body == NULL) {
298 		rc = -EPROTO;
299 		goto out;
300 	}
301 
302 	CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
303 	lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
304 			     &body->oa);
305 
306 	oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
307 	oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
308 
309  out:
310 	ptlrpc_req_finished(req);
311 	return rc;
312 }
313 
osc_setattr(const struct lu_env * env,struct obd_export * exp,struct obd_info * oinfo,struct obd_trans_info * oti)314 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
315 		       struct obd_info *oinfo, struct obd_trans_info *oti)
316 {
317 	struct ptlrpc_request *req;
318 	struct ost_body *body;
319 	int rc;
320 
321 	LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
322 
323 	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
324 	if (req == NULL)
325 		return -ENOMEM;
326 
327 	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
328 	if (rc) {
329 		ptlrpc_request_free(req);
330 		return rc;
331 	}
332 
333 	osc_pack_req_body(req, oinfo);
334 
335 	ptlrpc_request_set_replen(req);
336 
337 	rc = ptlrpc_queue_wait(req);
338 	if (rc)
339 		goto out;
340 
341 	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
342 	if (body == NULL) {
343 		rc = -EPROTO;
344 		goto out;
345 	}
346 
347 	lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
348 			     &body->oa);
349 
350 out:
351 	ptlrpc_req_finished(req);
352 	return rc;
353 }
354 
osc_setattr_interpret(const struct lu_env * env,struct ptlrpc_request * req,struct osc_setattr_args * sa,int rc)355 static int osc_setattr_interpret(const struct lu_env *env,
356 				 struct ptlrpc_request *req,
357 				 struct osc_setattr_args *sa, int rc)
358 {
359 	struct ost_body *body;
360 
361 	if (rc != 0)
362 		goto out;
363 
364 	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
365 	if (body == NULL) {
366 		rc = -EPROTO;
367 		goto out;
368 	}
369 
370 	lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
371 			     &body->oa);
372 out:
373 	rc = sa->sa_upcall(sa->sa_cookie, rc);
374 	return rc;
375 }
376 
osc_setattr_async_base(struct obd_export * exp,struct obd_info * oinfo,struct obd_trans_info * oti,obd_enqueue_update_f upcall,void * cookie,struct ptlrpc_request_set * rqset)377 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
378 			   struct obd_trans_info *oti,
379 			   obd_enqueue_update_f upcall, void *cookie,
380 			   struct ptlrpc_request_set *rqset)
381 {
382 	struct ptlrpc_request *req;
383 	struct osc_setattr_args *sa;
384 	int rc;
385 
386 	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
387 	if (req == NULL)
388 		return -ENOMEM;
389 
390 	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
391 	if (rc) {
392 		ptlrpc_request_free(req);
393 		return rc;
394 	}
395 
396 	if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
397 		oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
398 
399 	osc_pack_req_body(req, oinfo);
400 
401 	ptlrpc_request_set_replen(req);
402 
403 	/* do mds to ost setattr asynchronously */
404 	if (!rqset) {
405 		/* Do not wait for response. */
406 		ptlrpcd_add_req(req);
407 	} else {
408 		req->rq_interpret_reply =
409 			(ptlrpc_interpterer_t)osc_setattr_interpret;
410 
411 		CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
412 		sa = ptlrpc_req_async_args(req);
413 		sa->sa_oa = oinfo->oi_oa;
414 		sa->sa_upcall = upcall;
415 		sa->sa_cookie = cookie;
416 
417 		if (rqset == PTLRPCD_SET)
418 			ptlrpcd_add_req(req);
419 		else
420 			ptlrpc_set_add_req(rqset, req);
421 	}
422 
423 	return 0;
424 }
425 
osc_setattr_async(struct obd_export * exp,struct obd_info * oinfo,struct obd_trans_info * oti,struct ptlrpc_request_set * rqset)426 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
427 			     struct obd_trans_info *oti,
428 			     struct ptlrpc_request_set *rqset)
429 {
430 	return osc_setattr_async_base(exp, oinfo, oti,
431 				      oinfo->oi_cb_up, oinfo, rqset);
432 }
433 
osc_real_create(struct obd_export * exp,struct obdo * oa,struct lov_stripe_md ** ea,struct obd_trans_info * oti)434 int osc_real_create(struct obd_export *exp, struct obdo *oa,
435 		    struct lov_stripe_md **ea, struct obd_trans_info *oti)
436 {
437 	struct ptlrpc_request *req;
438 	struct ost_body *body;
439 	struct lov_stripe_md *lsm;
440 	int rc;
441 
442 	LASSERT(oa);
443 	LASSERT(ea);
444 
445 	lsm = *ea;
446 	if (!lsm) {
447 		rc = obd_alloc_memmd(exp, &lsm);
448 		if (rc < 0)
449 			return rc;
450 	}
451 
452 	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
453 	if (req == NULL) {
454 		rc = -ENOMEM;
455 		goto out;
456 	}
457 
458 	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
459 	if (rc) {
460 		ptlrpc_request_free(req);
461 		goto out;
462 	}
463 
464 	body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
465 	LASSERT(body);
466 
467 	lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
468 
469 	ptlrpc_request_set_replen(req);
470 
471 	if ((oa->o_valid & OBD_MD_FLFLAGS) &&
472 	    oa->o_flags == OBD_FL_DELORPHAN) {
473 		DEBUG_REQ(D_HA, req,
474 			  "delorphan from OST integration");
475 		/* Don't resend the delorphan req */
476 		req->rq_no_resend = req->rq_no_delay = 1;
477 	}
478 
479 	rc = ptlrpc_queue_wait(req);
480 	if (rc)
481 		goto out_req;
482 
483 	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
484 	if (body == NULL) {
485 		rc = -EPROTO;
486 		goto out_req;
487 	}
488 
489 	CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
490 	lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
491 
492 	oa->o_blksize = cli_brw_size(exp->exp_obd);
493 	oa->o_valid |= OBD_MD_FLBLKSZ;
494 
495 	/* XXX LOV STACKING: the lsm that is passed to us from LOV does not
496 	 * have valid lsm_oinfo data structs, so don't go touching that.
497 	 * This needs to be fixed in a big way.
498 	 */
499 	lsm->lsm_oi = oa->o_oi;
500 	*ea = lsm;
501 
502 	if (oti != NULL) {
503 		oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
504 
505 		if (oa->o_valid & OBD_MD_FLCOOKIE) {
506 			if (!oti->oti_logcookies)
507 				oti_alloc_cookies(oti, 1);
508 			*oti->oti_logcookies = oa->o_lcookie;
509 		}
510 	}
511 
512 	CDEBUG(D_HA, "transno: %lld\n",
513 	       lustre_msg_get_transno(req->rq_repmsg));
514 out_req:
515 	ptlrpc_req_finished(req);
516 out:
517 	if (rc && !*ea)
518 		obd_free_memmd(exp, &lsm);
519 	return rc;
520 }
521 
osc_punch_base(struct obd_export * exp,struct obd_info * oinfo,obd_enqueue_update_f upcall,void * cookie,struct ptlrpc_request_set * rqset)522 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
523 		   obd_enqueue_update_f upcall, void *cookie,
524 		   struct ptlrpc_request_set *rqset)
525 {
526 	struct ptlrpc_request *req;
527 	struct osc_setattr_args *sa;
528 	struct ost_body *body;
529 	int rc;
530 
531 	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
532 	if (req == NULL)
533 		return -ENOMEM;
534 
535 	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
536 	if (rc) {
537 		ptlrpc_request_free(req);
538 		return rc;
539 	}
540 	req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
541 	ptlrpc_at_set_req_timeout(req);
542 
543 	body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
544 	LASSERT(body);
545 	lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
546 			     oinfo->oi_oa);
547 
548 	ptlrpc_request_set_replen(req);
549 
550 	req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
551 	CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
552 	sa = ptlrpc_req_async_args(req);
553 	sa->sa_oa = oinfo->oi_oa;
554 	sa->sa_upcall = upcall;
555 	sa->sa_cookie = cookie;
556 	if (rqset == PTLRPCD_SET)
557 		ptlrpcd_add_req(req);
558 	else
559 		ptlrpc_set_add_req(rqset, req);
560 
561 	return 0;
562 }
563 
osc_sync_interpret(const struct lu_env * env,struct ptlrpc_request * req,void * arg,int rc)564 static int osc_sync_interpret(const struct lu_env *env,
565 			      struct ptlrpc_request *req,
566 			      void *arg, int rc)
567 {
568 	struct osc_fsync_args *fa = arg;
569 	struct ost_body *body;
570 
571 	if (rc)
572 		goto out;
573 
574 	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
575 	if (body == NULL) {
576 		CERROR("can't unpack ost_body\n");
577 		rc = -EPROTO;
578 		goto out;
579 	}
580 
581 	*fa->fa_oi->oi_oa = body->oa;
582 out:
583 	rc = fa->fa_upcall(fa->fa_cookie, rc);
584 	return rc;
585 }
586 
osc_sync_base(struct obd_export * exp,struct obd_info * oinfo,obd_enqueue_update_f upcall,void * cookie,struct ptlrpc_request_set * rqset)587 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
588 		  obd_enqueue_update_f upcall, void *cookie,
589 		  struct ptlrpc_request_set *rqset)
590 {
591 	struct ptlrpc_request *req;
592 	struct ost_body *body;
593 	struct osc_fsync_args *fa;
594 	int rc;
595 
596 	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
597 	if (req == NULL)
598 		return -ENOMEM;
599 
600 	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
601 	if (rc) {
602 		ptlrpc_request_free(req);
603 		return rc;
604 	}
605 
606 	/* overload the size and blocks fields in the oa with start/end */
607 	body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
608 	LASSERT(body);
609 	lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
610 			     oinfo->oi_oa);
611 
612 	ptlrpc_request_set_replen(req);
613 	req->rq_interpret_reply = osc_sync_interpret;
614 
615 	CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
616 	fa = ptlrpc_req_async_args(req);
617 	fa->fa_oi = oinfo;
618 	fa->fa_upcall = upcall;
619 	fa->fa_cookie = cookie;
620 
621 	if (rqset == PTLRPCD_SET)
622 		ptlrpcd_add_req(req);
623 	else
624 		ptlrpc_set_add_req(rqset, req);
625 
626 	return 0;
627 }
628 
629 /* Find and cancel locally locks matched by @mode in the resource found by
630  * @objid. Found locks are added into @cancel list. Returns the amount of
631  * locks added to @cancels list. */
osc_resource_get_unused(struct obd_export * exp,struct obdo * oa,struct list_head * cancels,ldlm_mode_t mode,__u64 lock_flags)632 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
633 				   struct list_head *cancels,
634 				   ldlm_mode_t mode, __u64 lock_flags)
635 {
636 	struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
637 	struct ldlm_res_id res_id;
638 	struct ldlm_resource *res;
639 	int count;
640 
641 	/* Return, i.e. cancel nothing, only if ELC is supported (flag in
642 	 * export) but disabled through procfs (flag in NS).
643 	 *
644 	 * This distinguishes from a case when ELC is not supported originally,
645 	 * when we still want to cancel locks in advance and just cancel them
646 	 * locally, without sending any RPC. */
647 	if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
648 		return 0;
649 
650 	ostid_build_res_name(&oa->o_oi, &res_id);
651 	res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
652 	if (res == NULL)
653 		return 0;
654 
655 	LDLM_RESOURCE_ADDREF(res);
656 	count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
657 					   lock_flags, 0, NULL);
658 	LDLM_RESOURCE_DELREF(res);
659 	ldlm_resource_putref(res);
660 	return count;
661 }
662 
osc_destroy_interpret(const struct lu_env * env,struct ptlrpc_request * req,void * data,int rc)663 static int osc_destroy_interpret(const struct lu_env *env,
664 				 struct ptlrpc_request *req, void *data,
665 				 int rc)
666 {
667 	struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
668 
669 	atomic_dec(&cli->cl_destroy_in_flight);
670 	wake_up(&cli->cl_destroy_waitq);
671 	return 0;
672 }
673 
osc_can_send_destroy(struct client_obd * cli)674 static int osc_can_send_destroy(struct client_obd *cli)
675 {
676 	if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
677 	    cli->cl_max_rpcs_in_flight) {
678 		/* The destroy request can be sent */
679 		return 1;
680 	}
681 	if (atomic_dec_return(&cli->cl_destroy_in_flight) <
682 	    cli->cl_max_rpcs_in_flight) {
683 		/*
684 		 * The counter has been modified between the two atomic
685 		 * operations.
686 		 */
687 		wake_up(&cli->cl_destroy_waitq);
688 	}
689 	return 0;
690 }
691 
osc_create(const struct lu_env * env,struct obd_export * exp,struct obdo * oa,struct lov_stripe_md ** ea,struct obd_trans_info * oti)692 int osc_create(const struct lu_env *env, struct obd_export *exp,
693 	       struct obdo *oa, struct lov_stripe_md **ea,
694 	       struct obd_trans_info *oti)
695 {
696 	int rc = 0;
697 
698 	LASSERT(oa);
699 	LASSERT(ea);
700 	LASSERT(oa->o_valid & OBD_MD_FLGROUP);
701 
702 	if ((oa->o_valid & OBD_MD_FLFLAGS) &&
703 	    oa->o_flags == OBD_FL_RECREATE_OBJS) {
704 		return osc_real_create(exp, oa, ea, oti);
705 	}
706 
707 	if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
708 		return osc_real_create(exp, oa, ea, oti);
709 
710 	/* we should not get here anymore */
711 	LBUG();
712 
713 	return rc;
714 }
715 
716 /* Destroy requests can be async always on the client, and we don't even really
717  * care about the return code since the client cannot do anything at all about
718  * a destroy failure.
719  * When the MDS is unlinking a filename, it saves the file objects into a
720  * recovery llog, and these object records are cancelled when the OST reports
721  * they were destroyed and sync'd to disk (i.e. transaction committed).
722  * If the client dies, or the OST is down when the object should be destroyed,
723  * the records are not cancelled, and when the OST reconnects to the MDS next,
724  * it will retrieve the llog unlink logs and then sends the log cancellation
725  * cookies to the MDS after committing destroy transactions. */
osc_destroy(const struct lu_env * env,struct obd_export * exp,struct obdo * oa,struct lov_stripe_md * ea,struct obd_trans_info * oti,struct obd_export * md_export)726 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
727 		       struct obdo *oa, struct lov_stripe_md *ea,
728 		       struct obd_trans_info *oti, struct obd_export *md_export)
729 {
730 	struct client_obd *cli = &exp->exp_obd->u.cli;
731 	struct ptlrpc_request *req;
732 	struct ost_body *body;
733 	LIST_HEAD(cancels);
734 	int rc, count;
735 
736 	if (!oa) {
737 		CDEBUG(D_INFO, "oa NULL\n");
738 		return -EINVAL;
739 	}
740 
741 	count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
742 					LDLM_FL_DISCARD_DATA);
743 
744 	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
745 	if (req == NULL) {
746 		ldlm_lock_list_put(&cancels, l_bl_ast, count);
747 		return -ENOMEM;
748 	}
749 
750 	rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
751 			       0, &cancels, count);
752 	if (rc) {
753 		ptlrpc_request_free(req);
754 		return rc;
755 	}
756 
757 	req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
758 	ptlrpc_at_set_req_timeout(req);
759 
760 	if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
761 		oa->o_lcookie = *oti->oti_logcookies;
762 	body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
763 	LASSERT(body);
764 	lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
765 
766 	ptlrpc_request_set_replen(req);
767 
768 	/* If osc_destroy is for destroying the unlink orphan,
769 	 * sent from MDT to OST, which should not be blocked here,
770 	 * because the process might be triggered by ptlrpcd, and
771 	 * it is not good to block ptlrpcd thread (b=16006)*/
772 	if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
773 		req->rq_interpret_reply = osc_destroy_interpret;
774 		if (!osc_can_send_destroy(cli)) {
775 			struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
776 							  NULL);
777 
778 			/*
779 			 * Wait until the number of on-going destroy RPCs drops
780 			 * under max_rpc_in_flight
781 			 */
782 			l_wait_event_exclusive(cli->cl_destroy_waitq,
783 					       osc_can_send_destroy(cli), &lwi);
784 		}
785 	}
786 
787 	/* Do not wait for response */
788 	ptlrpcd_add_req(req);
789 	return 0;
790 }
791 
osc_announce_cached(struct client_obd * cli,struct obdo * oa,long writing_bytes)792 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
793 				long writing_bytes)
794 {
795 	u32 bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
796 
797 	LASSERT(!(oa->o_valid & bits));
798 
799 	oa->o_valid |= bits;
800 	client_obd_list_lock(&cli->cl_loi_list_lock);
801 	oa->o_dirty = cli->cl_dirty;
802 	if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
803 		     cli->cl_dirty_max)) {
804 		CERROR("dirty %lu - %lu > dirty_max %lu\n",
805 		       cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
806 		oa->o_undirty = 0;
807 	} else if (unlikely(atomic_read(&obd_dirty_pages) -
808 			    atomic_read(&obd_dirty_transit_pages) >
809 			    (long)(obd_max_dirty_pages + 1))) {
810 		/* The atomic_read() allowing the atomic_inc() are
811 		 * not covered by a lock thus they may safely race and trip
812 		 * this CERROR() unless we add in a small fudge factor (+1). */
813 		CERROR("dirty %d - %d > system dirty_max %d\n",
814 		       atomic_read(&obd_dirty_pages),
815 		       atomic_read(&obd_dirty_transit_pages),
816 		       obd_max_dirty_pages);
817 		oa->o_undirty = 0;
818 	} else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
819 		CERROR("dirty %lu - dirty_max %lu too big???\n",
820 		       cli->cl_dirty, cli->cl_dirty_max);
821 		oa->o_undirty = 0;
822 	} else {
823 		long max_in_flight = (cli->cl_max_pages_per_rpc <<
824 				      PAGE_CACHE_SHIFT)*
825 				     (cli->cl_max_rpcs_in_flight + 1);
826 		oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
827 	}
828 	oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
829 	oa->o_dropped = cli->cl_lost_grant;
830 	cli->cl_lost_grant = 0;
831 	client_obd_list_unlock(&cli->cl_loi_list_lock);
832 	CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
833 	       oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
834 
835 }
836 
osc_update_next_shrink(struct client_obd * cli)837 void osc_update_next_shrink(struct client_obd *cli)
838 {
839 	cli->cl_next_shrink_grant =
840 		cfs_time_shift(cli->cl_grant_shrink_interval);
841 	CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
842 	       cli->cl_next_shrink_grant);
843 }
844 
__osc_update_grant(struct client_obd * cli,u64 grant)845 static void __osc_update_grant(struct client_obd *cli, u64 grant)
846 {
847 	client_obd_list_lock(&cli->cl_loi_list_lock);
848 	cli->cl_avail_grant += grant;
849 	client_obd_list_unlock(&cli->cl_loi_list_lock);
850 }
851 
osc_update_grant(struct client_obd * cli,struct ost_body * body)852 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
853 {
854 	if (body->oa.o_valid & OBD_MD_FLGRANT) {
855 		CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
856 		__osc_update_grant(cli, body->oa.o_grant);
857 	}
858 }
859 
860 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
861 			      u32 keylen, void *key, u32 vallen,
862 			      void *val, struct ptlrpc_request_set *set);
863 
osc_shrink_grant_interpret(const struct lu_env * env,struct ptlrpc_request * req,void * aa,int rc)864 static int osc_shrink_grant_interpret(const struct lu_env *env,
865 				      struct ptlrpc_request *req,
866 				      void *aa, int rc)
867 {
868 	struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
869 	struct obdo *oa = ((struct osc_brw_async_args *)aa)->aa_oa;
870 	struct ost_body *body;
871 
872 	if (rc != 0) {
873 		__osc_update_grant(cli, oa->o_grant);
874 		goto out;
875 	}
876 
877 	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
878 	LASSERT(body);
879 	osc_update_grant(cli, body);
880 out:
881 	kmem_cache_free(obdo_cachep, oa);
882 	return rc;
883 }
884 
osc_shrink_grant_local(struct client_obd * cli,struct obdo * oa)885 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
886 {
887 	client_obd_list_lock(&cli->cl_loi_list_lock);
888 	oa->o_grant = cli->cl_avail_grant / 4;
889 	cli->cl_avail_grant -= oa->o_grant;
890 	client_obd_list_unlock(&cli->cl_loi_list_lock);
891 	if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
892 		oa->o_valid |= OBD_MD_FLFLAGS;
893 		oa->o_flags = 0;
894 	}
895 	oa->o_flags |= OBD_FL_SHRINK_GRANT;
896 	osc_update_next_shrink(cli);
897 }
898 
899 /* Shrink the current grant, either from some large amount to enough for a
900  * full set of in-flight RPCs, or if we have already shrunk to that limit
901  * then to enough for a single RPC.  This avoids keeping more grant than
902  * needed, and avoids shrinking the grant piecemeal. */
osc_shrink_grant(struct client_obd * cli)903 static int osc_shrink_grant(struct client_obd *cli)
904 {
905 	__u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
906 			     (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
907 
908 	client_obd_list_lock(&cli->cl_loi_list_lock);
909 	if (cli->cl_avail_grant <= target_bytes)
910 		target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
911 	client_obd_list_unlock(&cli->cl_loi_list_lock);
912 
913 	return osc_shrink_grant_to_target(cli, target_bytes);
914 }
915 
osc_shrink_grant_to_target(struct client_obd * cli,__u64 target_bytes)916 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
917 {
918 	int rc = 0;
919 	struct ost_body	*body;
920 
921 	client_obd_list_lock(&cli->cl_loi_list_lock);
922 	/* Don't shrink if we are already above or below the desired limit
923 	 * We don't want to shrink below a single RPC, as that will negatively
924 	 * impact block allocation and long-term performance. */
925 	if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
926 		target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
927 
928 	if (target_bytes >= cli->cl_avail_grant) {
929 		client_obd_list_unlock(&cli->cl_loi_list_lock);
930 		return 0;
931 	}
932 	client_obd_list_unlock(&cli->cl_loi_list_lock);
933 
934 	body = kzalloc(sizeof(*body), GFP_NOFS);
935 	if (!body)
936 		return -ENOMEM;
937 
938 	osc_announce_cached(cli, &body->oa, 0);
939 
940 	client_obd_list_lock(&cli->cl_loi_list_lock);
941 	body->oa.o_grant = cli->cl_avail_grant - target_bytes;
942 	cli->cl_avail_grant = target_bytes;
943 	client_obd_list_unlock(&cli->cl_loi_list_lock);
944 	if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
945 		body->oa.o_valid |= OBD_MD_FLFLAGS;
946 		body->oa.o_flags = 0;
947 	}
948 	body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
949 	osc_update_next_shrink(cli);
950 
951 	rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
952 				sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
953 				sizeof(*body), body, NULL);
954 	if (rc != 0)
955 		__osc_update_grant(cli, body->oa.o_grant);
956 	kfree(body);
957 	return rc;
958 }
959 
osc_should_shrink_grant(struct client_obd * client)960 static int osc_should_shrink_grant(struct client_obd *client)
961 {
962 	unsigned long time = cfs_time_current();
963 	unsigned long next_shrink = client->cl_next_shrink_grant;
964 
965 	if ((client->cl_import->imp_connect_data.ocd_connect_flags &
966 	     OBD_CONNECT_GRANT_SHRINK) == 0)
967 		return 0;
968 
969 	if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
970 		/* Get the current RPC size directly, instead of going via:
971 		 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
972 		 * Keep comment here so that it can be found by searching. */
973 		int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
974 
975 		if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
976 		    client->cl_avail_grant > brw_size)
977 			return 1;
978 
979 		osc_update_next_shrink(client);
980 	}
981 	return 0;
982 }
983 
osc_grant_shrink_grant_cb(struct timeout_item * item,void * data)984 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
985 {
986 	struct client_obd *client;
987 
988 	list_for_each_entry(client, &item->ti_obd_list,
989 				cl_grant_shrink_list) {
990 		if (osc_should_shrink_grant(client))
991 			osc_shrink_grant(client);
992 	}
993 	return 0;
994 }
995 
osc_add_shrink_grant(struct client_obd * client)996 static int osc_add_shrink_grant(struct client_obd *client)
997 {
998 	int rc;
999 
1000 	rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1001 				       TIMEOUT_GRANT,
1002 				       osc_grant_shrink_grant_cb, NULL,
1003 				       &client->cl_grant_shrink_list);
1004 	if (rc) {
1005 		CERROR("add grant client %s error %d\n",
1006 			client->cl_import->imp_obd->obd_name, rc);
1007 		return rc;
1008 	}
1009 	CDEBUG(D_CACHE, "add grant client %s \n",
1010 	       client->cl_import->imp_obd->obd_name);
1011 	osc_update_next_shrink(client);
1012 	return 0;
1013 }
1014 
osc_del_shrink_grant(struct client_obd * client)1015 static int osc_del_shrink_grant(struct client_obd *client)
1016 {
1017 	return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1018 					 TIMEOUT_GRANT);
1019 }
1020 
osc_init_grant(struct client_obd * cli,struct obd_connect_data * ocd)1021 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1022 {
1023 	/*
1024 	 * ocd_grant is the total grant amount we're expect to hold: if we've
1025 	 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1026 	 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1027 	 *
1028 	 * race is tolerable here: if we're evicted, but imp_state already
1029 	 * left EVICTED state, then cl_dirty must be 0 already.
1030 	 */
1031 	client_obd_list_lock(&cli->cl_loi_list_lock);
1032 	if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1033 		cli->cl_avail_grant = ocd->ocd_grant;
1034 	else
1035 		cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1036 
1037 	if (cli->cl_avail_grant < 0) {
1038 		CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1039 		      cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1040 		      ocd->ocd_grant, cli->cl_dirty);
1041 		/* workaround for servers which do not have the patch from
1042 		 * LU-2679 */
1043 		cli->cl_avail_grant = ocd->ocd_grant;
1044 	}
1045 
1046 	/* determine the appropriate chunk size used by osc_extent. */
1047 	cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1048 	client_obd_list_unlock(&cli->cl_loi_list_lock);
1049 
1050 	CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld chunk bits: %d\n",
1051 	       cli->cl_import->imp_obd->obd_name,
1052 	       cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1053 
1054 	if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1055 	    list_empty(&cli->cl_grant_shrink_list))
1056 		osc_add_shrink_grant(cli);
1057 }
1058 
1059 /* We assume that the reason this OSC got a short read is because it read
1060  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1061  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1062  * this stripe never got written at or beyond this stripe offset yet. */
handle_short_read(int nob_read,u32 page_count,struct brw_page ** pga)1063 static void handle_short_read(int nob_read, u32 page_count,
1064 			      struct brw_page **pga)
1065 {
1066 	char *ptr;
1067 	int i = 0;
1068 
1069 	/* skip bytes read OK */
1070 	while (nob_read > 0) {
1071 		LASSERT(page_count > 0);
1072 
1073 		if (pga[i]->count > nob_read) {
1074 			/* EOF inside this page */
1075 			ptr = kmap(pga[i]->pg) +
1076 				(pga[i]->off & ~CFS_PAGE_MASK);
1077 			memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1078 			kunmap(pga[i]->pg);
1079 			page_count--;
1080 			i++;
1081 			break;
1082 		}
1083 
1084 		nob_read -= pga[i]->count;
1085 		page_count--;
1086 		i++;
1087 	}
1088 
1089 	/* zero remaining pages */
1090 	while (page_count-- > 0) {
1091 		ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1092 		memset(ptr, 0, pga[i]->count);
1093 		kunmap(pga[i]->pg);
1094 		i++;
1095 	}
1096 }
1097 
check_write_rcs(struct ptlrpc_request * req,int requested_nob,int niocount,u32 page_count,struct brw_page ** pga)1098 static int check_write_rcs(struct ptlrpc_request *req,
1099 			   int requested_nob, int niocount,
1100 			   u32 page_count, struct brw_page **pga)
1101 {
1102 	int i;
1103 	__u32 *remote_rcs;
1104 
1105 	remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1106 						  sizeof(*remote_rcs) *
1107 						  niocount);
1108 	if (remote_rcs == NULL) {
1109 		CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1110 		return -EPROTO;
1111 	}
1112 
1113 	/* return error if any niobuf was in error */
1114 	for (i = 0; i < niocount; i++) {
1115 		if ((int)remote_rcs[i] < 0)
1116 			return remote_rcs[i];
1117 
1118 		if (remote_rcs[i] != 0) {
1119 			CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1120 				i, remote_rcs[i], req);
1121 			return -EPROTO;
1122 		}
1123 	}
1124 
1125 	if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1126 		CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1127 		       req->rq_bulk->bd_nob_transferred, requested_nob);
1128 		return -EPROTO;
1129 	}
1130 
1131 	return 0;
1132 }
1133 
can_merge_pages(struct brw_page * p1,struct brw_page * p2)1134 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1135 {
1136 	if (p1->flag != p2->flag) {
1137 		unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1138 				  OBD_BRW_SYNC | OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1139 
1140 		/* warn if we try to combine flags that we don't know to be
1141 		 * safe to combine */
1142 		if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1143 			CWARN("Saw flags 0x%x and 0x%x in the same brw, please report this at http://bugs.whamcloud.com/\n",
1144 			      p1->flag, p2->flag);
1145 		}
1146 		return 0;
1147 	}
1148 
1149 	return (p1->off + p1->count == p2->off);
1150 }
1151 
osc_checksum_bulk(int nob,u32 pg_count,struct brw_page ** pga,int opc,cksum_type_t cksum_type)1152 static u32 osc_checksum_bulk(int nob, u32 pg_count,
1153 			     struct brw_page **pga, int opc,
1154 			     cksum_type_t cksum_type)
1155 {
1156 	__u32 cksum;
1157 	int i = 0;
1158 	struct cfs_crypto_hash_desc *hdesc;
1159 	unsigned int bufsize;
1160 	int err;
1161 	unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1162 
1163 	LASSERT(pg_count > 0);
1164 
1165 	hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1166 	if (IS_ERR(hdesc)) {
1167 		CERROR("Unable to initialize checksum hash %s\n",
1168 		       cfs_crypto_hash_name(cfs_alg));
1169 		return PTR_ERR(hdesc);
1170 	}
1171 
1172 	while (nob > 0 && pg_count > 0) {
1173 		int count = pga[i]->count > nob ? nob : pga[i]->count;
1174 
1175 		/* corrupt the data before we compute the checksum, to
1176 		 * simulate an OST->client data error */
1177 		if (i == 0 && opc == OST_READ &&
1178 		    OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1179 			unsigned char *ptr = kmap(pga[i]->pg);
1180 			int off = pga[i]->off & ~CFS_PAGE_MASK;
1181 
1182 			memcpy(ptr + off, "bad1", min(4, nob));
1183 			kunmap(pga[i]->pg);
1184 		}
1185 		cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1186 				  pga[i]->off & ~CFS_PAGE_MASK,
1187 				  count);
1188 		CDEBUG(D_PAGE,
1189 		       "page %p map %p index %lu flags %lx count %u priv %0lx: off %d\n",
1190 		       pga[i]->pg, pga[i]->pg->mapping, pga[i]->pg->index,
1191 		       (long)pga[i]->pg->flags, page_count(pga[i]->pg),
1192 		       page_private(pga[i]->pg),
1193 		       (int)(pga[i]->off & ~CFS_PAGE_MASK));
1194 
1195 		nob -= pga[i]->count;
1196 		pg_count--;
1197 		i++;
1198 	}
1199 
1200 	bufsize = 4;
1201 	err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1202 
1203 	if (err)
1204 		cfs_crypto_hash_final(hdesc, NULL, NULL);
1205 
1206 	/* For sending we only compute the wrong checksum instead
1207 	 * of corrupting the data so it is still correct on a redo */
1208 	if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1209 		cksum++;
1210 
1211 	return cksum;
1212 }
1213 
osc_brw_prep_request(int cmd,struct client_obd * cli,struct obdo * oa,struct lov_stripe_md * lsm,u32 page_count,struct brw_page ** pga,struct ptlrpc_request ** reqp,int reserve,int resend)1214 static int osc_brw_prep_request(int cmd, struct client_obd *cli,
1215 				struct obdo *oa,
1216 				struct lov_stripe_md *lsm, u32 page_count,
1217 				struct brw_page **pga,
1218 				struct ptlrpc_request **reqp,
1219 				int reserve,
1220 				int resend)
1221 {
1222 	struct ptlrpc_request *req;
1223 	struct ptlrpc_bulk_desc *desc;
1224 	struct ost_body	*body;
1225 	struct obd_ioobj *ioobj;
1226 	struct niobuf_remote *niobuf;
1227 	int niocount, i, requested_nob, opc, rc;
1228 	struct osc_brw_async_args *aa;
1229 	struct req_capsule *pill;
1230 	struct brw_page *pg_prev;
1231 
1232 	if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1233 		return -ENOMEM; /* Recoverable */
1234 	if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1235 		return -EINVAL; /* Fatal */
1236 
1237 	if ((cmd & OBD_BRW_WRITE) != 0) {
1238 		opc = OST_WRITE;
1239 		req = ptlrpc_request_alloc_pool(cli->cl_import,
1240 						osc_rq_pool,
1241 						&RQF_OST_BRW_WRITE);
1242 	} else {
1243 		opc = OST_READ;
1244 		req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1245 	}
1246 	if (req == NULL)
1247 		return -ENOMEM;
1248 
1249 	for (niocount = i = 1; i < page_count; i++) {
1250 		if (!can_merge_pages(pga[i - 1], pga[i]))
1251 			niocount++;
1252 	}
1253 
1254 	pill = &req->rq_pill;
1255 	req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1256 			     sizeof(*ioobj));
1257 	req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1258 			     niocount * sizeof(*niobuf));
1259 
1260 	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1261 	if (rc) {
1262 		ptlrpc_request_free(req);
1263 		return rc;
1264 	}
1265 	req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1266 	ptlrpc_at_set_req_timeout(req);
1267 	/* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1268 	 * retry logic */
1269 	req->rq_no_retry_einprogress = 1;
1270 
1271 	desc = ptlrpc_prep_bulk_imp(req, page_count,
1272 		cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1273 		opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1274 		OST_BULK_PORTAL);
1275 
1276 	if (desc == NULL) {
1277 		rc = -ENOMEM;
1278 		goto out;
1279 	}
1280 	/* NB request now owns desc and will free it when it gets freed */
1281 
1282 	body = req_capsule_client_get(pill, &RMF_OST_BODY);
1283 	ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1284 	niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1285 	LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1286 
1287 	lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1288 
1289 	obdo_to_ioobj(oa, ioobj);
1290 	ioobj->ioo_bufcnt = niocount;
1291 	/* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1292 	 * that might be send for this request.  The actual number is decided
1293 	 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1294 	 * "max - 1" for old client compatibility sending "0", and also so the
1295 	 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1296 	ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1297 	LASSERT(page_count > 0);
1298 	pg_prev = pga[0];
1299 	for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1300 		struct brw_page *pg = pga[i];
1301 		int poff = pg->off & ~CFS_PAGE_MASK;
1302 
1303 		LASSERT(pg->count > 0);
1304 		/* make sure there is no gap in the middle of page array */
1305 		LASSERTF(page_count == 1 ||
1306 			 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1307 			  ergo(i > 0 && i < page_count - 1,
1308 			       poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1309 			  ergo(i == page_count - 1, poff == 0)),
1310 			 "i: %d/%d pg: %p off: %llu, count: %u\n",
1311 			 i, page_count, pg, pg->off, pg->count);
1312 		LASSERTF(i == 0 || pg->off > pg_prev->off,
1313 			 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu prev_pg %p [pri %lu ind %lu] off %llu\n",
1314 			 i, page_count,
1315 			 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1316 			 pg_prev->pg, page_private(pg_prev->pg),
1317 			 pg_prev->pg->index, pg_prev->off);
1318 		LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1319 			(pg->flag & OBD_BRW_SRVLOCK));
1320 
1321 		ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1322 		requested_nob += pg->count;
1323 
1324 		if (i > 0 && can_merge_pages(pg_prev, pg)) {
1325 			niobuf--;
1326 			niobuf->len += pg->count;
1327 		} else {
1328 			niobuf->offset = pg->off;
1329 			niobuf->len = pg->count;
1330 			niobuf->flags = pg->flag;
1331 		}
1332 		pg_prev = pg;
1333 	}
1334 
1335 	LASSERTF((void *)(niobuf - niocount) ==
1336 		req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1337 		"want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1338 		&RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1339 
1340 	osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1341 	if (resend) {
1342 		if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1343 			body->oa.o_valid |= OBD_MD_FLFLAGS;
1344 			body->oa.o_flags = 0;
1345 		}
1346 		body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1347 	}
1348 
1349 	if (osc_should_shrink_grant(cli))
1350 		osc_shrink_grant_local(cli, &body->oa);
1351 
1352 	/* size[REQ_REC_OFF] still sizeof (*body) */
1353 	if (opc == OST_WRITE) {
1354 		if (cli->cl_checksum &&
1355 		    !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1356 			/* store cl_cksum_type in a local variable since
1357 			 * it can be changed via lprocfs */
1358 			cksum_type_t cksum_type = cli->cl_cksum_type;
1359 
1360 			if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1361 				oa->o_flags &= OBD_FL_LOCAL_MASK;
1362 				body->oa.o_flags = 0;
1363 			}
1364 			body->oa.o_flags |= cksum_type_pack(cksum_type);
1365 			body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1366 			body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1367 							     page_count, pga,
1368 							     OST_WRITE,
1369 							     cksum_type);
1370 			CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1371 			       body->oa.o_cksum);
1372 			/* save this in 'oa', too, for later checking */
1373 			oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1374 			oa->o_flags |= cksum_type_pack(cksum_type);
1375 		} else {
1376 			/* clear out the checksum flag, in case this is a
1377 			 * resend but cl_checksum is no longer set. b=11238 */
1378 			oa->o_valid &= ~OBD_MD_FLCKSUM;
1379 		}
1380 		oa->o_cksum = body->oa.o_cksum;
1381 		/* 1 RC per niobuf */
1382 		req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1383 				     sizeof(__u32) * niocount);
1384 	} else {
1385 		if (cli->cl_checksum &&
1386 		    !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1387 			if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1388 				body->oa.o_flags = 0;
1389 			body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1390 			body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1391 		}
1392 	}
1393 	ptlrpc_request_set_replen(req);
1394 
1395 	CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1396 	aa = ptlrpc_req_async_args(req);
1397 	aa->aa_oa = oa;
1398 	aa->aa_requested_nob = requested_nob;
1399 	aa->aa_nio_count = niocount;
1400 	aa->aa_page_count = page_count;
1401 	aa->aa_resends = 0;
1402 	aa->aa_ppga = pga;
1403 	aa->aa_cli = cli;
1404 	INIT_LIST_HEAD(&aa->aa_oaps);
1405 
1406 	*reqp = req;
1407 	return 0;
1408 
1409  out:
1410 	ptlrpc_req_finished(req);
1411 	return rc;
1412 }
1413 
check_write_checksum(struct obdo * oa,const lnet_process_id_t * peer,__u32 client_cksum,__u32 server_cksum,int nob,u32 page_count,struct brw_page ** pga,cksum_type_t client_cksum_type)1414 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1415 				__u32 client_cksum, __u32 server_cksum, int nob,
1416 				u32 page_count, struct brw_page **pga,
1417 				cksum_type_t client_cksum_type)
1418 {
1419 	__u32 new_cksum;
1420 	char *msg;
1421 	cksum_type_t cksum_type;
1422 
1423 	if (server_cksum == client_cksum) {
1424 		CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1425 		return 0;
1426 	}
1427 
1428 	cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1429 				       oa->o_flags : 0);
1430 	new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1431 				      cksum_type);
1432 
1433 	if (cksum_type != client_cksum_type)
1434 		msg = "the server did not use the checksum type specified in the original request - likely a protocol problem"
1435 			;
1436 	else if (new_cksum == server_cksum)
1437 		msg = "changed on the client after we checksummed it - likely false positive due to mmap IO (bug 11742)"
1438 			;
1439 	else if (new_cksum == client_cksum)
1440 		msg = "changed in transit before arrival at OST";
1441 	else
1442 		msg = "changed in transit AND doesn't match the original - likely false positive due to mmap IO (bug 11742)"
1443 			;
1444 
1445 	LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1446 			   " object "DOSTID" extent [%llu-%llu]\n",
1447 			   msg, libcfs_nid2str(peer->nid),
1448 			   oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1449 			   oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1450 			   oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1451 			   POSTID(&oa->o_oi), pga[0]->off,
1452 			   pga[page_count-1]->off + pga[page_count-1]->count - 1);
1453 	CERROR("original client csum %x (type %x), server csum %x (type %x), client csum now %x\n",
1454 	       client_cksum, client_cksum_type,
1455 	       server_cksum, cksum_type, new_cksum);
1456 	return 1;
1457 }
1458 
1459 /* Note rc enters this function as number of bytes transferred */
osc_brw_fini_request(struct ptlrpc_request * req,int rc)1460 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1461 {
1462 	struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1463 	const lnet_process_id_t *peer =
1464 			&req->rq_import->imp_connection->c_peer;
1465 	struct client_obd *cli = aa->aa_cli;
1466 	struct ost_body *body;
1467 	__u32 client_cksum = 0;
1468 
1469 	if (rc < 0 && rc != -EDQUOT) {
1470 		DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1471 		return rc;
1472 	}
1473 
1474 	LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1475 	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1476 	if (body == NULL) {
1477 		DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1478 		return -EPROTO;
1479 	}
1480 
1481 	/* set/clear over quota flag for a uid/gid */
1482 	if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1483 	    body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1484 		unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1485 
1486 		CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %#llx, flags %x\n",
1487 		       body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1488 		       body->oa.o_flags);
1489 		osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1490 	}
1491 
1492 	osc_update_grant(cli, body);
1493 
1494 	if (rc < 0)
1495 		return rc;
1496 
1497 	if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1498 		client_cksum = aa->aa_oa->o_cksum; /* save for later */
1499 
1500 	if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1501 		if (rc > 0) {
1502 			CERROR("Unexpected +ve rc %d\n", rc);
1503 			return -EPROTO;
1504 		}
1505 		LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1506 
1507 		if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1508 			return -EAGAIN;
1509 
1510 		if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1511 		    check_write_checksum(&body->oa, peer, client_cksum,
1512 					 body->oa.o_cksum, aa->aa_requested_nob,
1513 					 aa->aa_page_count, aa->aa_ppga,
1514 					 cksum_type_unpack(aa->aa_oa->o_flags)))
1515 			return -EAGAIN;
1516 
1517 		rc = check_write_rcs(req, aa->aa_requested_nob,
1518 				     aa->aa_nio_count,
1519 				     aa->aa_page_count, aa->aa_ppga);
1520 		goto out;
1521 	}
1522 
1523 	/* The rest of this function executes only for OST_READs */
1524 
1525 	/* if unwrap_bulk failed, return -EAGAIN to retry */
1526 	rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1527 	if (rc < 0) {
1528 		rc = -EAGAIN;
1529 		goto out;
1530 	}
1531 
1532 	if (rc > aa->aa_requested_nob) {
1533 		CERROR("Unexpected rc %d (%d requested)\n", rc,
1534 		       aa->aa_requested_nob);
1535 		return -EPROTO;
1536 	}
1537 
1538 	if (rc != req->rq_bulk->bd_nob_transferred) {
1539 		CERROR("Unexpected rc %d (%d transferred)\n",
1540 			rc, req->rq_bulk->bd_nob_transferred);
1541 		return -EPROTO;
1542 	}
1543 
1544 	if (rc < aa->aa_requested_nob)
1545 		handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1546 
1547 	if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1548 		static int cksum_counter;
1549 		__u32 server_cksum = body->oa.o_cksum;
1550 		char *via = "";
1551 		char *router = "";
1552 		cksum_type_t cksum_type;
1553 
1554 		cksum_type = cksum_type_unpack(body->oa.o_valid&OBD_MD_FLFLAGS ?
1555 					       body->oa.o_flags : 0);
1556 		client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1557 						 aa->aa_ppga, OST_READ,
1558 						 cksum_type);
1559 
1560 		if (peer->nid != req->rq_bulk->bd_sender) {
1561 			via = " via ";
1562 			router = libcfs_nid2str(req->rq_bulk->bd_sender);
1563 		}
1564 
1565 		if (server_cksum != client_cksum) {
1566 			LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from %s%s%s inode " DFID " object " DOSTID " extent [%llu-%llu]\n",
1567 					   req->rq_import->imp_obd->obd_name,
1568 					   libcfs_nid2str(peer->nid),
1569 					   via, router,
1570 					   body->oa.o_valid & OBD_MD_FLFID ?
1571 					   body->oa.o_parent_seq : (__u64)0,
1572 					   body->oa.o_valid & OBD_MD_FLFID ?
1573 					   body->oa.o_parent_oid : 0,
1574 					   body->oa.o_valid & OBD_MD_FLFID ?
1575 					   body->oa.o_parent_ver : 0,
1576 					   POSTID(&body->oa.o_oi),
1577 					   aa->aa_ppga[0]->off,
1578 					   aa->aa_ppga[aa->aa_page_count-1]->off +
1579 					   aa->aa_ppga[aa->aa_page_count-1]->count -
1580 					   1);
1581 			CERROR("client %x, server %x, cksum_type %x\n",
1582 			       client_cksum, server_cksum, cksum_type);
1583 			cksum_counter = 0;
1584 			aa->aa_oa->o_cksum = client_cksum;
1585 			rc = -EAGAIN;
1586 		} else {
1587 			cksum_counter++;
1588 			CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1589 			rc = 0;
1590 		}
1591 	} else if (unlikely(client_cksum)) {
1592 		static int cksum_missed;
1593 
1594 		cksum_missed++;
1595 		if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1596 			CERROR("Checksum %u requested from %s but not sent\n",
1597 			       cksum_missed, libcfs_nid2str(peer->nid));
1598 	} else {
1599 		rc = 0;
1600 	}
1601 out:
1602 	if (rc >= 0)
1603 		lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1604 				     aa->aa_oa, &body->oa);
1605 
1606 	return rc;
1607 }
1608 
osc_brw_redo_request(struct ptlrpc_request * request,struct osc_brw_async_args * aa,int rc)1609 static int osc_brw_redo_request(struct ptlrpc_request *request,
1610 				struct osc_brw_async_args *aa, int rc)
1611 {
1612 	struct ptlrpc_request *new_req;
1613 	struct osc_brw_async_args *new_aa;
1614 	struct osc_async_page *oap;
1615 
1616 	DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1617 		  "redo for recoverable error %d", rc);
1618 
1619 	rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1620 					OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1621 				  aa->aa_cli, aa->aa_oa,
1622 				  NULL /* lsm unused by osc currently */,
1623 				  aa->aa_page_count, aa->aa_ppga,
1624 				  &new_req, 0, 1);
1625 	if (rc)
1626 		return rc;
1627 
1628 	list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1629 		if (oap->oap_request != NULL) {
1630 			LASSERTF(request == oap->oap_request,
1631 				 "request %p != oap_request %p\n",
1632 				 request, oap->oap_request);
1633 			if (oap->oap_interrupted) {
1634 				ptlrpc_req_finished(new_req);
1635 				return -EINTR;
1636 			}
1637 		}
1638 	}
1639 	/* New request takes over pga and oaps from old request.
1640 	 * Note that copying a list_head doesn't work, need to move it... */
1641 	aa->aa_resends++;
1642 	new_req->rq_interpret_reply = request->rq_interpret_reply;
1643 	new_req->rq_async_args = request->rq_async_args;
1644 	/* cap resend delay to the current request timeout, this is similar to
1645 	 * what ptlrpc does (see after_reply()) */
1646 	if (aa->aa_resends > new_req->rq_timeout)
1647 		new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1648 	else
1649 		new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1650 	new_req->rq_generation_set = 1;
1651 	new_req->rq_import_generation = request->rq_import_generation;
1652 
1653 	new_aa = ptlrpc_req_async_args(new_req);
1654 
1655 	INIT_LIST_HEAD(&new_aa->aa_oaps);
1656 	list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1657 	INIT_LIST_HEAD(&new_aa->aa_exts);
1658 	list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1659 	new_aa->aa_resends = aa->aa_resends;
1660 
1661 	list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1662 		if (oap->oap_request) {
1663 			ptlrpc_req_finished(oap->oap_request);
1664 			oap->oap_request = ptlrpc_request_addref(new_req);
1665 		}
1666 	}
1667 
1668 	/* XXX: This code will run into problem if we're going to support
1669 	 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1670 	 * and wait for all of them to be finished. We should inherit request
1671 	 * set from old request. */
1672 	ptlrpcd_add_req(new_req);
1673 
1674 	DEBUG_REQ(D_INFO, new_req, "new request");
1675 	return 0;
1676 }
1677 
1678 /*
1679  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1680  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1681  * fine for our small page arrays and doesn't require allocation.  its an
1682  * insertion sort that swaps elements that are strides apart, shrinking the
1683  * stride down until its '1' and the array is sorted.
1684  */
sort_brw_pages(struct brw_page ** array,int num)1685 static void sort_brw_pages(struct brw_page **array, int num)
1686 {
1687 	int stride, i, j;
1688 	struct brw_page *tmp;
1689 
1690 	if (num == 1)
1691 		return;
1692 	for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1693 		;
1694 
1695 	do {
1696 		stride /= 3;
1697 		for (i = stride ; i < num ; i++) {
1698 			tmp = array[i];
1699 			j = i;
1700 			while (j >= stride && array[j - stride]->off > tmp->off) {
1701 				array[j] = array[j - stride];
1702 				j -= stride;
1703 			}
1704 			array[j] = tmp;
1705 		}
1706 	} while (stride > 1);
1707 }
1708 
osc_release_ppga(struct brw_page ** ppga,u32 count)1709 static void osc_release_ppga(struct brw_page **ppga, u32 count)
1710 {
1711 	LASSERT(ppga != NULL);
1712 	kfree(ppga);
1713 }
1714 
brw_interpret(const struct lu_env * env,struct ptlrpc_request * req,void * data,int rc)1715 static int brw_interpret(const struct lu_env *env,
1716 			 struct ptlrpc_request *req, void *data, int rc)
1717 {
1718 	struct osc_brw_async_args *aa = data;
1719 	struct osc_extent *ext;
1720 	struct osc_extent *tmp;
1721 	struct cl_object *obj = NULL;
1722 	struct client_obd *cli = aa->aa_cli;
1723 
1724 	rc = osc_brw_fini_request(req, rc);
1725 	CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1726 	/* When server return -EINPROGRESS, client should always retry
1727 	 * regardless of the number of times the bulk was resent already. */
1728 	if (osc_recoverable_error(rc)) {
1729 		if (req->rq_import_generation !=
1730 		    req->rq_import->imp_generation) {
1731 			CDEBUG(D_HA, "%s: resend cross eviction for object: " DOSTID ", rc = %d.\n",
1732 			       req->rq_import->imp_obd->obd_name,
1733 			       POSTID(&aa->aa_oa->o_oi), rc);
1734 		} else if (rc == -EINPROGRESS ||
1735 		    client_should_resend(aa->aa_resends, aa->aa_cli)) {
1736 			rc = osc_brw_redo_request(req, aa, rc);
1737 		} else {
1738 			CERROR("%s: too many resent retries for object: %llu:%llu, rc = %d.\n",
1739 			       req->rq_import->imp_obd->obd_name,
1740 			       POSTID(&aa->aa_oa->o_oi), rc);
1741 		}
1742 
1743 		if (rc == 0)
1744 			return 0;
1745 		else if (rc == -EAGAIN || rc == -EINPROGRESS)
1746 			rc = -EIO;
1747 	}
1748 
1749 	list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1750 		if (obj == NULL && rc == 0) {
1751 			obj = osc2cl(ext->oe_obj);
1752 			cl_object_get(obj);
1753 		}
1754 
1755 		list_del_init(&ext->oe_link);
1756 		osc_extent_finish(env, ext, 1, rc);
1757 	}
1758 	LASSERT(list_empty(&aa->aa_exts));
1759 	LASSERT(list_empty(&aa->aa_oaps));
1760 
1761 	if (obj != NULL) {
1762 		struct obdo *oa = aa->aa_oa;
1763 		struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
1764 		unsigned long valid = 0;
1765 
1766 		LASSERT(rc == 0);
1767 		if (oa->o_valid & OBD_MD_FLBLOCKS) {
1768 			attr->cat_blocks = oa->o_blocks;
1769 			valid |= CAT_BLOCKS;
1770 		}
1771 		if (oa->o_valid & OBD_MD_FLMTIME) {
1772 			attr->cat_mtime = oa->o_mtime;
1773 			valid |= CAT_MTIME;
1774 		}
1775 		if (oa->o_valid & OBD_MD_FLATIME) {
1776 			attr->cat_atime = oa->o_atime;
1777 			valid |= CAT_ATIME;
1778 		}
1779 		if (oa->o_valid & OBD_MD_FLCTIME) {
1780 			attr->cat_ctime = oa->o_ctime;
1781 			valid |= CAT_CTIME;
1782 		}
1783 		if (valid != 0) {
1784 			cl_object_attr_lock(obj);
1785 			cl_object_attr_set(env, obj, attr, valid);
1786 			cl_object_attr_unlock(obj);
1787 		}
1788 		cl_object_put(env, obj);
1789 	}
1790 	kmem_cache_free(obdo_cachep, aa->aa_oa);
1791 
1792 	cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1793 			  req->rq_bulk->bd_nob_transferred);
1794 	osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1795 	ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1796 
1797 	client_obd_list_lock(&cli->cl_loi_list_lock);
1798 	/* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1799 	 * is called so we know whether to go to sync BRWs or wait for more
1800 	 * RPCs to complete */
1801 	if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1802 		cli->cl_w_in_flight--;
1803 	else
1804 		cli->cl_r_in_flight--;
1805 	osc_wake_cache_waiters(cli);
1806 	client_obd_list_unlock(&cli->cl_loi_list_lock);
1807 
1808 	osc_io_unplug(env, cli, NULL);
1809 	return rc;
1810 }
1811 
1812 /**
1813  * Build an RPC by the list of extent @ext_list. The caller must ensure
1814  * that the total pages in this list are NOT over max pages per RPC.
1815  * Extents in the list must be in OES_RPC state.
1816  */
osc_build_rpc(const struct lu_env * env,struct client_obd * cli,struct list_head * ext_list,int cmd)1817 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1818 		  struct list_head *ext_list, int cmd)
1819 {
1820 	struct ptlrpc_request *req = NULL;
1821 	struct osc_extent *ext;
1822 	struct brw_page **pga = NULL;
1823 	struct osc_brw_async_args *aa = NULL;
1824 	struct obdo *oa = NULL;
1825 	struct osc_async_page *oap;
1826 	struct osc_async_page *tmp;
1827 	struct cl_req *clerq = NULL;
1828 	enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1829 	struct ldlm_lock *lock = NULL;
1830 	struct cl_req_attr *crattr = NULL;
1831 	u64 starting_offset = OBD_OBJECT_EOF;
1832 	u64 ending_offset = 0;
1833 	int mpflag = 0;
1834 	int mem_tight = 0;
1835 	int page_count = 0;
1836 	int i;
1837 	int rc;
1838 	struct ost_body *body;
1839 	LIST_HEAD(rpc_list);
1840 
1841 	LASSERT(!list_empty(ext_list));
1842 
1843 	/* add pages into rpc_list to build BRW rpc */
1844 	list_for_each_entry(ext, ext_list, oe_link) {
1845 		LASSERT(ext->oe_state == OES_RPC);
1846 		mem_tight |= ext->oe_memalloc;
1847 		list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1848 			++page_count;
1849 			list_add_tail(&oap->oap_rpc_item, &rpc_list);
1850 			if (starting_offset > oap->oap_obj_off)
1851 				starting_offset = oap->oap_obj_off;
1852 			else
1853 				LASSERT(oap->oap_page_off == 0);
1854 			if (ending_offset < oap->oap_obj_off + oap->oap_count)
1855 				ending_offset = oap->oap_obj_off +
1856 						oap->oap_count;
1857 			else
1858 				LASSERT(oap->oap_page_off + oap->oap_count ==
1859 					PAGE_CACHE_SIZE);
1860 		}
1861 	}
1862 
1863 	if (mem_tight)
1864 		mpflag = cfs_memory_pressure_get_and_set();
1865 
1866 	crattr = kzalloc(sizeof(*crattr), GFP_NOFS);
1867 	if (!crattr) {
1868 		rc = -ENOMEM;
1869 		goto out;
1870 	}
1871 
1872 	pga = kcalloc(page_count, sizeof(*pga), GFP_NOFS);
1873 	if (pga == NULL) {
1874 		rc = -ENOMEM;
1875 		goto out;
1876 	}
1877 
1878 	oa = kmem_cache_alloc(obdo_cachep, GFP_NOFS | __GFP_ZERO);
1879 	if (oa == NULL) {
1880 		rc = -ENOMEM;
1881 		goto out;
1882 	}
1883 
1884 	i = 0;
1885 	list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1886 		struct cl_page *page = oap2cl_page(oap);
1887 
1888 		if (clerq == NULL) {
1889 			clerq = cl_req_alloc(env, page, crt,
1890 					     1 /* only 1-object rpcs for now */);
1891 			if (IS_ERR(clerq)) {
1892 				rc = PTR_ERR(clerq);
1893 				goto out;
1894 			}
1895 			lock = oap->oap_ldlm_lock;
1896 		}
1897 		if (mem_tight)
1898 			oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1899 		pga[i] = &oap->oap_brw_page;
1900 		pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1901 		CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1902 		       pga[i]->pg, page_index(oap->oap_page), oap,
1903 		       pga[i]->flag);
1904 		i++;
1905 		cl_req_page_add(env, clerq, page);
1906 	}
1907 
1908 	/* always get the data for the obdo for the rpc */
1909 	LASSERT(clerq != NULL);
1910 	crattr->cra_oa = oa;
1911 	cl_req_attr_set(env, clerq, crattr, ~0ULL);
1912 	if (lock) {
1913 		oa->o_handle = lock->l_remote_handle;
1914 		oa->o_valid |= OBD_MD_FLHANDLE;
1915 	}
1916 
1917 	rc = cl_req_prep(env, clerq);
1918 	if (rc != 0) {
1919 		CERROR("cl_req_prep failed: %d\n", rc);
1920 		goto out;
1921 	}
1922 
1923 	sort_brw_pages(pga, page_count);
1924 	rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1925 			pga, &req, 1, 0);
1926 	if (rc != 0) {
1927 		CERROR("prep_req failed: %d\n", rc);
1928 		goto out;
1929 	}
1930 
1931 	req->rq_interpret_reply = brw_interpret;
1932 
1933 	if (mem_tight != 0)
1934 		req->rq_memalloc = 1;
1935 
1936 	/* Need to update the timestamps after the request is built in case
1937 	 * we race with setattr (locally or in queue at OST).  If OST gets
1938 	 * later setattr before earlier BRW (as determined by the request xid),
1939 	 * the OST will not use BRW timestamps.  Sadly, there is no obvious
1940 	 * way to do this in a single call.  bug 10150 */
1941 	body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1942 	crattr->cra_oa = &body->oa;
1943 	cl_req_attr_set(env, clerq, crattr,
1944 			OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1945 
1946 	lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1947 
1948 	CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1949 	aa = ptlrpc_req_async_args(req);
1950 	INIT_LIST_HEAD(&aa->aa_oaps);
1951 	list_splice_init(&rpc_list, &aa->aa_oaps);
1952 	INIT_LIST_HEAD(&aa->aa_exts);
1953 	list_splice_init(ext_list, &aa->aa_exts);
1954 	aa->aa_clerq = clerq;
1955 
1956 	/* queued sync pages can be torn down while the pages
1957 	 * were between the pending list and the rpc */
1958 	tmp = NULL;
1959 	list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1960 		/* only one oap gets a request reference */
1961 		if (tmp == NULL)
1962 			tmp = oap;
1963 		if (oap->oap_interrupted && !req->rq_intr) {
1964 			CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1965 					oap, req);
1966 			ptlrpc_mark_interrupted(req);
1967 		}
1968 	}
1969 	if (tmp != NULL)
1970 		tmp->oap_request = ptlrpc_request_addref(req);
1971 
1972 	client_obd_list_lock(&cli->cl_loi_list_lock);
1973 	starting_offset >>= PAGE_CACHE_SHIFT;
1974 	if (cmd == OBD_BRW_READ) {
1975 		cli->cl_r_in_flight++;
1976 		lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1977 		lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1978 		lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1979 				      starting_offset + 1);
1980 	} else {
1981 		cli->cl_w_in_flight++;
1982 		lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1983 		lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1984 		lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1985 				      starting_offset + 1);
1986 	}
1987 	client_obd_list_unlock(&cli->cl_loi_list_lock);
1988 
1989 	DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
1990 		  page_count, aa, cli->cl_r_in_flight,
1991 		  cli->cl_w_in_flight);
1992 
1993 	ptlrpcd_add_req(req);
1994 	rc = 0;
1995 
1996 out:
1997 	if (mem_tight != 0)
1998 		cfs_memory_pressure_restore(mpflag);
1999 
2000 	kfree(crattr);
2001 
2002 	if (rc != 0) {
2003 		LASSERT(req == NULL);
2004 
2005 		if (oa)
2006 			kmem_cache_free(obdo_cachep, oa);
2007 		kfree(pga);
2008 		/* this should happen rarely and is pretty bad, it makes the
2009 		 * pending list not follow the dirty order */
2010 		while (!list_empty(ext_list)) {
2011 			ext = list_entry(ext_list->next, struct osc_extent,
2012 					     oe_link);
2013 			list_del_init(&ext->oe_link);
2014 			osc_extent_finish(env, ext, 0, rc);
2015 		}
2016 		if (clerq && !IS_ERR(clerq))
2017 			cl_req_completion(env, clerq, rc);
2018 	}
2019 	return rc;
2020 }
2021 
osc_set_lock_data_with_check(struct ldlm_lock * lock,struct ldlm_enqueue_info * einfo)2022 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2023 					struct ldlm_enqueue_info *einfo)
2024 {
2025 	void *data = einfo->ei_cbdata;
2026 	int set = 0;
2027 
2028 	LASSERT(lock != NULL);
2029 	LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2030 	LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2031 	LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2032 	LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2033 
2034 	lock_res_and_lock(lock);
2035 	spin_lock(&osc_ast_guard);
2036 
2037 	if (lock->l_ast_data == NULL)
2038 		lock->l_ast_data = data;
2039 	if (lock->l_ast_data == data)
2040 		set = 1;
2041 
2042 	spin_unlock(&osc_ast_guard);
2043 	unlock_res_and_lock(lock);
2044 
2045 	return set;
2046 }
2047 
osc_set_data_with_check(struct lustre_handle * lockh,struct ldlm_enqueue_info * einfo)2048 static int osc_set_data_with_check(struct lustre_handle *lockh,
2049 				   struct ldlm_enqueue_info *einfo)
2050 {
2051 	struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2052 	int set = 0;
2053 
2054 	if (lock != NULL) {
2055 		set = osc_set_lock_data_with_check(lock, einfo);
2056 		LDLM_LOCK_PUT(lock);
2057 	} else
2058 		CERROR("lockh %p, data %p - client evicted?\n",
2059 		       lockh, einfo->ei_cbdata);
2060 	return set;
2061 }
2062 
2063 /* find any ldlm lock of the inode in osc
2064  * return 0    not find
2065  *	1    find one
2066  *      < 0    error */
osc_find_cbdata(struct obd_export * exp,struct lov_stripe_md * lsm,ldlm_iterator_t replace,void * data)2067 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2068 			   ldlm_iterator_t replace, void *data)
2069 {
2070 	struct ldlm_res_id res_id;
2071 	struct obd_device *obd = class_exp2obd(exp);
2072 	int rc = 0;
2073 
2074 	ostid_build_res_name(&lsm->lsm_oi, &res_id);
2075 	rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2076 	if (rc == LDLM_ITER_STOP)
2077 		return 1;
2078 	if (rc == LDLM_ITER_CONTINUE)
2079 		return 0;
2080 	return rc;
2081 }
2082 
osc_enqueue_fini(struct ptlrpc_request * req,struct ost_lvb * lvb,obd_enqueue_update_f upcall,void * cookie,__u64 * flags,int agl,int rc)2083 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2084 			    obd_enqueue_update_f upcall, void *cookie,
2085 			    __u64 *flags, int agl, int rc)
2086 {
2087 	int intent = *flags & LDLM_FL_HAS_INTENT;
2088 
2089 	if (intent) {
2090 		/* The request was created before ldlm_cli_enqueue call. */
2091 		if (rc == ELDLM_LOCK_ABORTED) {
2092 			struct ldlm_reply *rep;
2093 
2094 			rep = req_capsule_server_get(&req->rq_pill,
2095 						     &RMF_DLM_REP);
2096 
2097 			LASSERT(rep != NULL);
2098 			rep->lock_policy_res1 =
2099 				ptlrpc_status_ntoh(rep->lock_policy_res1);
2100 			if (rep->lock_policy_res1)
2101 				rc = rep->lock_policy_res1;
2102 		}
2103 	}
2104 
2105 	if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2106 	    (rc == 0)) {
2107 		*flags |= LDLM_FL_LVB_READY;
2108 		CDEBUG(D_INODE, "got kms %llu blocks %llu mtime %llu\n",
2109 		       lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2110 	}
2111 
2112 	/* Call the update callback. */
2113 	rc = (*upcall)(cookie, rc);
2114 	return rc;
2115 }
2116 
osc_enqueue_interpret(const struct lu_env * env,struct ptlrpc_request * req,struct osc_enqueue_args * aa,int rc)2117 static int osc_enqueue_interpret(const struct lu_env *env,
2118 				 struct ptlrpc_request *req,
2119 				 struct osc_enqueue_args *aa, int rc)
2120 {
2121 	struct ldlm_lock *lock;
2122 	struct lustre_handle handle;
2123 	__u32 mode;
2124 	struct ost_lvb *lvb;
2125 	__u32 lvb_len;
2126 	__u64 *flags = aa->oa_flags;
2127 
2128 	/* Make a local copy of a lock handle and a mode, because aa->oa_*
2129 	 * might be freed anytime after lock upcall has been called. */
2130 	lustre_handle_copy(&handle, aa->oa_lockh);
2131 	mode = aa->oa_ei->ei_mode;
2132 
2133 	/* ldlm_cli_enqueue is holding a reference on the lock, so it must
2134 	 * be valid. */
2135 	lock = ldlm_handle2lock(&handle);
2136 
2137 	/* Take an additional reference so that a blocking AST that
2138 	 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2139 	 * to arrive after an upcall has been executed by
2140 	 * osc_enqueue_fini(). */
2141 	ldlm_lock_addref(&handle, mode);
2142 
2143 	/* Let CP AST to grant the lock first. */
2144 	OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2145 
2146 	if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2147 		lvb = NULL;
2148 		lvb_len = 0;
2149 	} else {
2150 		lvb = aa->oa_lvb;
2151 		lvb_len = sizeof(*aa->oa_lvb);
2152 	}
2153 
2154 	/* Complete obtaining the lock procedure. */
2155 	rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2156 				   mode, flags, lvb, lvb_len, &handle, rc);
2157 	/* Complete osc stuff. */
2158 	rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2159 			      flags, aa->oa_agl, rc);
2160 
2161 	OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2162 
2163 	/* Release the lock for async request. */
2164 	if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2165 		/*
2166 		 * Releases a reference taken by ldlm_cli_enqueue(), if it is
2167 		 * not already released by
2168 		 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2169 		 */
2170 		ldlm_lock_decref(&handle, mode);
2171 
2172 	LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2173 		 aa->oa_lockh, req, aa);
2174 	ldlm_lock_decref(&handle, mode);
2175 	LDLM_LOCK_PUT(lock);
2176 	return rc;
2177 }
2178 
2179 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2180 
2181 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2182  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2183  * other synchronous requests, however keeping some locks and trying to obtain
2184  * others may take a considerable amount of time in a case of ost failure; and
2185  * when other sync requests do not get released lock from a client, the client
2186  * is excluded from the cluster -- such scenarious make the life difficult, so
2187  * release locks just after they are obtained. */
osc_enqueue_base(struct obd_export * exp,struct ldlm_res_id * res_id,__u64 * flags,ldlm_policy_data_t * policy,struct ost_lvb * lvb,int kms_valid,obd_enqueue_update_f upcall,void * cookie,struct ldlm_enqueue_info * einfo,struct lustre_handle * lockh,struct ptlrpc_request_set * rqset,int async,int agl)2188 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2189 		     __u64 *flags, ldlm_policy_data_t *policy,
2190 		     struct ost_lvb *lvb, int kms_valid,
2191 		     obd_enqueue_update_f upcall, void *cookie,
2192 		     struct ldlm_enqueue_info *einfo,
2193 		     struct lustre_handle *lockh,
2194 		     struct ptlrpc_request_set *rqset, int async, int agl)
2195 {
2196 	struct obd_device *obd = exp->exp_obd;
2197 	struct ptlrpc_request *req = NULL;
2198 	int intent = *flags & LDLM_FL_HAS_INTENT;
2199 	__u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2200 	ldlm_mode_t mode;
2201 	int rc;
2202 
2203 	/* Filesystem lock extents are extended to page boundaries so that
2204 	 * dealing with the page cache is a little smoother.  */
2205 	policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2206 	policy->l_extent.end |= ~CFS_PAGE_MASK;
2207 
2208 	/*
2209 	 * kms is not valid when either object is completely fresh (so that no
2210 	 * locks are cached), or object was evicted. In the latter case cached
2211 	 * lock cannot be used, because it would prime inode state with
2212 	 * potentially stale LVB.
2213 	 */
2214 	if (!kms_valid)
2215 		goto no_match;
2216 
2217 	/* Next, search for already existing extent locks that will cover us */
2218 	/* If we're trying to read, we also search for an existing PW lock.  The
2219 	 * VFS and page cache already protect us locally, so lots of readers/
2220 	 * writers can share a single PW lock.
2221 	 *
2222 	 * There are problems with conversion deadlocks, so instead of
2223 	 * converting a read lock to a write lock, we'll just enqueue a new
2224 	 * one.
2225 	 *
2226 	 * At some point we should cancel the read lock instead of making them
2227 	 * send us a blocking callback, but there are problems with canceling
2228 	 * locks out from other users right now, too. */
2229 	mode = einfo->ei_mode;
2230 	if (einfo->ei_mode == LCK_PR)
2231 		mode |= LCK_PW;
2232 	mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2233 			       einfo->ei_type, policy, mode, lockh, 0);
2234 	if (mode) {
2235 		struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2236 
2237 		if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2238 			/* For AGL, if enqueue RPC is sent but the lock is not
2239 			 * granted, then skip to process this strpe.
2240 			 * Return -ECANCELED to tell the caller. */
2241 			ldlm_lock_decref(lockh, mode);
2242 			LDLM_LOCK_PUT(matched);
2243 			return -ECANCELED;
2244 		}
2245 
2246 		if (osc_set_lock_data_with_check(matched, einfo)) {
2247 			*flags |= LDLM_FL_LVB_READY;
2248 			/* addref the lock only if not async requests and PW
2249 			 * lock is matched whereas we asked for PR. */
2250 			if (!rqset && einfo->ei_mode != mode)
2251 				ldlm_lock_addref(lockh, LCK_PR);
2252 			if (intent) {
2253 				/* I would like to be able to ASSERT here that
2254 				 * rss <= kms, but I can't, for reasons which
2255 				 * are explained in lov_enqueue() */
2256 			}
2257 
2258 			/* We already have a lock, and it's referenced.
2259 			 *
2260 			 * At this point, the cl_lock::cll_state is CLS_QUEUING,
2261 			 * AGL upcall may change it to CLS_HELD directly. */
2262 			(*upcall)(cookie, ELDLM_OK);
2263 
2264 			if (einfo->ei_mode != mode)
2265 				ldlm_lock_decref(lockh, LCK_PW);
2266 			else if (rqset)
2267 				/* For async requests, decref the lock. */
2268 				ldlm_lock_decref(lockh, einfo->ei_mode);
2269 			LDLM_LOCK_PUT(matched);
2270 			return ELDLM_OK;
2271 		}
2272 
2273 		ldlm_lock_decref(lockh, mode);
2274 		LDLM_LOCK_PUT(matched);
2275 	}
2276 
2277  no_match:
2278 	if (intent) {
2279 		LIST_HEAD(cancels);
2280 
2281 		req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2282 					   &RQF_LDLM_ENQUEUE_LVB);
2283 		if (req == NULL)
2284 			return -ENOMEM;
2285 
2286 		rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2287 		if (rc) {
2288 			ptlrpc_request_free(req);
2289 			return rc;
2290 		}
2291 
2292 		req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2293 				     sizeof(*lvb));
2294 		ptlrpc_request_set_replen(req);
2295 	}
2296 
2297 	/* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2298 	*flags &= ~LDLM_FL_BLOCK_GRANTED;
2299 
2300 	rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2301 			      sizeof(*lvb), LVB_T_OST, lockh, async);
2302 	if (rqset) {
2303 		if (!rc) {
2304 			struct osc_enqueue_args *aa;
2305 
2306 			CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2307 			aa = ptlrpc_req_async_args(req);
2308 			aa->oa_ei = einfo;
2309 			aa->oa_exp = exp;
2310 			aa->oa_flags  = flags;
2311 			aa->oa_upcall = upcall;
2312 			aa->oa_cookie = cookie;
2313 			aa->oa_lvb    = lvb;
2314 			aa->oa_lockh  = lockh;
2315 			aa->oa_agl    = !!agl;
2316 
2317 			req->rq_interpret_reply =
2318 				(ptlrpc_interpterer_t)osc_enqueue_interpret;
2319 			if (rqset == PTLRPCD_SET)
2320 				ptlrpcd_add_req(req);
2321 			else
2322 				ptlrpc_set_add_req(rqset, req);
2323 		} else if (intent) {
2324 			ptlrpc_req_finished(req);
2325 		}
2326 		return rc;
2327 	}
2328 
2329 	rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2330 	if (intent)
2331 		ptlrpc_req_finished(req);
2332 
2333 	return rc;
2334 }
2335 
osc_match_base(struct obd_export * exp,struct ldlm_res_id * res_id,__u32 type,ldlm_policy_data_t * policy,__u32 mode,__u64 * flags,void * data,struct lustre_handle * lockh,int unref)2336 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2337 		   __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2338 		   __u64 *flags, void *data, struct lustre_handle *lockh,
2339 		   int unref)
2340 {
2341 	struct obd_device *obd = exp->exp_obd;
2342 	__u64 lflags = *flags;
2343 	ldlm_mode_t rc;
2344 
2345 	if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2346 		return -EIO;
2347 
2348 	/* Filesystem lock extents are extended to page boundaries so that
2349 	 * dealing with the page cache is a little smoother */
2350 	policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2351 	policy->l_extent.end |= ~CFS_PAGE_MASK;
2352 
2353 	/* Next, search for already existing extent locks that will cover us */
2354 	/* If we're trying to read, we also search for an existing PW lock.  The
2355 	 * VFS and page cache already protect us locally, so lots of readers/
2356 	 * writers can share a single PW lock. */
2357 	rc = mode;
2358 	if (mode == LCK_PR)
2359 		rc |= LCK_PW;
2360 	rc = ldlm_lock_match(obd->obd_namespace, lflags,
2361 			     res_id, type, policy, rc, lockh, unref);
2362 	if (rc) {
2363 		if (data != NULL) {
2364 			if (!osc_set_data_with_check(lockh, data)) {
2365 				if (!(lflags & LDLM_FL_TEST_LOCK))
2366 					ldlm_lock_decref(lockh, rc);
2367 				return 0;
2368 			}
2369 		}
2370 		if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2371 			ldlm_lock_addref(lockh, LCK_PR);
2372 			ldlm_lock_decref(lockh, LCK_PW);
2373 		}
2374 		return rc;
2375 	}
2376 	return rc;
2377 }
2378 
osc_cancel_base(struct lustre_handle * lockh,__u32 mode)2379 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2380 {
2381 	if (unlikely(mode == LCK_GROUP))
2382 		ldlm_lock_decref_and_cancel(lockh, mode);
2383 	else
2384 		ldlm_lock_decref(lockh, mode);
2385 
2386 	return 0;
2387 }
2388 
osc_statfs_interpret(const struct lu_env * env,struct ptlrpc_request * req,struct osc_async_args * aa,int rc)2389 static int osc_statfs_interpret(const struct lu_env *env,
2390 				struct ptlrpc_request *req,
2391 				struct osc_async_args *aa, int rc)
2392 {
2393 	struct obd_statfs *msfs;
2394 
2395 	if (rc == -EBADR)
2396 		/* The request has in fact never been sent
2397 		 * due to issues at a higher level (LOV).
2398 		 * Exit immediately since the caller is
2399 		 * aware of the problem and takes care
2400 		 * of the clean up */
2401 		 return rc;
2402 
2403 	if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2404 	    (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY)) {
2405 		rc = 0;
2406 		goto out;
2407 	}
2408 
2409 	if (rc != 0)
2410 		goto out;
2411 
2412 	msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2413 	if (msfs == NULL) {
2414 		rc = -EPROTO;
2415 		goto out;
2416 	}
2417 
2418 	*aa->aa_oi->oi_osfs = *msfs;
2419 out:
2420 	rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2421 	return rc;
2422 }
2423 
osc_statfs_async(struct obd_export * exp,struct obd_info * oinfo,__u64 max_age,struct ptlrpc_request_set * rqset)2424 static int osc_statfs_async(struct obd_export *exp,
2425 			    struct obd_info *oinfo, __u64 max_age,
2426 			    struct ptlrpc_request_set *rqset)
2427 {
2428 	struct obd_device *obd = class_exp2obd(exp);
2429 	struct ptlrpc_request *req;
2430 	struct osc_async_args *aa;
2431 	int rc;
2432 
2433 	/* We could possibly pass max_age in the request (as an absolute
2434 	 * timestamp or a "seconds.usec ago") so the target can avoid doing
2435 	 * extra calls into the filesystem if that isn't necessary (e.g.
2436 	 * during mount that would help a bit).  Having relative timestamps
2437 	 * is not so great if request processing is slow, while absolute
2438 	 * timestamps are not ideal because they need time synchronization. */
2439 	req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2440 	if (req == NULL)
2441 		return -ENOMEM;
2442 
2443 	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2444 	if (rc) {
2445 		ptlrpc_request_free(req);
2446 		return rc;
2447 	}
2448 	ptlrpc_request_set_replen(req);
2449 	req->rq_request_portal = OST_CREATE_PORTAL;
2450 	ptlrpc_at_set_req_timeout(req);
2451 
2452 	if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2453 		/* procfs requests not want stat in wait for avoid deadlock */
2454 		req->rq_no_resend = 1;
2455 		req->rq_no_delay = 1;
2456 	}
2457 
2458 	req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2459 	CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2460 	aa = ptlrpc_req_async_args(req);
2461 	aa->aa_oi = oinfo;
2462 
2463 	ptlrpc_set_add_req(rqset, req);
2464 	return 0;
2465 }
2466 
osc_statfs(const struct lu_env * env,struct obd_export * exp,struct obd_statfs * osfs,__u64 max_age,__u32 flags)2467 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2468 		      struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2469 {
2470 	struct obd_device *obd = class_exp2obd(exp);
2471 	struct obd_statfs *msfs;
2472 	struct ptlrpc_request *req;
2473 	struct obd_import *imp = NULL;
2474 	int rc;
2475 
2476 	/*Since the request might also come from lprocfs, so we need
2477 	 *sync this with client_disconnect_export Bug15684*/
2478 	down_read(&obd->u.cli.cl_sem);
2479 	if (obd->u.cli.cl_import)
2480 		imp = class_import_get(obd->u.cli.cl_import);
2481 	up_read(&obd->u.cli.cl_sem);
2482 	if (!imp)
2483 		return -ENODEV;
2484 
2485 	/* We could possibly pass max_age in the request (as an absolute
2486 	 * timestamp or a "seconds.usec ago") so the target can avoid doing
2487 	 * extra calls into the filesystem if that isn't necessary (e.g.
2488 	 * during mount that would help a bit).  Having relative timestamps
2489 	 * is not so great if request processing is slow, while absolute
2490 	 * timestamps are not ideal because they need time synchronization. */
2491 	req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2492 
2493 	class_import_put(imp);
2494 
2495 	if (req == NULL)
2496 		return -ENOMEM;
2497 
2498 	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2499 	if (rc) {
2500 		ptlrpc_request_free(req);
2501 		return rc;
2502 	}
2503 	ptlrpc_request_set_replen(req);
2504 	req->rq_request_portal = OST_CREATE_PORTAL;
2505 	ptlrpc_at_set_req_timeout(req);
2506 
2507 	if (flags & OBD_STATFS_NODELAY) {
2508 		/* procfs requests not want stat in wait for avoid deadlock */
2509 		req->rq_no_resend = 1;
2510 		req->rq_no_delay = 1;
2511 	}
2512 
2513 	rc = ptlrpc_queue_wait(req);
2514 	if (rc)
2515 		goto out;
2516 
2517 	msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2518 	if (msfs == NULL) {
2519 		rc = -EPROTO;
2520 		goto out;
2521 	}
2522 
2523 	*osfs = *msfs;
2524 
2525  out:
2526 	ptlrpc_req_finished(req);
2527 	return rc;
2528 }
2529 
2530 /* Retrieve object striping information.
2531  *
2532  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2533  * the maximum number of OST indices which will fit in the user buffer.
2534  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2535  */
osc_getstripe(struct lov_stripe_md * lsm,struct lov_user_md * lump)2536 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2537 {
2538 	/* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2539 	struct lov_user_md_v3 lum, *lumk;
2540 	struct lov_user_ost_data_v1 *lmm_objects;
2541 	int rc = 0, lum_size;
2542 
2543 	if (!lsm)
2544 		return -ENODATA;
2545 
2546 	/* we only need the header part from user space to get lmm_magic and
2547 	 * lmm_stripe_count, (the header part is common to v1 and v3) */
2548 	lum_size = sizeof(struct lov_user_md_v1);
2549 	if (copy_from_user(&lum, lump, lum_size))
2550 		return -EFAULT;
2551 
2552 	if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2553 	    (lum.lmm_magic != LOV_USER_MAGIC_V3))
2554 		return -EINVAL;
2555 
2556 	/* lov_user_md_vX and lov_mds_md_vX must have the same size */
2557 	LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2558 	LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2559 	LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2560 
2561 	/* we can use lov_mds_md_size() to compute lum_size
2562 	 * because lov_user_md_vX and lov_mds_md_vX have the same size */
2563 	if (lum.lmm_stripe_count > 0) {
2564 		lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2565 		lumk = kzalloc(lum_size, GFP_NOFS);
2566 		if (!lumk)
2567 			return -ENOMEM;
2568 
2569 		if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2570 			lmm_objects =
2571 			    &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2572 		else
2573 			lmm_objects = &(lumk->lmm_objects[0]);
2574 		lmm_objects->l_ost_oi = lsm->lsm_oi;
2575 	} else {
2576 		lum_size = lov_mds_md_size(0, lum.lmm_magic);
2577 		lumk = &lum;
2578 	}
2579 
2580 	lumk->lmm_oi = lsm->lsm_oi;
2581 	lumk->lmm_stripe_count = 1;
2582 
2583 	if (copy_to_user(lump, lumk, lum_size))
2584 		rc = -EFAULT;
2585 
2586 	if (lumk != &lum)
2587 		kfree(lumk);
2588 
2589 	return rc;
2590 }
2591 
osc_iocontrol(unsigned int cmd,struct obd_export * exp,int len,void * karg,void * uarg)2592 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2593 			 void *karg, void *uarg)
2594 {
2595 	struct obd_device *obd = exp->exp_obd;
2596 	struct obd_ioctl_data *data = karg;
2597 	int err = 0;
2598 
2599 	if (!try_module_get(THIS_MODULE)) {
2600 		CERROR("Can't get module. Is it alive?");
2601 		return -EINVAL;
2602 	}
2603 	switch (cmd) {
2604 	case OBD_IOC_LOV_GET_CONFIG: {
2605 		char *buf;
2606 		struct lov_desc *desc;
2607 		struct obd_uuid uuid;
2608 
2609 		buf = NULL;
2610 		len = 0;
2611 		if (obd_ioctl_getdata(&buf, &len, uarg)) {
2612 			err = -EINVAL;
2613 			goto out;
2614 		}
2615 
2616 		data = (struct obd_ioctl_data *)buf;
2617 
2618 		if (sizeof(*desc) > data->ioc_inllen1) {
2619 			obd_ioctl_freedata(buf, len);
2620 			err = -EINVAL;
2621 			goto out;
2622 		}
2623 
2624 		if (data->ioc_inllen2 < sizeof(uuid)) {
2625 			obd_ioctl_freedata(buf, len);
2626 			err = -EINVAL;
2627 			goto out;
2628 		}
2629 
2630 		desc = (struct lov_desc *)data->ioc_inlbuf1;
2631 		desc->ld_tgt_count = 1;
2632 		desc->ld_active_tgt_count = 1;
2633 		desc->ld_default_stripe_count = 1;
2634 		desc->ld_default_stripe_size = 0;
2635 		desc->ld_default_stripe_offset = 0;
2636 		desc->ld_pattern = 0;
2637 		memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2638 
2639 		memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2640 
2641 		err = copy_to_user(uarg, buf, len);
2642 		if (err)
2643 			err = -EFAULT;
2644 		obd_ioctl_freedata(buf, len);
2645 		goto out;
2646 	}
2647 	case LL_IOC_LOV_SETSTRIPE:
2648 		err = obd_alloc_memmd(exp, karg);
2649 		if (err > 0)
2650 			err = 0;
2651 		goto out;
2652 	case LL_IOC_LOV_GETSTRIPE:
2653 		err = osc_getstripe(karg, uarg);
2654 		goto out;
2655 	case OBD_IOC_CLIENT_RECOVER:
2656 		err = ptlrpc_recover_import(obd->u.cli.cl_import,
2657 					    data->ioc_inlbuf1, 0);
2658 		if (err > 0)
2659 			err = 0;
2660 		goto out;
2661 	case IOC_OSC_SET_ACTIVE:
2662 		err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2663 					       data->ioc_offset);
2664 		goto out;
2665 	case OBD_IOC_POLL_QUOTACHECK:
2666 		err = osc_quota_poll_check(exp, karg);
2667 		goto out;
2668 	case OBD_IOC_PING_TARGET:
2669 		err = ptlrpc_obd_ping(obd);
2670 		goto out;
2671 	default:
2672 		CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2673 		       cmd, current_comm());
2674 		err = -ENOTTY;
2675 		goto out;
2676 	}
2677 out:
2678 	module_put(THIS_MODULE);
2679 	return err;
2680 }
2681 
osc_get_info(const struct lu_env * env,struct obd_export * exp,u32 keylen,void * key,__u32 * vallen,void * val,struct lov_stripe_md * lsm)2682 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2683 			u32 keylen, void *key, __u32 *vallen, void *val,
2684 			struct lov_stripe_md *lsm)
2685 {
2686 	if (!vallen || !val)
2687 		return -EFAULT;
2688 
2689 	if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2690 		__u32 *stripe = val;
2691 		*vallen = sizeof(*stripe);
2692 		*stripe = 0;
2693 		return 0;
2694 	} else if (KEY_IS(KEY_LAST_ID)) {
2695 		struct ptlrpc_request *req;
2696 		u64 *reply;
2697 		char *tmp;
2698 		int rc;
2699 
2700 		req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2701 					   &RQF_OST_GET_INFO_LAST_ID);
2702 		if (req == NULL)
2703 			return -ENOMEM;
2704 
2705 		req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2706 				     RCL_CLIENT, keylen);
2707 		rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2708 		if (rc) {
2709 			ptlrpc_request_free(req);
2710 			return rc;
2711 		}
2712 
2713 		tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2714 		memcpy(tmp, key, keylen);
2715 
2716 		req->rq_no_delay = req->rq_no_resend = 1;
2717 		ptlrpc_request_set_replen(req);
2718 		rc = ptlrpc_queue_wait(req);
2719 		if (rc)
2720 			goto out;
2721 
2722 		reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
2723 		if (reply == NULL) {
2724 			rc = -EPROTO;
2725 			goto out;
2726 		}
2727 
2728 		*((u64 *)val) = *reply;
2729 	out:
2730 		ptlrpc_req_finished(req);
2731 		return rc;
2732 	} else if (KEY_IS(KEY_FIEMAP)) {
2733 		struct ll_fiemap_info_key *fm_key = key;
2734 		struct ldlm_res_id res_id;
2735 		ldlm_policy_data_t policy;
2736 		struct lustre_handle lockh;
2737 		ldlm_mode_t mode = 0;
2738 		struct ptlrpc_request *req;
2739 		struct ll_user_fiemap *reply;
2740 		char *tmp;
2741 		int rc;
2742 
2743 		if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2744 			goto skip_locking;
2745 
2746 		policy.l_extent.start = fm_key->fiemap.fm_start &
2747 						CFS_PAGE_MASK;
2748 
2749 		if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2750 		    fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2751 			policy.l_extent.end = OBD_OBJECT_EOF;
2752 		else
2753 			policy.l_extent.end = (fm_key->fiemap.fm_start +
2754 				fm_key->fiemap.fm_length +
2755 				PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2756 
2757 		ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2758 		mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2759 				       LDLM_FL_BLOCK_GRANTED |
2760 				       LDLM_FL_LVB_READY,
2761 				       &res_id, LDLM_EXTENT, &policy,
2762 				       LCK_PR | LCK_PW, &lockh, 0);
2763 		if (mode) { /* lock is cached on client */
2764 			if (mode != LCK_PR) {
2765 				ldlm_lock_addref(&lockh, LCK_PR);
2766 				ldlm_lock_decref(&lockh, LCK_PW);
2767 			}
2768 		} else { /* no cached lock, needs acquire lock on server side */
2769 			fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2770 			fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2771 		}
2772 
2773 skip_locking:
2774 		req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2775 					   &RQF_OST_GET_INFO_FIEMAP);
2776 		if (req == NULL) {
2777 			rc = -ENOMEM;
2778 			goto drop_lock;
2779 		}
2780 
2781 		req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2782 				     RCL_CLIENT, keylen);
2783 		req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2784 				     RCL_CLIENT, *vallen);
2785 		req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2786 				     RCL_SERVER, *vallen);
2787 
2788 		rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2789 		if (rc) {
2790 			ptlrpc_request_free(req);
2791 			goto drop_lock;
2792 		}
2793 
2794 		tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2795 		memcpy(tmp, key, keylen);
2796 		tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2797 		memcpy(tmp, val, *vallen);
2798 
2799 		ptlrpc_request_set_replen(req);
2800 		rc = ptlrpc_queue_wait(req);
2801 		if (rc)
2802 			goto fini_req;
2803 
2804 		reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2805 		if (reply == NULL) {
2806 			rc = -EPROTO;
2807 			goto fini_req;
2808 		}
2809 
2810 		memcpy(val, reply, *vallen);
2811 fini_req:
2812 		ptlrpc_req_finished(req);
2813 drop_lock:
2814 		if (mode)
2815 			ldlm_lock_decref(&lockh, LCK_PR);
2816 		return rc;
2817 	}
2818 
2819 	return -EINVAL;
2820 }
2821 
osc_set_info_async(const struct lu_env * env,struct obd_export * exp,u32 keylen,void * key,u32 vallen,void * val,struct ptlrpc_request_set * set)2822 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2823 			      u32 keylen, void *key, u32 vallen,
2824 			      void *val, struct ptlrpc_request_set *set)
2825 {
2826 	struct ptlrpc_request *req;
2827 	struct obd_device *obd = exp->exp_obd;
2828 	struct obd_import *imp = class_exp2cliimp(exp);
2829 	char *tmp;
2830 	int rc;
2831 
2832 	OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2833 
2834 	if (KEY_IS(KEY_CHECKSUM)) {
2835 		if (vallen != sizeof(int))
2836 			return -EINVAL;
2837 		exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2838 		return 0;
2839 	}
2840 
2841 	if (KEY_IS(KEY_SPTLRPC_CONF)) {
2842 		sptlrpc_conf_client_adapt(obd);
2843 		return 0;
2844 	}
2845 
2846 	if (KEY_IS(KEY_FLUSH_CTX)) {
2847 		sptlrpc_import_flush_my_ctx(imp);
2848 		return 0;
2849 	}
2850 
2851 	if (KEY_IS(KEY_CACHE_SET)) {
2852 		struct client_obd *cli = &obd->u.cli;
2853 
2854 		LASSERT(cli->cl_cache == NULL); /* only once */
2855 		cli->cl_cache = val;
2856 		atomic_inc(&cli->cl_cache->ccc_users);
2857 		cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2858 
2859 		/* add this osc into entity list */
2860 		LASSERT(list_empty(&cli->cl_lru_osc));
2861 		spin_lock(&cli->cl_cache->ccc_lru_lock);
2862 		list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2863 		spin_unlock(&cli->cl_cache->ccc_lru_lock);
2864 
2865 		return 0;
2866 	}
2867 
2868 	if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2869 		struct client_obd *cli = &obd->u.cli;
2870 		int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
2871 		int target = *(int *)val;
2872 
2873 		nr = osc_lru_shrink(cli, min(nr, target));
2874 		*(int *)val -= nr;
2875 		return 0;
2876 	}
2877 
2878 	if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2879 		return -EINVAL;
2880 
2881 	/* We pass all other commands directly to OST. Since nobody calls osc
2882 	   methods directly and everybody is supposed to go through LOV, we
2883 	   assume lov checked invalid values for us.
2884 	   The only recognised values so far are evict_by_nid and mds_conn.
2885 	   Even if something bad goes through, we'd get a -EINVAL from OST
2886 	   anyway. */
2887 
2888 	req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2889 						&RQF_OST_SET_GRANT_INFO :
2890 						&RQF_OBD_SET_INFO);
2891 	if (req == NULL)
2892 		return -ENOMEM;
2893 
2894 	req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2895 			     RCL_CLIENT, keylen);
2896 	if (!KEY_IS(KEY_GRANT_SHRINK))
2897 		req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2898 				     RCL_CLIENT, vallen);
2899 	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2900 	if (rc) {
2901 		ptlrpc_request_free(req);
2902 		return rc;
2903 	}
2904 
2905 	tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2906 	memcpy(tmp, key, keylen);
2907 	tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2908 							&RMF_OST_BODY :
2909 							&RMF_SETINFO_VAL);
2910 	memcpy(tmp, val, vallen);
2911 
2912 	if (KEY_IS(KEY_GRANT_SHRINK)) {
2913 		struct osc_brw_async_args *aa;
2914 		struct obdo *oa;
2915 
2916 		CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2917 		aa = ptlrpc_req_async_args(req);
2918 		oa = kmem_cache_alloc(obdo_cachep, GFP_NOFS | __GFP_ZERO);
2919 		if (!oa) {
2920 			ptlrpc_req_finished(req);
2921 			return -ENOMEM;
2922 		}
2923 		*oa = ((struct ost_body *)val)->oa;
2924 		aa->aa_oa = oa;
2925 		req->rq_interpret_reply = osc_shrink_grant_interpret;
2926 	}
2927 
2928 	ptlrpc_request_set_replen(req);
2929 	if (!KEY_IS(KEY_GRANT_SHRINK)) {
2930 		LASSERT(set != NULL);
2931 		ptlrpc_set_add_req(set, req);
2932 		ptlrpc_check_set(NULL, set);
2933 	} else {
2934 		ptlrpcd_add_req(req);
2935 	}
2936 
2937 	return 0;
2938 }
2939 
osc_reconnect(const struct lu_env * env,struct obd_export * exp,struct obd_device * obd,struct obd_uuid * cluuid,struct obd_connect_data * data,void * localdata)2940 static int osc_reconnect(const struct lu_env *env,
2941 			 struct obd_export *exp, struct obd_device *obd,
2942 			 struct obd_uuid *cluuid,
2943 			 struct obd_connect_data *data,
2944 			 void *localdata)
2945 {
2946 	struct client_obd *cli = &obd->u.cli;
2947 
2948 	if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2949 		long lost_grant;
2950 
2951 		client_obd_list_lock(&cli->cl_loi_list_lock);
2952 		data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
2953 				2 * cli_brw_size(obd);
2954 		lost_grant = cli->cl_lost_grant;
2955 		cli->cl_lost_grant = 0;
2956 		client_obd_list_unlock(&cli->cl_loi_list_lock);
2957 
2958 		CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d ocd_grant: %d, lost: %ld.\n",
2959 		       data->ocd_connect_flags,
2960 		       data->ocd_version, data->ocd_grant, lost_grant);
2961 	}
2962 
2963 	return 0;
2964 }
2965 
osc_disconnect(struct obd_export * exp)2966 static int osc_disconnect(struct obd_export *exp)
2967 {
2968 	struct obd_device *obd = class_exp2obd(exp);
2969 	int rc;
2970 
2971 	rc = client_disconnect_export(exp);
2972 	/**
2973 	 * Initially we put del_shrink_grant before disconnect_export, but it
2974 	 * causes the following problem if setup (connect) and cleanup
2975 	 * (disconnect) are tangled together.
2976 	 *      connect p1		     disconnect p2
2977 	 *   ptlrpc_connect_import
2978 	 *     ...............	       class_manual_cleanup
2979 	 *				     osc_disconnect
2980 	 *				     del_shrink_grant
2981 	 *   ptlrpc_connect_interrupt
2982 	 *     init_grant_shrink
2983 	 *   add this client to shrink list
2984 	 *				      cleanup_osc
2985 	 * Bang! pinger trigger the shrink.
2986 	 * So the osc should be disconnected from the shrink list, after we
2987 	 * are sure the import has been destroyed. BUG18662
2988 	 */
2989 	if (obd->u.cli.cl_import == NULL)
2990 		osc_del_shrink_grant(&obd->u.cli);
2991 	return rc;
2992 }
2993 
osc_import_event(struct obd_device * obd,struct obd_import * imp,enum obd_import_event event)2994 static int osc_import_event(struct obd_device *obd,
2995 			    struct obd_import *imp,
2996 			    enum obd_import_event event)
2997 {
2998 	struct client_obd *cli;
2999 	int rc = 0;
3000 
3001 	LASSERT(imp->imp_obd == obd);
3002 
3003 	switch (event) {
3004 	case IMP_EVENT_DISCON: {
3005 		cli = &obd->u.cli;
3006 		client_obd_list_lock(&cli->cl_loi_list_lock);
3007 		cli->cl_avail_grant = 0;
3008 		cli->cl_lost_grant = 0;
3009 		client_obd_list_unlock(&cli->cl_loi_list_lock);
3010 		break;
3011 	}
3012 	case IMP_EVENT_INACTIVE: {
3013 		rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3014 		break;
3015 	}
3016 	case IMP_EVENT_INVALIDATE: {
3017 		struct ldlm_namespace *ns = obd->obd_namespace;
3018 		struct lu_env *env;
3019 		int refcheck;
3020 
3021 		env = cl_env_get(&refcheck);
3022 		if (!IS_ERR(env)) {
3023 			/* Reset grants */
3024 			cli = &obd->u.cli;
3025 			/* all pages go to failing rpcs due to the invalid
3026 			 * import */
3027 			osc_io_unplug(env, cli, NULL);
3028 
3029 			ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3030 			cl_env_put(env, &refcheck);
3031 		} else
3032 			rc = PTR_ERR(env);
3033 		break;
3034 	}
3035 	case IMP_EVENT_ACTIVE: {
3036 		rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3037 		break;
3038 	}
3039 	case IMP_EVENT_OCD: {
3040 		struct obd_connect_data *ocd = &imp->imp_connect_data;
3041 
3042 		if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3043 			osc_init_grant(&obd->u.cli, ocd);
3044 
3045 		/* See bug 7198 */
3046 		if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3047 			imp->imp_client->cli_request_portal = OST_REQUEST_PORTAL;
3048 
3049 		rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3050 		break;
3051 	}
3052 	case IMP_EVENT_DEACTIVATE: {
3053 		rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3054 		break;
3055 	}
3056 	case IMP_EVENT_ACTIVATE: {
3057 		rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3058 		break;
3059 	}
3060 	default:
3061 		CERROR("Unknown import event %d\n", event);
3062 		LBUG();
3063 	}
3064 	return rc;
3065 }
3066 
3067 /**
3068  * Determine whether the lock can be canceled before replaying the lock
3069  * during recovery, see bug16774 for detailed information.
3070  *
3071  * \retval zero the lock can't be canceled
3072  * \retval other ok to cancel
3073  */
osc_cancel_for_recovery(struct ldlm_lock * lock)3074 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3075 {
3076 	check_res_locked(lock->l_resource);
3077 
3078 	/*
3079 	 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3080 	 *
3081 	 * XXX as a future improvement, we can also cancel unused write lock
3082 	 * if it doesn't have dirty data and active mmaps.
3083 	 */
3084 	if (lock->l_resource->lr_type == LDLM_EXTENT &&
3085 	    (lock->l_granted_mode == LCK_PR ||
3086 	     lock->l_granted_mode == LCK_CR) &&
3087 	    (osc_dlm_lock_pageref(lock) == 0))
3088 		return 1;
3089 
3090 	return 0;
3091 }
3092 
brw_queue_work(const struct lu_env * env,void * data)3093 static int brw_queue_work(const struct lu_env *env, void *data)
3094 {
3095 	struct client_obd *cli = data;
3096 
3097 	CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3098 
3099 	osc_io_unplug(env, cli, NULL);
3100 	return 0;
3101 }
3102 
osc_setup(struct obd_device * obd,struct lustre_cfg * lcfg)3103 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3104 {
3105 	struct lprocfs_static_vars lvars = { NULL };
3106 	struct client_obd *cli = &obd->u.cli;
3107 	void *handler;
3108 	int rc;
3109 	int adding;
3110 	int added;
3111 	int req_count;
3112 
3113 	rc = ptlrpcd_addref();
3114 	if (rc)
3115 		return rc;
3116 
3117 	rc = client_obd_setup(obd, lcfg);
3118 	if (rc)
3119 		goto out_ptlrpcd;
3120 
3121 	handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3122 	if (IS_ERR(handler)) {
3123 		rc = PTR_ERR(handler);
3124 		goto out_client_setup;
3125 	}
3126 	cli->cl_writeback_work = handler;
3127 
3128 	rc = osc_quota_setup(obd);
3129 	if (rc)
3130 		goto out_ptlrpcd_work;
3131 
3132 	cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3133 	lprocfs_osc_init_vars(&lvars);
3134 	if (lprocfs_obd_setup(obd, lvars.obd_vars, lvars.sysfs_vars) == 0) {
3135 		lproc_osc_attach_seqstat(obd);
3136 		sptlrpc_lprocfs_cliobd_attach(obd);
3137 		ptlrpc_lprocfs_register_obd(obd);
3138 	}
3139 
3140 	/*
3141 	 * We try to control the total number of requests with a upper limit
3142 	 * osc_reqpool_maxreqcount. There might be some race which will cause
3143 	 * over-limit allocation, but it is fine.
3144 	 */
3145 	req_count = atomic_read(&osc_pool_req_count);
3146 	if (req_count < osc_reqpool_maxreqcount) {
3147 		adding = cli->cl_max_rpcs_in_flight + 2;
3148 		if (req_count + adding > osc_reqpool_maxreqcount)
3149 			adding = osc_reqpool_maxreqcount - req_count;
3150 
3151 		added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3152 		atomic_add(added, &osc_pool_req_count);
3153 	}
3154 
3155 	INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3156 	ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3157 	return rc;
3158 
3159 out_ptlrpcd_work:
3160 	ptlrpcd_destroy_work(handler);
3161 out_client_setup:
3162 	client_obd_cleanup(obd);
3163 out_ptlrpcd:
3164 	ptlrpcd_decref();
3165 	return rc;
3166 }
3167 
osc_precleanup(struct obd_device * obd,enum obd_cleanup_stage stage)3168 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3169 {
3170 	switch (stage) {
3171 	case OBD_CLEANUP_EARLY: {
3172 		struct obd_import *imp;
3173 
3174 		imp = obd->u.cli.cl_import;
3175 		CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3176 		/* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3177 		ptlrpc_deactivate_import(imp);
3178 		spin_lock(&imp->imp_lock);
3179 		imp->imp_pingable = 0;
3180 		spin_unlock(&imp->imp_lock);
3181 		break;
3182 	}
3183 	case OBD_CLEANUP_EXPORTS: {
3184 		struct client_obd *cli = &obd->u.cli;
3185 		/* LU-464
3186 		 * for echo client, export may be on zombie list, wait for
3187 		 * zombie thread to cull it, because cli.cl_import will be
3188 		 * cleared in client_disconnect_export():
3189 		 *   class_export_destroy() -> obd_cleanup() ->
3190 		 *   echo_device_free() -> echo_client_cleanup() ->
3191 		 *   obd_disconnect() -> osc_disconnect() ->
3192 		 *   client_disconnect_export()
3193 		 */
3194 		obd_zombie_barrier();
3195 		if (cli->cl_writeback_work) {
3196 			ptlrpcd_destroy_work(cli->cl_writeback_work);
3197 			cli->cl_writeback_work = NULL;
3198 		}
3199 		obd_cleanup_client_import(obd);
3200 		ptlrpc_lprocfs_unregister_obd(obd);
3201 		lprocfs_obd_cleanup(obd);
3202 		break;
3203 		}
3204 	}
3205 	return 0;
3206 }
3207 
osc_cleanup(struct obd_device * obd)3208 int osc_cleanup(struct obd_device *obd)
3209 {
3210 	struct client_obd *cli = &obd->u.cli;
3211 	int rc;
3212 
3213 	/* lru cleanup */
3214 	if (cli->cl_cache != NULL) {
3215 		LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3216 		spin_lock(&cli->cl_cache->ccc_lru_lock);
3217 		list_del_init(&cli->cl_lru_osc);
3218 		spin_unlock(&cli->cl_cache->ccc_lru_lock);
3219 		cli->cl_lru_left = NULL;
3220 		atomic_dec(&cli->cl_cache->ccc_users);
3221 		cli->cl_cache = NULL;
3222 	}
3223 
3224 	/* free memory of osc quota cache */
3225 	osc_quota_cleanup(obd);
3226 
3227 	rc = client_obd_cleanup(obd);
3228 
3229 	ptlrpcd_decref();
3230 	return rc;
3231 }
3232 
osc_process_config_base(struct obd_device * obd,struct lustre_cfg * lcfg)3233 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3234 {
3235 	struct lprocfs_static_vars lvars = { NULL };
3236 	int rc = 0;
3237 
3238 	lprocfs_osc_init_vars(&lvars);
3239 
3240 	switch (lcfg->lcfg_command) {
3241 	default:
3242 		rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3243 					      lcfg, obd);
3244 		if (rc > 0)
3245 			rc = 0;
3246 		break;
3247 	}
3248 
3249 	return rc;
3250 }
3251 
osc_process_config(struct obd_device * obd,u32 len,void * buf)3252 static int osc_process_config(struct obd_device *obd, u32 len, void *buf)
3253 {
3254 	return osc_process_config_base(obd, buf);
3255 }
3256 
3257 struct obd_ops osc_obd_ops = {
3258 	.o_owner		= THIS_MODULE,
3259 	.o_setup		= osc_setup,
3260 	.o_precleanup	   = osc_precleanup,
3261 	.o_cleanup	      = osc_cleanup,
3262 	.o_add_conn	     = client_import_add_conn,
3263 	.o_del_conn	     = client_import_del_conn,
3264 	.o_connect	      = client_connect_import,
3265 	.o_reconnect	    = osc_reconnect,
3266 	.o_disconnect	   = osc_disconnect,
3267 	.o_statfs	       = osc_statfs,
3268 	.o_statfs_async	 = osc_statfs_async,
3269 	.o_packmd	       = osc_packmd,
3270 	.o_unpackmd	     = osc_unpackmd,
3271 	.o_create	       = osc_create,
3272 	.o_destroy	      = osc_destroy,
3273 	.o_getattr	      = osc_getattr,
3274 	.o_getattr_async	= osc_getattr_async,
3275 	.o_setattr	      = osc_setattr,
3276 	.o_setattr_async	= osc_setattr_async,
3277 	.o_find_cbdata	  = osc_find_cbdata,
3278 	.o_iocontrol	    = osc_iocontrol,
3279 	.o_get_info	     = osc_get_info,
3280 	.o_set_info_async       = osc_set_info_async,
3281 	.o_import_event	 = osc_import_event,
3282 	.o_process_config       = osc_process_config,
3283 	.o_quotactl	     = osc_quotactl,
3284 	.o_quotacheck	   = osc_quotacheck,
3285 };
3286 
3287 extern struct lu_kmem_descr osc_caches[];
3288 extern spinlock_t osc_ast_guard;
3289 extern struct lock_class_key osc_ast_guard_class;
3290 
osc_init(void)3291 static int __init osc_init(void)
3292 {
3293 	struct lprocfs_static_vars lvars = { NULL };
3294 	unsigned int reqpool_size;
3295 	unsigned int reqsize;
3296 	int rc;
3297 
3298 	/* print an address of _any_ initialized kernel symbol from this
3299 	 * module, to allow debugging with gdb that doesn't support data
3300 	 * symbols from modules.*/
3301 	CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3302 
3303 	rc = lu_kmem_init(osc_caches);
3304 	if (rc)
3305 		return rc;
3306 
3307 	lprocfs_osc_init_vars(&lvars);
3308 
3309 	rc = class_register_type(&osc_obd_ops, NULL,
3310 				 LUSTRE_OSC_NAME, &osc_device_type);
3311 	if (rc)
3312 		goto out_kmem;
3313 
3314 	spin_lock_init(&osc_ast_guard);
3315 	lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3316 
3317 	/* This is obviously too much memory, only prevent overflow here */
3318 	if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0) {
3319 		rc = -EINVAL;
3320 		goto out_type;
3321 	}
3322 
3323 	reqpool_size = osc_reqpool_mem_max << 20;
3324 
3325 	reqsize = 1;
3326 	while (reqsize < OST_MAXREQSIZE)
3327 		reqsize = reqsize << 1;
3328 
3329 	/*
3330 	 * We don't enlarge the request count in OSC pool according to
3331 	 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3332 	 * tried after normal allocation failed. So a small OSC pool won't
3333 	 * cause much performance degression in most of cases.
3334 	 */
3335 	osc_reqpool_maxreqcount = reqpool_size / reqsize;
3336 
3337 	atomic_set(&osc_pool_req_count, 0);
3338 	osc_rq_pool = ptlrpc_init_rq_pool(0, OST_MAXREQSIZE,
3339 					  ptlrpc_add_rqs_to_pool);
3340 
3341 	if (osc_rq_pool)
3342 		return 0;
3343 
3344 	rc = -ENOMEM;
3345 
3346 out_type:
3347 	class_unregister_type(LUSTRE_OSC_NAME);
3348 out_kmem:
3349 	lu_kmem_fini(osc_caches);
3350 	return rc;
3351 }
3352 
osc_exit(void)3353 static void /*__exit*/ osc_exit(void)
3354 {
3355 	class_unregister_type(LUSTRE_OSC_NAME);
3356 	lu_kmem_fini(osc_caches);
3357 	ptlrpc_free_rq_pool(osc_rq_pool);
3358 }
3359 
3360 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3361 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3362 MODULE_LICENSE("GPL");
3363 MODULE_VERSION(LUSTRE_VERSION_STRING);
3364 
3365 module_init(osc_init);
3366 module_exit(osc_exit);
3367