1 /*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26 /*
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32 /*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 *
36 * lustre/obdclass/genops.c
37 *
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
40 */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include "../include/obd_class.h"
44 #include "../include/lprocfs_status.h"
45
46 spinlock_t obd_types_lock;
47
48 struct kmem_cache *obd_device_cachep;
49 struct kmem_cache *obdo_cachep;
50 EXPORT_SYMBOL(obdo_cachep);
51 static struct kmem_cache *import_cachep;
52
53 static struct list_head obd_zombie_imports;
54 static struct list_head obd_zombie_exports;
55 static spinlock_t obd_zombie_impexp_lock;
56 static void obd_zombie_impexp_notify(void);
57 static void obd_zombie_export_add(struct obd_export *exp);
58 static void obd_zombie_import_add(struct obd_import *imp);
59 static void print_export_data(struct obd_export *exp,
60 const char *status, int locks);
61
62 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
63 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
64
65 /*
66 * support functions: we could use inter-module communication, but this
67 * is more portable to other OS's
68 */
obd_device_alloc(void)69 static struct obd_device *obd_device_alloc(void)
70 {
71 struct obd_device *obd;
72
73 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
74 if (obd != NULL) {
75 obd->obd_magic = OBD_DEVICE_MAGIC;
76 }
77 return obd;
78 }
79
obd_device_free(struct obd_device * obd)80 static void obd_device_free(struct obd_device *obd)
81 {
82 LASSERT(obd != NULL);
83 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
84 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
85 if (obd->obd_namespace != NULL) {
86 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
87 obd, obd->obd_namespace, obd->obd_force);
88 LBUG();
89 }
90 lu_ref_fini(&obd->obd_reference);
91 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
92 }
93
class_search_type(const char * name)94 struct obd_type *class_search_type(const char *name)
95 {
96 struct list_head *tmp;
97 struct obd_type *type;
98
99 spin_lock(&obd_types_lock);
100 list_for_each(tmp, &obd_types) {
101 type = list_entry(tmp, struct obd_type, typ_chain);
102 if (strcmp(type->typ_name, name) == 0) {
103 spin_unlock(&obd_types_lock);
104 return type;
105 }
106 }
107 spin_unlock(&obd_types_lock);
108 return NULL;
109 }
110 EXPORT_SYMBOL(class_search_type);
111
class_get_type(const char * name)112 struct obd_type *class_get_type(const char *name)
113 {
114 struct obd_type *type = class_search_type(name);
115
116 if (!type) {
117 const char *modname = name;
118
119 if (strcmp(modname, "obdfilter") == 0)
120 modname = "ofd";
121
122 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
123 modname = LUSTRE_OSP_NAME;
124
125 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
126 modname = LUSTRE_MDT_NAME;
127
128 if (!request_module("%s", modname)) {
129 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
130 type = class_search_type(name);
131 } else {
132 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
133 modname);
134 }
135 }
136 if (type) {
137 spin_lock(&type->obd_type_lock);
138 type->typ_refcnt++;
139 try_module_get(type->typ_dt_ops->o_owner);
140 spin_unlock(&type->obd_type_lock);
141 }
142 return type;
143 }
144 EXPORT_SYMBOL(class_get_type);
145
class_put_type(struct obd_type * type)146 void class_put_type(struct obd_type *type)
147 {
148 LASSERT(type);
149 spin_lock(&type->obd_type_lock);
150 type->typ_refcnt--;
151 module_put(type->typ_dt_ops->o_owner);
152 spin_unlock(&type->obd_type_lock);
153 }
154 EXPORT_SYMBOL(class_put_type);
155
156 #define CLASS_MAX_NAME 1024
157
class_register_type(struct obd_ops * dt_ops,struct md_ops * md_ops,struct lprocfs_vars * vars,const char * name,struct lu_device_type * ldt)158 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
159 struct lprocfs_vars *vars, const char *name,
160 struct lu_device_type *ldt)
161 {
162 struct obd_type *type;
163 int rc = 0;
164
165 /* sanity check */
166 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
167
168 if (class_search_type(name)) {
169 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
170 return -EEXIST;
171 }
172
173 rc = -ENOMEM;
174 OBD_ALLOC(type, sizeof(*type));
175 if (type == NULL)
176 return rc;
177
178 OBD_ALLOC_PTR(type->typ_dt_ops);
179 OBD_ALLOC_PTR(type->typ_md_ops);
180 OBD_ALLOC(type->typ_name, strlen(name) + 1);
181
182 if (type->typ_dt_ops == NULL ||
183 type->typ_md_ops == NULL ||
184 type->typ_name == NULL)
185 goto failed;
186
187 *(type->typ_dt_ops) = *dt_ops;
188 /* md_ops is optional */
189 if (md_ops)
190 *(type->typ_md_ops) = *md_ops;
191 strcpy(type->typ_name, name);
192 spin_lock_init(&type->obd_type_lock);
193
194 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
195 vars, type);
196 if (IS_ERR(type->typ_procroot)) {
197 rc = PTR_ERR(type->typ_procroot);
198 type->typ_procroot = NULL;
199 goto failed;
200 }
201
202 if (ldt != NULL) {
203 type->typ_lu = ldt;
204 rc = lu_device_type_init(ldt);
205 if (rc != 0)
206 goto failed;
207 }
208
209 spin_lock(&obd_types_lock);
210 list_add(&type->typ_chain, &obd_types);
211 spin_unlock(&obd_types_lock);
212
213 return 0;
214
215 failed:
216 if (type->typ_name != NULL)
217 OBD_FREE(type->typ_name, strlen(name) + 1);
218 if (type->typ_md_ops != NULL)
219 OBD_FREE_PTR(type->typ_md_ops);
220 if (type->typ_dt_ops != NULL)
221 OBD_FREE_PTR(type->typ_dt_ops);
222 OBD_FREE(type, sizeof(*type));
223 return rc;
224 }
225 EXPORT_SYMBOL(class_register_type);
226
class_unregister_type(const char * name)227 int class_unregister_type(const char *name)
228 {
229 struct obd_type *type = class_search_type(name);
230
231 if (!type) {
232 CERROR("unknown obd type\n");
233 return -EINVAL;
234 }
235
236 if (type->typ_refcnt) {
237 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
238 /* This is a bad situation, let's make the best of it */
239 /* Remove ops, but leave the name for debugging */
240 OBD_FREE_PTR(type->typ_dt_ops);
241 OBD_FREE_PTR(type->typ_md_ops);
242 return -EBUSY;
243 }
244
245 if (type->typ_procroot) {
246 lprocfs_remove(&type->typ_procroot);
247 }
248
249 if (type->typ_lu)
250 lu_device_type_fini(type->typ_lu);
251
252 spin_lock(&obd_types_lock);
253 list_del(&type->typ_chain);
254 spin_unlock(&obd_types_lock);
255 OBD_FREE(type->typ_name, strlen(name) + 1);
256 if (type->typ_dt_ops != NULL)
257 OBD_FREE_PTR(type->typ_dt_ops);
258 if (type->typ_md_ops != NULL)
259 OBD_FREE_PTR(type->typ_md_ops);
260 OBD_FREE(type, sizeof(*type));
261 return 0;
262 } /* class_unregister_type */
263 EXPORT_SYMBOL(class_unregister_type);
264
265 /**
266 * Create a new obd device.
267 *
268 * Find an empty slot in ::obd_devs[], create a new obd device in it.
269 *
270 * \param[in] type_name obd device type string.
271 * \param[in] name obd device name.
272 *
273 * \retval NULL if create fails, otherwise return the obd device
274 * pointer created.
275 */
class_newdev(const char * type_name,const char * name)276 struct obd_device *class_newdev(const char *type_name, const char *name)
277 {
278 struct obd_device *result = NULL;
279 struct obd_device *newdev;
280 struct obd_type *type = NULL;
281 int i;
282 int new_obd_minor = 0;
283
284 if (strlen(name) >= MAX_OBD_NAME) {
285 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
286 return ERR_PTR(-EINVAL);
287 }
288
289 type = class_get_type(type_name);
290 if (type == NULL){
291 CERROR("OBD: unknown type: %s\n", type_name);
292 return ERR_PTR(-ENODEV);
293 }
294
295 newdev = obd_device_alloc();
296 if (newdev == NULL) {
297 result = ERR_PTR(-ENOMEM);
298 goto out_type;
299 }
300
301 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
302
303 write_lock(&obd_dev_lock);
304 for (i = 0; i < class_devno_max(); i++) {
305 struct obd_device *obd = class_num2obd(i);
306
307 if (obd && (strcmp(name, obd->obd_name) == 0)) {
308 CERROR("Device %s already exists at %d, won't add\n",
309 name, i);
310 if (result) {
311 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
312 "%p obd_magic %08x != %08x\n", result,
313 result->obd_magic, OBD_DEVICE_MAGIC);
314 LASSERTF(result->obd_minor == new_obd_minor,
315 "%p obd_minor %d != %d\n", result,
316 result->obd_minor, new_obd_minor);
317
318 obd_devs[result->obd_minor] = NULL;
319 result->obd_name[0] = '\0';
320 }
321 result = ERR_PTR(-EEXIST);
322 break;
323 }
324 if (!result && !obd) {
325 result = newdev;
326 result->obd_minor = i;
327 new_obd_minor = i;
328 result->obd_type = type;
329 strncpy(result->obd_name, name,
330 sizeof(result->obd_name) - 1);
331 obd_devs[i] = result;
332 }
333 }
334 write_unlock(&obd_dev_lock);
335
336 if (result == NULL && i >= class_devno_max()) {
337 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
338 class_devno_max());
339 result = ERR_PTR(-EOVERFLOW);
340 goto out;
341 }
342
343 if (IS_ERR(result))
344 goto out;
345
346 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
347 result->obd_name, result);
348
349 return result;
350 out:
351 obd_device_free(newdev);
352 out_type:
353 class_put_type(type);
354 return result;
355 }
356
class_release_dev(struct obd_device * obd)357 void class_release_dev(struct obd_device *obd)
358 {
359 struct obd_type *obd_type = obd->obd_type;
360
361 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
362 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
363 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
364 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
365 LASSERT(obd_type != NULL);
366
367 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
368 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
369
370 write_lock(&obd_dev_lock);
371 obd_devs[obd->obd_minor] = NULL;
372 write_unlock(&obd_dev_lock);
373 obd_device_free(obd);
374
375 class_put_type(obd_type);
376 }
377
class_name2dev(const char * name)378 int class_name2dev(const char *name)
379 {
380 int i;
381
382 if (!name)
383 return -1;
384
385 read_lock(&obd_dev_lock);
386 for (i = 0; i < class_devno_max(); i++) {
387 struct obd_device *obd = class_num2obd(i);
388
389 if (obd && strcmp(name, obd->obd_name) == 0) {
390 /* Make sure we finished attaching before we give
391 out any references */
392 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
393 if (obd->obd_attached) {
394 read_unlock(&obd_dev_lock);
395 return i;
396 }
397 break;
398 }
399 }
400 read_unlock(&obd_dev_lock);
401
402 return -1;
403 }
404 EXPORT_SYMBOL(class_name2dev);
405
class_name2obd(const char * name)406 struct obd_device *class_name2obd(const char *name)
407 {
408 int dev = class_name2dev(name);
409
410 if (dev < 0 || dev > class_devno_max())
411 return NULL;
412 return class_num2obd(dev);
413 }
414 EXPORT_SYMBOL(class_name2obd);
415
class_uuid2dev(struct obd_uuid * uuid)416 int class_uuid2dev(struct obd_uuid *uuid)
417 {
418 int i;
419
420 read_lock(&obd_dev_lock);
421 for (i = 0; i < class_devno_max(); i++) {
422 struct obd_device *obd = class_num2obd(i);
423
424 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
425 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
426 read_unlock(&obd_dev_lock);
427 return i;
428 }
429 }
430 read_unlock(&obd_dev_lock);
431
432 return -1;
433 }
434 EXPORT_SYMBOL(class_uuid2dev);
435
class_uuid2obd(struct obd_uuid * uuid)436 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
437 {
438 int dev = class_uuid2dev(uuid);
439 if (dev < 0)
440 return NULL;
441 return class_num2obd(dev);
442 }
443 EXPORT_SYMBOL(class_uuid2obd);
444
445 /**
446 * Get obd device from ::obd_devs[]
447 *
448 * \param num [in] array index
449 *
450 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
451 * otherwise return the obd device there.
452 */
class_num2obd(int num)453 struct obd_device *class_num2obd(int num)
454 {
455 struct obd_device *obd = NULL;
456
457 if (num < class_devno_max()) {
458 obd = obd_devs[num];
459 if (obd == NULL)
460 return NULL;
461
462 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
463 "%p obd_magic %08x != %08x\n",
464 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
465 LASSERTF(obd->obd_minor == num,
466 "%p obd_minor %0d != %0d\n",
467 obd, obd->obd_minor, num);
468 }
469
470 return obd;
471 }
472 EXPORT_SYMBOL(class_num2obd);
473
474 /**
475 * Get obd devices count. Device in any
476 * state are counted
477 * \retval obd device count
478 */
get_devices_count(void)479 int get_devices_count(void)
480 {
481 int index, max_index = class_devno_max(), dev_count = 0;
482
483 read_lock(&obd_dev_lock);
484 for (index = 0; index <= max_index; index++) {
485 struct obd_device *obd = class_num2obd(index);
486 if (obd != NULL)
487 dev_count++;
488 }
489 read_unlock(&obd_dev_lock);
490
491 return dev_count;
492 }
493 EXPORT_SYMBOL(get_devices_count);
494
class_obd_list(void)495 void class_obd_list(void)
496 {
497 char *status;
498 int i;
499
500 read_lock(&obd_dev_lock);
501 for (i = 0; i < class_devno_max(); i++) {
502 struct obd_device *obd = class_num2obd(i);
503
504 if (obd == NULL)
505 continue;
506 if (obd->obd_stopping)
507 status = "ST";
508 else if (obd->obd_set_up)
509 status = "UP";
510 else if (obd->obd_attached)
511 status = "AT";
512 else
513 status = "--";
514 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
515 i, status, obd->obd_type->typ_name,
516 obd->obd_name, obd->obd_uuid.uuid,
517 atomic_read(&obd->obd_refcount));
518 }
519 read_unlock(&obd_dev_lock);
520 return;
521 }
522
523 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
524 specified, then only the client with that uuid is returned,
525 otherwise any client connected to the tgt is returned. */
class_find_client_obd(struct obd_uuid * tgt_uuid,const char * typ_name,struct obd_uuid * grp_uuid)526 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
527 const char *typ_name,
528 struct obd_uuid *grp_uuid)
529 {
530 int i;
531
532 read_lock(&obd_dev_lock);
533 for (i = 0; i < class_devno_max(); i++) {
534 struct obd_device *obd = class_num2obd(i);
535
536 if (obd == NULL)
537 continue;
538 if ((strncmp(obd->obd_type->typ_name, typ_name,
539 strlen(typ_name)) == 0)) {
540 if (obd_uuid_equals(tgt_uuid,
541 &obd->u.cli.cl_target_uuid) &&
542 ((grp_uuid)? obd_uuid_equals(grp_uuid,
543 &obd->obd_uuid) : 1)) {
544 read_unlock(&obd_dev_lock);
545 return obd;
546 }
547 }
548 }
549 read_unlock(&obd_dev_lock);
550
551 return NULL;
552 }
553 EXPORT_SYMBOL(class_find_client_obd);
554
555 /* Iterate the obd_device list looking devices have grp_uuid. Start
556 searching at *next, and if a device is found, the next index to look
557 at is saved in *next. If next is NULL, then the first matching device
558 will always be returned. */
class_devices_in_group(struct obd_uuid * grp_uuid,int * next)559 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
560 {
561 int i;
562
563 if (next == NULL)
564 i = 0;
565 else if (*next >= 0 && *next < class_devno_max())
566 i = *next;
567 else
568 return NULL;
569
570 read_lock(&obd_dev_lock);
571 for (; i < class_devno_max(); i++) {
572 struct obd_device *obd = class_num2obd(i);
573
574 if (obd == NULL)
575 continue;
576 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
577 if (next != NULL)
578 *next = i+1;
579 read_unlock(&obd_dev_lock);
580 return obd;
581 }
582 }
583 read_unlock(&obd_dev_lock);
584
585 return NULL;
586 }
587 EXPORT_SYMBOL(class_devices_in_group);
588
589 /**
590 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
591 * adjust sptlrpc settings accordingly.
592 */
class_notify_sptlrpc_conf(const char * fsname,int namelen)593 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
594 {
595 struct obd_device *obd;
596 const char *type;
597 int i, rc = 0, rc2;
598
599 LASSERT(namelen > 0);
600
601 read_lock(&obd_dev_lock);
602 for (i = 0; i < class_devno_max(); i++) {
603 obd = class_num2obd(i);
604
605 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
606 continue;
607
608 /* only notify mdc, osc, mdt, ost */
609 type = obd->obd_type->typ_name;
610 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
611 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
612 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
613 strcmp(type, LUSTRE_OST_NAME) != 0)
614 continue;
615
616 if (strncmp(obd->obd_name, fsname, namelen))
617 continue;
618
619 class_incref(obd, __func__, obd);
620 read_unlock(&obd_dev_lock);
621 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
622 sizeof(KEY_SPTLRPC_CONF),
623 KEY_SPTLRPC_CONF, 0, NULL, NULL);
624 rc = rc ? rc : rc2;
625 class_decref(obd, __func__, obd);
626 read_lock(&obd_dev_lock);
627 }
628 read_unlock(&obd_dev_lock);
629 return rc;
630 }
631 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
632
obd_cleanup_caches(void)633 void obd_cleanup_caches(void)
634 {
635 if (obd_device_cachep) {
636 kmem_cache_destroy(obd_device_cachep);
637 obd_device_cachep = NULL;
638 }
639 if (obdo_cachep) {
640 kmem_cache_destroy(obdo_cachep);
641 obdo_cachep = NULL;
642 }
643 if (import_cachep) {
644 kmem_cache_destroy(import_cachep);
645 import_cachep = NULL;
646 }
647 if (capa_cachep) {
648 kmem_cache_destroy(capa_cachep);
649 capa_cachep = NULL;
650 }
651 }
652
obd_init_caches(void)653 int obd_init_caches(void)
654 {
655 LASSERT(obd_device_cachep == NULL);
656 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
657 sizeof(struct obd_device),
658 0, 0, NULL);
659 if (!obd_device_cachep)
660 goto out;
661
662 LASSERT(obdo_cachep == NULL);
663 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
664 0, 0, NULL);
665 if (!obdo_cachep)
666 goto out;
667
668 LASSERT(import_cachep == NULL);
669 import_cachep = kmem_cache_create("ll_import_cache",
670 sizeof(struct obd_import),
671 0, 0, NULL);
672 if (!import_cachep)
673 goto out;
674
675 LASSERT(capa_cachep == NULL);
676 capa_cachep = kmem_cache_create("capa_cache",
677 sizeof(struct obd_capa), 0, 0, NULL);
678 if (!capa_cachep)
679 goto out;
680
681 return 0;
682 out:
683 obd_cleanup_caches();
684 return -ENOMEM;
685
686 }
687
688 /* map connection to client */
class_conn2export(struct lustre_handle * conn)689 struct obd_export *class_conn2export(struct lustre_handle *conn)
690 {
691 struct obd_export *export;
692
693 if (!conn) {
694 CDEBUG(D_CACHE, "looking for null handle\n");
695 return NULL;
696 }
697
698 if (conn->cookie == -1) { /* this means assign a new connection */
699 CDEBUG(D_CACHE, "want a new connection\n");
700 return NULL;
701 }
702
703 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
704 export = class_handle2object(conn->cookie);
705 return export;
706 }
707 EXPORT_SYMBOL(class_conn2export);
708
class_exp2obd(struct obd_export * exp)709 struct obd_device *class_exp2obd(struct obd_export *exp)
710 {
711 if (exp)
712 return exp->exp_obd;
713 return NULL;
714 }
715 EXPORT_SYMBOL(class_exp2obd);
716
class_conn2obd(struct lustre_handle * conn)717 struct obd_device *class_conn2obd(struct lustre_handle *conn)
718 {
719 struct obd_export *export;
720 export = class_conn2export(conn);
721 if (export) {
722 struct obd_device *obd = export->exp_obd;
723 class_export_put(export);
724 return obd;
725 }
726 return NULL;
727 }
728 EXPORT_SYMBOL(class_conn2obd);
729
class_exp2cliimp(struct obd_export * exp)730 struct obd_import *class_exp2cliimp(struct obd_export *exp)
731 {
732 struct obd_device *obd = exp->exp_obd;
733 if (obd == NULL)
734 return NULL;
735 return obd->u.cli.cl_import;
736 }
737 EXPORT_SYMBOL(class_exp2cliimp);
738
class_conn2cliimp(struct lustre_handle * conn)739 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
740 {
741 struct obd_device *obd = class_conn2obd(conn);
742 if (obd == NULL)
743 return NULL;
744 return obd->u.cli.cl_import;
745 }
746 EXPORT_SYMBOL(class_conn2cliimp);
747
748 /* Export management functions */
class_export_destroy(struct obd_export * exp)749 static void class_export_destroy(struct obd_export *exp)
750 {
751 struct obd_device *obd = exp->exp_obd;
752
753 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
754 LASSERT(obd != NULL);
755
756 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
757 exp->exp_client_uuid.uuid, obd->obd_name);
758
759 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
760 if (exp->exp_connection)
761 ptlrpc_put_connection_superhack(exp->exp_connection);
762
763 LASSERT(list_empty(&exp->exp_outstanding_replies));
764 LASSERT(list_empty(&exp->exp_uncommitted_replies));
765 LASSERT(list_empty(&exp->exp_req_replay_queue));
766 LASSERT(list_empty(&exp->exp_hp_rpcs));
767 obd_destroy_export(exp);
768 class_decref(obd, "export", exp);
769
770 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
771 }
772
export_handle_addref(void * export)773 static void export_handle_addref(void *export)
774 {
775 class_export_get(export);
776 }
777
778 static struct portals_handle_ops export_handle_ops = {
779 .hop_addref = export_handle_addref,
780 .hop_free = NULL,
781 };
782
class_export_get(struct obd_export * exp)783 struct obd_export *class_export_get(struct obd_export *exp)
784 {
785 atomic_inc(&exp->exp_refcount);
786 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
787 atomic_read(&exp->exp_refcount));
788 return exp;
789 }
790 EXPORT_SYMBOL(class_export_get);
791
class_export_put(struct obd_export * exp)792 void class_export_put(struct obd_export *exp)
793 {
794 LASSERT(exp != NULL);
795 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
796 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
797 atomic_read(&exp->exp_refcount) - 1);
798
799 if (atomic_dec_and_test(&exp->exp_refcount)) {
800 LASSERT(!list_empty(&exp->exp_obd_chain));
801 CDEBUG(D_IOCTL, "final put %p/%s\n",
802 exp, exp->exp_client_uuid.uuid);
803
804 /* release nid stat refererence */
805 lprocfs_exp_cleanup(exp);
806
807 obd_zombie_export_add(exp);
808 }
809 }
810 EXPORT_SYMBOL(class_export_put);
811
812 /* Creates a new export, adds it to the hash table, and returns a
813 * pointer to it. The refcount is 2: one for the hash reference, and
814 * one for the pointer returned by this function. */
class_new_export(struct obd_device * obd,struct obd_uuid * cluuid)815 struct obd_export *class_new_export(struct obd_device *obd,
816 struct obd_uuid *cluuid)
817 {
818 struct obd_export *export;
819 struct cfs_hash *hash = NULL;
820 int rc = 0;
821
822 OBD_ALLOC_PTR(export);
823 if (!export)
824 return ERR_PTR(-ENOMEM);
825
826 export->exp_conn_cnt = 0;
827 export->exp_lock_hash = NULL;
828 export->exp_flock_hash = NULL;
829 atomic_set(&export->exp_refcount, 2);
830 atomic_set(&export->exp_rpc_count, 0);
831 atomic_set(&export->exp_cb_count, 0);
832 atomic_set(&export->exp_locks_count, 0);
833 #if LUSTRE_TRACKS_LOCK_EXP_REFS
834 INIT_LIST_HEAD(&export->exp_locks_list);
835 spin_lock_init(&export->exp_locks_list_guard);
836 #endif
837 atomic_set(&export->exp_replay_count, 0);
838 export->exp_obd = obd;
839 INIT_LIST_HEAD(&export->exp_outstanding_replies);
840 spin_lock_init(&export->exp_uncommitted_replies_lock);
841 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
842 INIT_LIST_HEAD(&export->exp_req_replay_queue);
843 INIT_LIST_HEAD(&export->exp_handle.h_link);
844 INIT_LIST_HEAD(&export->exp_hp_rpcs);
845 class_handle_hash(&export->exp_handle, &export_handle_ops);
846 export->exp_last_request_time = get_seconds();
847 spin_lock_init(&export->exp_lock);
848 spin_lock_init(&export->exp_rpc_lock);
849 INIT_HLIST_NODE(&export->exp_uuid_hash);
850 INIT_HLIST_NODE(&export->exp_nid_hash);
851 spin_lock_init(&export->exp_bl_list_lock);
852 INIT_LIST_HEAD(&export->exp_bl_list);
853
854 export->exp_sp_peer = LUSTRE_SP_ANY;
855 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
856 export->exp_client_uuid = *cluuid;
857 obd_init_export(export);
858
859 spin_lock(&obd->obd_dev_lock);
860 /* shouldn't happen, but might race */
861 if (obd->obd_stopping) {
862 rc = -ENODEV;
863 goto exit_unlock;
864 }
865
866 hash = cfs_hash_getref(obd->obd_uuid_hash);
867 if (hash == NULL) {
868 rc = -ENODEV;
869 goto exit_unlock;
870 }
871 spin_unlock(&obd->obd_dev_lock);
872
873 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
874 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
875 if (rc != 0) {
876 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
877 obd->obd_name, cluuid->uuid, rc);
878 rc = -EALREADY;
879 goto exit_err;
880 }
881 }
882
883 spin_lock(&obd->obd_dev_lock);
884 if (obd->obd_stopping) {
885 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
886 rc = -ENODEV;
887 goto exit_unlock;
888 }
889
890 class_incref(obd, "export", export);
891 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
892 list_add_tail(&export->exp_obd_chain_timed,
893 &export->exp_obd->obd_exports_timed);
894 export->exp_obd->obd_num_exports++;
895 spin_unlock(&obd->obd_dev_lock);
896 cfs_hash_putref(hash);
897 return export;
898
899 exit_unlock:
900 spin_unlock(&obd->obd_dev_lock);
901 exit_err:
902 if (hash)
903 cfs_hash_putref(hash);
904 class_handle_unhash(&export->exp_handle);
905 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
906 obd_destroy_export(export);
907 OBD_FREE_PTR(export);
908 return ERR_PTR(rc);
909 }
910 EXPORT_SYMBOL(class_new_export);
911
class_unlink_export(struct obd_export * exp)912 void class_unlink_export(struct obd_export *exp)
913 {
914 class_handle_unhash(&exp->exp_handle);
915
916 spin_lock(&exp->exp_obd->obd_dev_lock);
917 /* delete an uuid-export hashitem from hashtables */
918 if (!hlist_unhashed(&exp->exp_uuid_hash))
919 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
920 &exp->exp_client_uuid,
921 &exp->exp_uuid_hash);
922
923 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
924 list_del_init(&exp->exp_obd_chain_timed);
925 exp->exp_obd->obd_num_exports--;
926 spin_unlock(&exp->exp_obd->obd_dev_lock);
927 class_export_put(exp);
928 }
929 EXPORT_SYMBOL(class_unlink_export);
930
931 /* Import management functions */
class_import_destroy(struct obd_import * imp)932 static void class_import_destroy(struct obd_import *imp)
933 {
934 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
935 imp->imp_obd->obd_name);
936
937 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
938
939 ptlrpc_put_connection_superhack(imp->imp_connection);
940
941 while (!list_empty(&imp->imp_conn_list)) {
942 struct obd_import_conn *imp_conn;
943
944 imp_conn = list_entry(imp->imp_conn_list.next,
945 struct obd_import_conn, oic_item);
946 list_del_init(&imp_conn->oic_item);
947 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
948 OBD_FREE(imp_conn, sizeof(*imp_conn));
949 }
950
951 LASSERT(imp->imp_sec == NULL);
952 class_decref(imp->imp_obd, "import", imp);
953 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
954 }
955
import_handle_addref(void * import)956 static void import_handle_addref(void *import)
957 {
958 class_import_get(import);
959 }
960
961 static struct portals_handle_ops import_handle_ops = {
962 .hop_addref = import_handle_addref,
963 .hop_free = NULL,
964 };
965
class_import_get(struct obd_import * import)966 struct obd_import *class_import_get(struct obd_import *import)
967 {
968 atomic_inc(&import->imp_refcount);
969 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
970 atomic_read(&import->imp_refcount),
971 import->imp_obd->obd_name);
972 return import;
973 }
974 EXPORT_SYMBOL(class_import_get);
975
class_import_put(struct obd_import * imp)976 void class_import_put(struct obd_import *imp)
977 {
978 LASSERT(list_empty(&imp->imp_zombie_chain));
979 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
980
981 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
982 atomic_read(&imp->imp_refcount) - 1,
983 imp->imp_obd->obd_name);
984
985 if (atomic_dec_and_test(&imp->imp_refcount)) {
986 CDEBUG(D_INFO, "final put import %p\n", imp);
987 obd_zombie_import_add(imp);
988 }
989
990 /* catch possible import put race */
991 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
992 }
993 EXPORT_SYMBOL(class_import_put);
994
init_imp_at(struct imp_at * at)995 static void init_imp_at(struct imp_at *at) {
996 int i;
997 at_init(&at->iat_net_latency, 0, 0);
998 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
999 /* max service estimates are tracked on the server side, so
1000 don't use the AT history here, just use the last reported
1001 val. (But keep hist for proc histogram, worst_ever) */
1002 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1003 AT_FLG_NOHIST);
1004 }
1005 }
1006
class_new_import(struct obd_device * obd)1007 struct obd_import *class_new_import(struct obd_device *obd)
1008 {
1009 struct obd_import *imp;
1010
1011 OBD_ALLOC(imp, sizeof(*imp));
1012 if (imp == NULL)
1013 return NULL;
1014
1015 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1016 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1017 INIT_LIST_HEAD(&imp->imp_replay_list);
1018 INIT_LIST_HEAD(&imp->imp_sending_list);
1019 INIT_LIST_HEAD(&imp->imp_delayed_list);
1020 INIT_LIST_HEAD(&imp->imp_committed_list);
1021 imp->imp_replay_cursor = &imp->imp_committed_list;
1022 spin_lock_init(&imp->imp_lock);
1023 imp->imp_last_success_conn = 0;
1024 imp->imp_state = LUSTRE_IMP_NEW;
1025 imp->imp_obd = class_incref(obd, "import", imp);
1026 mutex_init(&imp->imp_sec_mutex);
1027 init_waitqueue_head(&imp->imp_recovery_waitq);
1028
1029 atomic_set(&imp->imp_refcount, 2);
1030 atomic_set(&imp->imp_unregistering, 0);
1031 atomic_set(&imp->imp_inflight, 0);
1032 atomic_set(&imp->imp_replay_inflight, 0);
1033 atomic_set(&imp->imp_inval_count, 0);
1034 INIT_LIST_HEAD(&imp->imp_conn_list);
1035 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1036 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1037 init_imp_at(&imp->imp_at);
1038
1039 /* the default magic is V2, will be used in connect RPC, and
1040 * then adjusted according to the flags in request/reply. */
1041 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1042
1043 return imp;
1044 }
1045 EXPORT_SYMBOL(class_new_import);
1046
class_destroy_import(struct obd_import * import)1047 void class_destroy_import(struct obd_import *import)
1048 {
1049 LASSERT(import != NULL);
1050 LASSERT(import != LP_POISON);
1051
1052 class_handle_unhash(&import->imp_handle);
1053
1054 spin_lock(&import->imp_lock);
1055 import->imp_generation++;
1056 spin_unlock(&import->imp_lock);
1057 class_import_put(import);
1058 }
1059 EXPORT_SYMBOL(class_destroy_import);
1060
1061 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1062
__class_export_add_lock_ref(struct obd_export * exp,struct ldlm_lock * lock)1063 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1064 {
1065 spin_lock(&exp->exp_locks_list_guard);
1066
1067 LASSERT(lock->l_exp_refs_nr >= 0);
1068
1069 if (lock->l_exp_refs_target != NULL &&
1070 lock->l_exp_refs_target != exp) {
1071 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1072 exp, lock, lock->l_exp_refs_target);
1073 }
1074 if ((lock->l_exp_refs_nr ++) == 0) {
1075 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1076 lock->l_exp_refs_target = exp;
1077 }
1078 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1079 lock, exp, lock->l_exp_refs_nr);
1080 spin_unlock(&exp->exp_locks_list_guard);
1081 }
1082 EXPORT_SYMBOL(__class_export_add_lock_ref);
1083
__class_export_del_lock_ref(struct obd_export * exp,struct ldlm_lock * lock)1084 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1085 {
1086 spin_lock(&exp->exp_locks_list_guard);
1087 LASSERT(lock->l_exp_refs_nr > 0);
1088 if (lock->l_exp_refs_target != exp) {
1089 LCONSOLE_WARN("lock %p, mismatching export pointers: %p, %p\n",
1090 lock, lock->l_exp_refs_target, exp);
1091 }
1092 if (-- lock->l_exp_refs_nr == 0) {
1093 list_del_init(&lock->l_exp_refs_link);
1094 lock->l_exp_refs_target = NULL;
1095 }
1096 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1097 lock, exp, lock->l_exp_refs_nr);
1098 spin_unlock(&exp->exp_locks_list_guard);
1099 }
1100 EXPORT_SYMBOL(__class_export_del_lock_ref);
1101 #endif
1102
1103 /* A connection defines an export context in which preallocation can
1104 be managed. This releases the export pointer reference, and returns
1105 the export handle, so the export refcount is 1 when this function
1106 returns. */
class_connect(struct lustre_handle * conn,struct obd_device * obd,struct obd_uuid * cluuid)1107 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1108 struct obd_uuid *cluuid)
1109 {
1110 struct obd_export *export;
1111 LASSERT(conn != NULL);
1112 LASSERT(obd != NULL);
1113 LASSERT(cluuid != NULL);
1114
1115 export = class_new_export(obd, cluuid);
1116 if (IS_ERR(export))
1117 return PTR_ERR(export);
1118
1119 conn->cookie = export->exp_handle.h_cookie;
1120 class_export_put(export);
1121
1122 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1123 cluuid->uuid, conn->cookie);
1124 return 0;
1125 }
1126 EXPORT_SYMBOL(class_connect);
1127
1128 /* if export is involved in recovery then clean up related things */
class_export_recovery_cleanup(struct obd_export * exp)1129 static void class_export_recovery_cleanup(struct obd_export *exp)
1130 {
1131 struct obd_device *obd = exp->exp_obd;
1132
1133 spin_lock(&obd->obd_recovery_task_lock);
1134 if (exp->exp_delayed)
1135 obd->obd_delayed_clients--;
1136 if (obd->obd_recovering) {
1137 if (exp->exp_in_recovery) {
1138 spin_lock(&exp->exp_lock);
1139 exp->exp_in_recovery = 0;
1140 spin_unlock(&exp->exp_lock);
1141 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1142 atomic_dec(&obd->obd_connected_clients);
1143 }
1144
1145 /* if called during recovery then should update
1146 * obd_stale_clients counter,
1147 * lightweight exports are not counted */
1148 if (exp->exp_failed &&
1149 (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1150 exp->exp_obd->obd_stale_clients++;
1151 }
1152 spin_unlock(&obd->obd_recovery_task_lock);
1153
1154 spin_lock(&exp->exp_lock);
1155 /** Cleanup req replay fields */
1156 if (exp->exp_req_replay_needed) {
1157 exp->exp_req_replay_needed = 0;
1158
1159 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1160 atomic_dec(&obd->obd_req_replay_clients);
1161 }
1162
1163 /** Cleanup lock replay data */
1164 if (exp->exp_lock_replay_needed) {
1165 exp->exp_lock_replay_needed = 0;
1166
1167 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1168 atomic_dec(&obd->obd_lock_replay_clients);
1169 }
1170 spin_unlock(&exp->exp_lock);
1171 }
1172
1173 /* This function removes 1-3 references from the export:
1174 * 1 - for export pointer passed
1175 * and if disconnect really need
1176 * 2 - removing from hash
1177 * 3 - in client_unlink_export
1178 * The export pointer passed to this function can destroyed */
class_disconnect(struct obd_export * export)1179 int class_disconnect(struct obd_export *export)
1180 {
1181 int already_disconnected;
1182
1183 if (export == NULL) {
1184 CWARN("attempting to free NULL export %p\n", export);
1185 return -EINVAL;
1186 }
1187
1188 spin_lock(&export->exp_lock);
1189 already_disconnected = export->exp_disconnected;
1190 export->exp_disconnected = 1;
1191 spin_unlock(&export->exp_lock);
1192
1193 /* class_cleanup(), abort_recovery(), and class_fail_export()
1194 * all end up in here, and if any of them race we shouldn't
1195 * call extra class_export_puts(). */
1196 if (already_disconnected) {
1197 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1198 goto no_disconn;
1199 }
1200
1201 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1202 export->exp_handle.h_cookie);
1203
1204 if (!hlist_unhashed(&export->exp_nid_hash))
1205 cfs_hash_del(export->exp_obd->obd_nid_hash,
1206 &export->exp_connection->c_peer.nid,
1207 &export->exp_nid_hash);
1208
1209 class_export_recovery_cleanup(export);
1210 class_unlink_export(export);
1211 no_disconn:
1212 class_export_put(export);
1213 return 0;
1214 }
1215 EXPORT_SYMBOL(class_disconnect);
1216
1217 /* Return non-zero for a fully connected export */
class_connected_export(struct obd_export * exp)1218 int class_connected_export(struct obd_export *exp)
1219 {
1220 if (exp) {
1221 int connected;
1222 spin_lock(&exp->exp_lock);
1223 connected = exp->exp_conn_cnt > 0;
1224 spin_unlock(&exp->exp_lock);
1225 return connected;
1226 }
1227 return 0;
1228 }
1229 EXPORT_SYMBOL(class_connected_export);
1230
class_disconnect_export_list(struct list_head * list,enum obd_option flags)1231 static void class_disconnect_export_list(struct list_head *list,
1232 enum obd_option flags)
1233 {
1234 int rc;
1235 struct obd_export *exp;
1236
1237 /* It's possible that an export may disconnect itself, but
1238 * nothing else will be added to this list. */
1239 while (!list_empty(list)) {
1240 exp = list_entry(list->next, struct obd_export,
1241 exp_obd_chain);
1242 /* need for safe call CDEBUG after obd_disconnect */
1243 class_export_get(exp);
1244
1245 spin_lock(&exp->exp_lock);
1246 exp->exp_flags = flags;
1247 spin_unlock(&exp->exp_lock);
1248
1249 if (obd_uuid_equals(&exp->exp_client_uuid,
1250 &exp->exp_obd->obd_uuid)) {
1251 CDEBUG(D_HA,
1252 "exp %p export uuid == obd uuid, don't discon\n",
1253 exp);
1254 /* Need to delete this now so we don't end up pointing
1255 * to work_list later when this export is cleaned up. */
1256 list_del_init(&exp->exp_obd_chain);
1257 class_export_put(exp);
1258 continue;
1259 }
1260
1261 class_export_get(exp);
1262 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), last request at " CFS_TIME_T "\n",
1263 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1264 exp, exp->exp_last_request_time);
1265 /* release one export reference anyway */
1266 rc = obd_disconnect(exp);
1267
1268 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1269 obd_export_nid2str(exp), exp, rc);
1270 class_export_put(exp);
1271 }
1272 }
1273
class_disconnect_exports(struct obd_device * obd)1274 void class_disconnect_exports(struct obd_device *obd)
1275 {
1276 struct list_head work_list;
1277
1278 /* Move all of the exports from obd_exports to a work list, en masse. */
1279 INIT_LIST_HEAD(&work_list);
1280 spin_lock(&obd->obd_dev_lock);
1281 list_splice_init(&obd->obd_exports, &work_list);
1282 list_splice_init(&obd->obd_delayed_exports, &work_list);
1283 spin_unlock(&obd->obd_dev_lock);
1284
1285 if (!list_empty(&work_list)) {
1286 CDEBUG(D_HA, "OBD device %d (%p) has exports, disconnecting them\n",
1287 obd->obd_minor, obd);
1288 class_disconnect_export_list(&work_list,
1289 exp_flags_from_obd(obd));
1290 } else
1291 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1292 obd->obd_minor, obd);
1293 }
1294 EXPORT_SYMBOL(class_disconnect_exports);
1295
1296 /* Remove exports that have not completed recovery.
1297 */
class_disconnect_stale_exports(struct obd_device * obd,int (* test_export)(struct obd_export *))1298 void class_disconnect_stale_exports(struct obd_device *obd,
1299 int (*test_export)(struct obd_export *))
1300 {
1301 struct list_head work_list;
1302 struct obd_export *exp, *n;
1303 int evicted = 0;
1304
1305 INIT_LIST_HEAD(&work_list);
1306 spin_lock(&obd->obd_dev_lock);
1307 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1308 exp_obd_chain) {
1309 /* don't count self-export as client */
1310 if (obd_uuid_equals(&exp->exp_client_uuid,
1311 &exp->exp_obd->obd_uuid))
1312 continue;
1313
1314 /* don't evict clients which have no slot in last_rcvd
1315 * (e.g. lightweight connection) */
1316 if (exp->exp_target_data.ted_lr_idx == -1)
1317 continue;
1318
1319 spin_lock(&exp->exp_lock);
1320 if (exp->exp_failed || test_export(exp)) {
1321 spin_unlock(&exp->exp_lock);
1322 continue;
1323 }
1324 exp->exp_failed = 1;
1325 spin_unlock(&exp->exp_lock);
1326
1327 list_move(&exp->exp_obd_chain, &work_list);
1328 evicted++;
1329 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1330 obd->obd_name, exp->exp_client_uuid.uuid,
1331 exp->exp_connection == NULL ? "<unknown>" :
1332 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1333 print_export_data(exp, "EVICTING", 0);
1334 }
1335 spin_unlock(&obd->obd_dev_lock);
1336
1337 if (evicted)
1338 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1339 obd->obd_name, evicted);
1340
1341 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1342 OBD_OPT_ABORT_RECOV);
1343 }
1344 EXPORT_SYMBOL(class_disconnect_stale_exports);
1345
class_fail_export(struct obd_export * exp)1346 void class_fail_export(struct obd_export *exp)
1347 {
1348 int rc, already_failed;
1349
1350 spin_lock(&exp->exp_lock);
1351 already_failed = exp->exp_failed;
1352 exp->exp_failed = 1;
1353 spin_unlock(&exp->exp_lock);
1354
1355 if (already_failed) {
1356 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1357 exp, exp->exp_client_uuid.uuid);
1358 return;
1359 }
1360
1361 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1362 exp, exp->exp_client_uuid.uuid);
1363
1364 if (obd_dump_on_timeout)
1365 libcfs_debug_dumplog();
1366
1367 /* need for safe call CDEBUG after obd_disconnect */
1368 class_export_get(exp);
1369
1370 /* Most callers into obd_disconnect are removing their own reference
1371 * (request, for example) in addition to the one from the hash table.
1372 * We don't have such a reference here, so make one. */
1373 class_export_get(exp);
1374 rc = obd_disconnect(exp);
1375 if (rc)
1376 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1377 else
1378 CDEBUG(D_HA, "disconnected export %p/%s\n",
1379 exp, exp->exp_client_uuid.uuid);
1380 class_export_put(exp);
1381 }
1382 EXPORT_SYMBOL(class_fail_export);
1383
obd_export_nid2str(struct obd_export * exp)1384 char *obd_export_nid2str(struct obd_export *exp)
1385 {
1386 if (exp->exp_connection != NULL)
1387 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1388
1389 return "(no nid)";
1390 }
1391 EXPORT_SYMBOL(obd_export_nid2str);
1392
obd_export_evict_by_nid(struct obd_device * obd,const char * nid)1393 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1394 {
1395 struct cfs_hash *nid_hash;
1396 struct obd_export *doomed_exp = NULL;
1397 int exports_evicted = 0;
1398
1399 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1400
1401 spin_lock(&obd->obd_dev_lock);
1402 /* umount has run already, so evict thread should leave
1403 * its task to umount thread now */
1404 if (obd->obd_stopping) {
1405 spin_unlock(&obd->obd_dev_lock);
1406 return exports_evicted;
1407 }
1408 nid_hash = obd->obd_nid_hash;
1409 cfs_hash_getref(nid_hash);
1410 spin_unlock(&obd->obd_dev_lock);
1411
1412 do {
1413 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1414 if (doomed_exp == NULL)
1415 break;
1416
1417 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1418 "nid %s found, wanted nid %s, requested nid %s\n",
1419 obd_export_nid2str(doomed_exp),
1420 libcfs_nid2str(nid_key), nid);
1421 LASSERTF(doomed_exp != obd->obd_self_export,
1422 "self-export is hashed by NID?\n");
1423 exports_evicted++;
1424 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1425 obd->obd_name,
1426 obd_uuid2str(&doomed_exp->exp_client_uuid),
1427 obd_export_nid2str(doomed_exp));
1428 class_fail_export(doomed_exp);
1429 class_export_put(doomed_exp);
1430 } while (1);
1431
1432 cfs_hash_putref(nid_hash);
1433
1434 if (!exports_evicted)
1435 CDEBUG(D_HA,
1436 "%s: can't disconnect NID '%s': no exports found\n",
1437 obd->obd_name, nid);
1438 return exports_evicted;
1439 }
1440 EXPORT_SYMBOL(obd_export_evict_by_nid);
1441
obd_export_evict_by_uuid(struct obd_device * obd,const char * uuid)1442 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1443 {
1444 struct cfs_hash *uuid_hash;
1445 struct obd_export *doomed_exp = NULL;
1446 struct obd_uuid doomed_uuid;
1447 int exports_evicted = 0;
1448
1449 spin_lock(&obd->obd_dev_lock);
1450 if (obd->obd_stopping) {
1451 spin_unlock(&obd->obd_dev_lock);
1452 return exports_evicted;
1453 }
1454 uuid_hash = obd->obd_uuid_hash;
1455 cfs_hash_getref(uuid_hash);
1456 spin_unlock(&obd->obd_dev_lock);
1457
1458 obd_str2uuid(&doomed_uuid, uuid);
1459 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1460 CERROR("%s: can't evict myself\n", obd->obd_name);
1461 cfs_hash_putref(uuid_hash);
1462 return exports_evicted;
1463 }
1464
1465 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1466
1467 if (doomed_exp == NULL) {
1468 CERROR("%s: can't disconnect %s: no exports found\n",
1469 obd->obd_name, uuid);
1470 } else {
1471 CWARN("%s: evicting %s at administrative request\n",
1472 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1473 class_fail_export(doomed_exp);
1474 class_export_put(doomed_exp);
1475 exports_evicted++;
1476 }
1477 cfs_hash_putref(uuid_hash);
1478
1479 return exports_evicted;
1480 }
1481 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1482
1483 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1484 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1485 EXPORT_SYMBOL(class_export_dump_hook);
1486 #endif
1487
print_export_data(struct obd_export * exp,const char * status,int locks)1488 static void print_export_data(struct obd_export *exp, const char *status,
1489 int locks)
1490 {
1491 struct ptlrpc_reply_state *rs;
1492 struct ptlrpc_reply_state *first_reply = NULL;
1493 int nreplies = 0;
1494
1495 spin_lock(&exp->exp_lock);
1496 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1497 rs_exp_list) {
1498 if (nreplies == 0)
1499 first_reply = rs;
1500 nreplies++;
1501 }
1502 spin_unlock(&exp->exp_lock);
1503
1504 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s %llu\n",
1505 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1506 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1507 atomic_read(&exp->exp_rpc_count),
1508 atomic_read(&exp->exp_cb_count),
1509 atomic_read(&exp->exp_locks_count),
1510 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1511 nreplies, first_reply, nreplies > 3 ? "..." : "",
1512 exp->exp_last_committed);
1513 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1514 if (locks && class_export_dump_hook != NULL)
1515 class_export_dump_hook(exp);
1516 #endif
1517 }
1518
dump_exports(struct obd_device * obd,int locks)1519 void dump_exports(struct obd_device *obd, int locks)
1520 {
1521 struct obd_export *exp;
1522
1523 spin_lock(&obd->obd_dev_lock);
1524 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1525 print_export_data(exp, "ACTIVE", locks);
1526 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1527 print_export_data(exp, "UNLINKED", locks);
1528 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1529 print_export_data(exp, "DELAYED", locks);
1530 spin_unlock(&obd->obd_dev_lock);
1531 spin_lock(&obd_zombie_impexp_lock);
1532 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1533 print_export_data(exp, "ZOMBIE", locks);
1534 spin_unlock(&obd_zombie_impexp_lock);
1535 }
1536 EXPORT_SYMBOL(dump_exports);
1537
obd_exports_barrier(struct obd_device * obd)1538 void obd_exports_barrier(struct obd_device *obd)
1539 {
1540 int waited = 2;
1541 LASSERT(list_empty(&obd->obd_exports));
1542 spin_lock(&obd->obd_dev_lock);
1543 while (!list_empty(&obd->obd_unlinked_exports)) {
1544 spin_unlock(&obd->obd_dev_lock);
1545 set_current_state(TASK_UNINTERRUPTIBLE);
1546 schedule_timeout(cfs_time_seconds(waited));
1547 if (waited > 5 && IS_PO2(waited)) {
1548 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports more than %d seconds. The obd refcount = %d. Is it stuck?\n",
1549 obd->obd_name, waited,
1550 atomic_read(&obd->obd_refcount));
1551 dump_exports(obd, 1);
1552 }
1553 waited *= 2;
1554 spin_lock(&obd->obd_dev_lock);
1555 }
1556 spin_unlock(&obd->obd_dev_lock);
1557 }
1558 EXPORT_SYMBOL(obd_exports_barrier);
1559
1560 /* Total amount of zombies to be destroyed */
1561 static int zombies_count;
1562
1563 /**
1564 * kill zombie imports and exports
1565 */
obd_zombie_impexp_cull(void)1566 void obd_zombie_impexp_cull(void)
1567 {
1568 struct obd_import *import;
1569 struct obd_export *export;
1570
1571 do {
1572 spin_lock(&obd_zombie_impexp_lock);
1573
1574 import = NULL;
1575 if (!list_empty(&obd_zombie_imports)) {
1576 import = list_entry(obd_zombie_imports.next,
1577 struct obd_import,
1578 imp_zombie_chain);
1579 list_del_init(&import->imp_zombie_chain);
1580 }
1581
1582 export = NULL;
1583 if (!list_empty(&obd_zombie_exports)) {
1584 export = list_entry(obd_zombie_exports.next,
1585 struct obd_export,
1586 exp_obd_chain);
1587 list_del_init(&export->exp_obd_chain);
1588 }
1589
1590 spin_unlock(&obd_zombie_impexp_lock);
1591
1592 if (import != NULL) {
1593 class_import_destroy(import);
1594 spin_lock(&obd_zombie_impexp_lock);
1595 zombies_count--;
1596 spin_unlock(&obd_zombie_impexp_lock);
1597 }
1598
1599 if (export != NULL) {
1600 class_export_destroy(export);
1601 spin_lock(&obd_zombie_impexp_lock);
1602 zombies_count--;
1603 spin_unlock(&obd_zombie_impexp_lock);
1604 }
1605
1606 cond_resched();
1607 } while (import != NULL || export != NULL);
1608 }
1609
1610 static struct completion obd_zombie_start;
1611 static struct completion obd_zombie_stop;
1612 static unsigned long obd_zombie_flags;
1613 static wait_queue_head_t obd_zombie_waitq;
1614 static pid_t obd_zombie_pid;
1615
1616 enum {
1617 OBD_ZOMBIE_STOP = 0x0001,
1618 };
1619
1620 /**
1621 * check for work for kill zombie import/export thread.
1622 */
obd_zombie_impexp_check(void * arg)1623 static int obd_zombie_impexp_check(void *arg)
1624 {
1625 int rc;
1626
1627 spin_lock(&obd_zombie_impexp_lock);
1628 rc = (zombies_count == 0) &&
1629 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1630 spin_unlock(&obd_zombie_impexp_lock);
1631
1632 return rc;
1633 }
1634
1635 /**
1636 * Add export to the obd_zombie thread and notify it.
1637 */
obd_zombie_export_add(struct obd_export * exp)1638 static void obd_zombie_export_add(struct obd_export *exp) {
1639 spin_lock(&exp->exp_obd->obd_dev_lock);
1640 LASSERT(!list_empty(&exp->exp_obd_chain));
1641 list_del_init(&exp->exp_obd_chain);
1642 spin_unlock(&exp->exp_obd->obd_dev_lock);
1643 spin_lock(&obd_zombie_impexp_lock);
1644 zombies_count++;
1645 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1646 spin_unlock(&obd_zombie_impexp_lock);
1647
1648 obd_zombie_impexp_notify();
1649 }
1650
1651 /**
1652 * Add import to the obd_zombie thread and notify it.
1653 */
obd_zombie_import_add(struct obd_import * imp)1654 static void obd_zombie_import_add(struct obd_import *imp) {
1655 LASSERT(imp->imp_sec == NULL);
1656 LASSERT(imp->imp_rq_pool == NULL);
1657 spin_lock(&obd_zombie_impexp_lock);
1658 LASSERT(list_empty(&imp->imp_zombie_chain));
1659 zombies_count++;
1660 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1661 spin_unlock(&obd_zombie_impexp_lock);
1662
1663 obd_zombie_impexp_notify();
1664 }
1665
1666 /**
1667 * notify import/export destroy thread about new zombie.
1668 */
obd_zombie_impexp_notify(void)1669 static void obd_zombie_impexp_notify(void)
1670 {
1671 /*
1672 * Make sure obd_zombie_impexp_thread get this notification.
1673 * It is possible this signal only get by obd_zombie_barrier, and
1674 * barrier gulps this notification and sleeps away and hangs ensues
1675 */
1676 wake_up_all(&obd_zombie_waitq);
1677 }
1678
1679 /**
1680 * check whether obd_zombie is idle
1681 */
obd_zombie_is_idle(void)1682 static int obd_zombie_is_idle(void)
1683 {
1684 int rc;
1685
1686 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1687 spin_lock(&obd_zombie_impexp_lock);
1688 rc = (zombies_count == 0);
1689 spin_unlock(&obd_zombie_impexp_lock);
1690 return rc;
1691 }
1692
1693 /**
1694 * wait when obd_zombie import/export queues become empty
1695 */
obd_zombie_barrier(void)1696 void obd_zombie_barrier(void)
1697 {
1698 struct l_wait_info lwi = { 0 };
1699
1700 if (obd_zombie_pid == current_pid())
1701 /* don't wait for myself */
1702 return;
1703 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1704 }
1705 EXPORT_SYMBOL(obd_zombie_barrier);
1706
1707
1708 /**
1709 * destroy zombie export/import thread.
1710 */
obd_zombie_impexp_thread(void * unused)1711 static int obd_zombie_impexp_thread(void *unused)
1712 {
1713 unshare_fs_struct();
1714 complete(&obd_zombie_start);
1715
1716 obd_zombie_pid = current_pid();
1717
1718 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1719 struct l_wait_info lwi = { 0 };
1720
1721 l_wait_event(obd_zombie_waitq,
1722 !obd_zombie_impexp_check(NULL), &lwi);
1723 obd_zombie_impexp_cull();
1724
1725 /*
1726 * Notify obd_zombie_barrier callers that queues
1727 * may be empty.
1728 */
1729 wake_up(&obd_zombie_waitq);
1730 }
1731
1732 complete(&obd_zombie_stop);
1733
1734 return 0;
1735 }
1736
1737
1738 /**
1739 * start destroy zombie import/export thread
1740 */
obd_zombie_impexp_init(void)1741 int obd_zombie_impexp_init(void)
1742 {
1743 struct task_struct *task;
1744
1745 INIT_LIST_HEAD(&obd_zombie_imports);
1746 INIT_LIST_HEAD(&obd_zombie_exports);
1747 spin_lock_init(&obd_zombie_impexp_lock);
1748 init_completion(&obd_zombie_start);
1749 init_completion(&obd_zombie_stop);
1750 init_waitqueue_head(&obd_zombie_waitq);
1751 obd_zombie_pid = 0;
1752
1753 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1754 if (IS_ERR(task))
1755 return PTR_ERR(task);
1756
1757 wait_for_completion(&obd_zombie_start);
1758 return 0;
1759 }
1760 /**
1761 * stop destroy zombie import/export thread
1762 */
obd_zombie_impexp_stop(void)1763 void obd_zombie_impexp_stop(void)
1764 {
1765 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1766 obd_zombie_impexp_notify();
1767 wait_for_completion(&obd_zombie_stop);
1768 }
1769
1770 /***** Kernel-userspace comm helpers *******/
1771
1772 /* Get length of entire message, including header */
kuc_len(int payload_len)1773 int kuc_len(int payload_len)
1774 {
1775 return sizeof(struct kuc_hdr) + payload_len;
1776 }
1777 EXPORT_SYMBOL(kuc_len);
1778
1779 /* Get a pointer to kuc header, given a ptr to the payload
1780 * @param p Pointer to payload area
1781 * @returns Pointer to kuc header
1782 */
kuc_ptr(void * p)1783 struct kuc_hdr *kuc_ptr(void *p)
1784 {
1785 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1786 LASSERT(lh->kuc_magic == KUC_MAGIC);
1787 return lh;
1788 }
1789 EXPORT_SYMBOL(kuc_ptr);
1790
1791 /* Test if payload is part of kuc message
1792 * @param p Pointer to payload area
1793 * @returns boolean
1794 */
kuc_ispayload(void * p)1795 int kuc_ispayload(void *p)
1796 {
1797 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1798
1799 if (kh->kuc_magic == KUC_MAGIC)
1800 return 1;
1801 else
1802 return 0;
1803 }
1804 EXPORT_SYMBOL(kuc_ispayload);
1805
1806 /* Alloc space for a message, and fill in header
1807 * @return Pointer to payload area
1808 */
kuc_alloc(int payload_len,int transport,int type)1809 void *kuc_alloc(int payload_len, int transport, int type)
1810 {
1811 struct kuc_hdr *lh;
1812 int len = kuc_len(payload_len);
1813
1814 OBD_ALLOC(lh, len);
1815 if (lh == NULL)
1816 return ERR_PTR(-ENOMEM);
1817
1818 lh->kuc_magic = KUC_MAGIC;
1819 lh->kuc_transport = transport;
1820 lh->kuc_msgtype = type;
1821 lh->kuc_msglen = len;
1822
1823 return (void *)(lh + 1);
1824 }
1825 EXPORT_SYMBOL(kuc_alloc);
1826
1827 /* Takes pointer to payload area */
kuc_free(void * p,int payload_len)1828 inline void kuc_free(void *p, int payload_len)
1829 {
1830 struct kuc_hdr *lh = kuc_ptr(p);
1831 OBD_FREE(lh, kuc_len(payload_len));
1832 }
1833 EXPORT_SYMBOL(kuc_free);
1834