1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41 
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include "../include/obd_class.h"
44 #include "../include/lprocfs_status.h"
45 
46 spinlock_t obd_types_lock;
47 
48 struct kmem_cache *obd_device_cachep;
49 struct kmem_cache *obdo_cachep;
50 EXPORT_SYMBOL(obdo_cachep);
51 static struct kmem_cache *import_cachep;
52 
53 static struct list_head      obd_zombie_imports;
54 static struct list_head      obd_zombie_exports;
55 static spinlock_t  obd_zombie_impexp_lock;
56 static void obd_zombie_impexp_notify(void);
57 static void obd_zombie_export_add(struct obd_export *exp);
58 static void obd_zombie_import_add(struct obd_import *imp);
59 static void print_export_data(struct obd_export *exp,
60 			      const char *status, int locks);
61 
62 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
63 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
64 
65 /*
66  * support functions: we could use inter-module communication, but this
67  * is more portable to other OS's
68  */
obd_device_alloc(void)69 static struct obd_device *obd_device_alloc(void)
70 {
71 	struct obd_device *obd;
72 
73 	OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
74 	if (obd != NULL) {
75 		obd->obd_magic = OBD_DEVICE_MAGIC;
76 	}
77 	return obd;
78 }
79 
obd_device_free(struct obd_device * obd)80 static void obd_device_free(struct obd_device *obd)
81 {
82 	LASSERT(obd != NULL);
83 	LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
84 		 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
85 	if (obd->obd_namespace != NULL) {
86 		CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
87 		       obd, obd->obd_namespace, obd->obd_force);
88 		LBUG();
89 	}
90 	lu_ref_fini(&obd->obd_reference);
91 	OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
92 }
93 
class_search_type(const char * name)94 struct obd_type *class_search_type(const char *name)
95 {
96 	struct list_head *tmp;
97 	struct obd_type *type;
98 
99 	spin_lock(&obd_types_lock);
100 	list_for_each(tmp, &obd_types) {
101 		type = list_entry(tmp, struct obd_type, typ_chain);
102 		if (strcmp(type->typ_name, name) == 0) {
103 			spin_unlock(&obd_types_lock);
104 			return type;
105 		}
106 	}
107 	spin_unlock(&obd_types_lock);
108 	return NULL;
109 }
110 EXPORT_SYMBOL(class_search_type);
111 
class_get_type(const char * name)112 struct obd_type *class_get_type(const char *name)
113 {
114 	struct obd_type *type = class_search_type(name);
115 
116 	if (!type) {
117 		const char *modname = name;
118 
119 		if (strcmp(modname, "obdfilter") == 0)
120 			modname = "ofd";
121 
122 		if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
123 			modname = LUSTRE_OSP_NAME;
124 
125 		if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
126 			modname = LUSTRE_MDT_NAME;
127 
128 		if (!request_module("%s", modname)) {
129 			CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
130 			type = class_search_type(name);
131 		} else {
132 			LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
133 					   modname);
134 		}
135 	}
136 	if (type) {
137 		spin_lock(&type->obd_type_lock);
138 		type->typ_refcnt++;
139 		try_module_get(type->typ_dt_ops->o_owner);
140 		spin_unlock(&type->obd_type_lock);
141 	}
142 	return type;
143 }
144 EXPORT_SYMBOL(class_get_type);
145 
class_put_type(struct obd_type * type)146 void class_put_type(struct obd_type *type)
147 {
148 	LASSERT(type);
149 	spin_lock(&type->obd_type_lock);
150 	type->typ_refcnt--;
151 	module_put(type->typ_dt_ops->o_owner);
152 	spin_unlock(&type->obd_type_lock);
153 }
154 EXPORT_SYMBOL(class_put_type);
155 
156 #define CLASS_MAX_NAME 1024
157 
class_register_type(struct obd_ops * dt_ops,struct md_ops * md_ops,struct lprocfs_vars * vars,const char * name,struct lu_device_type * ldt)158 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
159 			struct lprocfs_vars *vars, const char *name,
160 			struct lu_device_type *ldt)
161 {
162 	struct obd_type *type;
163 	int rc = 0;
164 
165 	/* sanity check */
166 	LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
167 
168 	if (class_search_type(name)) {
169 		CDEBUG(D_IOCTL, "Type %s already registered\n", name);
170 		return -EEXIST;
171 	}
172 
173 	rc = -ENOMEM;
174 	OBD_ALLOC(type, sizeof(*type));
175 	if (type == NULL)
176 		return rc;
177 
178 	OBD_ALLOC_PTR(type->typ_dt_ops);
179 	OBD_ALLOC_PTR(type->typ_md_ops);
180 	OBD_ALLOC(type->typ_name, strlen(name) + 1);
181 
182 	if (type->typ_dt_ops == NULL ||
183 	    type->typ_md_ops == NULL ||
184 	    type->typ_name == NULL)
185 		goto failed;
186 
187 	*(type->typ_dt_ops) = *dt_ops;
188 	/* md_ops is optional */
189 	if (md_ops)
190 		*(type->typ_md_ops) = *md_ops;
191 	strcpy(type->typ_name, name);
192 	spin_lock_init(&type->obd_type_lock);
193 
194 	type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
195 					      vars, type);
196 	if (IS_ERR(type->typ_procroot)) {
197 		rc = PTR_ERR(type->typ_procroot);
198 		type->typ_procroot = NULL;
199 		goto failed;
200 	}
201 
202 	if (ldt != NULL) {
203 		type->typ_lu = ldt;
204 		rc = lu_device_type_init(ldt);
205 		if (rc != 0)
206 			goto failed;
207 	}
208 
209 	spin_lock(&obd_types_lock);
210 	list_add(&type->typ_chain, &obd_types);
211 	spin_unlock(&obd_types_lock);
212 
213 	return 0;
214 
215  failed:
216 	if (type->typ_name != NULL)
217 		OBD_FREE(type->typ_name, strlen(name) + 1);
218 	if (type->typ_md_ops != NULL)
219 		OBD_FREE_PTR(type->typ_md_ops);
220 	if (type->typ_dt_ops != NULL)
221 		OBD_FREE_PTR(type->typ_dt_ops);
222 	OBD_FREE(type, sizeof(*type));
223 	return rc;
224 }
225 EXPORT_SYMBOL(class_register_type);
226 
class_unregister_type(const char * name)227 int class_unregister_type(const char *name)
228 {
229 	struct obd_type *type = class_search_type(name);
230 
231 	if (!type) {
232 		CERROR("unknown obd type\n");
233 		return -EINVAL;
234 	}
235 
236 	if (type->typ_refcnt) {
237 		CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
238 		/* This is a bad situation, let's make the best of it */
239 		/* Remove ops, but leave the name for debugging */
240 		OBD_FREE_PTR(type->typ_dt_ops);
241 		OBD_FREE_PTR(type->typ_md_ops);
242 		return -EBUSY;
243 	}
244 
245 	if (type->typ_procroot) {
246 		lprocfs_remove(&type->typ_procroot);
247 	}
248 
249 	if (type->typ_lu)
250 		lu_device_type_fini(type->typ_lu);
251 
252 	spin_lock(&obd_types_lock);
253 	list_del(&type->typ_chain);
254 	spin_unlock(&obd_types_lock);
255 	OBD_FREE(type->typ_name, strlen(name) + 1);
256 	if (type->typ_dt_ops != NULL)
257 		OBD_FREE_PTR(type->typ_dt_ops);
258 	if (type->typ_md_ops != NULL)
259 		OBD_FREE_PTR(type->typ_md_ops);
260 	OBD_FREE(type, sizeof(*type));
261 	return 0;
262 } /* class_unregister_type */
263 EXPORT_SYMBOL(class_unregister_type);
264 
265 /**
266  * Create a new obd device.
267  *
268  * Find an empty slot in ::obd_devs[], create a new obd device in it.
269  *
270  * \param[in] type_name obd device type string.
271  * \param[in] name      obd device name.
272  *
273  * \retval NULL if create fails, otherwise return the obd device
274  *	 pointer created.
275  */
class_newdev(const char * type_name,const char * name)276 struct obd_device *class_newdev(const char *type_name, const char *name)
277 {
278 	struct obd_device *result = NULL;
279 	struct obd_device *newdev;
280 	struct obd_type *type = NULL;
281 	int i;
282 	int new_obd_minor = 0;
283 
284 	if (strlen(name) >= MAX_OBD_NAME) {
285 		CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
286 		return ERR_PTR(-EINVAL);
287 	}
288 
289 	type = class_get_type(type_name);
290 	if (type == NULL){
291 		CERROR("OBD: unknown type: %s\n", type_name);
292 		return ERR_PTR(-ENODEV);
293 	}
294 
295 	newdev = obd_device_alloc();
296 	if (newdev == NULL) {
297 		result = ERR_PTR(-ENOMEM);
298 		goto out_type;
299 	}
300 
301 	LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
302 
303 	write_lock(&obd_dev_lock);
304 	for (i = 0; i < class_devno_max(); i++) {
305 		struct obd_device *obd = class_num2obd(i);
306 
307 		if (obd && (strcmp(name, obd->obd_name) == 0)) {
308 			CERROR("Device %s already exists at %d, won't add\n",
309 			       name, i);
310 			if (result) {
311 				LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
312 					 "%p obd_magic %08x != %08x\n", result,
313 					 result->obd_magic, OBD_DEVICE_MAGIC);
314 				LASSERTF(result->obd_minor == new_obd_minor,
315 					 "%p obd_minor %d != %d\n", result,
316 					 result->obd_minor, new_obd_minor);
317 
318 				obd_devs[result->obd_minor] = NULL;
319 				result->obd_name[0] = '\0';
320 			 }
321 			result = ERR_PTR(-EEXIST);
322 			break;
323 		}
324 		if (!result && !obd) {
325 			result = newdev;
326 			result->obd_minor = i;
327 			new_obd_minor = i;
328 			result->obd_type = type;
329 			strncpy(result->obd_name, name,
330 				sizeof(result->obd_name) - 1);
331 			obd_devs[i] = result;
332 		}
333 	}
334 	write_unlock(&obd_dev_lock);
335 
336 	if (result == NULL && i >= class_devno_max()) {
337 		CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
338 		       class_devno_max());
339 		result = ERR_PTR(-EOVERFLOW);
340 		goto out;
341 	}
342 
343 	if (IS_ERR(result))
344 		goto out;
345 
346 	CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
347 	       result->obd_name, result);
348 
349 	return result;
350 out:
351 	obd_device_free(newdev);
352 out_type:
353 	class_put_type(type);
354 	return result;
355 }
356 
class_release_dev(struct obd_device * obd)357 void class_release_dev(struct obd_device *obd)
358 {
359 	struct obd_type *obd_type = obd->obd_type;
360 
361 	LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
362 		 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
363 	LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
364 		 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
365 	LASSERT(obd_type != NULL);
366 
367 	CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
368 	       obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
369 
370 	write_lock(&obd_dev_lock);
371 	obd_devs[obd->obd_minor] = NULL;
372 	write_unlock(&obd_dev_lock);
373 	obd_device_free(obd);
374 
375 	class_put_type(obd_type);
376 }
377 
class_name2dev(const char * name)378 int class_name2dev(const char *name)
379 {
380 	int i;
381 
382 	if (!name)
383 		return -1;
384 
385 	read_lock(&obd_dev_lock);
386 	for (i = 0; i < class_devno_max(); i++) {
387 		struct obd_device *obd = class_num2obd(i);
388 
389 		if (obd && strcmp(name, obd->obd_name) == 0) {
390 			/* Make sure we finished attaching before we give
391 			   out any references */
392 			LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
393 			if (obd->obd_attached) {
394 				read_unlock(&obd_dev_lock);
395 				return i;
396 			}
397 			break;
398 		}
399 	}
400 	read_unlock(&obd_dev_lock);
401 
402 	return -1;
403 }
404 EXPORT_SYMBOL(class_name2dev);
405 
class_name2obd(const char * name)406 struct obd_device *class_name2obd(const char *name)
407 {
408 	int dev = class_name2dev(name);
409 
410 	if (dev < 0 || dev > class_devno_max())
411 		return NULL;
412 	return class_num2obd(dev);
413 }
414 EXPORT_SYMBOL(class_name2obd);
415 
class_uuid2dev(struct obd_uuid * uuid)416 int class_uuid2dev(struct obd_uuid *uuid)
417 {
418 	int i;
419 
420 	read_lock(&obd_dev_lock);
421 	for (i = 0; i < class_devno_max(); i++) {
422 		struct obd_device *obd = class_num2obd(i);
423 
424 		if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
425 			LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
426 			read_unlock(&obd_dev_lock);
427 			return i;
428 		}
429 	}
430 	read_unlock(&obd_dev_lock);
431 
432 	return -1;
433 }
434 EXPORT_SYMBOL(class_uuid2dev);
435 
class_uuid2obd(struct obd_uuid * uuid)436 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
437 {
438 	int dev = class_uuid2dev(uuid);
439 	if (dev < 0)
440 		return NULL;
441 	return class_num2obd(dev);
442 }
443 EXPORT_SYMBOL(class_uuid2obd);
444 
445 /**
446  * Get obd device from ::obd_devs[]
447  *
448  * \param num [in] array index
449  *
450  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
451  *	 otherwise return the obd device there.
452  */
class_num2obd(int num)453 struct obd_device *class_num2obd(int num)
454 {
455 	struct obd_device *obd = NULL;
456 
457 	if (num < class_devno_max()) {
458 		obd = obd_devs[num];
459 		if (obd == NULL)
460 			return NULL;
461 
462 		LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
463 			 "%p obd_magic %08x != %08x\n",
464 			 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
465 		LASSERTF(obd->obd_minor == num,
466 			 "%p obd_minor %0d != %0d\n",
467 			 obd, obd->obd_minor, num);
468 	}
469 
470 	return obd;
471 }
472 EXPORT_SYMBOL(class_num2obd);
473 
474 /**
475  * Get obd devices count. Device in any
476  *    state are counted
477  * \retval obd device count
478  */
get_devices_count(void)479 int get_devices_count(void)
480 {
481 	int index, max_index = class_devno_max(), dev_count = 0;
482 
483 	read_lock(&obd_dev_lock);
484 	for (index = 0; index <= max_index; index++) {
485 		struct obd_device *obd = class_num2obd(index);
486 		if (obd != NULL)
487 			dev_count++;
488 	}
489 	read_unlock(&obd_dev_lock);
490 
491 	return dev_count;
492 }
493 EXPORT_SYMBOL(get_devices_count);
494 
class_obd_list(void)495 void class_obd_list(void)
496 {
497 	char *status;
498 	int i;
499 
500 	read_lock(&obd_dev_lock);
501 	for (i = 0; i < class_devno_max(); i++) {
502 		struct obd_device *obd = class_num2obd(i);
503 
504 		if (obd == NULL)
505 			continue;
506 		if (obd->obd_stopping)
507 			status = "ST";
508 		else if (obd->obd_set_up)
509 			status = "UP";
510 		else if (obd->obd_attached)
511 			status = "AT";
512 		else
513 			status = "--";
514 		LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
515 			 i, status, obd->obd_type->typ_name,
516 			 obd->obd_name, obd->obd_uuid.uuid,
517 			 atomic_read(&obd->obd_refcount));
518 	}
519 	read_unlock(&obd_dev_lock);
520 	return;
521 }
522 
523 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
524    specified, then only the client with that uuid is returned,
525    otherwise any client connected to the tgt is returned. */
class_find_client_obd(struct obd_uuid * tgt_uuid,const char * typ_name,struct obd_uuid * grp_uuid)526 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
527 					  const char *typ_name,
528 					  struct obd_uuid *grp_uuid)
529 {
530 	int i;
531 
532 	read_lock(&obd_dev_lock);
533 	for (i = 0; i < class_devno_max(); i++) {
534 		struct obd_device *obd = class_num2obd(i);
535 
536 		if (obd == NULL)
537 			continue;
538 		if ((strncmp(obd->obd_type->typ_name, typ_name,
539 			     strlen(typ_name)) == 0)) {
540 			if (obd_uuid_equals(tgt_uuid,
541 					    &obd->u.cli.cl_target_uuid) &&
542 			    ((grp_uuid)? obd_uuid_equals(grp_uuid,
543 							 &obd->obd_uuid) : 1)) {
544 				read_unlock(&obd_dev_lock);
545 				return obd;
546 			}
547 		}
548 	}
549 	read_unlock(&obd_dev_lock);
550 
551 	return NULL;
552 }
553 EXPORT_SYMBOL(class_find_client_obd);
554 
555 /* Iterate the obd_device list looking devices have grp_uuid. Start
556    searching at *next, and if a device is found, the next index to look
557    at is saved in *next. If next is NULL, then the first matching device
558    will always be returned. */
class_devices_in_group(struct obd_uuid * grp_uuid,int * next)559 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
560 {
561 	int i;
562 
563 	if (next == NULL)
564 		i = 0;
565 	else if (*next >= 0 && *next < class_devno_max())
566 		i = *next;
567 	else
568 		return NULL;
569 
570 	read_lock(&obd_dev_lock);
571 	for (; i < class_devno_max(); i++) {
572 		struct obd_device *obd = class_num2obd(i);
573 
574 		if (obd == NULL)
575 			continue;
576 		if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
577 			if (next != NULL)
578 				*next = i+1;
579 			read_unlock(&obd_dev_lock);
580 			return obd;
581 		}
582 	}
583 	read_unlock(&obd_dev_lock);
584 
585 	return NULL;
586 }
587 EXPORT_SYMBOL(class_devices_in_group);
588 
589 /**
590  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
591  * adjust sptlrpc settings accordingly.
592  */
class_notify_sptlrpc_conf(const char * fsname,int namelen)593 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
594 {
595 	struct obd_device  *obd;
596 	const char	 *type;
597 	int		 i, rc = 0, rc2;
598 
599 	LASSERT(namelen > 0);
600 
601 	read_lock(&obd_dev_lock);
602 	for (i = 0; i < class_devno_max(); i++) {
603 		obd = class_num2obd(i);
604 
605 		if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
606 			continue;
607 
608 		/* only notify mdc, osc, mdt, ost */
609 		type = obd->obd_type->typ_name;
610 		if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
611 		    strcmp(type, LUSTRE_OSC_NAME) != 0 &&
612 		    strcmp(type, LUSTRE_MDT_NAME) != 0 &&
613 		    strcmp(type, LUSTRE_OST_NAME) != 0)
614 			continue;
615 
616 		if (strncmp(obd->obd_name, fsname, namelen))
617 			continue;
618 
619 		class_incref(obd, __func__, obd);
620 		read_unlock(&obd_dev_lock);
621 		rc2 = obd_set_info_async(NULL, obd->obd_self_export,
622 					 sizeof(KEY_SPTLRPC_CONF),
623 					 KEY_SPTLRPC_CONF, 0, NULL, NULL);
624 		rc = rc ? rc : rc2;
625 		class_decref(obd, __func__, obd);
626 		read_lock(&obd_dev_lock);
627 	}
628 	read_unlock(&obd_dev_lock);
629 	return rc;
630 }
631 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
632 
obd_cleanup_caches(void)633 void obd_cleanup_caches(void)
634 {
635 	if (obd_device_cachep) {
636 		kmem_cache_destroy(obd_device_cachep);
637 		obd_device_cachep = NULL;
638 	}
639 	if (obdo_cachep) {
640 		kmem_cache_destroy(obdo_cachep);
641 		obdo_cachep = NULL;
642 	}
643 	if (import_cachep) {
644 		kmem_cache_destroy(import_cachep);
645 		import_cachep = NULL;
646 	}
647 	if (capa_cachep) {
648 		kmem_cache_destroy(capa_cachep);
649 		capa_cachep = NULL;
650 	}
651 }
652 
obd_init_caches(void)653 int obd_init_caches(void)
654 {
655 	LASSERT(obd_device_cachep == NULL);
656 	obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
657 						 sizeof(struct obd_device),
658 						 0, 0, NULL);
659 	if (!obd_device_cachep)
660 		goto out;
661 
662 	LASSERT(obdo_cachep == NULL);
663 	obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
664 					   0, 0, NULL);
665 	if (!obdo_cachep)
666 		goto out;
667 
668 	LASSERT(import_cachep == NULL);
669 	import_cachep = kmem_cache_create("ll_import_cache",
670 					     sizeof(struct obd_import),
671 					     0, 0, NULL);
672 	if (!import_cachep)
673 		goto out;
674 
675 	LASSERT(capa_cachep == NULL);
676 	capa_cachep = kmem_cache_create("capa_cache",
677 					   sizeof(struct obd_capa), 0, 0, NULL);
678 	if (!capa_cachep)
679 		goto out;
680 
681 	return 0;
682  out:
683 	obd_cleanup_caches();
684 	return -ENOMEM;
685 
686 }
687 
688 /* map connection to client */
class_conn2export(struct lustre_handle * conn)689 struct obd_export *class_conn2export(struct lustre_handle *conn)
690 {
691 	struct obd_export *export;
692 
693 	if (!conn) {
694 		CDEBUG(D_CACHE, "looking for null handle\n");
695 		return NULL;
696 	}
697 
698 	if (conn->cookie == -1) {  /* this means assign a new connection */
699 		CDEBUG(D_CACHE, "want a new connection\n");
700 		return NULL;
701 	}
702 
703 	CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
704 	export = class_handle2object(conn->cookie);
705 	return export;
706 }
707 EXPORT_SYMBOL(class_conn2export);
708 
class_exp2obd(struct obd_export * exp)709 struct obd_device *class_exp2obd(struct obd_export *exp)
710 {
711 	if (exp)
712 		return exp->exp_obd;
713 	return NULL;
714 }
715 EXPORT_SYMBOL(class_exp2obd);
716 
class_conn2obd(struct lustre_handle * conn)717 struct obd_device *class_conn2obd(struct lustre_handle *conn)
718 {
719 	struct obd_export *export;
720 	export = class_conn2export(conn);
721 	if (export) {
722 		struct obd_device *obd = export->exp_obd;
723 		class_export_put(export);
724 		return obd;
725 	}
726 	return NULL;
727 }
728 EXPORT_SYMBOL(class_conn2obd);
729 
class_exp2cliimp(struct obd_export * exp)730 struct obd_import *class_exp2cliimp(struct obd_export *exp)
731 {
732 	struct obd_device *obd = exp->exp_obd;
733 	if (obd == NULL)
734 		return NULL;
735 	return obd->u.cli.cl_import;
736 }
737 EXPORT_SYMBOL(class_exp2cliimp);
738 
class_conn2cliimp(struct lustre_handle * conn)739 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
740 {
741 	struct obd_device *obd = class_conn2obd(conn);
742 	if (obd == NULL)
743 		return NULL;
744 	return obd->u.cli.cl_import;
745 }
746 EXPORT_SYMBOL(class_conn2cliimp);
747 
748 /* Export management functions */
class_export_destroy(struct obd_export * exp)749 static void class_export_destroy(struct obd_export *exp)
750 {
751 	struct obd_device *obd = exp->exp_obd;
752 
753 	LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
754 	LASSERT(obd != NULL);
755 
756 	CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
757 	       exp->exp_client_uuid.uuid, obd->obd_name);
758 
759 	/* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
760 	if (exp->exp_connection)
761 		ptlrpc_put_connection_superhack(exp->exp_connection);
762 
763 	LASSERT(list_empty(&exp->exp_outstanding_replies));
764 	LASSERT(list_empty(&exp->exp_uncommitted_replies));
765 	LASSERT(list_empty(&exp->exp_req_replay_queue));
766 	LASSERT(list_empty(&exp->exp_hp_rpcs));
767 	obd_destroy_export(exp);
768 	class_decref(obd, "export", exp);
769 
770 	OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
771 }
772 
export_handle_addref(void * export)773 static void export_handle_addref(void *export)
774 {
775 	class_export_get(export);
776 }
777 
778 static struct portals_handle_ops export_handle_ops = {
779 	.hop_addref = export_handle_addref,
780 	.hop_free   = NULL,
781 };
782 
class_export_get(struct obd_export * exp)783 struct obd_export *class_export_get(struct obd_export *exp)
784 {
785 	atomic_inc(&exp->exp_refcount);
786 	CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
787 	       atomic_read(&exp->exp_refcount));
788 	return exp;
789 }
790 EXPORT_SYMBOL(class_export_get);
791 
class_export_put(struct obd_export * exp)792 void class_export_put(struct obd_export *exp)
793 {
794 	LASSERT(exp != NULL);
795 	LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
796 	CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
797 	       atomic_read(&exp->exp_refcount) - 1);
798 
799 	if (atomic_dec_and_test(&exp->exp_refcount)) {
800 		LASSERT(!list_empty(&exp->exp_obd_chain));
801 		CDEBUG(D_IOCTL, "final put %p/%s\n",
802 		       exp, exp->exp_client_uuid.uuid);
803 
804 		/* release nid stat refererence */
805 		lprocfs_exp_cleanup(exp);
806 
807 		obd_zombie_export_add(exp);
808 	}
809 }
810 EXPORT_SYMBOL(class_export_put);
811 
812 /* Creates a new export, adds it to the hash table, and returns a
813  * pointer to it. The refcount is 2: one for the hash reference, and
814  * one for the pointer returned by this function. */
class_new_export(struct obd_device * obd,struct obd_uuid * cluuid)815 struct obd_export *class_new_export(struct obd_device *obd,
816 				    struct obd_uuid *cluuid)
817 {
818 	struct obd_export *export;
819 	struct cfs_hash *hash = NULL;
820 	int rc = 0;
821 
822 	OBD_ALLOC_PTR(export);
823 	if (!export)
824 		return ERR_PTR(-ENOMEM);
825 
826 	export->exp_conn_cnt = 0;
827 	export->exp_lock_hash = NULL;
828 	export->exp_flock_hash = NULL;
829 	atomic_set(&export->exp_refcount, 2);
830 	atomic_set(&export->exp_rpc_count, 0);
831 	atomic_set(&export->exp_cb_count, 0);
832 	atomic_set(&export->exp_locks_count, 0);
833 #if LUSTRE_TRACKS_LOCK_EXP_REFS
834 	INIT_LIST_HEAD(&export->exp_locks_list);
835 	spin_lock_init(&export->exp_locks_list_guard);
836 #endif
837 	atomic_set(&export->exp_replay_count, 0);
838 	export->exp_obd = obd;
839 	INIT_LIST_HEAD(&export->exp_outstanding_replies);
840 	spin_lock_init(&export->exp_uncommitted_replies_lock);
841 	INIT_LIST_HEAD(&export->exp_uncommitted_replies);
842 	INIT_LIST_HEAD(&export->exp_req_replay_queue);
843 	INIT_LIST_HEAD(&export->exp_handle.h_link);
844 	INIT_LIST_HEAD(&export->exp_hp_rpcs);
845 	class_handle_hash(&export->exp_handle, &export_handle_ops);
846 	export->exp_last_request_time = get_seconds();
847 	spin_lock_init(&export->exp_lock);
848 	spin_lock_init(&export->exp_rpc_lock);
849 	INIT_HLIST_NODE(&export->exp_uuid_hash);
850 	INIT_HLIST_NODE(&export->exp_nid_hash);
851 	spin_lock_init(&export->exp_bl_list_lock);
852 	INIT_LIST_HEAD(&export->exp_bl_list);
853 
854 	export->exp_sp_peer = LUSTRE_SP_ANY;
855 	export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
856 	export->exp_client_uuid = *cluuid;
857 	obd_init_export(export);
858 
859 	spin_lock(&obd->obd_dev_lock);
860 	/* shouldn't happen, but might race */
861 	if (obd->obd_stopping) {
862 		rc = -ENODEV;
863 		goto exit_unlock;
864 	}
865 
866 	hash = cfs_hash_getref(obd->obd_uuid_hash);
867 	if (hash == NULL) {
868 		rc = -ENODEV;
869 		goto exit_unlock;
870 	}
871 	spin_unlock(&obd->obd_dev_lock);
872 
873 	if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
874 		rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
875 		if (rc != 0) {
876 			LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
877 				      obd->obd_name, cluuid->uuid, rc);
878 			rc = -EALREADY;
879 			goto exit_err;
880 		}
881 	}
882 
883 	spin_lock(&obd->obd_dev_lock);
884 	if (obd->obd_stopping) {
885 		cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
886 		rc = -ENODEV;
887 		goto exit_unlock;
888 	}
889 
890 	class_incref(obd, "export", export);
891 	list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
892 	list_add_tail(&export->exp_obd_chain_timed,
893 			  &export->exp_obd->obd_exports_timed);
894 	export->exp_obd->obd_num_exports++;
895 	spin_unlock(&obd->obd_dev_lock);
896 	cfs_hash_putref(hash);
897 	return export;
898 
899 exit_unlock:
900 	spin_unlock(&obd->obd_dev_lock);
901 exit_err:
902 	if (hash)
903 		cfs_hash_putref(hash);
904 	class_handle_unhash(&export->exp_handle);
905 	LASSERT(hlist_unhashed(&export->exp_uuid_hash));
906 	obd_destroy_export(export);
907 	OBD_FREE_PTR(export);
908 	return ERR_PTR(rc);
909 }
910 EXPORT_SYMBOL(class_new_export);
911 
class_unlink_export(struct obd_export * exp)912 void class_unlink_export(struct obd_export *exp)
913 {
914 	class_handle_unhash(&exp->exp_handle);
915 
916 	spin_lock(&exp->exp_obd->obd_dev_lock);
917 	/* delete an uuid-export hashitem from hashtables */
918 	if (!hlist_unhashed(&exp->exp_uuid_hash))
919 		cfs_hash_del(exp->exp_obd->obd_uuid_hash,
920 			     &exp->exp_client_uuid,
921 			     &exp->exp_uuid_hash);
922 
923 	list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
924 	list_del_init(&exp->exp_obd_chain_timed);
925 	exp->exp_obd->obd_num_exports--;
926 	spin_unlock(&exp->exp_obd->obd_dev_lock);
927 	class_export_put(exp);
928 }
929 EXPORT_SYMBOL(class_unlink_export);
930 
931 /* Import management functions */
class_import_destroy(struct obd_import * imp)932 static void class_import_destroy(struct obd_import *imp)
933 {
934 	CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
935 		imp->imp_obd->obd_name);
936 
937 	LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
938 
939 	ptlrpc_put_connection_superhack(imp->imp_connection);
940 
941 	while (!list_empty(&imp->imp_conn_list)) {
942 		struct obd_import_conn *imp_conn;
943 
944 		imp_conn = list_entry(imp->imp_conn_list.next,
945 					  struct obd_import_conn, oic_item);
946 		list_del_init(&imp_conn->oic_item);
947 		ptlrpc_put_connection_superhack(imp_conn->oic_conn);
948 		OBD_FREE(imp_conn, sizeof(*imp_conn));
949 	}
950 
951 	LASSERT(imp->imp_sec == NULL);
952 	class_decref(imp->imp_obd, "import", imp);
953 	OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
954 }
955 
import_handle_addref(void * import)956 static void import_handle_addref(void *import)
957 {
958 	class_import_get(import);
959 }
960 
961 static struct portals_handle_ops import_handle_ops = {
962 	.hop_addref = import_handle_addref,
963 	.hop_free   = NULL,
964 };
965 
class_import_get(struct obd_import * import)966 struct obd_import *class_import_get(struct obd_import *import)
967 {
968 	atomic_inc(&import->imp_refcount);
969 	CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
970 	       atomic_read(&import->imp_refcount),
971 	       import->imp_obd->obd_name);
972 	return import;
973 }
974 EXPORT_SYMBOL(class_import_get);
975 
class_import_put(struct obd_import * imp)976 void class_import_put(struct obd_import *imp)
977 {
978 	LASSERT(list_empty(&imp->imp_zombie_chain));
979 	LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
980 
981 	CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
982 	       atomic_read(&imp->imp_refcount) - 1,
983 	       imp->imp_obd->obd_name);
984 
985 	if (atomic_dec_and_test(&imp->imp_refcount)) {
986 		CDEBUG(D_INFO, "final put import %p\n", imp);
987 		obd_zombie_import_add(imp);
988 	}
989 
990 	/* catch possible import put race */
991 	LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
992 }
993 EXPORT_SYMBOL(class_import_put);
994 
init_imp_at(struct imp_at * at)995 static void init_imp_at(struct imp_at *at) {
996 	int i;
997 	at_init(&at->iat_net_latency, 0, 0);
998 	for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
999 		/* max service estimates are tracked on the server side, so
1000 		   don't use the AT history here, just use the last reported
1001 		   val. (But keep hist for proc histogram, worst_ever) */
1002 		at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1003 			AT_FLG_NOHIST);
1004 	}
1005 }
1006 
class_new_import(struct obd_device * obd)1007 struct obd_import *class_new_import(struct obd_device *obd)
1008 {
1009 	struct obd_import *imp;
1010 
1011 	OBD_ALLOC(imp, sizeof(*imp));
1012 	if (imp == NULL)
1013 		return NULL;
1014 
1015 	INIT_LIST_HEAD(&imp->imp_pinger_chain);
1016 	INIT_LIST_HEAD(&imp->imp_zombie_chain);
1017 	INIT_LIST_HEAD(&imp->imp_replay_list);
1018 	INIT_LIST_HEAD(&imp->imp_sending_list);
1019 	INIT_LIST_HEAD(&imp->imp_delayed_list);
1020 	INIT_LIST_HEAD(&imp->imp_committed_list);
1021 	imp->imp_replay_cursor = &imp->imp_committed_list;
1022 	spin_lock_init(&imp->imp_lock);
1023 	imp->imp_last_success_conn = 0;
1024 	imp->imp_state = LUSTRE_IMP_NEW;
1025 	imp->imp_obd = class_incref(obd, "import", imp);
1026 	mutex_init(&imp->imp_sec_mutex);
1027 	init_waitqueue_head(&imp->imp_recovery_waitq);
1028 
1029 	atomic_set(&imp->imp_refcount, 2);
1030 	atomic_set(&imp->imp_unregistering, 0);
1031 	atomic_set(&imp->imp_inflight, 0);
1032 	atomic_set(&imp->imp_replay_inflight, 0);
1033 	atomic_set(&imp->imp_inval_count, 0);
1034 	INIT_LIST_HEAD(&imp->imp_conn_list);
1035 	INIT_LIST_HEAD(&imp->imp_handle.h_link);
1036 	class_handle_hash(&imp->imp_handle, &import_handle_ops);
1037 	init_imp_at(&imp->imp_at);
1038 
1039 	/* the default magic is V2, will be used in connect RPC, and
1040 	 * then adjusted according to the flags in request/reply. */
1041 	imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1042 
1043 	return imp;
1044 }
1045 EXPORT_SYMBOL(class_new_import);
1046 
class_destroy_import(struct obd_import * import)1047 void class_destroy_import(struct obd_import *import)
1048 {
1049 	LASSERT(import != NULL);
1050 	LASSERT(import != LP_POISON);
1051 
1052 	class_handle_unhash(&import->imp_handle);
1053 
1054 	spin_lock(&import->imp_lock);
1055 	import->imp_generation++;
1056 	spin_unlock(&import->imp_lock);
1057 	class_import_put(import);
1058 }
1059 EXPORT_SYMBOL(class_destroy_import);
1060 
1061 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1062 
__class_export_add_lock_ref(struct obd_export * exp,struct ldlm_lock * lock)1063 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1064 {
1065 	spin_lock(&exp->exp_locks_list_guard);
1066 
1067 	LASSERT(lock->l_exp_refs_nr >= 0);
1068 
1069 	if (lock->l_exp_refs_target != NULL &&
1070 	    lock->l_exp_refs_target != exp) {
1071 		LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1072 			      exp, lock, lock->l_exp_refs_target);
1073 	}
1074 	if ((lock->l_exp_refs_nr ++) == 0) {
1075 		list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1076 		lock->l_exp_refs_target = exp;
1077 	}
1078 	CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1079 	       lock, exp, lock->l_exp_refs_nr);
1080 	spin_unlock(&exp->exp_locks_list_guard);
1081 }
1082 EXPORT_SYMBOL(__class_export_add_lock_ref);
1083 
__class_export_del_lock_ref(struct obd_export * exp,struct ldlm_lock * lock)1084 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1085 {
1086 	spin_lock(&exp->exp_locks_list_guard);
1087 	LASSERT(lock->l_exp_refs_nr > 0);
1088 	if (lock->l_exp_refs_target != exp) {
1089 		LCONSOLE_WARN("lock %p, mismatching export pointers: %p, %p\n",
1090 			      lock, lock->l_exp_refs_target, exp);
1091 	}
1092 	if (-- lock->l_exp_refs_nr == 0) {
1093 		list_del_init(&lock->l_exp_refs_link);
1094 		lock->l_exp_refs_target = NULL;
1095 	}
1096 	CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1097 	       lock, exp, lock->l_exp_refs_nr);
1098 	spin_unlock(&exp->exp_locks_list_guard);
1099 }
1100 EXPORT_SYMBOL(__class_export_del_lock_ref);
1101 #endif
1102 
1103 /* A connection defines an export context in which preallocation can
1104    be managed. This releases the export pointer reference, and returns
1105    the export handle, so the export refcount is 1 when this function
1106    returns. */
class_connect(struct lustre_handle * conn,struct obd_device * obd,struct obd_uuid * cluuid)1107 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1108 		  struct obd_uuid *cluuid)
1109 {
1110 	struct obd_export *export;
1111 	LASSERT(conn != NULL);
1112 	LASSERT(obd != NULL);
1113 	LASSERT(cluuid != NULL);
1114 
1115 	export = class_new_export(obd, cluuid);
1116 	if (IS_ERR(export))
1117 		return PTR_ERR(export);
1118 
1119 	conn->cookie = export->exp_handle.h_cookie;
1120 	class_export_put(export);
1121 
1122 	CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1123 	       cluuid->uuid, conn->cookie);
1124 	return 0;
1125 }
1126 EXPORT_SYMBOL(class_connect);
1127 
1128 /* if export is involved in recovery then clean up related things */
class_export_recovery_cleanup(struct obd_export * exp)1129 static void class_export_recovery_cleanup(struct obd_export *exp)
1130 {
1131 	struct obd_device *obd = exp->exp_obd;
1132 
1133 	spin_lock(&obd->obd_recovery_task_lock);
1134 	if (exp->exp_delayed)
1135 		obd->obd_delayed_clients--;
1136 	if (obd->obd_recovering) {
1137 		if (exp->exp_in_recovery) {
1138 			spin_lock(&exp->exp_lock);
1139 			exp->exp_in_recovery = 0;
1140 			spin_unlock(&exp->exp_lock);
1141 			LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1142 			atomic_dec(&obd->obd_connected_clients);
1143 		}
1144 
1145 		/* if called during recovery then should update
1146 		 * obd_stale_clients counter,
1147 		 * lightweight exports are not counted */
1148 		if (exp->exp_failed &&
1149 		    (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1150 			exp->exp_obd->obd_stale_clients++;
1151 	}
1152 	spin_unlock(&obd->obd_recovery_task_lock);
1153 
1154 	spin_lock(&exp->exp_lock);
1155 	/** Cleanup req replay fields */
1156 	if (exp->exp_req_replay_needed) {
1157 		exp->exp_req_replay_needed = 0;
1158 
1159 		LASSERT(atomic_read(&obd->obd_req_replay_clients));
1160 		atomic_dec(&obd->obd_req_replay_clients);
1161 	}
1162 
1163 	/** Cleanup lock replay data */
1164 	if (exp->exp_lock_replay_needed) {
1165 		exp->exp_lock_replay_needed = 0;
1166 
1167 		LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1168 		atomic_dec(&obd->obd_lock_replay_clients);
1169 	}
1170 	spin_unlock(&exp->exp_lock);
1171 }
1172 
1173 /* This function removes 1-3 references from the export:
1174  * 1 - for export pointer passed
1175  * and if disconnect really need
1176  * 2 - removing from hash
1177  * 3 - in client_unlink_export
1178  * The export pointer passed to this function can destroyed */
class_disconnect(struct obd_export * export)1179 int class_disconnect(struct obd_export *export)
1180 {
1181 	int already_disconnected;
1182 
1183 	if (export == NULL) {
1184 		CWARN("attempting to free NULL export %p\n", export);
1185 		return -EINVAL;
1186 	}
1187 
1188 	spin_lock(&export->exp_lock);
1189 	already_disconnected = export->exp_disconnected;
1190 	export->exp_disconnected = 1;
1191 	spin_unlock(&export->exp_lock);
1192 
1193 	/* class_cleanup(), abort_recovery(), and class_fail_export()
1194 	 * all end up in here, and if any of them race we shouldn't
1195 	 * call extra class_export_puts(). */
1196 	if (already_disconnected) {
1197 		LASSERT(hlist_unhashed(&export->exp_nid_hash));
1198 		goto no_disconn;
1199 	}
1200 
1201 	CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1202 	       export->exp_handle.h_cookie);
1203 
1204 	if (!hlist_unhashed(&export->exp_nid_hash))
1205 		cfs_hash_del(export->exp_obd->obd_nid_hash,
1206 			     &export->exp_connection->c_peer.nid,
1207 			     &export->exp_nid_hash);
1208 
1209 	class_export_recovery_cleanup(export);
1210 	class_unlink_export(export);
1211 no_disconn:
1212 	class_export_put(export);
1213 	return 0;
1214 }
1215 EXPORT_SYMBOL(class_disconnect);
1216 
1217 /* Return non-zero for a fully connected export */
class_connected_export(struct obd_export * exp)1218 int class_connected_export(struct obd_export *exp)
1219 {
1220 	if (exp) {
1221 		int connected;
1222 		spin_lock(&exp->exp_lock);
1223 		connected = exp->exp_conn_cnt > 0;
1224 		spin_unlock(&exp->exp_lock);
1225 		return connected;
1226 	}
1227 	return 0;
1228 }
1229 EXPORT_SYMBOL(class_connected_export);
1230 
class_disconnect_export_list(struct list_head * list,enum obd_option flags)1231 static void class_disconnect_export_list(struct list_head *list,
1232 					 enum obd_option flags)
1233 {
1234 	int rc;
1235 	struct obd_export *exp;
1236 
1237 	/* It's possible that an export may disconnect itself, but
1238 	 * nothing else will be added to this list. */
1239 	while (!list_empty(list)) {
1240 		exp = list_entry(list->next, struct obd_export,
1241 				     exp_obd_chain);
1242 		/* need for safe call CDEBUG after obd_disconnect */
1243 		class_export_get(exp);
1244 
1245 		spin_lock(&exp->exp_lock);
1246 		exp->exp_flags = flags;
1247 		spin_unlock(&exp->exp_lock);
1248 
1249 		if (obd_uuid_equals(&exp->exp_client_uuid,
1250 				    &exp->exp_obd->obd_uuid)) {
1251 			CDEBUG(D_HA,
1252 			       "exp %p export uuid == obd uuid, don't discon\n",
1253 			       exp);
1254 			/* Need to delete this now so we don't end up pointing
1255 			 * to work_list later when this export is cleaned up. */
1256 			list_del_init(&exp->exp_obd_chain);
1257 			class_export_put(exp);
1258 			continue;
1259 		}
1260 
1261 		class_export_get(exp);
1262 		CDEBUG(D_HA, "%s: disconnecting export at %s (%p), last request at " CFS_TIME_T "\n",
1263 		       exp->exp_obd->obd_name, obd_export_nid2str(exp),
1264 		       exp, exp->exp_last_request_time);
1265 		/* release one export reference anyway */
1266 		rc = obd_disconnect(exp);
1267 
1268 		CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1269 		       obd_export_nid2str(exp), exp, rc);
1270 		class_export_put(exp);
1271 	}
1272 }
1273 
class_disconnect_exports(struct obd_device * obd)1274 void class_disconnect_exports(struct obd_device *obd)
1275 {
1276 	struct list_head work_list;
1277 
1278 	/* Move all of the exports from obd_exports to a work list, en masse. */
1279 	INIT_LIST_HEAD(&work_list);
1280 	spin_lock(&obd->obd_dev_lock);
1281 	list_splice_init(&obd->obd_exports, &work_list);
1282 	list_splice_init(&obd->obd_delayed_exports, &work_list);
1283 	spin_unlock(&obd->obd_dev_lock);
1284 
1285 	if (!list_empty(&work_list)) {
1286 		CDEBUG(D_HA, "OBD device %d (%p) has exports, disconnecting them\n",
1287 		       obd->obd_minor, obd);
1288 		class_disconnect_export_list(&work_list,
1289 					     exp_flags_from_obd(obd));
1290 	} else
1291 		CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1292 		       obd->obd_minor, obd);
1293 }
1294 EXPORT_SYMBOL(class_disconnect_exports);
1295 
1296 /* Remove exports that have not completed recovery.
1297  */
class_disconnect_stale_exports(struct obd_device * obd,int (* test_export)(struct obd_export *))1298 void class_disconnect_stale_exports(struct obd_device *obd,
1299 				    int (*test_export)(struct obd_export *))
1300 {
1301 	struct list_head work_list;
1302 	struct obd_export *exp, *n;
1303 	int evicted = 0;
1304 
1305 	INIT_LIST_HEAD(&work_list);
1306 	spin_lock(&obd->obd_dev_lock);
1307 	list_for_each_entry_safe(exp, n, &obd->obd_exports,
1308 				     exp_obd_chain) {
1309 		/* don't count self-export as client */
1310 		if (obd_uuid_equals(&exp->exp_client_uuid,
1311 				    &exp->exp_obd->obd_uuid))
1312 			continue;
1313 
1314 		/* don't evict clients which have no slot in last_rcvd
1315 		 * (e.g. lightweight connection) */
1316 		if (exp->exp_target_data.ted_lr_idx == -1)
1317 			continue;
1318 
1319 		spin_lock(&exp->exp_lock);
1320 		if (exp->exp_failed || test_export(exp)) {
1321 			spin_unlock(&exp->exp_lock);
1322 			continue;
1323 		}
1324 		exp->exp_failed = 1;
1325 		spin_unlock(&exp->exp_lock);
1326 
1327 		list_move(&exp->exp_obd_chain, &work_list);
1328 		evicted++;
1329 		CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1330 		       obd->obd_name, exp->exp_client_uuid.uuid,
1331 		       exp->exp_connection == NULL ? "<unknown>" :
1332 		       libcfs_nid2str(exp->exp_connection->c_peer.nid));
1333 		print_export_data(exp, "EVICTING", 0);
1334 	}
1335 	spin_unlock(&obd->obd_dev_lock);
1336 
1337 	if (evicted)
1338 		LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1339 			      obd->obd_name, evicted);
1340 
1341 	class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1342 						 OBD_OPT_ABORT_RECOV);
1343 }
1344 EXPORT_SYMBOL(class_disconnect_stale_exports);
1345 
class_fail_export(struct obd_export * exp)1346 void class_fail_export(struct obd_export *exp)
1347 {
1348 	int rc, already_failed;
1349 
1350 	spin_lock(&exp->exp_lock);
1351 	already_failed = exp->exp_failed;
1352 	exp->exp_failed = 1;
1353 	spin_unlock(&exp->exp_lock);
1354 
1355 	if (already_failed) {
1356 		CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1357 		       exp, exp->exp_client_uuid.uuid);
1358 		return;
1359 	}
1360 
1361 	CDEBUG(D_HA, "disconnecting export %p/%s\n",
1362 	       exp, exp->exp_client_uuid.uuid);
1363 
1364 	if (obd_dump_on_timeout)
1365 		libcfs_debug_dumplog();
1366 
1367 	/* need for safe call CDEBUG after obd_disconnect */
1368 	class_export_get(exp);
1369 
1370 	/* Most callers into obd_disconnect are removing their own reference
1371 	 * (request, for example) in addition to the one from the hash table.
1372 	 * We don't have such a reference here, so make one. */
1373 	class_export_get(exp);
1374 	rc = obd_disconnect(exp);
1375 	if (rc)
1376 		CERROR("disconnecting export %p failed: %d\n", exp, rc);
1377 	else
1378 		CDEBUG(D_HA, "disconnected export %p/%s\n",
1379 		       exp, exp->exp_client_uuid.uuid);
1380 	class_export_put(exp);
1381 }
1382 EXPORT_SYMBOL(class_fail_export);
1383 
obd_export_nid2str(struct obd_export * exp)1384 char *obd_export_nid2str(struct obd_export *exp)
1385 {
1386 	if (exp->exp_connection != NULL)
1387 		return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1388 
1389 	return "(no nid)";
1390 }
1391 EXPORT_SYMBOL(obd_export_nid2str);
1392 
obd_export_evict_by_nid(struct obd_device * obd,const char * nid)1393 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1394 {
1395 	struct cfs_hash *nid_hash;
1396 	struct obd_export *doomed_exp = NULL;
1397 	int exports_evicted = 0;
1398 
1399 	lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1400 
1401 	spin_lock(&obd->obd_dev_lock);
1402 	/* umount has run already, so evict thread should leave
1403 	 * its task to umount thread now */
1404 	if (obd->obd_stopping) {
1405 		spin_unlock(&obd->obd_dev_lock);
1406 		return exports_evicted;
1407 	}
1408 	nid_hash = obd->obd_nid_hash;
1409 	cfs_hash_getref(nid_hash);
1410 	spin_unlock(&obd->obd_dev_lock);
1411 
1412 	do {
1413 		doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1414 		if (doomed_exp == NULL)
1415 			break;
1416 
1417 		LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1418 			 "nid %s found, wanted nid %s, requested nid %s\n",
1419 			 obd_export_nid2str(doomed_exp),
1420 			 libcfs_nid2str(nid_key), nid);
1421 		LASSERTF(doomed_exp != obd->obd_self_export,
1422 			 "self-export is hashed by NID?\n");
1423 		exports_evicted++;
1424 		LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1425 			      obd->obd_name,
1426 			      obd_uuid2str(&doomed_exp->exp_client_uuid),
1427 			      obd_export_nid2str(doomed_exp));
1428 		class_fail_export(doomed_exp);
1429 		class_export_put(doomed_exp);
1430 	} while (1);
1431 
1432 	cfs_hash_putref(nid_hash);
1433 
1434 	if (!exports_evicted)
1435 		CDEBUG(D_HA,
1436 		       "%s: can't disconnect NID '%s': no exports found\n",
1437 		       obd->obd_name, nid);
1438 	return exports_evicted;
1439 }
1440 EXPORT_SYMBOL(obd_export_evict_by_nid);
1441 
obd_export_evict_by_uuid(struct obd_device * obd,const char * uuid)1442 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1443 {
1444 	struct cfs_hash *uuid_hash;
1445 	struct obd_export *doomed_exp = NULL;
1446 	struct obd_uuid doomed_uuid;
1447 	int exports_evicted = 0;
1448 
1449 	spin_lock(&obd->obd_dev_lock);
1450 	if (obd->obd_stopping) {
1451 		spin_unlock(&obd->obd_dev_lock);
1452 		return exports_evicted;
1453 	}
1454 	uuid_hash = obd->obd_uuid_hash;
1455 	cfs_hash_getref(uuid_hash);
1456 	spin_unlock(&obd->obd_dev_lock);
1457 
1458 	obd_str2uuid(&doomed_uuid, uuid);
1459 	if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1460 		CERROR("%s: can't evict myself\n", obd->obd_name);
1461 		cfs_hash_putref(uuid_hash);
1462 		return exports_evicted;
1463 	}
1464 
1465 	doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1466 
1467 	if (doomed_exp == NULL) {
1468 		CERROR("%s: can't disconnect %s: no exports found\n",
1469 		       obd->obd_name, uuid);
1470 	} else {
1471 		CWARN("%s: evicting %s at administrative request\n",
1472 		       obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1473 		class_fail_export(doomed_exp);
1474 		class_export_put(doomed_exp);
1475 		exports_evicted++;
1476 	}
1477 	cfs_hash_putref(uuid_hash);
1478 
1479 	return exports_evicted;
1480 }
1481 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1482 
1483 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1484 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1485 EXPORT_SYMBOL(class_export_dump_hook);
1486 #endif
1487 
print_export_data(struct obd_export * exp,const char * status,int locks)1488 static void print_export_data(struct obd_export *exp, const char *status,
1489 			      int locks)
1490 {
1491 	struct ptlrpc_reply_state *rs;
1492 	struct ptlrpc_reply_state *first_reply = NULL;
1493 	int nreplies = 0;
1494 
1495 	spin_lock(&exp->exp_lock);
1496 	list_for_each_entry(rs, &exp->exp_outstanding_replies,
1497 				rs_exp_list) {
1498 		if (nreplies == 0)
1499 			first_reply = rs;
1500 		nreplies++;
1501 	}
1502 	spin_unlock(&exp->exp_lock);
1503 
1504 	CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s %llu\n",
1505 	       exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1506 	       obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1507 	       atomic_read(&exp->exp_rpc_count),
1508 	       atomic_read(&exp->exp_cb_count),
1509 	       atomic_read(&exp->exp_locks_count),
1510 	       exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1511 	       nreplies, first_reply, nreplies > 3 ? "..." : "",
1512 	       exp->exp_last_committed);
1513 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1514 	if (locks && class_export_dump_hook != NULL)
1515 		class_export_dump_hook(exp);
1516 #endif
1517 }
1518 
dump_exports(struct obd_device * obd,int locks)1519 void dump_exports(struct obd_device *obd, int locks)
1520 {
1521 	struct obd_export *exp;
1522 
1523 	spin_lock(&obd->obd_dev_lock);
1524 	list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1525 		print_export_data(exp, "ACTIVE", locks);
1526 	list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1527 		print_export_data(exp, "UNLINKED", locks);
1528 	list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1529 		print_export_data(exp, "DELAYED", locks);
1530 	spin_unlock(&obd->obd_dev_lock);
1531 	spin_lock(&obd_zombie_impexp_lock);
1532 	list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1533 		print_export_data(exp, "ZOMBIE", locks);
1534 	spin_unlock(&obd_zombie_impexp_lock);
1535 }
1536 EXPORT_SYMBOL(dump_exports);
1537 
obd_exports_barrier(struct obd_device * obd)1538 void obd_exports_barrier(struct obd_device *obd)
1539 {
1540 	int waited = 2;
1541 	LASSERT(list_empty(&obd->obd_exports));
1542 	spin_lock(&obd->obd_dev_lock);
1543 	while (!list_empty(&obd->obd_unlinked_exports)) {
1544 		spin_unlock(&obd->obd_dev_lock);
1545 		set_current_state(TASK_UNINTERRUPTIBLE);
1546 		schedule_timeout(cfs_time_seconds(waited));
1547 		if (waited > 5 && IS_PO2(waited)) {
1548 			LCONSOLE_WARN("%s is waiting for obd_unlinked_exports more than %d seconds. The obd refcount = %d. Is it stuck?\n",
1549 				      obd->obd_name, waited,
1550 				      atomic_read(&obd->obd_refcount));
1551 			dump_exports(obd, 1);
1552 		}
1553 		waited *= 2;
1554 		spin_lock(&obd->obd_dev_lock);
1555 	}
1556 	spin_unlock(&obd->obd_dev_lock);
1557 }
1558 EXPORT_SYMBOL(obd_exports_barrier);
1559 
1560 /* Total amount of zombies to be destroyed */
1561 static int zombies_count;
1562 
1563 /**
1564  * kill zombie imports and exports
1565  */
obd_zombie_impexp_cull(void)1566 void obd_zombie_impexp_cull(void)
1567 {
1568 	struct obd_import *import;
1569 	struct obd_export *export;
1570 
1571 	do {
1572 		spin_lock(&obd_zombie_impexp_lock);
1573 
1574 		import = NULL;
1575 		if (!list_empty(&obd_zombie_imports)) {
1576 			import = list_entry(obd_zombie_imports.next,
1577 						struct obd_import,
1578 						imp_zombie_chain);
1579 			list_del_init(&import->imp_zombie_chain);
1580 		}
1581 
1582 		export = NULL;
1583 		if (!list_empty(&obd_zombie_exports)) {
1584 			export = list_entry(obd_zombie_exports.next,
1585 						struct obd_export,
1586 						exp_obd_chain);
1587 			list_del_init(&export->exp_obd_chain);
1588 		}
1589 
1590 		spin_unlock(&obd_zombie_impexp_lock);
1591 
1592 		if (import != NULL) {
1593 			class_import_destroy(import);
1594 			spin_lock(&obd_zombie_impexp_lock);
1595 			zombies_count--;
1596 			spin_unlock(&obd_zombie_impexp_lock);
1597 		}
1598 
1599 		if (export != NULL) {
1600 			class_export_destroy(export);
1601 			spin_lock(&obd_zombie_impexp_lock);
1602 			zombies_count--;
1603 			spin_unlock(&obd_zombie_impexp_lock);
1604 		}
1605 
1606 		cond_resched();
1607 	} while (import != NULL || export != NULL);
1608 }
1609 
1610 static struct completion	obd_zombie_start;
1611 static struct completion	obd_zombie_stop;
1612 static unsigned long		obd_zombie_flags;
1613 static wait_queue_head_t		obd_zombie_waitq;
1614 static pid_t			obd_zombie_pid;
1615 
1616 enum {
1617 	OBD_ZOMBIE_STOP		= 0x0001,
1618 };
1619 
1620 /**
1621  * check for work for kill zombie import/export thread.
1622  */
obd_zombie_impexp_check(void * arg)1623 static int obd_zombie_impexp_check(void *arg)
1624 {
1625 	int rc;
1626 
1627 	spin_lock(&obd_zombie_impexp_lock);
1628 	rc = (zombies_count == 0) &&
1629 	     !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1630 	spin_unlock(&obd_zombie_impexp_lock);
1631 
1632 	return rc;
1633 }
1634 
1635 /**
1636  * Add export to the obd_zombie thread and notify it.
1637  */
obd_zombie_export_add(struct obd_export * exp)1638 static void obd_zombie_export_add(struct obd_export *exp) {
1639 	spin_lock(&exp->exp_obd->obd_dev_lock);
1640 	LASSERT(!list_empty(&exp->exp_obd_chain));
1641 	list_del_init(&exp->exp_obd_chain);
1642 	spin_unlock(&exp->exp_obd->obd_dev_lock);
1643 	spin_lock(&obd_zombie_impexp_lock);
1644 	zombies_count++;
1645 	list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1646 	spin_unlock(&obd_zombie_impexp_lock);
1647 
1648 	obd_zombie_impexp_notify();
1649 }
1650 
1651 /**
1652  * Add import to the obd_zombie thread and notify it.
1653  */
obd_zombie_import_add(struct obd_import * imp)1654 static void obd_zombie_import_add(struct obd_import *imp) {
1655 	LASSERT(imp->imp_sec == NULL);
1656 	LASSERT(imp->imp_rq_pool == NULL);
1657 	spin_lock(&obd_zombie_impexp_lock);
1658 	LASSERT(list_empty(&imp->imp_zombie_chain));
1659 	zombies_count++;
1660 	list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1661 	spin_unlock(&obd_zombie_impexp_lock);
1662 
1663 	obd_zombie_impexp_notify();
1664 }
1665 
1666 /**
1667  * notify import/export destroy thread about new zombie.
1668  */
obd_zombie_impexp_notify(void)1669 static void obd_zombie_impexp_notify(void)
1670 {
1671 	/*
1672 	 * Make sure obd_zombie_impexp_thread get this notification.
1673 	 * It is possible this signal only get by obd_zombie_barrier, and
1674 	 * barrier gulps this notification and sleeps away and hangs ensues
1675 	 */
1676 	wake_up_all(&obd_zombie_waitq);
1677 }
1678 
1679 /**
1680  * check whether obd_zombie is idle
1681  */
obd_zombie_is_idle(void)1682 static int obd_zombie_is_idle(void)
1683 {
1684 	int rc;
1685 
1686 	LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1687 	spin_lock(&obd_zombie_impexp_lock);
1688 	rc = (zombies_count == 0);
1689 	spin_unlock(&obd_zombie_impexp_lock);
1690 	return rc;
1691 }
1692 
1693 /**
1694  * wait when obd_zombie import/export queues become empty
1695  */
obd_zombie_barrier(void)1696 void obd_zombie_barrier(void)
1697 {
1698 	struct l_wait_info lwi = { 0 };
1699 
1700 	if (obd_zombie_pid == current_pid())
1701 		/* don't wait for myself */
1702 		return;
1703 	l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1704 }
1705 EXPORT_SYMBOL(obd_zombie_barrier);
1706 
1707 
1708 /**
1709  * destroy zombie export/import thread.
1710  */
obd_zombie_impexp_thread(void * unused)1711 static int obd_zombie_impexp_thread(void *unused)
1712 {
1713 	unshare_fs_struct();
1714 	complete(&obd_zombie_start);
1715 
1716 	obd_zombie_pid = current_pid();
1717 
1718 	while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1719 		struct l_wait_info lwi = { 0 };
1720 
1721 		l_wait_event(obd_zombie_waitq,
1722 			     !obd_zombie_impexp_check(NULL), &lwi);
1723 		obd_zombie_impexp_cull();
1724 
1725 		/*
1726 		 * Notify obd_zombie_barrier callers that queues
1727 		 * may be empty.
1728 		 */
1729 		wake_up(&obd_zombie_waitq);
1730 	}
1731 
1732 	complete(&obd_zombie_stop);
1733 
1734 	return 0;
1735 }
1736 
1737 
1738 /**
1739  * start destroy zombie import/export thread
1740  */
obd_zombie_impexp_init(void)1741 int obd_zombie_impexp_init(void)
1742 {
1743 	struct task_struct *task;
1744 
1745 	INIT_LIST_HEAD(&obd_zombie_imports);
1746 	INIT_LIST_HEAD(&obd_zombie_exports);
1747 	spin_lock_init(&obd_zombie_impexp_lock);
1748 	init_completion(&obd_zombie_start);
1749 	init_completion(&obd_zombie_stop);
1750 	init_waitqueue_head(&obd_zombie_waitq);
1751 	obd_zombie_pid = 0;
1752 
1753 	task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1754 	if (IS_ERR(task))
1755 		return PTR_ERR(task);
1756 
1757 	wait_for_completion(&obd_zombie_start);
1758 	return 0;
1759 }
1760 /**
1761  * stop destroy zombie import/export thread
1762  */
obd_zombie_impexp_stop(void)1763 void obd_zombie_impexp_stop(void)
1764 {
1765 	set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1766 	obd_zombie_impexp_notify();
1767 	wait_for_completion(&obd_zombie_stop);
1768 }
1769 
1770 /***** Kernel-userspace comm helpers *******/
1771 
1772 /* Get length of entire message, including header */
kuc_len(int payload_len)1773 int kuc_len(int payload_len)
1774 {
1775 	return sizeof(struct kuc_hdr) + payload_len;
1776 }
1777 EXPORT_SYMBOL(kuc_len);
1778 
1779 /* Get a pointer to kuc header, given a ptr to the payload
1780  * @param p Pointer to payload area
1781  * @returns Pointer to kuc header
1782  */
kuc_ptr(void * p)1783 struct kuc_hdr *kuc_ptr(void *p)
1784 {
1785 	struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1786 	LASSERT(lh->kuc_magic == KUC_MAGIC);
1787 	return lh;
1788 }
1789 EXPORT_SYMBOL(kuc_ptr);
1790 
1791 /* Test if payload is part of kuc message
1792  * @param p Pointer to payload area
1793  * @returns boolean
1794  */
kuc_ispayload(void * p)1795 int kuc_ispayload(void *p)
1796 {
1797 	struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1798 
1799 	if (kh->kuc_magic == KUC_MAGIC)
1800 		return 1;
1801 	else
1802 		return 0;
1803 }
1804 EXPORT_SYMBOL(kuc_ispayload);
1805 
1806 /* Alloc space for a message, and fill in header
1807  * @return Pointer to payload area
1808  */
kuc_alloc(int payload_len,int transport,int type)1809 void *kuc_alloc(int payload_len, int transport, int type)
1810 {
1811 	struct kuc_hdr *lh;
1812 	int len = kuc_len(payload_len);
1813 
1814 	OBD_ALLOC(lh, len);
1815 	if (lh == NULL)
1816 		return ERR_PTR(-ENOMEM);
1817 
1818 	lh->kuc_magic = KUC_MAGIC;
1819 	lh->kuc_transport = transport;
1820 	lh->kuc_msgtype = type;
1821 	lh->kuc_msglen = len;
1822 
1823 	return (void *)(lh + 1);
1824 }
1825 EXPORT_SYMBOL(kuc_alloc);
1826 
1827 /* Takes pointer to payload area */
kuc_free(void * p,int payload_len)1828 inline void kuc_free(void *p, int payload_len)
1829 {
1830 	struct kuc_hdr *lh = kuc_ptr(p);
1831 	OBD_FREE(lh, kuc_len(payload_len));
1832 }
1833 EXPORT_SYMBOL(kuc_free);
1834