1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36 
37 #define DEBUG_SUBSYSTEM S_ECHO
38 #include "../../include/linux/libcfs/libcfs.h"
39 
40 #include "../include/obd.h"
41 #include "../include/obd_support.h"
42 #include "../include/obd_class.h"
43 #include "../include/lustre_debug.h"
44 #include "../include/lprocfs_status.h"
45 #include "../include/cl_object.h"
46 #include "../include/lustre_fid.h"
47 #include "../include/lustre_acl.h"
48 #include "../include/lustre_net.h"
49 
50 #include "echo_internal.h"
51 
52 /** \defgroup echo_client Echo Client
53  * @{
54  */
55 
56 struct echo_device {
57 	struct cl_device	ed_cl;
58 	struct echo_client_obd *ed_ec;
59 
60 	struct cl_site	  ed_site_myself;
61 	struct cl_site	 *ed_site;
62 	struct lu_device       *ed_next;
63 	int		     ed_next_islov;
64 };
65 
66 struct echo_object {
67 	struct cl_object	eo_cl;
68 	struct cl_object_header eo_hdr;
69 
70 	struct echo_device     *eo_dev;
71 	struct list_head	      eo_obj_chain;
72 	struct lov_stripe_md   *eo_lsm;
73 	atomic_t	    eo_npages;
74 	int		     eo_deleted;
75 };
76 
77 struct echo_object_conf {
78 	struct cl_object_conf  eoc_cl;
79 	struct lov_stripe_md **eoc_md;
80 };
81 
82 struct echo_page {
83 	struct cl_page_slice   ep_cl;
84 	struct mutex		ep_lock;
85 	struct page	    *ep_vmpage;
86 };
87 
88 struct echo_lock {
89 	struct cl_lock_slice   el_cl;
90 	struct list_head	     el_chain;
91 	struct echo_object    *el_object;
92 	__u64		  el_cookie;
93 	atomic_t	   el_refcount;
94 };
95 
96 static int echo_client_setup(const struct lu_env *env,
97 			     struct obd_device *obddev,
98 			     struct lustre_cfg *lcfg);
99 static int echo_client_cleanup(struct obd_device *obddev);
100 
101 /** \defgroup echo_helpers Helper functions
102  * @{
103  */
cl2echo_dev(const struct cl_device * dev)104 static inline struct echo_device *cl2echo_dev(const struct cl_device *dev)
105 {
106 	return container_of0(dev, struct echo_device, ed_cl);
107 }
108 
echo_dev2cl(struct echo_device * d)109 static inline struct cl_device *echo_dev2cl(struct echo_device *d)
110 {
111 	return &d->ed_cl;
112 }
113 
obd2echo_dev(const struct obd_device * obd)114 static inline struct echo_device *obd2echo_dev(const struct obd_device *obd)
115 {
116 	return cl2echo_dev(lu2cl_dev(obd->obd_lu_dev));
117 }
118 
echo_obj2cl(struct echo_object * eco)119 static inline struct cl_object *echo_obj2cl(struct echo_object *eco)
120 {
121 	return &eco->eo_cl;
122 }
123 
cl2echo_obj(const struct cl_object * o)124 static inline struct echo_object *cl2echo_obj(const struct cl_object *o)
125 {
126 	return container_of(o, struct echo_object, eo_cl);
127 }
128 
cl2echo_page(const struct cl_page_slice * s)129 static inline struct echo_page *cl2echo_page(const struct cl_page_slice *s)
130 {
131 	return container_of(s, struct echo_page, ep_cl);
132 }
133 
cl2echo_lock(const struct cl_lock_slice * s)134 static inline struct echo_lock *cl2echo_lock(const struct cl_lock_slice *s)
135 {
136 	return container_of(s, struct echo_lock, el_cl);
137 }
138 
echo_lock2cl(const struct echo_lock * ecl)139 static inline struct cl_lock *echo_lock2cl(const struct echo_lock *ecl)
140 {
141 	return ecl->el_cl.cls_lock;
142 }
143 
144 static struct lu_context_key echo_thread_key;
echo_env_info(const struct lu_env * env)145 static inline struct echo_thread_info *echo_env_info(const struct lu_env *env)
146 {
147 	struct echo_thread_info *info;
148 
149 	info = lu_context_key_get(&env->le_ctx, &echo_thread_key);
150 	LASSERT(info != NULL);
151 	return info;
152 }
153 
154 static inline
cl2echo_conf(const struct cl_object_conf * c)155 struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c)
156 {
157 	return container_of(c, struct echo_object_conf, eoc_cl);
158 }
159 
160 /** @} echo_helpers */
161 
162 static struct echo_object *cl_echo_object_find(struct echo_device *d,
163 					       struct lov_stripe_md **lsm);
164 static int cl_echo_object_put(struct echo_object *eco);
165 static int cl_echo_enqueue(struct echo_object *eco, u64 start,
166 			   u64 end, int mode, __u64 *cookie);
167 static int cl_echo_cancel(struct echo_device *d, __u64 cookie);
168 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
169 			      struct page **pages, int npages, int async);
170 
171 static struct echo_thread_info *echo_env_info(const struct lu_env *env);
172 
173 struct echo_thread_info {
174 	struct echo_object_conf eti_conf;
175 	struct lustre_md	eti_md;
176 
177 	struct cl_2queue	eti_queue;
178 	struct cl_io	    eti_io;
179 	struct cl_lock_descr    eti_descr;
180 	struct lu_fid	   eti_fid;
181 	struct lu_fid		eti_fid2;
182 };
183 
184 /* No session used right now */
185 struct echo_session_info {
186 	unsigned long dummy;
187 };
188 
189 static struct kmem_cache *echo_lock_kmem;
190 static struct kmem_cache *echo_object_kmem;
191 static struct kmem_cache *echo_thread_kmem;
192 static struct kmem_cache *echo_session_kmem;
193 
194 static struct lu_kmem_descr echo_caches[] = {
195 	{
196 		.ckd_cache = &echo_lock_kmem,
197 		.ckd_name  = "echo_lock_kmem",
198 		.ckd_size  = sizeof(struct echo_lock)
199 	},
200 	{
201 		.ckd_cache = &echo_object_kmem,
202 		.ckd_name  = "echo_object_kmem",
203 		.ckd_size  = sizeof(struct echo_object)
204 	},
205 	{
206 		.ckd_cache = &echo_thread_kmem,
207 		.ckd_name  = "echo_thread_kmem",
208 		.ckd_size  = sizeof(struct echo_thread_info)
209 	},
210 	{
211 		.ckd_cache = &echo_session_kmem,
212 		.ckd_name  = "echo_session_kmem",
213 		.ckd_size  = sizeof(struct echo_session_info)
214 	},
215 	{
216 		.ckd_cache = NULL
217 	}
218 };
219 
220 /** \defgroup echo_page Page operations
221  *
222  * Echo page operations.
223  *
224  * @{
225  */
echo_page_vmpage(const struct lu_env * env,const struct cl_page_slice * slice)226 static struct page *echo_page_vmpage(const struct lu_env *env,
227 				    const struct cl_page_slice *slice)
228 {
229 	return cl2echo_page(slice)->ep_vmpage;
230 }
231 
echo_page_own(const struct lu_env * env,const struct cl_page_slice * slice,struct cl_io * io,int nonblock)232 static int echo_page_own(const struct lu_env *env,
233 			 const struct cl_page_slice *slice,
234 			 struct cl_io *io, int nonblock)
235 {
236 	struct echo_page *ep = cl2echo_page(slice);
237 
238 	if (!nonblock)
239 		mutex_lock(&ep->ep_lock);
240 	else if (!mutex_trylock(&ep->ep_lock))
241 		return -EAGAIN;
242 	return 0;
243 }
244 
echo_page_disown(const struct lu_env * env,const struct cl_page_slice * slice,struct cl_io * io)245 static void echo_page_disown(const struct lu_env *env,
246 			     const struct cl_page_slice *slice,
247 			     struct cl_io *io)
248 {
249 	struct echo_page *ep = cl2echo_page(slice);
250 
251 	LASSERT(mutex_is_locked(&ep->ep_lock));
252 	mutex_unlock(&ep->ep_lock);
253 }
254 
echo_page_discard(const struct lu_env * env,const struct cl_page_slice * slice,struct cl_io * unused)255 static void echo_page_discard(const struct lu_env *env,
256 			      const struct cl_page_slice *slice,
257 			      struct cl_io *unused)
258 {
259 	cl_page_delete(env, slice->cpl_page);
260 }
261 
echo_page_is_vmlocked(const struct lu_env * env,const struct cl_page_slice * slice)262 static int echo_page_is_vmlocked(const struct lu_env *env,
263 				 const struct cl_page_slice *slice)
264 {
265 	if (mutex_is_locked(&cl2echo_page(slice)->ep_lock))
266 		return -EBUSY;
267 	return -ENODATA;
268 }
269 
echo_page_completion(const struct lu_env * env,const struct cl_page_slice * slice,int ioret)270 static void echo_page_completion(const struct lu_env *env,
271 				 const struct cl_page_slice *slice,
272 				 int ioret)
273 {
274 	LASSERT(slice->cpl_page->cp_sync_io != NULL);
275 }
276 
echo_page_fini(const struct lu_env * env,struct cl_page_slice * slice)277 static void echo_page_fini(const struct lu_env *env,
278 			   struct cl_page_slice *slice)
279 {
280 	struct echo_page *ep    = cl2echo_page(slice);
281 	struct echo_object *eco = cl2echo_obj(slice->cpl_obj);
282 	struct page *vmpage      = ep->ep_vmpage;
283 
284 	atomic_dec(&eco->eo_npages);
285 	page_cache_release(vmpage);
286 }
287 
echo_page_prep(const struct lu_env * env,const struct cl_page_slice * slice,struct cl_io * unused)288 static int echo_page_prep(const struct lu_env *env,
289 			  const struct cl_page_slice *slice,
290 			  struct cl_io *unused)
291 {
292 	return 0;
293 }
294 
echo_page_print(const struct lu_env * env,const struct cl_page_slice * slice,void * cookie,lu_printer_t printer)295 static int echo_page_print(const struct lu_env *env,
296 			   const struct cl_page_slice *slice,
297 			   void *cookie, lu_printer_t printer)
298 {
299 	struct echo_page *ep = cl2echo_page(slice);
300 
301 	(*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n",
302 		   ep, mutex_is_locked(&ep->ep_lock), ep->ep_vmpage);
303 	return 0;
304 }
305 
306 static const struct cl_page_operations echo_page_ops = {
307 	.cpo_own	   = echo_page_own,
308 	.cpo_disown	= echo_page_disown,
309 	.cpo_discard       = echo_page_discard,
310 	.cpo_vmpage	= echo_page_vmpage,
311 	.cpo_fini	  = echo_page_fini,
312 	.cpo_print	 = echo_page_print,
313 	.cpo_is_vmlocked   = echo_page_is_vmlocked,
314 	.io = {
315 		[CRT_READ] = {
316 			.cpo_prep	= echo_page_prep,
317 			.cpo_completion  = echo_page_completion,
318 		},
319 		[CRT_WRITE] = {
320 			.cpo_prep	= echo_page_prep,
321 			.cpo_completion  = echo_page_completion,
322 		}
323 	}
324 };
325 
326 /** @} echo_page */
327 
328 /** \defgroup echo_lock Locking
329  *
330  * echo lock operations
331  *
332  * @{
333  */
echo_lock_fini(const struct lu_env * env,struct cl_lock_slice * slice)334 static void echo_lock_fini(const struct lu_env *env,
335 			   struct cl_lock_slice *slice)
336 {
337 	struct echo_lock *ecl = cl2echo_lock(slice);
338 
339 	LASSERT(list_empty(&ecl->el_chain));
340 	kmem_cache_free(echo_lock_kmem, ecl);
341 }
342 
echo_lock_delete(const struct lu_env * env,const struct cl_lock_slice * slice)343 static void echo_lock_delete(const struct lu_env *env,
344 			     const struct cl_lock_slice *slice)
345 {
346 	struct echo_lock *ecl      = cl2echo_lock(slice);
347 
348 	LASSERT(list_empty(&ecl->el_chain));
349 }
350 
echo_lock_fits_into(const struct lu_env * env,const struct cl_lock_slice * slice,const struct cl_lock_descr * need,const struct cl_io * unused)351 static int echo_lock_fits_into(const struct lu_env *env,
352 			       const struct cl_lock_slice *slice,
353 			       const struct cl_lock_descr *need,
354 			       const struct cl_io *unused)
355 {
356 	return 1;
357 }
358 
359 static struct cl_lock_operations echo_lock_ops = {
360 	.clo_fini      = echo_lock_fini,
361 	.clo_delete    = echo_lock_delete,
362 	.clo_fits_into = echo_lock_fits_into
363 };
364 
365 /** @} echo_lock */
366 
367 /** \defgroup echo_cl_ops cl_object operations
368  *
369  * operations for cl_object
370  *
371  * @{
372  */
echo_page_init(const struct lu_env * env,struct cl_object * obj,struct cl_page * page,struct page * vmpage)373 static int echo_page_init(const struct lu_env *env, struct cl_object *obj,
374 			struct cl_page *page, struct page *vmpage)
375 {
376 	struct echo_page *ep = cl_object_page_slice(obj, page);
377 	struct echo_object *eco = cl2echo_obj(obj);
378 
379 	ep->ep_vmpage = vmpage;
380 	page_cache_get(vmpage);
381 	mutex_init(&ep->ep_lock);
382 	cl_page_slice_add(page, &ep->ep_cl, obj, &echo_page_ops);
383 	atomic_inc(&eco->eo_npages);
384 	return 0;
385 }
386 
echo_io_init(const struct lu_env * env,struct cl_object * obj,struct cl_io * io)387 static int echo_io_init(const struct lu_env *env, struct cl_object *obj,
388 			struct cl_io *io)
389 {
390 	return 0;
391 }
392 
echo_lock_init(const struct lu_env * env,struct cl_object * obj,struct cl_lock * lock,const struct cl_io * unused)393 static int echo_lock_init(const struct lu_env *env,
394 			  struct cl_object *obj, struct cl_lock *lock,
395 			  const struct cl_io *unused)
396 {
397 	struct echo_lock *el;
398 
399 	el = kmem_cache_alloc(echo_lock_kmem, GFP_NOFS | __GFP_ZERO);
400 	if (el != NULL) {
401 		cl_lock_slice_add(lock, &el->el_cl, obj, &echo_lock_ops);
402 		el->el_object = cl2echo_obj(obj);
403 		INIT_LIST_HEAD(&el->el_chain);
404 		atomic_set(&el->el_refcount, 0);
405 	}
406 	return el == NULL ? -ENOMEM : 0;
407 }
408 
echo_conf_set(const struct lu_env * env,struct cl_object * obj,const struct cl_object_conf * conf)409 static int echo_conf_set(const struct lu_env *env, struct cl_object *obj,
410 			 const struct cl_object_conf *conf)
411 {
412 	return 0;
413 }
414 
415 static const struct cl_object_operations echo_cl_obj_ops = {
416 	.coo_page_init = echo_page_init,
417 	.coo_lock_init = echo_lock_init,
418 	.coo_io_init   = echo_io_init,
419 	.coo_conf_set  = echo_conf_set
420 };
421 
422 /** @} echo_cl_ops */
423 
424 /** \defgroup echo_lu_ops lu_object operations
425  *
426  * operations for echo lu object.
427  *
428  * @{
429  */
echo_object_init(const struct lu_env * env,struct lu_object * obj,const struct lu_object_conf * conf)430 static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
431 			    const struct lu_object_conf *conf)
432 {
433 	struct echo_device *ed	 = cl2echo_dev(lu2cl_dev(obj->lo_dev));
434 	struct echo_client_obd *ec     = ed->ed_ec;
435 	struct echo_object *eco	= cl2echo_obj(lu2cl(obj));
436 	const struct cl_object_conf *cconf;
437 	struct echo_object_conf *econf;
438 
439 	if (ed->ed_next) {
440 		struct lu_object  *below;
441 		struct lu_device  *under;
442 
443 		under = ed->ed_next;
444 		below = under->ld_ops->ldo_object_alloc(env, obj->lo_header,
445 							under);
446 		if (below == NULL)
447 			return -ENOMEM;
448 		lu_object_add(obj, below);
449 	}
450 
451 	cconf = lu2cl_conf(conf);
452 	econf = cl2echo_conf(cconf);
453 
454 	LASSERT(econf->eoc_md);
455 	eco->eo_lsm = *econf->eoc_md;
456 	/* clear the lsm pointer so that it won't get freed. */
457 	*econf->eoc_md = NULL;
458 
459 	eco->eo_dev = ed;
460 	atomic_set(&eco->eo_npages, 0);
461 	cl_object_page_init(lu2cl(obj), sizeof(struct echo_page));
462 
463 	spin_lock(&ec->ec_lock);
464 	list_add_tail(&eco->eo_obj_chain, &ec->ec_objects);
465 	spin_unlock(&ec->ec_lock);
466 
467 	return 0;
468 }
469 
470 /* taken from osc_unpackmd() */
echo_alloc_memmd(struct echo_device * ed,struct lov_stripe_md ** lsmp)471 static int echo_alloc_memmd(struct echo_device *ed,
472 			    struct lov_stripe_md **lsmp)
473 {
474 	int lsm_size;
475 
476 	/* If export is lov/osc then use their obd method */
477 	if (ed->ed_next != NULL)
478 		return obd_alloc_memmd(ed->ed_ec->ec_exp, lsmp);
479 	/* OFD has no unpackmd method, do everything here */
480 	lsm_size = lov_stripe_md_size(1);
481 
482 	LASSERT(*lsmp == NULL);
483 	*lsmp = kzalloc(lsm_size, GFP_NOFS);
484 	if (!*lsmp)
485 		return -ENOMEM;
486 
487 	(*lsmp)->lsm_oinfo[0] = kzalloc(sizeof(struct lov_oinfo), GFP_NOFS);
488 	if (!(*lsmp)->lsm_oinfo[0]) {
489 		kfree(*lsmp);
490 		return -ENOMEM;
491 	}
492 
493 	loi_init((*lsmp)->lsm_oinfo[0]);
494 	(*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
495 	ostid_set_seq_echo(&(*lsmp)->lsm_oi);
496 
497 	return lsm_size;
498 }
499 
echo_free_memmd(struct echo_device * ed,struct lov_stripe_md ** lsmp)500 static int echo_free_memmd(struct echo_device *ed, struct lov_stripe_md **lsmp)
501 {
502 	int lsm_size;
503 
504 	/* If export is lov/osc then use their obd method */
505 	if (ed->ed_next != NULL)
506 		return obd_free_memmd(ed->ed_ec->ec_exp, lsmp);
507 	/* OFD has no unpackmd method, do everything here */
508 	lsm_size = lov_stripe_md_size(1);
509 
510 	LASSERT(*lsmp != NULL);
511 	kfree((*lsmp)->lsm_oinfo[0]);
512 	kfree(*lsmp);
513 	*lsmp = NULL;
514 	return 0;
515 }
516 
echo_object_free(const struct lu_env * env,struct lu_object * obj)517 static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
518 {
519 	struct echo_object *eco    = cl2echo_obj(lu2cl(obj));
520 	struct echo_client_obd *ec = eco->eo_dev->ed_ec;
521 
522 	LASSERT(atomic_read(&eco->eo_npages) == 0);
523 
524 	spin_lock(&ec->ec_lock);
525 	list_del_init(&eco->eo_obj_chain);
526 	spin_unlock(&ec->ec_lock);
527 
528 	lu_object_fini(obj);
529 	lu_object_header_fini(obj->lo_header);
530 
531 	if (eco->eo_lsm)
532 		echo_free_memmd(eco->eo_dev, &eco->eo_lsm);
533 	kmem_cache_free(echo_object_kmem, eco);
534 }
535 
echo_object_print(const struct lu_env * env,void * cookie,lu_printer_t p,const struct lu_object * o)536 static int echo_object_print(const struct lu_env *env, void *cookie,
537 			    lu_printer_t p, const struct lu_object *o)
538 {
539 	struct echo_object *obj = cl2echo_obj(lu2cl(o));
540 
541 	return (*p)(env, cookie, "echoclient-object@%p", obj);
542 }
543 
544 static const struct lu_object_operations echo_lu_obj_ops = {
545 	.loo_object_init      = echo_object_init,
546 	.loo_object_delete    = NULL,
547 	.loo_object_release   = NULL,
548 	.loo_object_free      = echo_object_free,
549 	.loo_object_print     = echo_object_print,
550 	.loo_object_invariant = NULL
551 };
552 
553 /** @} echo_lu_ops */
554 
555 /** \defgroup echo_lu_dev_ops  lu_device operations
556  *
557  * Operations for echo lu device.
558  *
559  * @{
560  */
echo_object_alloc(const struct lu_env * env,const struct lu_object_header * hdr,struct lu_device * dev)561 static struct lu_object *echo_object_alloc(const struct lu_env *env,
562 					   const struct lu_object_header *hdr,
563 					   struct lu_device *dev)
564 {
565 	struct echo_object *eco;
566 	struct lu_object *obj = NULL;
567 
568 	/* we're the top dev. */
569 	LASSERT(hdr == NULL);
570 	eco = kmem_cache_alloc(echo_object_kmem, GFP_NOFS | __GFP_ZERO);
571 	if (eco != NULL) {
572 		struct cl_object_header *hdr = &eco->eo_hdr;
573 
574 		obj = &echo_obj2cl(eco)->co_lu;
575 		cl_object_header_init(hdr);
576 		lu_object_init(obj, &hdr->coh_lu, dev);
577 		lu_object_add_top(&hdr->coh_lu, obj);
578 
579 		eco->eo_cl.co_ops = &echo_cl_obj_ops;
580 		obj->lo_ops       = &echo_lu_obj_ops;
581 	}
582 	return obj;
583 }
584 
585 static struct lu_device_operations echo_device_lu_ops = {
586 	.ldo_object_alloc   = echo_object_alloc,
587 };
588 
589 /** @} echo_lu_dev_ops */
590 
591 static struct cl_device_operations echo_device_cl_ops = {
592 };
593 
594 /** \defgroup echo_init Setup and teardown
595  *
596  * Init and fini functions for echo client.
597  *
598  * @{
599  */
echo_site_init(const struct lu_env * env,struct echo_device * ed)600 static int echo_site_init(const struct lu_env *env, struct echo_device *ed)
601 {
602 	struct cl_site *site = &ed->ed_site_myself;
603 	int rc;
604 
605 	/* initialize site */
606 	rc = cl_site_init(site, &ed->ed_cl);
607 	if (rc) {
608 		CERROR("Cannot initialize site for echo client(%d)\n", rc);
609 		return rc;
610 	}
611 
612 	rc = lu_site_init_finish(&site->cs_lu);
613 	if (rc)
614 		return rc;
615 
616 	ed->ed_site = site;
617 	return 0;
618 }
619 
echo_site_fini(const struct lu_env * env,struct echo_device * ed)620 static void echo_site_fini(const struct lu_env *env, struct echo_device *ed)
621 {
622 	if (ed->ed_site) {
623 		cl_site_fini(ed->ed_site);
624 		ed->ed_site = NULL;
625 	}
626 }
627 
echo_thread_key_init(const struct lu_context * ctx,struct lu_context_key * key)628 static void *echo_thread_key_init(const struct lu_context *ctx,
629 			  struct lu_context_key *key)
630 {
631 	struct echo_thread_info *info;
632 
633 	info = kmem_cache_alloc(echo_thread_kmem, GFP_NOFS | __GFP_ZERO);
634 	if (info == NULL)
635 		info = ERR_PTR(-ENOMEM);
636 	return info;
637 }
638 
echo_thread_key_fini(const struct lu_context * ctx,struct lu_context_key * key,void * data)639 static void echo_thread_key_fini(const struct lu_context *ctx,
640 			 struct lu_context_key *key, void *data)
641 {
642 	struct echo_thread_info *info = data;
643 
644 	kmem_cache_free(echo_thread_kmem, info);
645 }
646 
echo_thread_key_exit(const struct lu_context * ctx,struct lu_context_key * key,void * data)647 static void echo_thread_key_exit(const struct lu_context *ctx,
648 			 struct lu_context_key *key, void *data)
649 {
650 }
651 
652 static struct lu_context_key echo_thread_key = {
653 	.lct_tags = LCT_CL_THREAD,
654 	.lct_init = echo_thread_key_init,
655 	.lct_fini = echo_thread_key_fini,
656 	.lct_exit = echo_thread_key_exit
657 };
658 
echo_session_key_init(const struct lu_context * ctx,struct lu_context_key * key)659 static void *echo_session_key_init(const struct lu_context *ctx,
660 				  struct lu_context_key *key)
661 {
662 	struct echo_session_info *session;
663 
664 	session = kmem_cache_alloc(echo_session_kmem, GFP_NOFS | __GFP_ZERO);
665 	if (session == NULL)
666 		session = ERR_PTR(-ENOMEM);
667 	return session;
668 }
669 
echo_session_key_fini(const struct lu_context * ctx,struct lu_context_key * key,void * data)670 static void echo_session_key_fini(const struct lu_context *ctx,
671 				 struct lu_context_key *key, void *data)
672 {
673 	struct echo_session_info *session = data;
674 
675 	kmem_cache_free(echo_session_kmem, session);
676 }
677 
echo_session_key_exit(const struct lu_context * ctx,struct lu_context_key * key,void * data)678 static void echo_session_key_exit(const struct lu_context *ctx,
679 				 struct lu_context_key *key, void *data)
680 {
681 }
682 
683 static struct lu_context_key echo_session_key = {
684 	.lct_tags = LCT_SESSION,
685 	.lct_init = echo_session_key_init,
686 	.lct_fini = echo_session_key_fini,
687 	.lct_exit = echo_session_key_exit
688 };
689 
690 LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
691 
echo_device_alloc(const struct lu_env * env,struct lu_device_type * t,struct lustre_cfg * cfg)692 static struct lu_device *echo_device_alloc(const struct lu_env *env,
693 					   struct lu_device_type *t,
694 					   struct lustre_cfg *cfg)
695 {
696 	struct lu_device   *next;
697 	struct echo_device *ed;
698 	struct cl_device   *cd;
699 	struct obd_device  *obd = NULL; /* to keep compiler happy */
700 	struct obd_device  *tgt;
701 	const char *tgt_type_name;
702 	int rc;
703 	int cleanup = 0;
704 
705 	ed = kzalloc(sizeof(*ed), GFP_NOFS);
706 	if (!ed) {
707 		rc = -ENOMEM;
708 		goto out;
709 	}
710 
711 	cleanup = 1;
712 	cd = &ed->ed_cl;
713 	rc = cl_device_init(cd, t);
714 	if (rc)
715 		goto out;
716 
717 	cd->cd_lu_dev.ld_ops = &echo_device_lu_ops;
718 	cd->cd_ops = &echo_device_cl_ops;
719 
720 	cleanup = 2;
721 	obd = class_name2obd(lustre_cfg_string(cfg, 0));
722 	LASSERT(obd != NULL);
723 	LASSERT(env != NULL);
724 
725 	tgt = class_name2obd(lustre_cfg_string(cfg, 1));
726 	if (tgt == NULL) {
727 		CERROR("Can not find tgt device %s\n",
728 			lustre_cfg_string(cfg, 1));
729 		rc = -ENODEV;
730 		goto out;
731 	}
732 
733 	next = tgt->obd_lu_dev;
734 	if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
735 		CERROR("echo MDT client must be run on server\n");
736 		rc = -EOPNOTSUPP;
737 		goto out;
738 	}
739 
740 	rc = echo_site_init(env, ed);
741 	if (rc)
742 		goto out;
743 
744 	cleanup = 3;
745 
746 	rc = echo_client_setup(env, obd, cfg);
747 	if (rc)
748 		goto out;
749 
750 	ed->ed_ec = &obd->u.echo_client;
751 	cleanup = 4;
752 
753 	/* if echo client is to be stacked upon ost device, the next is
754 	 * NULL since ost is not a clio device so far */
755 	if (next != NULL && !lu_device_is_cl(next))
756 		next = NULL;
757 
758 	tgt_type_name = tgt->obd_type->typ_name;
759 	if (next != NULL) {
760 		LASSERT(next != NULL);
761 		if (next->ld_site != NULL) {
762 			rc = -EBUSY;
763 			goto out;
764 		}
765 
766 		next->ld_site = &ed->ed_site->cs_lu;
767 		rc = next->ld_type->ldt_ops->ldto_device_init(env, next,
768 						next->ld_type->ldt_name,
769 							      NULL);
770 		if (rc)
771 			goto out;
772 
773 		/* Tricky case, I have to determine the obd type since
774 		 * CLIO uses the different parameters to initialize
775 		 * objects for lov & osc. */
776 		if (strcmp(tgt_type_name, LUSTRE_LOV_NAME) == 0)
777 			ed->ed_next_islov = 1;
778 		else
779 			LASSERT(strcmp(tgt_type_name,
780 				       LUSTRE_OSC_NAME) == 0);
781 	} else {
782 		LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0);
783 	}
784 
785 	ed->ed_next = next;
786 	return &cd->cd_lu_dev;
787 out:
788 	switch (cleanup) {
789 	case 4: {
790 		int rc2;
791 
792 		rc2 = echo_client_cleanup(obd);
793 		if (rc2)
794 			CERROR("Cleanup obd device %s error(%d)\n",
795 			       obd->obd_name, rc2);
796 	}
797 
798 	case 3:
799 		echo_site_fini(env, ed);
800 	case 2:
801 		cl_device_fini(&ed->ed_cl);
802 	case 1:
803 		kfree(ed);
804 	case 0:
805 	default:
806 		break;
807 	}
808 	return ERR_PTR(rc);
809 }
810 
echo_device_init(const struct lu_env * env,struct lu_device * d,const char * name,struct lu_device * next)811 static int echo_device_init(const struct lu_env *env, struct lu_device *d,
812 			  const char *name, struct lu_device *next)
813 {
814 	LBUG();
815 	return 0;
816 }
817 
echo_device_fini(const struct lu_env * env,struct lu_device * d)818 static struct lu_device *echo_device_fini(const struct lu_env *env,
819 					  struct lu_device *d)
820 {
821 	struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
822 	struct lu_device *next = ed->ed_next;
823 
824 	while (next)
825 		next = next->ld_type->ldt_ops->ldto_device_fini(env, next);
826 	return NULL;
827 }
828 
echo_lock_release(const struct lu_env * env,struct echo_lock * ecl,int still_used)829 static void echo_lock_release(const struct lu_env *env,
830 			      struct echo_lock *ecl,
831 			      int still_used)
832 {
833 	struct cl_lock *clk = echo_lock2cl(ecl);
834 
835 	cl_lock_get(clk);
836 	cl_unuse(env, clk);
837 	cl_lock_release(env, clk, "ec enqueue", ecl->el_object);
838 	if (!still_used) {
839 		cl_lock_mutex_get(env, clk);
840 		cl_lock_cancel(env, clk);
841 		cl_lock_delete(env, clk);
842 		cl_lock_mutex_put(env, clk);
843 	}
844 	cl_lock_put(env, clk);
845 }
846 
echo_device_free(const struct lu_env * env,struct lu_device * d)847 static struct lu_device *echo_device_free(const struct lu_env *env,
848 					  struct lu_device *d)
849 {
850 	struct echo_device     *ed   = cl2echo_dev(lu2cl_dev(d));
851 	struct echo_client_obd *ec   = ed->ed_ec;
852 	struct echo_object     *eco;
853 	struct lu_device       *next = ed->ed_next;
854 
855 	CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n",
856 	       ed, next);
857 
858 	lu_site_purge(env, &ed->ed_site->cs_lu, -1);
859 
860 	/* check if there are objects still alive.
861 	 * It shouldn't have any object because lu_site_purge would cleanup
862 	 * all of cached objects. Anyway, probably the echo device is being
863 	 * parallelly accessed.
864 	 */
865 	spin_lock(&ec->ec_lock);
866 	list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain)
867 		eco->eo_deleted = 1;
868 	spin_unlock(&ec->ec_lock);
869 
870 	/* purge again */
871 	lu_site_purge(env, &ed->ed_site->cs_lu, -1);
872 
873 	CDEBUG(D_INFO,
874 	       "Waiting for the reference of echo object to be dropped\n");
875 
876 	/* Wait for the last reference to be dropped. */
877 	spin_lock(&ec->ec_lock);
878 	while (!list_empty(&ec->ec_objects)) {
879 		spin_unlock(&ec->ec_lock);
880 		CERROR("echo_client still has objects at cleanup time, wait for 1 second\n");
881 		set_current_state(TASK_UNINTERRUPTIBLE);
882 		schedule_timeout(cfs_time_seconds(1));
883 		lu_site_purge(env, &ed->ed_site->cs_lu, -1);
884 		spin_lock(&ec->ec_lock);
885 	}
886 	spin_unlock(&ec->ec_lock);
887 
888 	LASSERT(list_empty(&ec->ec_locks));
889 
890 	CDEBUG(D_INFO, "No object exists, exiting...\n");
891 
892 	echo_client_cleanup(d->ld_obd);
893 
894 	while (next)
895 		next = next->ld_type->ldt_ops->ldto_device_free(env, next);
896 
897 	LASSERT(ed->ed_site == lu2cl_site(d->ld_site));
898 	echo_site_fini(env, ed);
899 	cl_device_fini(&ed->ed_cl);
900 	kfree(ed);
901 
902 	return NULL;
903 }
904 
905 static const struct lu_device_type_operations echo_device_type_ops = {
906 	.ldto_init = echo_type_init,
907 	.ldto_fini = echo_type_fini,
908 
909 	.ldto_start = echo_type_start,
910 	.ldto_stop  = echo_type_stop,
911 
912 	.ldto_device_alloc = echo_device_alloc,
913 	.ldto_device_free  = echo_device_free,
914 	.ldto_device_init  = echo_device_init,
915 	.ldto_device_fini  = echo_device_fini
916 };
917 
918 static struct lu_device_type echo_device_type = {
919 	.ldt_tags     = LU_DEVICE_CL,
920 	.ldt_name     = LUSTRE_ECHO_CLIENT_NAME,
921 	.ldt_ops      = &echo_device_type_ops,
922 	.ldt_ctx_tags = LCT_CL_THREAD,
923 };
924 
925 /** @} echo_init */
926 
927 /** \defgroup echo_exports Exported operations
928  *
929  * exporting functions to echo client
930  *
931  * @{
932  */
933 
934 /* Interfaces to echo client obd device */
cl_echo_object_find(struct echo_device * d,struct lov_stripe_md ** lsmp)935 static struct echo_object *cl_echo_object_find(struct echo_device *d,
936 					       struct lov_stripe_md **lsmp)
937 {
938 	struct lu_env *env;
939 	struct echo_thread_info *info;
940 	struct echo_object_conf *conf;
941 	struct lov_stripe_md    *lsm;
942 	struct echo_object *eco;
943 	struct cl_object   *obj;
944 	struct lu_fid *fid;
945 	int refcheck;
946 	int rc;
947 
948 	LASSERT(lsmp);
949 	lsm = *lsmp;
950 	LASSERT(lsm);
951 	LASSERTF(ostid_id(&lsm->lsm_oi) != 0, DOSTID"\n", POSTID(&lsm->lsm_oi));
952 	LASSERTF(ostid_seq(&lsm->lsm_oi) == FID_SEQ_ECHO, DOSTID"\n",
953 		 POSTID(&lsm->lsm_oi));
954 
955 	/* Never return an object if the obd is to be freed. */
956 	if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping)
957 		return ERR_PTR(-ENODEV);
958 
959 	env = cl_env_get(&refcheck);
960 	if (IS_ERR(env))
961 		return (void *)env;
962 
963 	info = echo_env_info(env);
964 	conf = &info->eti_conf;
965 	if (d->ed_next) {
966 		if (!d->ed_next_islov) {
967 			struct lov_oinfo *oinfo = lsm->lsm_oinfo[0];
968 
969 			LASSERT(oinfo != NULL);
970 			oinfo->loi_oi = lsm->lsm_oi;
971 			conf->eoc_cl.u.coc_oinfo = oinfo;
972 		} else {
973 			struct lustre_md *md;
974 
975 			md = &info->eti_md;
976 			memset(md, 0, sizeof(*md));
977 			md->lsm = lsm;
978 			conf->eoc_cl.u.coc_md = md;
979 		}
980 	}
981 	conf->eoc_md = lsmp;
982 
983 	fid  = &info->eti_fid;
984 	rc = ostid_to_fid(fid, &lsm->lsm_oi, 0);
985 	if (rc != 0) {
986 		eco = ERR_PTR(rc);
987 		goto out;
988 	}
989 
990 	/* In the function below, .hs_keycmp resolves to
991 	 * lu_obj_hop_keycmp() */
992 	/* coverity[overrun-buffer-val] */
993 	obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl);
994 	if (IS_ERR(obj)) {
995 		eco = (void *)obj;
996 		goto out;
997 	}
998 
999 	eco = cl2echo_obj(obj);
1000 	if (eco->eo_deleted) {
1001 		cl_object_put(env, obj);
1002 		eco = ERR_PTR(-EAGAIN);
1003 	}
1004 
1005 out:
1006 	cl_env_put(env, &refcheck);
1007 	return eco;
1008 }
1009 
cl_echo_object_put(struct echo_object * eco)1010 static int cl_echo_object_put(struct echo_object *eco)
1011 {
1012 	struct lu_env *env;
1013 	struct cl_object *obj = echo_obj2cl(eco);
1014 	int refcheck;
1015 
1016 	env = cl_env_get(&refcheck);
1017 	if (IS_ERR(env))
1018 		return PTR_ERR(env);
1019 
1020 	/* an external function to kill an object? */
1021 	if (eco->eo_deleted) {
1022 		struct lu_object_header *loh = obj->co_lu.lo_header;
1023 
1024 		LASSERT(&eco->eo_hdr == luh2coh(loh));
1025 		set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags);
1026 	}
1027 
1028 	cl_object_put(env, obj);
1029 	cl_env_put(env, &refcheck);
1030 	return 0;
1031 }
1032 
cl_echo_enqueue0(struct lu_env * env,struct echo_object * eco,u64 start,u64 end,int mode,__u64 * cookie,__u32 enqflags)1033 static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
1034 			    u64 start, u64 end, int mode,
1035 			    __u64 *cookie, __u32 enqflags)
1036 {
1037 	struct cl_io *io;
1038 	struct cl_lock *lck;
1039 	struct cl_object *obj;
1040 	struct cl_lock_descr *descr;
1041 	struct echo_thread_info *info;
1042 	int rc = -ENOMEM;
1043 
1044 	info = echo_env_info(env);
1045 	io = &info->eti_io;
1046 	descr = &info->eti_descr;
1047 	obj = echo_obj2cl(eco);
1048 
1049 	descr->cld_obj   = obj;
1050 	descr->cld_start = cl_index(obj, start);
1051 	descr->cld_end   = cl_index(obj, end);
1052 	descr->cld_mode  = mode == LCK_PW ? CLM_WRITE : CLM_READ;
1053 	descr->cld_enq_flags = enqflags;
1054 	io->ci_obj = obj;
1055 
1056 	lck = cl_lock_request(env, io, descr, "ec enqueue", eco);
1057 	if (lck) {
1058 		struct echo_client_obd *ec = eco->eo_dev->ed_ec;
1059 		struct echo_lock *el;
1060 
1061 		rc = cl_wait(env, lck);
1062 		if (rc == 0) {
1063 			el = cl2echo_lock(cl_lock_at(lck, &echo_device_type));
1064 			spin_lock(&ec->ec_lock);
1065 			if (list_empty(&el->el_chain)) {
1066 				list_add(&el->el_chain, &ec->ec_locks);
1067 				el->el_cookie = ++ec->ec_unique;
1068 			}
1069 			atomic_inc(&el->el_refcount);
1070 			*cookie = el->el_cookie;
1071 			spin_unlock(&ec->ec_lock);
1072 		} else {
1073 			cl_lock_release(env, lck, "ec enqueue", current);
1074 		}
1075 	}
1076 	return rc;
1077 }
1078 
cl_echo_enqueue(struct echo_object * eco,u64 start,u64 end,int mode,__u64 * cookie)1079 static int cl_echo_enqueue(struct echo_object *eco, u64 start, u64 end,
1080 			   int mode, __u64 *cookie)
1081 {
1082 	struct echo_thread_info *info;
1083 	struct lu_env *env;
1084 	struct cl_io *io;
1085 	int refcheck;
1086 	int result;
1087 
1088 	env = cl_env_get(&refcheck);
1089 	if (IS_ERR(env))
1090 		return PTR_ERR(env);
1091 
1092 	info = echo_env_info(env);
1093 	io = &info->eti_io;
1094 
1095 	io->ci_ignore_layout = 1;
1096 	result = cl_io_init(env, io, CIT_MISC, echo_obj2cl(eco));
1097 	if (result < 0)
1098 		goto out;
1099 	LASSERT(result == 0);
1100 
1101 	result = cl_echo_enqueue0(env, eco, start, end, mode, cookie, 0);
1102 	cl_io_fini(env, io);
1103 
1104 out:
1105 	cl_env_put(env, &refcheck);
1106 	return result;
1107 }
1108 
cl_echo_cancel0(struct lu_env * env,struct echo_device * ed,__u64 cookie)1109 static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed,
1110 			   __u64 cookie)
1111 {
1112 	struct echo_client_obd *ec = ed->ed_ec;
1113 	struct echo_lock       *ecl = NULL;
1114 	struct list_head	     *el;
1115 	int found = 0, still_used = 0;
1116 
1117 	LASSERT(ec != NULL);
1118 	spin_lock(&ec->ec_lock);
1119 	list_for_each(el, &ec->ec_locks) {
1120 		ecl = list_entry(el, struct echo_lock, el_chain);
1121 		CDEBUG(D_INFO, "ecl: %p, cookie: %#llx\n", ecl, ecl->el_cookie);
1122 		found = (ecl->el_cookie == cookie);
1123 		if (found) {
1124 			if (atomic_dec_and_test(&ecl->el_refcount))
1125 				list_del_init(&ecl->el_chain);
1126 			else
1127 				still_used = 1;
1128 			break;
1129 		}
1130 	}
1131 	spin_unlock(&ec->ec_lock);
1132 
1133 	if (!found)
1134 		return -ENOENT;
1135 
1136 	echo_lock_release(env, ecl, still_used);
1137 	return 0;
1138 }
1139 
cl_echo_cancel(struct echo_device * ed,__u64 cookie)1140 static int cl_echo_cancel(struct echo_device *ed, __u64 cookie)
1141 {
1142 	struct lu_env *env;
1143 	int refcheck;
1144 	int rc;
1145 
1146 	env = cl_env_get(&refcheck);
1147 	if (IS_ERR(env))
1148 		return PTR_ERR(env);
1149 
1150 	rc = cl_echo_cancel0(env, ed, cookie);
1151 
1152 	cl_env_put(env, &refcheck);
1153 	return rc;
1154 }
1155 
cl_echo_async_brw(const struct lu_env * env,struct cl_io * io,enum cl_req_type unused,struct cl_2queue * queue)1156 static int cl_echo_async_brw(const struct lu_env *env, struct cl_io *io,
1157 			     enum cl_req_type unused, struct cl_2queue *queue)
1158 {
1159 	struct cl_page *clp;
1160 	struct cl_page *temp;
1161 	int result = 0;
1162 
1163 	cl_page_list_for_each_safe(clp, temp, &queue->c2_qin) {
1164 		int rc;
1165 
1166 		rc = cl_page_cache_add(env, io, clp, CRT_WRITE);
1167 		if (rc == 0)
1168 			continue;
1169 		result = result ?: rc;
1170 	}
1171 	return result;
1172 }
1173 
cl_echo_object_brw(struct echo_object * eco,int rw,u64 offset,struct page ** pages,int npages,int async)1174 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
1175 			      struct page **pages, int npages, int async)
1176 {
1177 	struct lu_env	   *env;
1178 	struct echo_thread_info *info;
1179 	struct cl_object	*obj = echo_obj2cl(eco);
1180 	struct echo_device      *ed  = eco->eo_dev;
1181 	struct cl_2queue	*queue;
1182 	struct cl_io	    *io;
1183 	struct cl_page	  *clp;
1184 	struct lustre_handle    lh = { 0 };
1185 	int page_size = cl_page_size(obj);
1186 	int refcheck;
1187 	int rc;
1188 	int i;
1189 
1190 	LASSERT((offset & ~CFS_PAGE_MASK) == 0);
1191 	LASSERT(ed->ed_next != NULL);
1192 	env = cl_env_get(&refcheck);
1193 	if (IS_ERR(env))
1194 		return PTR_ERR(env);
1195 
1196 	info    = echo_env_info(env);
1197 	io      = &info->eti_io;
1198 	queue   = &info->eti_queue;
1199 
1200 	cl_2queue_init(queue);
1201 
1202 	io->ci_ignore_layout = 1;
1203 	rc = cl_io_init(env, io, CIT_MISC, obj);
1204 	if (rc < 0)
1205 		goto out;
1206 	LASSERT(rc == 0);
1207 
1208 	rc = cl_echo_enqueue0(env, eco, offset,
1209 			      offset + npages * PAGE_CACHE_SIZE - 1,
1210 			      rw == READ ? LCK_PR : LCK_PW, &lh.cookie,
1211 			      CEF_NEVER);
1212 	if (rc < 0)
1213 		goto error_lock;
1214 
1215 	for (i = 0; i < npages; i++) {
1216 		LASSERT(pages[i]);
1217 		clp = cl_page_find(env, obj, cl_index(obj, offset),
1218 				   pages[i], CPT_TRANSIENT);
1219 		if (IS_ERR(clp)) {
1220 			rc = PTR_ERR(clp);
1221 			break;
1222 		}
1223 		LASSERT(clp->cp_type == CPT_TRANSIENT);
1224 
1225 		rc = cl_page_own(env, io, clp);
1226 		if (rc) {
1227 			LASSERT(clp->cp_state == CPS_FREEING);
1228 			cl_page_put(env, clp);
1229 			break;
1230 		}
1231 
1232 		cl_2queue_add(queue, clp);
1233 
1234 		/* drop the reference count for cl_page_find, so that the page
1235 		 * will be freed in cl_2queue_fini. */
1236 		cl_page_put(env, clp);
1237 		cl_page_clip(env, clp, 0, page_size);
1238 
1239 		offset += page_size;
1240 	}
1241 
1242 	if (rc == 0) {
1243 		enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE;
1244 
1245 		async = async && (typ == CRT_WRITE);
1246 		if (async)
1247 			rc = cl_echo_async_brw(env, io, typ, queue);
1248 		else
1249 			rc = cl_io_submit_sync(env, io, typ, queue, 0);
1250 		CDEBUG(D_INFO, "echo_client %s write returns %d\n",
1251 		       async ? "async" : "sync", rc);
1252 	}
1253 
1254 	cl_echo_cancel0(env, ed, lh.cookie);
1255 error_lock:
1256 	cl_2queue_discard(env, io, queue);
1257 	cl_2queue_disown(env, io, queue);
1258 	cl_2queue_fini(env, queue);
1259 	cl_io_fini(env, io);
1260 out:
1261 	cl_env_put(env, &refcheck);
1262 	return rc;
1263 }
1264 
1265 /** @} echo_exports */
1266 
1267 static u64 last_object_id;
1268 
1269 static int
echo_copyout_lsm(struct lov_stripe_md * lsm,void * _ulsm,int ulsm_nob)1270 echo_copyout_lsm(struct lov_stripe_md *lsm, void *_ulsm, int ulsm_nob)
1271 {
1272 	struct lov_stripe_md *ulsm = _ulsm;
1273 	struct lov_oinfo **p;
1274 	int nob, i;
1275 
1276 	nob = offsetof(struct lov_stripe_md, lsm_oinfo[lsm->lsm_stripe_count]);
1277 	if (nob > ulsm_nob)
1278 		return -EINVAL;
1279 
1280 	if (copy_to_user(ulsm, lsm, sizeof(*ulsm)))
1281 		return -EFAULT;
1282 
1283 	for (i = 0, p = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, p++) {
1284 		struct lov_oinfo __user *up;
1285 		if (get_user(up, ulsm->lsm_oinfo + i) ||
1286 		    copy_to_user(up, *p, sizeof(struct lov_oinfo)))
1287 			return -EFAULT;
1288 	}
1289 	return 0;
1290 }
1291 
1292 static int
echo_copyin_lsm(struct echo_device * ed,struct lov_stripe_md * lsm,struct lov_stripe_md __user * ulsm,int ulsm_nob)1293 echo_copyin_lsm(struct echo_device *ed, struct lov_stripe_md *lsm,
1294 		struct lov_stripe_md __user *ulsm, int ulsm_nob)
1295 {
1296 	struct echo_client_obd *ec = ed->ed_ec;
1297 	struct lov_oinfo **p;
1298 	int		     i;
1299 
1300 	if (ulsm_nob < sizeof(*lsm))
1301 		return -EINVAL;
1302 
1303 	if (copy_from_user(lsm, ulsm, sizeof(*lsm)))
1304 		return -EFAULT;
1305 
1306 	if (lsm->lsm_stripe_count > ec->ec_nstripes ||
1307 	    lsm->lsm_magic != LOV_MAGIC ||
1308 	    (lsm->lsm_stripe_size & (~CFS_PAGE_MASK)) != 0 ||
1309 	    ((__u64)lsm->lsm_stripe_size * lsm->lsm_stripe_count > ~0UL))
1310 		return -EINVAL;
1311 
1312 	for (i = 0, p = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, p++) {
1313 		struct lov_oinfo __user *up;
1314 		if (get_user(up, ulsm->lsm_oinfo + i) ||
1315 		    copy_from_user(*p, up, sizeof(struct lov_oinfo)))
1316 			return -EFAULT;
1317 	}
1318 	return 0;
1319 }
1320 
echo_create_object(const struct lu_env * env,struct echo_device * ed,int on_target,struct obdo * oa,void * ulsm,int ulsm_nob,struct obd_trans_info * oti)1321 static int echo_create_object(const struct lu_env *env, struct echo_device *ed,
1322 			      int on_target, struct obdo *oa, void *ulsm,
1323 			      int ulsm_nob, struct obd_trans_info *oti)
1324 {
1325 	struct echo_object     *eco;
1326 	struct echo_client_obd *ec = ed->ed_ec;
1327 	struct lov_stripe_md   *lsm = NULL;
1328 	int		     rc;
1329 	int		     created = 0;
1330 
1331 	if ((oa->o_valid & OBD_MD_FLID) == 0 && /* no obj id */
1332 	    (on_target ||		       /* set_stripe */
1333 	     ec->ec_nstripes != 0)) {	   /* LOV */
1334 		CERROR("No valid oid\n");
1335 		return -EINVAL;
1336 	}
1337 
1338 	rc = echo_alloc_memmd(ed, &lsm);
1339 	if (rc < 0) {
1340 		CERROR("Cannot allocate md: rc = %d\n", rc);
1341 		goto failed;
1342 	}
1343 
1344 	if (ulsm != NULL) {
1345 		int i, idx;
1346 
1347 		rc = echo_copyin_lsm(ed, lsm, ulsm, ulsm_nob);
1348 		if (rc != 0)
1349 			goto failed;
1350 
1351 		if (lsm->lsm_stripe_count == 0)
1352 			lsm->lsm_stripe_count = ec->ec_nstripes;
1353 
1354 		if (lsm->lsm_stripe_size == 0)
1355 			lsm->lsm_stripe_size = PAGE_CACHE_SIZE;
1356 
1357 		idx = cfs_rand();
1358 
1359 		/* setup stripes: indices + default ids if required */
1360 		for (i = 0; i < lsm->lsm_stripe_count; i++) {
1361 			if (ostid_id(&lsm->lsm_oinfo[i]->loi_oi) == 0)
1362 				lsm->lsm_oinfo[i]->loi_oi = lsm->lsm_oi;
1363 
1364 			lsm->lsm_oinfo[i]->loi_ost_idx =
1365 				(idx + i) % ec->ec_nstripes;
1366 		}
1367 	}
1368 
1369 	/* setup object ID here for !on_target and LOV hint */
1370 	if (oa->o_valid & OBD_MD_FLID) {
1371 		LASSERT(oa->o_valid & OBD_MD_FLGROUP);
1372 		lsm->lsm_oi = oa->o_oi;
1373 	}
1374 
1375 	if (ostid_id(&lsm->lsm_oi) == 0)
1376 		ostid_set_id(&lsm->lsm_oi, ++last_object_id);
1377 
1378 	rc = 0;
1379 	if (on_target) {
1380 		/* Only echo objects are allowed to be created */
1381 		LASSERT((oa->o_valid & OBD_MD_FLGROUP) &&
1382 			(ostid_seq(&oa->o_oi) == FID_SEQ_ECHO));
1383 		rc = obd_create(env, ec->ec_exp, oa, &lsm, oti);
1384 		if (rc != 0) {
1385 			CERROR("Cannot create objects: rc = %d\n", rc);
1386 			goto failed;
1387 		}
1388 		created = 1;
1389 	}
1390 
1391 	/* See what object ID we were given */
1392 	oa->o_oi = lsm->lsm_oi;
1393 	oa->o_valid |= OBD_MD_FLID;
1394 
1395 	eco = cl_echo_object_find(ed, &lsm);
1396 	if (IS_ERR(eco)) {
1397 		rc = PTR_ERR(eco);
1398 		goto failed;
1399 	}
1400 	cl_echo_object_put(eco);
1401 
1402 	CDEBUG(D_INFO, "oa oid "DOSTID"\n", POSTID(&oa->o_oi));
1403 
1404  failed:
1405 	if (created && rc)
1406 		obd_destroy(env, ec->ec_exp, oa, lsm, oti, NULL);
1407 	if (lsm)
1408 		echo_free_memmd(ed, &lsm);
1409 	if (rc)
1410 		CERROR("create object failed with: rc = %d\n", rc);
1411 	return rc;
1412 }
1413 
echo_get_object(struct echo_object ** ecop,struct echo_device * ed,struct obdo * oa)1414 static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
1415 			   struct obdo *oa)
1416 {
1417 	struct lov_stripe_md   *lsm = NULL;
1418 	struct echo_object     *eco;
1419 	int		     rc;
1420 
1421 	if ((oa->o_valid & OBD_MD_FLID) == 0 || ostid_id(&oa->o_oi) == 0) {
1422 		/* disallow use of object id 0 */
1423 		CERROR("No valid oid\n");
1424 		return -EINVAL;
1425 	}
1426 
1427 	rc = echo_alloc_memmd(ed, &lsm);
1428 	if (rc < 0)
1429 		return rc;
1430 
1431 	lsm->lsm_oi = oa->o_oi;
1432 	if (!(oa->o_valid & OBD_MD_FLGROUP))
1433 		ostid_set_seq_echo(&lsm->lsm_oi);
1434 
1435 	rc = 0;
1436 	eco = cl_echo_object_find(ed, &lsm);
1437 	if (!IS_ERR(eco))
1438 		*ecop = eco;
1439 	else
1440 		rc = PTR_ERR(eco);
1441 	if (lsm)
1442 		echo_free_memmd(ed, &lsm);
1443 	return rc;
1444 }
1445 
echo_put_object(struct echo_object * eco)1446 static void echo_put_object(struct echo_object *eco)
1447 {
1448 	if (cl_echo_object_put(eco))
1449 		CERROR("echo client: drop an object failed");
1450 }
1451 
1452 static void
echo_get_stripe_off_id(struct lov_stripe_md * lsm,u64 * offp,u64 * idp)1453 echo_get_stripe_off_id(struct lov_stripe_md *lsm, u64 *offp, u64 *idp)
1454 {
1455 	unsigned long stripe_count;
1456 	unsigned long stripe_size;
1457 	unsigned long width;
1458 	unsigned long woffset;
1459 	int	   stripe_index;
1460 	u64       offset;
1461 
1462 	if (lsm->lsm_stripe_count <= 1)
1463 		return;
1464 
1465 	offset       = *offp;
1466 	stripe_size  = lsm->lsm_stripe_size;
1467 	stripe_count = lsm->lsm_stripe_count;
1468 
1469 	/* width = # bytes in all stripes */
1470 	width = stripe_size * stripe_count;
1471 
1472 	/* woffset = offset within a width; offset = whole number of widths */
1473 	woffset = do_div(offset, width);
1474 
1475 	stripe_index = woffset / stripe_size;
1476 
1477 	*idp = ostid_id(&lsm->lsm_oinfo[stripe_index]->loi_oi);
1478 	*offp = offset * stripe_size + woffset % stripe_size;
1479 }
1480 
1481 static void
echo_client_page_debug_setup(struct lov_stripe_md * lsm,struct page * page,int rw,u64 id,u64 offset,u64 count)1482 echo_client_page_debug_setup(struct lov_stripe_md *lsm,
1483 			     struct page *page, int rw, u64 id,
1484 			     u64 offset, u64 count)
1485 {
1486 	char    *addr;
1487 	u64	 stripe_off;
1488 	u64	 stripe_id;
1489 	int      delta;
1490 
1491 	/* no partial pages on the client */
1492 	LASSERT(count == PAGE_CACHE_SIZE);
1493 
1494 	addr = kmap(page);
1495 
1496 	for (delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
1497 		if (rw == OBD_BRW_WRITE) {
1498 			stripe_off = offset + delta;
1499 			stripe_id = id;
1500 			echo_get_stripe_off_id(lsm, &stripe_off, &stripe_id);
1501 		} else {
1502 			stripe_off = 0xdeadbeef00c0ffeeULL;
1503 			stripe_id = 0xdeadbeef00c0ffeeULL;
1504 		}
1505 		block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
1506 				  stripe_off, stripe_id);
1507 	}
1508 
1509 	kunmap(page);
1510 }
1511 
echo_client_page_debug_check(struct lov_stripe_md * lsm,struct page * page,u64 id,u64 offset,u64 count)1512 static int echo_client_page_debug_check(struct lov_stripe_md *lsm,
1513 					struct page *page, u64 id,
1514 					u64 offset, u64 count)
1515 {
1516 	u64	stripe_off;
1517 	u64	stripe_id;
1518 	char   *addr;
1519 	int     delta;
1520 	int     rc;
1521 	int     rc2;
1522 
1523 	/* no partial pages on the client */
1524 	LASSERT(count == PAGE_CACHE_SIZE);
1525 
1526 	addr = kmap(page);
1527 
1528 	for (rc = delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
1529 		stripe_off = offset + delta;
1530 		stripe_id = id;
1531 		echo_get_stripe_off_id(lsm, &stripe_off, &stripe_id);
1532 
1533 		rc2 = block_debug_check("test_brw",
1534 					addr + delta, OBD_ECHO_BLOCK_SIZE,
1535 					stripe_off, stripe_id);
1536 		if (rc2 != 0) {
1537 			CERROR("Error in echo object %#llx\n", id);
1538 			rc = rc2;
1539 		}
1540 	}
1541 
1542 	kunmap(page);
1543 	return rc;
1544 }
1545 
echo_client_kbrw(struct echo_device * ed,int rw,struct obdo * oa,struct echo_object * eco,u64 offset,u64 count,int async,struct obd_trans_info * oti)1546 static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
1547 			    struct echo_object *eco, u64 offset,
1548 			    u64 count, int async,
1549 			    struct obd_trans_info *oti)
1550 {
1551 	struct lov_stripe_md   *lsm = eco->eo_lsm;
1552 	u32	       npages;
1553 	struct brw_page	*pga;
1554 	struct brw_page	*pgp;
1555 	struct page	    **pages;
1556 	u64		 off;
1557 	int		     i;
1558 	int		     rc;
1559 	int		     verify;
1560 	gfp_t		     gfp_mask;
1561 	int		     brw_flags = 0;
1562 
1563 	verify = (ostid_id(&oa->o_oi) != ECHO_PERSISTENT_OBJID &&
1564 		  (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
1565 		  (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
1566 
1567 	gfp_mask = ((ostid_id(&oa->o_oi) & 2) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
1568 
1569 	LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
1570 	LASSERT(lsm != NULL);
1571 	LASSERT(ostid_id(&lsm->lsm_oi) == ostid_id(&oa->o_oi));
1572 
1573 	if (count <= 0 ||
1574 	    (count & (~CFS_PAGE_MASK)) != 0)
1575 		return -EINVAL;
1576 
1577 	/* XXX think again with misaligned I/O */
1578 	npages = count >> PAGE_CACHE_SHIFT;
1579 
1580 	if (rw == OBD_BRW_WRITE)
1581 		brw_flags = OBD_BRW_ASYNC;
1582 
1583 	pga = kcalloc(npages, sizeof(*pga), GFP_NOFS);
1584 	if (pga == NULL)
1585 		return -ENOMEM;
1586 
1587 	pages = kcalloc(npages, sizeof(*pages), GFP_NOFS);
1588 	if (pages == NULL) {
1589 		kfree(pga);
1590 		return -ENOMEM;
1591 	}
1592 
1593 	for (i = 0, pgp = pga, off = offset;
1594 	     i < npages;
1595 	     i++, pgp++, off += PAGE_CACHE_SIZE) {
1596 
1597 		LASSERT(pgp->pg == NULL);      /* for cleanup */
1598 
1599 		rc = -ENOMEM;
1600 		pgp->pg = alloc_page(gfp_mask);
1601 		if (pgp->pg == NULL)
1602 			goto out;
1603 
1604 		pages[i] = pgp->pg;
1605 		pgp->count = PAGE_CACHE_SIZE;
1606 		pgp->off = off;
1607 		pgp->flag = brw_flags;
1608 
1609 		if (verify)
1610 			echo_client_page_debug_setup(lsm, pgp->pg, rw,
1611 						     ostid_id(&oa->o_oi), off,
1612 						     pgp->count);
1613 	}
1614 
1615 	/* brw mode can only be used at client */
1616 	LASSERT(ed->ed_next != NULL);
1617 	rc = cl_echo_object_brw(eco, rw, offset, pages, npages, async);
1618 
1619  out:
1620 	if (rc != 0 || rw != OBD_BRW_READ)
1621 		verify = 0;
1622 
1623 	for (i = 0, pgp = pga; i < npages; i++, pgp++) {
1624 		if (pgp->pg == NULL)
1625 			continue;
1626 
1627 		if (verify) {
1628 			int vrc;
1629 
1630 			vrc = echo_client_page_debug_check(lsm, pgp->pg,
1631 							   ostid_id(&oa->o_oi),
1632 							   pgp->off, pgp->count);
1633 			if (vrc != 0 && rc == 0)
1634 				rc = vrc;
1635 		}
1636 		__free_page(pgp->pg);
1637 	}
1638 	kfree(pga);
1639 	kfree(pages);
1640 	return rc;
1641 }
1642 
echo_client_prep_commit(const struct lu_env * env,struct obd_export * exp,int rw,struct obdo * oa,struct echo_object * eco,u64 offset,u64 count,u64 batch,struct obd_trans_info * oti,int async)1643 static int echo_client_prep_commit(const struct lu_env *env,
1644 				   struct obd_export *exp, int rw,
1645 				   struct obdo *oa, struct echo_object *eco,
1646 				   u64 offset, u64 count,
1647 				   u64 batch, struct obd_trans_info *oti,
1648 				   int async)
1649 {
1650 	struct lov_stripe_md *lsm = eco->eo_lsm;
1651 	struct obd_ioobj ioo;
1652 	struct niobuf_local *lnb;
1653 	struct niobuf_remote *rnb;
1654 	u64 off;
1655 	u64 npages, tot_pages;
1656 	int i, ret = 0, brw_flags = 0;
1657 
1658 	if (count <= 0 || (count & (~CFS_PAGE_MASK)) != 0 ||
1659 	    (lsm != NULL && ostid_id(&lsm->lsm_oi) != ostid_id(&oa->o_oi)))
1660 		return -EINVAL;
1661 
1662 	npages = batch >> PAGE_CACHE_SHIFT;
1663 	tot_pages = count >> PAGE_CACHE_SHIFT;
1664 
1665 	lnb = kcalloc(npages, sizeof(struct niobuf_local), GFP_NOFS);
1666 	rnb = kcalloc(npages, sizeof(struct niobuf_remote), GFP_NOFS);
1667 
1668 	if (lnb == NULL || rnb == NULL) {
1669 		ret = -ENOMEM;
1670 		goto out;
1671 	}
1672 
1673 	if (rw == OBD_BRW_WRITE && async)
1674 		brw_flags |= OBD_BRW_ASYNC;
1675 
1676 	obdo_to_ioobj(oa, &ioo);
1677 
1678 	off = offset;
1679 
1680 	for (; tot_pages; tot_pages -= npages) {
1681 		int lpages;
1682 
1683 		if (tot_pages < npages)
1684 			npages = tot_pages;
1685 
1686 		for (i = 0; i < npages; i++, off += PAGE_CACHE_SIZE) {
1687 			rnb[i].offset = off;
1688 			rnb[i].len = PAGE_CACHE_SIZE;
1689 			rnb[i].flags = brw_flags;
1690 		}
1691 
1692 		ioo.ioo_bufcnt = npages;
1693 		oti->oti_transno = 0;
1694 
1695 		lpages = npages;
1696 		ret = obd_preprw(env, rw, exp, oa, 1, &ioo, rnb, &lpages,
1697 				 lnb, oti);
1698 		if (ret != 0)
1699 			goto out;
1700 		LASSERT(lpages == npages);
1701 
1702 		for (i = 0; i < lpages; i++) {
1703 			struct page *page = lnb[i].page;
1704 
1705 			/* read past eof? */
1706 			if (page == NULL && lnb[i].rc == 0)
1707 				continue;
1708 
1709 			if (async)
1710 				lnb[i].flags |= OBD_BRW_ASYNC;
1711 
1712 			if (ostid_id(&oa->o_oi) == ECHO_PERSISTENT_OBJID ||
1713 			    (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
1714 			    (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
1715 				continue;
1716 
1717 			if (rw == OBD_BRW_WRITE)
1718 				echo_client_page_debug_setup(lsm, page, rw,
1719 							    ostid_id(&oa->o_oi),
1720 							     rnb[i].offset,
1721 							     rnb[i].len);
1722 			else
1723 				echo_client_page_debug_check(lsm, page,
1724 							    ostid_id(&oa->o_oi),
1725 							     rnb[i].offset,
1726 							     rnb[i].len);
1727 		}
1728 
1729 		ret = obd_commitrw(env, rw, exp, oa, 1, &ioo,
1730 				   rnb, npages, lnb, oti, ret);
1731 		if (ret != 0)
1732 			goto out;
1733 
1734 		/* Reset oti otherwise it would confuse ldiskfs. */
1735 		memset(oti, 0, sizeof(*oti));
1736 
1737 		/* Reuse env context. */
1738 		lu_context_exit((struct lu_context *)&env->le_ctx);
1739 		lu_context_enter((struct lu_context *)&env->le_ctx);
1740 	}
1741 
1742 out:
1743 	kfree(lnb);
1744 	kfree(rnb);
1745 	return ret;
1746 }
1747 
echo_client_brw_ioctl(const struct lu_env * env,int rw,struct obd_export * exp,struct obd_ioctl_data * data,struct obd_trans_info * dummy_oti)1748 static int echo_client_brw_ioctl(const struct lu_env *env, int rw,
1749 				 struct obd_export *exp,
1750 				 struct obd_ioctl_data *data,
1751 				 struct obd_trans_info *dummy_oti)
1752 {
1753 	struct obd_device *obd = class_exp2obd(exp);
1754 	struct echo_device *ed = obd2echo_dev(obd);
1755 	struct echo_client_obd *ec = ed->ed_ec;
1756 	struct obdo *oa = &data->ioc_obdo1;
1757 	struct echo_object *eco;
1758 	int rc;
1759 	int async = 1;
1760 	long test_mode;
1761 
1762 	LASSERT(oa->o_valid & OBD_MD_FLGROUP);
1763 
1764 	rc = echo_get_object(&eco, ed, oa);
1765 	if (rc)
1766 		return rc;
1767 
1768 	oa->o_valid &= ~OBD_MD_FLHANDLE;
1769 
1770 	/* OFD/obdfilter works only via prep/commit */
1771 	test_mode = (long)data->ioc_pbuf1;
1772 	if (test_mode == 1)
1773 		async = 0;
1774 
1775 	if (ed->ed_next == NULL && test_mode != 3) {
1776 		test_mode = 3;
1777 		data->ioc_plen1 = data->ioc_count;
1778 	}
1779 
1780 	/* Truncate batch size to maximum */
1781 	if (data->ioc_plen1 > PTLRPC_MAX_BRW_SIZE)
1782 		data->ioc_plen1 = PTLRPC_MAX_BRW_SIZE;
1783 
1784 	switch (test_mode) {
1785 	case 1:
1786 		/* fall through */
1787 	case 2:
1788 		rc = echo_client_kbrw(ed, rw, oa,
1789 				      eco, data->ioc_offset,
1790 				      data->ioc_count, async, dummy_oti);
1791 		break;
1792 	case 3:
1793 		rc = echo_client_prep_commit(env, ec->ec_exp, rw, oa,
1794 					     eco, data->ioc_offset,
1795 					     data->ioc_count, data->ioc_plen1,
1796 					     dummy_oti, async);
1797 		break;
1798 	default:
1799 		rc = -EINVAL;
1800 	}
1801 	echo_put_object(eco);
1802 	return rc;
1803 }
1804 
1805 static int
echo_client_enqueue(struct obd_export * exp,struct obdo * oa,int mode,u64 offset,u64 nob)1806 echo_client_enqueue(struct obd_export *exp, struct obdo *oa,
1807 		    int mode, u64 offset, u64 nob)
1808 {
1809 	struct echo_device     *ed = obd2echo_dev(exp->exp_obd);
1810 	struct lustre_handle   *ulh = &oa->o_handle;
1811 	struct echo_object     *eco;
1812 	u64		 end;
1813 	int		     rc;
1814 
1815 	if (ed->ed_next == NULL)
1816 		return -EOPNOTSUPP;
1817 
1818 	if (!(mode == LCK_PR || mode == LCK_PW))
1819 		return -EINVAL;
1820 
1821 	if ((offset & (~CFS_PAGE_MASK)) != 0 ||
1822 	    (nob & (~CFS_PAGE_MASK)) != 0)
1823 		return -EINVAL;
1824 
1825 	rc = echo_get_object(&eco, ed, oa);
1826 	if (rc != 0)
1827 		return rc;
1828 
1829 	end = (nob == 0) ? ((u64) -1) : (offset + nob - 1);
1830 	rc = cl_echo_enqueue(eco, offset, end, mode, &ulh->cookie);
1831 	if (rc == 0) {
1832 		oa->o_valid |= OBD_MD_FLHANDLE;
1833 		CDEBUG(D_INFO, "Cookie is %#llx\n", ulh->cookie);
1834 	}
1835 	echo_put_object(eco);
1836 	return rc;
1837 }
1838 
1839 static int
echo_client_cancel(struct obd_export * exp,struct obdo * oa)1840 echo_client_cancel(struct obd_export *exp, struct obdo *oa)
1841 {
1842 	struct echo_device *ed     = obd2echo_dev(exp->exp_obd);
1843 	__u64	       cookie = oa->o_handle.cookie;
1844 
1845 	if ((oa->o_valid & OBD_MD_FLHANDLE) == 0)
1846 		return -EINVAL;
1847 
1848 	CDEBUG(D_INFO, "Cookie is %#llx\n", cookie);
1849 	return cl_echo_cancel(ed, cookie);
1850 }
1851 
1852 static int
echo_client_iocontrol(unsigned int cmd,struct obd_export * exp,int len,void * karg,void * uarg)1853 echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1854 		      void *karg, void *uarg)
1855 {
1856 	struct obd_device      *obd = exp->exp_obd;
1857 	struct echo_device     *ed = obd2echo_dev(obd);
1858 	struct echo_client_obd *ec = ed->ed_ec;
1859 	struct echo_object     *eco;
1860 	struct obd_ioctl_data  *data = karg;
1861 	struct obd_trans_info   dummy_oti;
1862 	struct lu_env	  *env;
1863 	struct oti_req_ack_lock *ack_lock;
1864 	struct obdo	    *oa;
1865 	struct lu_fid	   fid;
1866 	int		     rw = OBD_BRW_READ;
1867 	int		     rc = 0;
1868 	int		     i;
1869 
1870 	memset(&dummy_oti, 0, sizeof(dummy_oti));
1871 
1872 	oa = &data->ioc_obdo1;
1873 	if (!(oa->o_valid & OBD_MD_FLGROUP)) {
1874 		oa->o_valid |= OBD_MD_FLGROUP;
1875 		ostid_set_seq_echo(&oa->o_oi);
1876 	}
1877 
1878 	/* This FID is unpacked just for validation at this point */
1879 	rc = ostid_to_fid(&fid, &oa->o_oi, 0);
1880 	if (rc < 0)
1881 		return rc;
1882 
1883 	env = kzalloc(sizeof(*env), GFP_NOFS);
1884 	if (!env)
1885 		return -ENOMEM;
1886 
1887 	rc = lu_env_init(env, LCT_DT_THREAD);
1888 	if (rc) {
1889 		rc = -ENOMEM;
1890 		goto out;
1891 	}
1892 
1893 	switch (cmd) {
1894 	case OBD_IOC_CREATE:		    /* may create echo object */
1895 		if (!capable(CFS_CAP_SYS_ADMIN)) {
1896 			rc = -EPERM;
1897 			goto out;
1898 		}
1899 
1900 		rc = echo_create_object(env, ed, 1, oa, data->ioc_pbuf1,
1901 					data->ioc_plen1, &dummy_oti);
1902 		goto out;
1903 
1904 	case OBD_IOC_DESTROY:
1905 		if (!capable(CFS_CAP_SYS_ADMIN)) {
1906 			rc = -EPERM;
1907 			goto out;
1908 		}
1909 
1910 		rc = echo_get_object(&eco, ed, oa);
1911 		if (rc == 0) {
1912 			rc = obd_destroy(env, ec->ec_exp, oa, eco->eo_lsm,
1913 					 &dummy_oti, NULL);
1914 			if (rc == 0)
1915 				eco->eo_deleted = 1;
1916 			echo_put_object(eco);
1917 		}
1918 		goto out;
1919 
1920 	case OBD_IOC_GETATTR:
1921 		rc = echo_get_object(&eco, ed, oa);
1922 		if (rc == 0) {
1923 			struct obd_info oinfo = { };
1924 
1925 			oinfo.oi_md = eco->eo_lsm;
1926 			oinfo.oi_oa = oa;
1927 			rc = obd_getattr(env, ec->ec_exp, &oinfo);
1928 			echo_put_object(eco);
1929 		}
1930 		goto out;
1931 
1932 	case OBD_IOC_SETATTR:
1933 		if (!capable(CFS_CAP_SYS_ADMIN)) {
1934 			rc = -EPERM;
1935 			goto out;
1936 		}
1937 
1938 		rc = echo_get_object(&eco, ed, oa);
1939 		if (rc == 0) {
1940 			struct obd_info oinfo = { };
1941 
1942 			oinfo.oi_oa = oa;
1943 			oinfo.oi_md = eco->eo_lsm;
1944 
1945 			rc = obd_setattr(env, ec->ec_exp, &oinfo, NULL);
1946 			echo_put_object(eco);
1947 		}
1948 		goto out;
1949 
1950 	case OBD_IOC_BRW_WRITE:
1951 		if (!capable(CFS_CAP_SYS_ADMIN)) {
1952 			rc = -EPERM;
1953 			goto out;
1954 		}
1955 
1956 		rw = OBD_BRW_WRITE;
1957 		/* fall through */
1958 	case OBD_IOC_BRW_READ:
1959 		rc = echo_client_brw_ioctl(env, rw, exp, data, &dummy_oti);
1960 		goto out;
1961 
1962 	case ECHO_IOC_GET_STRIPE:
1963 		rc = echo_get_object(&eco, ed, oa);
1964 		if (rc == 0) {
1965 			rc = echo_copyout_lsm(eco->eo_lsm, data->ioc_pbuf1,
1966 					      data->ioc_plen1);
1967 			echo_put_object(eco);
1968 		}
1969 		goto out;
1970 
1971 	case ECHO_IOC_SET_STRIPE:
1972 		if (!capable(CFS_CAP_SYS_ADMIN)) {
1973 			rc = -EPERM;
1974 			goto out;
1975 		}
1976 
1977 		if (data->ioc_pbuf1 == NULL) {  /* unset */
1978 			rc = echo_get_object(&eco, ed, oa);
1979 			if (rc == 0) {
1980 				eco->eo_deleted = 1;
1981 				echo_put_object(eco);
1982 			}
1983 		} else {
1984 			rc = echo_create_object(env, ed, 0, oa,
1985 						data->ioc_pbuf1,
1986 						data->ioc_plen1, &dummy_oti);
1987 		}
1988 		goto out;
1989 
1990 	case ECHO_IOC_ENQUEUE:
1991 		if (!capable(CFS_CAP_SYS_ADMIN)) {
1992 			rc = -EPERM;
1993 			goto out;
1994 		}
1995 
1996 		rc = echo_client_enqueue(exp, oa,
1997 					 data->ioc_conn1, /* lock mode */
1998 					 data->ioc_offset,
1999 					 data->ioc_count);/*extent*/
2000 		goto out;
2001 
2002 	case ECHO_IOC_CANCEL:
2003 		rc = echo_client_cancel(exp, oa);
2004 		goto out;
2005 
2006 	default:
2007 		CERROR("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
2008 		rc = -ENOTTY;
2009 		goto out;
2010 	}
2011 
2012 out:
2013 	lu_env_fini(env);
2014 	kfree(env);
2015 
2016 	/* XXX this should be in a helper also called by target_send_reply */
2017 	for (ack_lock = dummy_oti.oti_ack_locks, i = 0; i < 4;
2018 	     i++, ack_lock++) {
2019 		if (!ack_lock->mode)
2020 			break;
2021 		ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
2022 	}
2023 
2024 	return rc;
2025 }
2026 
echo_client_setup(const struct lu_env * env,struct obd_device * obddev,struct lustre_cfg * lcfg)2027 static int echo_client_setup(const struct lu_env *env,
2028 			     struct obd_device *obddev, struct lustre_cfg *lcfg)
2029 {
2030 	struct echo_client_obd *ec = &obddev->u.echo_client;
2031 	struct obd_device *tgt;
2032 	struct obd_uuid echo_uuid = { "ECHO_UUID" };
2033 	struct obd_connect_data *ocd = NULL;
2034 	int rc;
2035 
2036 	if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
2037 		CERROR("requires a TARGET OBD name\n");
2038 		return -EINVAL;
2039 	}
2040 
2041 	tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
2042 	if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
2043 		CERROR("device not attached or not set up (%s)\n",
2044 		       lustre_cfg_string(lcfg, 1));
2045 		return -EINVAL;
2046 	}
2047 
2048 	spin_lock_init(&ec->ec_lock);
2049 	INIT_LIST_HEAD(&ec->ec_objects);
2050 	INIT_LIST_HEAD(&ec->ec_locks);
2051 	ec->ec_unique = 0;
2052 	ec->ec_nstripes = 0;
2053 
2054 	ocd = kzalloc(sizeof(*ocd), GFP_NOFS);
2055 	if (!ocd) {
2056 		CERROR("Can't alloc ocd connecting to %s\n",
2057 		       lustre_cfg_string(lcfg, 1));
2058 		return -ENOMEM;
2059 	}
2060 
2061 	ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
2062 				 OBD_CONNECT_BRW_SIZE |
2063 				 OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
2064 				 OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE |
2065 				 OBD_CONNECT_FID;
2066 	ocd->ocd_brw_size = DT_MAX_BRW_SIZE;
2067 	ocd->ocd_version = LUSTRE_VERSION_CODE;
2068 	ocd->ocd_group = FID_SEQ_ECHO;
2069 
2070 	rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
2071 
2072 	kfree(ocd);
2073 
2074 	if (rc != 0) {
2075 		CERROR("fail to connect to device %s\n",
2076 		       lustre_cfg_string(lcfg, 1));
2077 		return rc;
2078 	}
2079 
2080 	return rc;
2081 }
2082 
echo_client_cleanup(struct obd_device * obddev)2083 static int echo_client_cleanup(struct obd_device *obddev)
2084 {
2085 	struct echo_client_obd *ec = &obddev->u.echo_client;
2086 	int rc;
2087 
2088 	if (!list_empty(&obddev->obd_exports)) {
2089 		CERROR("still has clients!\n");
2090 		return -EBUSY;
2091 	}
2092 
2093 	LASSERT(atomic_read(&ec->ec_exp->exp_refcount) > 0);
2094 	rc = obd_disconnect(ec->ec_exp);
2095 	if (rc != 0)
2096 		CERROR("fail to disconnect device: %d\n", rc);
2097 
2098 	return rc;
2099 }
2100 
echo_client_connect(const struct lu_env * env,struct obd_export ** exp,struct obd_device * src,struct obd_uuid * cluuid,struct obd_connect_data * data,void * localdata)2101 static int echo_client_connect(const struct lu_env *env,
2102 			       struct obd_export **exp,
2103 			       struct obd_device *src, struct obd_uuid *cluuid,
2104 			       struct obd_connect_data *data, void *localdata)
2105 {
2106 	int		rc;
2107 	struct lustre_handle conn = { 0 };
2108 
2109 	rc = class_connect(&conn, src, cluuid);
2110 	if (rc == 0) {
2111 		*exp = class_conn2export(&conn);
2112 	}
2113 
2114 	return rc;
2115 }
2116 
echo_client_disconnect(struct obd_export * exp)2117 static int echo_client_disconnect(struct obd_export *exp)
2118 {
2119 	int		     rc;
2120 
2121 	if (exp == NULL) {
2122 		rc = -EINVAL;
2123 		goto out;
2124 	}
2125 
2126 	rc = class_disconnect(exp);
2127 	goto out;
2128  out:
2129 	return rc;
2130 }
2131 
2132 static struct obd_ops echo_client_obd_ops = {
2133 	.o_owner       = THIS_MODULE,
2134 	.o_iocontrol   = echo_client_iocontrol,
2135 	.o_connect     = echo_client_connect,
2136 	.o_disconnect  = echo_client_disconnect
2137 };
2138 
echo_client_init(void)2139 static int echo_client_init(void)
2140 {
2141 	int rc;
2142 
2143 	rc = lu_kmem_init(echo_caches);
2144 	if (rc == 0) {
2145 		rc = class_register_type(&echo_client_obd_ops, NULL,
2146 					 LUSTRE_ECHO_CLIENT_NAME,
2147 					 &echo_device_type);
2148 		if (rc)
2149 			lu_kmem_fini(echo_caches);
2150 	}
2151 	return rc;
2152 }
2153 
echo_client_exit(void)2154 static void echo_client_exit(void)
2155 {
2156 	class_unregister_type(LUSTRE_ECHO_CLIENT_NAME);
2157 	lu_kmem_fini(echo_caches);
2158 }
2159 
obdecho_init(void)2160 static int __init obdecho_init(void)
2161 {
2162 	LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n");
2163 
2164 	LASSERT(PAGE_CACHE_SIZE % OBD_ECHO_BLOCK_SIZE == 0);
2165 
2166 	return echo_client_init();
2167 }
2168 
obdecho_exit(void)2169 static void /*__exit*/ obdecho_exit(void)
2170 {
2171 	echo_client_exit();
2172 
2173 }
2174 
2175 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2176 MODULE_DESCRIPTION("Lustre Testing Echo OBD driver");
2177 MODULE_LICENSE("GPL");
2178 MODULE_VERSION(LUSTRE_VERSION_STRING);
2179 
2180 module_init(obdecho_init);
2181 module_exit(obdecho_exit);
2182 
2183 /** @} echo_client */
2184