1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36 
37 #define DEBUG_SUBSYSTEM S_LOV
38 
39 #include "../../include/linux/libcfs/libcfs.h"
40 
41 #include "../include/obd_class.h"
42 #include "../include/lustre/lustre_idl.h"
43 #include "lov_internal.h"
44 
lov_init_set(struct lov_request_set * set)45 static void lov_init_set(struct lov_request_set *set)
46 {
47 	set->set_count = 0;
48 	atomic_set(&set->set_completes, 0);
49 	atomic_set(&set->set_success, 0);
50 	atomic_set(&set->set_finish_checked, 0);
51 	set->set_cookies = NULL;
52 	INIT_LIST_HEAD(&set->set_list);
53 	atomic_set(&set->set_refcount, 1);
54 	init_waitqueue_head(&set->set_waitq);
55 	spin_lock_init(&set->set_lock);
56 }
57 
lov_finish_set(struct lov_request_set * set)58 void lov_finish_set(struct lov_request_set *set)
59 {
60 	struct list_head *pos, *n;
61 
62 	LASSERT(set);
63 	list_for_each_safe(pos, n, &set->set_list) {
64 		struct lov_request *req = list_entry(pos,
65 							 struct lov_request,
66 							 rq_link);
67 		list_del_init(&req->rq_link);
68 
69 		if (req->rq_oi.oi_oa)
70 			OBDO_FREE(req->rq_oi.oi_oa);
71 		if (req->rq_oi.oi_md)
72 			OBD_FREE_LARGE(req->rq_oi.oi_md, req->rq_buflen);
73 		if (req->rq_oi.oi_osfs)
74 			OBD_FREE(req->rq_oi.oi_osfs,
75 				 sizeof(*req->rq_oi.oi_osfs));
76 		OBD_FREE(req, sizeof(*req));
77 	}
78 
79 	if (set->set_pga) {
80 		int len = set->set_oabufs * sizeof(*set->set_pga);
81 		OBD_FREE_LARGE(set->set_pga, len);
82 	}
83 	if (set->set_lockh)
84 		lov_llh_put(set->set_lockh);
85 
86 	OBD_FREE(set, sizeof(*set));
87 }
88 
lov_set_finished(struct lov_request_set * set,int idempotent)89 int lov_set_finished(struct lov_request_set *set, int idempotent)
90 {
91 	int completes = atomic_read(&set->set_completes);
92 
93 	CDEBUG(D_INFO, "check set %d/%d\n", completes, set->set_count);
94 
95 	if (completes == set->set_count) {
96 		if (idempotent)
97 			return 1;
98 		if (atomic_inc_return(&set->set_finish_checked) == 1)
99 			return 1;
100 	}
101 	return 0;
102 }
103 
lov_update_set(struct lov_request_set * set,struct lov_request * req,int rc)104 void lov_update_set(struct lov_request_set *set,
105 		    struct lov_request *req, int rc)
106 {
107 	req->rq_complete = 1;
108 	req->rq_rc = rc;
109 
110 	atomic_inc(&set->set_completes);
111 	if (rc == 0)
112 		atomic_inc(&set->set_success);
113 
114 	wake_up(&set->set_waitq);
115 }
116 
lov_update_common_set(struct lov_request_set * set,struct lov_request * req,int rc)117 int lov_update_common_set(struct lov_request_set *set,
118 			  struct lov_request *req, int rc)
119 {
120 	struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
121 
122 	lov_update_set(set, req, rc);
123 
124 	/* grace error on inactive ost */
125 	if (rc && !(lov->lov_tgts[req->rq_idx] &&
126 		    lov->lov_tgts[req->rq_idx]->ltd_active))
127 		rc = 0;
128 
129 	/* FIXME in raid1 regime, should return 0 */
130 	return rc;
131 }
132 
lov_set_add_req(struct lov_request * req,struct lov_request_set * set)133 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
134 {
135 	list_add_tail(&req->rq_link, &set->set_list);
136 	set->set_count++;
137 	req->rq_rqset = set;
138 }
139 
lov_check_set(struct lov_obd * lov,int idx)140 static int lov_check_set(struct lov_obd *lov, int idx)
141 {
142 	int rc;
143 	struct lov_tgt_desc *tgt;
144 
145 	mutex_lock(&lov->lov_lock);
146 	tgt = lov->lov_tgts[idx];
147 	rc = !tgt || tgt->ltd_active ||
148 		(tgt->ltd_exp &&
149 		 class_exp2cliimp(tgt->ltd_exp)->imp_connect_tried);
150 	mutex_unlock(&lov->lov_lock);
151 
152 	return rc;
153 }
154 
155 /* Check if the OSC connection exists and is active.
156  * If the OSC has not yet had a chance to connect to the OST the first time,
157  * wait once for it to connect instead of returning an error.
158  */
lov_check_and_wait_active(struct lov_obd * lov,int ost_idx)159 int lov_check_and_wait_active(struct lov_obd *lov, int ost_idx)
160 {
161 	wait_queue_head_t waitq;
162 	struct l_wait_info lwi;
163 	struct lov_tgt_desc *tgt;
164 	int rc = 0;
165 
166 	mutex_lock(&lov->lov_lock);
167 
168 	tgt = lov->lov_tgts[ost_idx];
169 
170 	if (unlikely(tgt == NULL)) {
171 		rc = 0;
172 		goto out;
173 	}
174 
175 	if (likely(tgt->ltd_active)) {
176 		rc = 1;
177 		goto out;
178 	}
179 
180 	if (tgt->ltd_exp && class_exp2cliimp(tgt->ltd_exp)->imp_connect_tried) {
181 		rc = 0;
182 		goto out;
183 	}
184 
185 	mutex_unlock(&lov->lov_lock);
186 
187 	init_waitqueue_head(&waitq);
188 	lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(obd_timeout),
189 				   cfs_time_seconds(1), NULL, NULL);
190 
191 	rc = l_wait_event(waitq, lov_check_set(lov, ost_idx), &lwi);
192 	if (tgt != NULL && tgt->ltd_active)
193 		return 1;
194 
195 	return 0;
196 
197 out:
198 	mutex_unlock(&lov->lov_lock);
199 	return rc;
200 }
201 
common_attr_done(struct lov_request_set * set)202 static int common_attr_done(struct lov_request_set *set)
203 {
204 	struct list_head *pos;
205 	struct lov_request *req;
206 	struct obdo *tmp_oa;
207 	int rc = 0, attrset = 0;
208 
209 	LASSERT(set->set_oi != NULL);
210 
211 	if (set->set_oi->oi_oa == NULL)
212 		return 0;
213 
214 	if (!atomic_read(&set->set_success))
215 		return -EIO;
216 
217 	OBDO_ALLOC(tmp_oa);
218 	if (tmp_oa == NULL) {
219 		rc = -ENOMEM;
220 		goto out;
221 	}
222 
223 	list_for_each(pos, &set->set_list) {
224 		req = list_entry(pos, struct lov_request, rq_link);
225 
226 		if (!req->rq_complete || req->rq_rc)
227 			continue;
228 		if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
229 			continue;
230 		lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
231 				req->rq_oi.oi_oa->o_valid,
232 				set->set_oi->oi_md, req->rq_stripe, &attrset);
233 	}
234 	if (!attrset) {
235 		CERROR("No stripes had valid attrs\n");
236 		rc = -EIO;
237 	}
238 	if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
239 	    (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
240 		/* When we take attributes of some epoch, we require all the
241 		 * ost to be active. */
242 		CERROR("Not all the stripes had valid attrs\n");
243 		rc = -EIO;
244 		goto out;
245 	}
246 
247 	tmp_oa->o_oi = set->set_oi->oi_oa->o_oi;
248 	memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
249 out:
250 	if (tmp_oa)
251 		OBDO_FREE(tmp_oa);
252 	return rc;
253 
254 }
255 
lov_fini_getattr_set(struct lov_request_set * set)256 int lov_fini_getattr_set(struct lov_request_set *set)
257 {
258 	int rc = 0;
259 
260 	if (set == NULL)
261 		return 0;
262 	LASSERT(set->set_exp);
263 	if (atomic_read(&set->set_completes))
264 		rc = common_attr_done(set);
265 
266 	lov_put_reqset(set);
267 
268 	return rc;
269 }
270 
271 /* The callback for osc_getattr_async that finalizes a request info when a
272  * response is received. */
cb_getattr_update(void * cookie,int rc)273 static int cb_getattr_update(void *cookie, int rc)
274 {
275 	struct obd_info *oinfo = cookie;
276 	struct lov_request *lovreq;
277 
278 	lovreq = container_of(oinfo, struct lov_request, rq_oi);
279 	return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
280 }
281 
lov_prep_getattr_set(struct obd_export * exp,struct obd_info * oinfo,struct lov_request_set ** reqset)282 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
283 			 struct lov_request_set **reqset)
284 {
285 	struct lov_request_set *set;
286 	struct lov_obd *lov = &exp->exp_obd->u.lov;
287 	int rc = 0, i;
288 
289 	OBD_ALLOC(set, sizeof(*set));
290 	if (set == NULL)
291 		return -ENOMEM;
292 	lov_init_set(set);
293 
294 	set->set_exp = exp;
295 	set->set_oi = oinfo;
296 
297 	for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
298 		struct lov_oinfo *loi;
299 		struct lov_request *req;
300 
301 		loi = oinfo->oi_md->lsm_oinfo[i];
302 		if (lov_oinfo_is_dummy(loi))
303 			continue;
304 
305 		if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
306 			CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
307 			if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH) {
308 				/* SOM requires all the OSTs to be active. */
309 				rc = -EIO;
310 				goto out_set;
311 			}
312 			continue;
313 		}
314 
315 		OBD_ALLOC(req, sizeof(*req));
316 		if (req == NULL) {
317 			rc = -ENOMEM;
318 			goto out_set;
319 		}
320 
321 		req->rq_stripe = i;
322 		req->rq_idx = loi->loi_ost_idx;
323 
324 		OBDO_ALLOC(req->rq_oi.oi_oa);
325 		if (req->rq_oi.oi_oa == NULL) {
326 			OBD_FREE(req, sizeof(*req));
327 			rc = -ENOMEM;
328 			goto out_set;
329 		}
330 		memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
331 		       sizeof(*req->rq_oi.oi_oa));
332 		req->rq_oi.oi_oa->o_oi = loi->loi_oi;
333 		req->rq_oi.oi_cb_up = cb_getattr_update;
334 		req->rq_oi.oi_capa = oinfo->oi_capa;
335 
336 		lov_set_add_req(req, set);
337 	}
338 	if (!set->set_count) {
339 		rc = -EIO;
340 		goto out_set;
341 	}
342 	*reqset = set;
343 	return rc;
344 out_set:
345 	lov_fini_getattr_set(set);
346 	return rc;
347 }
348 
lov_fini_destroy_set(struct lov_request_set * set)349 int lov_fini_destroy_set(struct lov_request_set *set)
350 {
351 	if (set == NULL)
352 		return 0;
353 	LASSERT(set->set_exp);
354 	if (atomic_read(&set->set_completes)) {
355 		/* FIXME update qos data here */
356 	}
357 
358 	lov_put_reqset(set);
359 
360 	return 0;
361 }
362 
lov_prep_destroy_set(struct obd_export * exp,struct obd_info * oinfo,struct obdo * src_oa,struct lov_stripe_md * lsm,struct obd_trans_info * oti,struct lov_request_set ** reqset)363 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
364 			 struct obdo *src_oa, struct lov_stripe_md *lsm,
365 			 struct obd_trans_info *oti,
366 			 struct lov_request_set **reqset)
367 {
368 	struct lov_request_set *set;
369 	struct lov_obd *lov = &exp->exp_obd->u.lov;
370 	int rc = 0, i;
371 
372 	OBD_ALLOC(set, sizeof(*set));
373 	if (set == NULL)
374 		return -ENOMEM;
375 	lov_init_set(set);
376 
377 	set->set_exp = exp;
378 	set->set_oi = oinfo;
379 	set->set_oi->oi_md = lsm;
380 	set->set_oi->oi_oa = src_oa;
381 	set->set_oti = oti;
382 	if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
383 		set->set_cookies = oti->oti_logcookies;
384 
385 	for (i = 0; i < lsm->lsm_stripe_count; i++) {
386 		struct lov_oinfo *loi;
387 		struct lov_request *req;
388 
389 		loi = lsm->lsm_oinfo[i];
390 		if (lov_oinfo_is_dummy(loi))
391 			continue;
392 
393 		if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
394 			CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
395 			continue;
396 		}
397 
398 		OBD_ALLOC(req, sizeof(*req));
399 		if (req == NULL) {
400 			rc = -ENOMEM;
401 			goto out_set;
402 		}
403 
404 		req->rq_stripe = i;
405 		req->rq_idx = loi->loi_ost_idx;
406 
407 		OBDO_ALLOC(req->rq_oi.oi_oa);
408 		if (req->rq_oi.oi_oa == NULL) {
409 			OBD_FREE(req, sizeof(*req));
410 			rc = -ENOMEM;
411 			goto out_set;
412 		}
413 		memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
414 		req->rq_oi.oi_oa->o_oi = loi->loi_oi;
415 		lov_set_add_req(req, set);
416 	}
417 	if (!set->set_count) {
418 		rc = -EIO;
419 		goto out_set;
420 	}
421 	*reqset = set;
422 	return rc;
423 out_set:
424 	lov_fini_destroy_set(set);
425 	return rc;
426 }
427 
lov_fini_setattr_set(struct lov_request_set * set)428 int lov_fini_setattr_set(struct lov_request_set *set)
429 {
430 	int rc = 0;
431 
432 	if (set == NULL)
433 		return 0;
434 	LASSERT(set->set_exp);
435 	if (atomic_read(&set->set_completes)) {
436 		rc = common_attr_done(set);
437 		/* FIXME update qos data here */
438 	}
439 
440 	lov_put_reqset(set);
441 	return rc;
442 }
443 
lov_update_setattr_set(struct lov_request_set * set,struct lov_request * req,int rc)444 int lov_update_setattr_set(struct lov_request_set *set,
445 			   struct lov_request *req, int rc)
446 {
447 	struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
448 	struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
449 
450 	lov_update_set(set, req, rc);
451 
452 	/* grace error on inactive ost */
453 	if (rc && !(lov->lov_tgts[req->rq_idx] &&
454 		    lov->lov_tgts[req->rq_idx]->ltd_active))
455 		rc = 0;
456 
457 	if (rc == 0) {
458 		if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
459 			lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
460 				req->rq_oi.oi_oa->o_ctime;
461 		if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
462 			lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
463 				req->rq_oi.oi_oa->o_mtime;
464 		if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
465 			lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
466 				req->rq_oi.oi_oa->o_atime;
467 	}
468 
469 	return rc;
470 }
471 
472 /* The callback for osc_setattr_async that finalizes a request info when a
473  * response is received. */
cb_setattr_update(void * cookie,int rc)474 static int cb_setattr_update(void *cookie, int rc)
475 {
476 	struct obd_info *oinfo = cookie;
477 	struct lov_request *lovreq;
478 
479 	lovreq = container_of(oinfo, struct lov_request, rq_oi);
480 	return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
481 }
482 
lov_prep_setattr_set(struct obd_export * exp,struct obd_info * oinfo,struct obd_trans_info * oti,struct lov_request_set ** reqset)483 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
484 			 struct obd_trans_info *oti,
485 			 struct lov_request_set **reqset)
486 {
487 	struct lov_request_set *set;
488 	struct lov_obd *lov = &exp->exp_obd->u.lov;
489 	int rc = 0, i;
490 
491 	OBD_ALLOC(set, sizeof(*set));
492 	if (set == NULL)
493 		return -ENOMEM;
494 	lov_init_set(set);
495 
496 	set->set_exp = exp;
497 	set->set_oti = oti;
498 	set->set_oi = oinfo;
499 	if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
500 		set->set_cookies = oti->oti_logcookies;
501 
502 	for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
503 		struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
504 		struct lov_request *req;
505 
506 		if (lov_oinfo_is_dummy(loi))
507 			continue;
508 
509 		if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
510 			CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
511 			continue;
512 		}
513 
514 		OBD_ALLOC(req, sizeof(*req));
515 		if (req == NULL) {
516 			rc = -ENOMEM;
517 			goto out_set;
518 		}
519 		req->rq_stripe = i;
520 		req->rq_idx = loi->loi_ost_idx;
521 
522 		OBDO_ALLOC(req->rq_oi.oi_oa);
523 		if (req->rq_oi.oi_oa == NULL) {
524 			OBD_FREE(req, sizeof(*req));
525 			rc = -ENOMEM;
526 			goto out_set;
527 		}
528 		memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
529 		       sizeof(*req->rq_oi.oi_oa));
530 		req->rq_oi.oi_oa->o_oi = loi->loi_oi;
531 		req->rq_oi.oi_oa->o_stripe_idx = i;
532 		req->rq_oi.oi_cb_up = cb_setattr_update;
533 		req->rq_oi.oi_capa = oinfo->oi_capa;
534 
535 		if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
536 			int off = lov_stripe_offset(oinfo->oi_md,
537 						    oinfo->oi_oa->o_size, i,
538 						    &req->rq_oi.oi_oa->o_size);
539 
540 			if (off < 0 && req->rq_oi.oi_oa->o_size)
541 				req->rq_oi.oi_oa->o_size--;
542 
543 			CDEBUG(D_INODE, "stripe %d has size %llu/%llu\n",
544 			       i, req->rq_oi.oi_oa->o_size,
545 			       oinfo->oi_oa->o_size);
546 		}
547 		lov_set_add_req(req, set);
548 	}
549 	if (!set->set_count) {
550 		rc = -EIO;
551 		goto out_set;
552 	}
553 	*reqset = set;
554 	return rc;
555 out_set:
556 	lov_fini_setattr_set(set);
557 	return rc;
558 }
559 
560 #define LOV_U64_MAX ((__u64)~0ULL)
561 #define LOV_SUM_MAX(tot, add)					   \
562 	do {							    \
563 		if ((tot) + (add) < (tot))			      \
564 			(tot) = LOV_U64_MAX;			    \
565 		else						    \
566 			(tot) += (add);				 \
567 	} while (0)
568 
lov_fini_statfs(struct obd_device * obd,struct obd_statfs * osfs,int success)569 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,
570 		    int success)
571 {
572 	if (success) {
573 		__u32 expected_stripes = lov_get_stripecnt(&obd->u.lov,
574 							   LOV_MAGIC, 0);
575 		if (osfs->os_files != LOV_U64_MAX)
576 			lov_do_div64(osfs->os_files, expected_stripes);
577 		if (osfs->os_ffree != LOV_U64_MAX)
578 			lov_do_div64(osfs->os_ffree, expected_stripes);
579 
580 		spin_lock(&obd->obd_osfs_lock);
581 		memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
582 		obd->obd_osfs_age = cfs_time_current_64();
583 		spin_unlock(&obd->obd_osfs_lock);
584 		return 0;
585 	}
586 
587 	return -EIO;
588 }
589 
lov_fini_statfs_set(struct lov_request_set * set)590 int lov_fini_statfs_set(struct lov_request_set *set)
591 {
592 	int rc = 0;
593 
594 	if (set == NULL)
595 		return 0;
596 
597 	if (atomic_read(&set->set_completes)) {
598 		rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
599 				     atomic_read(&set->set_success));
600 	}
601 	lov_put_reqset(set);
602 	return rc;
603 }
604 
lov_update_statfs(struct obd_statfs * osfs,struct obd_statfs * lov_sfs,int success)605 void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
606 		       int success)
607 {
608 	int shift = 0, quit = 0;
609 	__u64 tmp;
610 
611 	if (success == 0) {
612 		memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
613 	} else {
614 		if (osfs->os_bsize != lov_sfs->os_bsize) {
615 			/* assume all block sizes are always powers of 2 */
616 			/* get the bits difference */
617 			tmp = osfs->os_bsize | lov_sfs->os_bsize;
618 			for (shift = 0; shift <= 64; ++shift) {
619 				if (tmp & 1) {
620 					if (quit)
621 						break;
622 					else
623 						quit = 1;
624 					shift = 0;
625 				}
626 				tmp >>= 1;
627 			}
628 		}
629 
630 		if (osfs->os_bsize < lov_sfs->os_bsize) {
631 			osfs->os_bsize = lov_sfs->os_bsize;
632 
633 			osfs->os_bfree  >>= shift;
634 			osfs->os_bavail >>= shift;
635 			osfs->os_blocks >>= shift;
636 		} else if (shift != 0) {
637 			lov_sfs->os_bfree  >>= shift;
638 			lov_sfs->os_bavail >>= shift;
639 			lov_sfs->os_blocks >>= shift;
640 		}
641 		osfs->os_bfree += lov_sfs->os_bfree;
642 		osfs->os_bavail += lov_sfs->os_bavail;
643 		osfs->os_blocks += lov_sfs->os_blocks;
644 		/* XXX not sure about this one - depends on policy.
645 		 *   - could be minimum if we always stripe on all OBDs
646 		 *     (but that would be wrong for any other policy,
647 		 *     if one of the OBDs has no more objects left)
648 		 *   - could be sum if we stripe whole objects
649 		 *   - could be average, just to give a nice number
650 		 *
651 		 * To give a "reasonable" (if not wholly accurate)
652 		 * number, we divide the total number of free objects
653 		 * by expected stripe count (watch out for overflow).
654 		 */
655 		LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
656 		LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
657 	}
658 }
659 
660 /* The callback for osc_statfs_async that finalizes a request info when a
661  * response is received. */
cb_statfs_update(void * cookie,int rc)662 static int cb_statfs_update(void *cookie, int rc)
663 {
664 	struct obd_info *oinfo = cookie;
665 	struct lov_request *lovreq;
666 	struct lov_request_set *set;
667 	struct obd_statfs *osfs, *lov_sfs;
668 	struct lov_obd *lov;
669 	struct lov_tgt_desc *tgt;
670 	struct obd_device *lovobd, *tgtobd;
671 	int success;
672 
673 	lovreq = container_of(oinfo, struct lov_request, rq_oi);
674 	set = lovreq->rq_rqset;
675 	lovobd = set->set_obd;
676 	lov = &lovobd->u.lov;
677 	osfs = set->set_oi->oi_osfs;
678 	lov_sfs = oinfo->oi_osfs;
679 	success = atomic_read(&set->set_success);
680 	/* XXX: the same is done in lov_update_common_set, however
681 	   lovset->set_exp is not initialized. */
682 	lov_update_set(set, lovreq, rc);
683 	if (rc)
684 		goto out;
685 
686 	obd_getref(lovobd);
687 	tgt = lov->lov_tgts[lovreq->rq_idx];
688 	if (!tgt || !tgt->ltd_active)
689 		goto out_update;
690 
691 	tgtobd = class_exp2obd(tgt->ltd_exp);
692 	spin_lock(&tgtobd->obd_osfs_lock);
693 	memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
694 	if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
695 		tgtobd->obd_osfs_age = cfs_time_current_64();
696 	spin_unlock(&tgtobd->obd_osfs_lock);
697 
698 out_update:
699 	lov_update_statfs(osfs, lov_sfs, success);
700 	obd_putref(lovobd);
701 
702 out:
703 	if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
704 	    lov_set_finished(set, 0)) {
705 		lov_statfs_interpret(NULL, set, set->set_count !=
706 				     atomic_read(&set->set_success));
707 	}
708 
709 	return 0;
710 }
711 
lov_prep_statfs_set(struct obd_device * obd,struct obd_info * oinfo,struct lov_request_set ** reqset)712 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
713 			struct lov_request_set **reqset)
714 {
715 	struct lov_request_set *set;
716 	struct lov_obd *lov = &obd->u.lov;
717 	int rc = 0, i;
718 
719 	OBD_ALLOC(set, sizeof(*set));
720 	if (set == NULL)
721 		return -ENOMEM;
722 	lov_init_set(set);
723 
724 	set->set_obd = obd;
725 	set->set_oi = oinfo;
726 
727 	/* We only get block data from the OBD */
728 	for (i = 0; i < lov->desc.ld_tgt_count; i++) {
729 		struct lov_request *req;
730 
731 		if (lov->lov_tgts[i] == NULL ||
732 		    (!lov_check_and_wait_active(lov, i) &&
733 		     (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
734 			CDEBUG(D_HA, "lov idx %d inactive\n", i);
735 			continue;
736 		}
737 
738 		/* skip targets that have been explicitly disabled by the
739 		 * administrator */
740 		if (!lov->lov_tgts[i]->ltd_exp) {
741 			CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
742 			continue;
743 		}
744 
745 		OBD_ALLOC(req, sizeof(*req));
746 		if (req == NULL) {
747 			rc = -ENOMEM;
748 			goto out_set;
749 		}
750 
751 		OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
752 		if (req->rq_oi.oi_osfs == NULL) {
753 			OBD_FREE(req, sizeof(*req));
754 			rc = -ENOMEM;
755 			goto out_set;
756 		}
757 
758 		req->rq_idx = i;
759 		req->rq_oi.oi_cb_up = cb_statfs_update;
760 		req->rq_oi.oi_flags = oinfo->oi_flags;
761 
762 		lov_set_add_req(req, set);
763 	}
764 	if (!set->set_count) {
765 		rc = -EIO;
766 		goto out_set;
767 	}
768 	*reqset = set;
769 	return rc;
770 out_set:
771 	lov_fini_statfs_set(set);
772 	return rc;
773 }
774