1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/fid/fid_request.c
37  *
38  * Lustre Sequence Manager
39  *
40  * Author: Yury Umanets <umka@clusterfs.com>
41  */
42 
43 #define DEBUG_SUBSYSTEM S_FID
44 
45 #include "../../include/linux/libcfs/libcfs.h"
46 #include <linux/module.h>
47 
48 #include "../include/obd.h"
49 #include "../include/obd_class.h"
50 #include "../include/obd_support.h"
51 #include "../include/lustre_fid.h"
52 /* mdc RPC locks */
53 #include "../include/lustre_mdc.h"
54 #include "fid_internal.h"
55 
seq_client_rpc(struct lu_client_seq * seq,struct lu_seq_range * output,__u32 opc,const char * opcname)56 static int seq_client_rpc(struct lu_client_seq *seq,
57 			  struct lu_seq_range *output, __u32 opc,
58 			  const char *opcname)
59 {
60 	struct obd_export     *exp = seq->lcs_exp;
61 	struct ptlrpc_request *req;
62 	struct lu_seq_range   *out, *in;
63 	__u32                 *op;
64 	unsigned int           debug_mask;
65 	int                    rc;
66 
67 	req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_SEQ_QUERY,
68 					LUSTRE_MDS_VERSION, SEQ_QUERY);
69 	if (req == NULL)
70 		return -ENOMEM;
71 
72 	/* Init operation code */
73 	op = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_OPC);
74 	*op = opc;
75 
76 	/* Zero out input range, this is not recovery yet. */
77 	in = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_RANGE);
78 	range_init(in);
79 
80 	ptlrpc_request_set_replen(req);
81 
82 	in->lsr_index = seq->lcs_space.lsr_index;
83 	if (seq->lcs_type == LUSTRE_SEQ_METADATA)
84 		fld_range_set_mdt(in);
85 	else
86 		fld_range_set_ost(in);
87 
88 	if (opc == SEQ_ALLOC_SUPER) {
89 		req->rq_request_portal = SEQ_CONTROLLER_PORTAL;
90 		req->rq_reply_portal = MDC_REPLY_PORTAL;
91 		/* During allocating super sequence for data object,
92 		 * the current thread might hold the export of MDT0(MDT0
93 		 * precreating objects on this OST), and it will send the
94 		 * request to MDT0 here, so we can not keep resending the
95 		 * request here, otherwise if MDT0 is failed(umounted),
96 		 * it can not release the export of MDT0 */
97 		if (seq->lcs_type == LUSTRE_SEQ_DATA)
98 			req->rq_no_delay = req->rq_no_resend = 1;
99 		debug_mask = D_CONSOLE;
100 	} else {
101 		if (seq->lcs_type == LUSTRE_SEQ_METADATA)
102 			req->rq_request_portal = SEQ_METADATA_PORTAL;
103 		else
104 			req->rq_request_portal = SEQ_DATA_PORTAL;
105 		debug_mask = D_INFO;
106 	}
107 
108 	ptlrpc_at_set_req_timeout(req);
109 
110 	if (seq->lcs_type == LUSTRE_SEQ_METADATA)
111 		mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
112 	rc = ptlrpc_queue_wait(req);
113 	if (seq->lcs_type == LUSTRE_SEQ_METADATA)
114 		mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
115 	if (rc)
116 		goto out_req;
117 
118 	out = req_capsule_server_get(&req->rq_pill, &RMF_SEQ_RANGE);
119 	*output = *out;
120 
121 	if (!range_is_sane(output)) {
122 		CERROR("%s: Invalid range received from server: "
123 		       DRANGE"\n", seq->lcs_name, PRANGE(output));
124 		rc = -EINVAL;
125 		goto out_req;
126 	}
127 
128 	if (range_is_exhausted(output)) {
129 		CERROR("%s: Range received from server is exhausted: "
130 		       DRANGE"]\n", seq->lcs_name, PRANGE(output));
131 		rc = -EINVAL;
132 		goto out_req;
133 	}
134 
135 	CDEBUG_LIMIT(debug_mask, "%s: Allocated %s-sequence "DRANGE"]\n",
136 		     seq->lcs_name, opcname, PRANGE(output));
137 
138 out_req:
139 	ptlrpc_req_finished(req);
140 	return rc;
141 }
142 
143 /* Request sequence-controller node to allocate new super-sequence. */
seq_client_alloc_super(struct lu_client_seq * seq,const struct lu_env * env)144 int seq_client_alloc_super(struct lu_client_seq *seq,
145 			   const struct lu_env *env)
146 {
147 	int rc;
148 
149 	mutex_lock(&seq->lcs_mutex);
150 
151 	if (seq->lcs_srv) {
152 		rc = 0;
153 	} else {
154 		/* Check whether the connection to seq controller has been
155 		 * setup (lcs_exp != NULL) */
156 		if (seq->lcs_exp == NULL) {
157 			mutex_unlock(&seq->lcs_mutex);
158 			return -EINPROGRESS;
159 		}
160 
161 		rc = seq_client_rpc(seq, &seq->lcs_space,
162 				    SEQ_ALLOC_SUPER, "super");
163 	}
164 	mutex_unlock(&seq->lcs_mutex);
165 	return rc;
166 }
167 
168 /* Request sequence-controller node to allocate new meta-sequence. */
seq_client_alloc_meta(const struct lu_env * env,struct lu_client_seq * seq)169 static int seq_client_alloc_meta(const struct lu_env *env,
170 				 struct lu_client_seq *seq)
171 {
172 	int rc;
173 
174 	if (seq->lcs_srv) {
175 		rc = 0;
176 	} else {
177 		do {
178 			/* If meta server return -EINPROGRESS or EAGAIN,
179 			 * it means meta server might not be ready to
180 			 * allocate super sequence from sequence controller
181 			 * (MDT0)yet */
182 			rc = seq_client_rpc(seq, &seq->lcs_space,
183 					    SEQ_ALLOC_META, "meta");
184 		} while (rc == -EINPROGRESS || rc == -EAGAIN);
185 	}
186 
187 	return rc;
188 }
189 
190 /* Allocate new sequence for client. */
seq_client_alloc_seq(const struct lu_env * env,struct lu_client_seq * seq,u64 * seqnr)191 static int seq_client_alloc_seq(const struct lu_env *env,
192 				struct lu_client_seq *seq, u64 *seqnr)
193 {
194 	int rc;
195 
196 	LASSERT(range_is_sane(&seq->lcs_space));
197 
198 	if (range_is_exhausted(&seq->lcs_space)) {
199 		rc = seq_client_alloc_meta(env, seq);
200 		if (rc) {
201 			CERROR("%s: Can't allocate new meta-sequence, rc %d\n",
202 			       seq->lcs_name, rc);
203 			return rc;
204 		} else {
205 			CDEBUG(D_INFO, "%s: New range - "DRANGE"\n",
206 			       seq->lcs_name, PRANGE(&seq->lcs_space));
207 		}
208 	} else {
209 		rc = 0;
210 	}
211 
212 	LASSERT(!range_is_exhausted(&seq->lcs_space));
213 	*seqnr = seq->lcs_space.lsr_start;
214 	seq->lcs_space.lsr_start += 1;
215 
216 	CDEBUG(D_INFO, "%s: Allocated sequence [%#llx]\n", seq->lcs_name,
217 	       *seqnr);
218 
219 	return rc;
220 }
221 
seq_fid_alloc_prep(struct lu_client_seq * seq,wait_queue_t * link)222 static int seq_fid_alloc_prep(struct lu_client_seq *seq,
223 			      wait_queue_t *link)
224 {
225 	if (seq->lcs_update) {
226 		add_wait_queue(&seq->lcs_waitq, link);
227 		set_current_state(TASK_UNINTERRUPTIBLE);
228 		mutex_unlock(&seq->lcs_mutex);
229 
230 		schedule();
231 
232 		mutex_lock(&seq->lcs_mutex);
233 		remove_wait_queue(&seq->lcs_waitq, link);
234 		set_current_state(TASK_RUNNING);
235 		return -EAGAIN;
236 	}
237 	++seq->lcs_update;
238 	mutex_unlock(&seq->lcs_mutex);
239 	return 0;
240 }
241 
seq_fid_alloc_fini(struct lu_client_seq * seq)242 static void seq_fid_alloc_fini(struct lu_client_seq *seq)
243 {
244 	LASSERT(seq->lcs_update == 1);
245 	mutex_lock(&seq->lcs_mutex);
246 	--seq->lcs_update;
247 	wake_up(&seq->lcs_waitq);
248 }
249 
250 /**
251  * Allocate the whole seq to the caller.
252  **/
seq_client_get_seq(const struct lu_env * env,struct lu_client_seq * seq,u64 * seqnr)253 int seq_client_get_seq(const struct lu_env *env,
254 		       struct lu_client_seq *seq, u64 *seqnr)
255 {
256 	wait_queue_t link;
257 	int rc;
258 
259 	LASSERT(seqnr != NULL);
260 	mutex_lock(&seq->lcs_mutex);
261 	init_waitqueue_entry(&link, current);
262 
263 	while (1) {
264 		rc = seq_fid_alloc_prep(seq, &link);
265 		if (rc == 0)
266 			break;
267 	}
268 
269 	rc = seq_client_alloc_seq(env, seq, seqnr);
270 	if (rc) {
271 		CERROR("%s: Can't allocate new sequence, rc %d\n",
272 		       seq->lcs_name, rc);
273 		seq_fid_alloc_fini(seq);
274 		mutex_unlock(&seq->lcs_mutex);
275 		return rc;
276 	}
277 
278 	CDEBUG(D_INFO, "%s: allocate sequence [0x%16.16Lx]\n",
279 	       seq->lcs_name, *seqnr);
280 
281 	/* Since the caller require the whole seq,
282 	 * so marked this seq to be used */
283 	if (seq->lcs_type == LUSTRE_SEQ_METADATA)
284 		seq->lcs_fid.f_oid = LUSTRE_METADATA_SEQ_MAX_WIDTH;
285 	else
286 		seq->lcs_fid.f_oid = LUSTRE_DATA_SEQ_MAX_WIDTH;
287 
288 	seq->lcs_fid.f_seq = *seqnr;
289 	seq->lcs_fid.f_ver = 0;
290 	/*
291 	 * Inform caller that sequence switch is performed to allow it
292 	 * to setup FLD for it.
293 	 */
294 	seq_fid_alloc_fini(seq);
295 	mutex_unlock(&seq->lcs_mutex);
296 
297 	return rc;
298 }
299 EXPORT_SYMBOL(seq_client_get_seq);
300 
301 /* Allocate new fid on passed client @seq and save it to @fid. */
seq_client_alloc_fid(const struct lu_env * env,struct lu_client_seq * seq,struct lu_fid * fid)302 int seq_client_alloc_fid(const struct lu_env *env,
303 			 struct lu_client_seq *seq, struct lu_fid *fid)
304 {
305 	wait_queue_t link;
306 	int rc;
307 
308 	LASSERT(seq != NULL);
309 	LASSERT(fid != NULL);
310 
311 	init_waitqueue_entry(&link, current);
312 	mutex_lock(&seq->lcs_mutex);
313 
314 	if (OBD_FAIL_CHECK(OBD_FAIL_SEQ_EXHAUST))
315 		seq->lcs_fid.f_oid = seq->lcs_width;
316 
317 	while (1) {
318 		u64 seqnr;
319 
320 		if (!fid_is_zero(&seq->lcs_fid) &&
321 		    fid_oid(&seq->lcs_fid) < seq->lcs_width) {
322 			/* Just bump last allocated fid and return to caller. */
323 			seq->lcs_fid.f_oid += 1;
324 			rc = 0;
325 			break;
326 		}
327 
328 		rc = seq_fid_alloc_prep(seq, &link);
329 		if (rc)
330 			continue;
331 
332 		rc = seq_client_alloc_seq(env, seq, &seqnr);
333 		if (rc) {
334 			CERROR("%s: Can't allocate new sequence, rc %d\n",
335 			       seq->lcs_name, rc);
336 			seq_fid_alloc_fini(seq);
337 			mutex_unlock(&seq->lcs_mutex);
338 			return rc;
339 		}
340 
341 		CDEBUG(D_INFO, "%s: Switch to sequence [0x%16.16Lx]\n",
342 		       seq->lcs_name, seqnr);
343 
344 		seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID;
345 		seq->lcs_fid.f_seq = seqnr;
346 		seq->lcs_fid.f_ver = 0;
347 
348 		/*
349 		 * Inform caller that sequence switch is performed to allow it
350 		 * to setup FLD for it.
351 		 */
352 		rc = 1;
353 
354 		seq_fid_alloc_fini(seq);
355 		break;
356 	}
357 
358 	*fid = seq->lcs_fid;
359 	mutex_unlock(&seq->lcs_mutex);
360 
361 	CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name,  PFID(fid));
362 	return rc;
363 }
364 EXPORT_SYMBOL(seq_client_alloc_fid);
365 
366 /*
367  * Finish the current sequence due to disconnect.
368  * See mdc_import_event()
369  */
seq_client_flush(struct lu_client_seq * seq)370 void seq_client_flush(struct lu_client_seq *seq)
371 {
372 	wait_queue_t link;
373 
374 	LASSERT(seq != NULL);
375 	init_waitqueue_entry(&link, current);
376 	mutex_lock(&seq->lcs_mutex);
377 
378 	while (seq->lcs_update) {
379 		add_wait_queue(&seq->lcs_waitq, &link);
380 		set_current_state(TASK_UNINTERRUPTIBLE);
381 		mutex_unlock(&seq->lcs_mutex);
382 
383 		schedule();
384 
385 		mutex_lock(&seq->lcs_mutex);
386 		remove_wait_queue(&seq->lcs_waitq, &link);
387 		set_current_state(TASK_RUNNING);
388 	}
389 
390 	fid_zero(&seq->lcs_fid);
391 	/**
392 	 * this id shld not be used for seq range allocation.
393 	 * set to -1 for dgb check.
394 	 */
395 
396 	seq->lcs_space.lsr_index = -1;
397 
398 	range_init(&seq->lcs_space);
399 	mutex_unlock(&seq->lcs_mutex);
400 }
401 EXPORT_SYMBOL(seq_client_flush);
402 
seq_client_proc_fini(struct lu_client_seq * seq)403 static void seq_client_proc_fini(struct lu_client_seq *seq)
404 {
405 #if defined(CONFIG_PROC_FS)
406 	if (seq->lcs_proc_dir) {
407 		if (!IS_ERR(seq->lcs_proc_dir))
408 			lprocfs_remove(&seq->lcs_proc_dir);
409 		seq->lcs_proc_dir = NULL;
410 	}
411 #endif /* CONFIG_PROC_FS */
412 }
413 
seq_client_proc_init(struct lu_client_seq * seq)414 static int seq_client_proc_init(struct lu_client_seq *seq)
415 {
416 #if defined(CONFIG_PROC_FS)
417 	int rc;
418 
419 	seq->lcs_proc_dir = lprocfs_register(seq->lcs_name,
420 					     seq_type_proc_dir,
421 					     NULL, NULL);
422 
423 	if (IS_ERR(seq->lcs_proc_dir)) {
424 		CERROR("%s: LProcFS failed in seq-init\n",
425 		       seq->lcs_name);
426 		rc = PTR_ERR(seq->lcs_proc_dir);
427 		return rc;
428 	}
429 
430 	rc = lprocfs_add_vars(seq->lcs_proc_dir,
431 			      seq_client_proc_list, seq);
432 	if (rc) {
433 		CERROR("%s: Can't init sequence manager proc, rc %d\n",
434 		       seq->lcs_name, rc);
435 		goto out_cleanup;
436 	}
437 
438 	return 0;
439 
440 out_cleanup:
441 	seq_client_proc_fini(seq);
442 	return rc;
443 
444 #else /* CONFIG_PROC_FS */
445 	return 0;
446 #endif
447 }
448 
seq_client_init(struct lu_client_seq * seq,struct obd_export * exp,enum lu_cli_type type,const char * prefix,struct lu_server_seq * srv)449 int seq_client_init(struct lu_client_seq *seq,
450 		    struct obd_export *exp,
451 		    enum lu_cli_type type,
452 		    const char *prefix,
453 		    struct lu_server_seq *srv)
454 {
455 	int rc;
456 
457 	LASSERT(seq != NULL);
458 	LASSERT(prefix != NULL);
459 
460 	seq->lcs_srv = srv;
461 	seq->lcs_type = type;
462 
463 	mutex_init(&seq->lcs_mutex);
464 	if (type == LUSTRE_SEQ_METADATA)
465 		seq->lcs_width = LUSTRE_METADATA_SEQ_MAX_WIDTH;
466 	else
467 		seq->lcs_width = LUSTRE_DATA_SEQ_MAX_WIDTH;
468 
469 	init_waitqueue_head(&seq->lcs_waitq);
470 	/* Make sure that things are clear before work is started. */
471 	seq_client_flush(seq);
472 
473 	if (exp != NULL)
474 		seq->lcs_exp = class_export_get(exp);
475 	else if (type == LUSTRE_SEQ_METADATA)
476 		LASSERT(seq->lcs_srv != NULL);
477 
478 	snprintf(seq->lcs_name, sizeof(seq->lcs_name),
479 		 "cli-%s", prefix);
480 
481 	rc = seq_client_proc_init(seq);
482 	if (rc)
483 		seq_client_fini(seq);
484 	return rc;
485 }
486 EXPORT_SYMBOL(seq_client_init);
487 
seq_client_fini(struct lu_client_seq * seq)488 void seq_client_fini(struct lu_client_seq *seq)
489 {
490 	seq_client_proc_fini(seq);
491 
492 	if (seq->lcs_exp != NULL) {
493 		class_export_put(seq->lcs_exp);
494 		seq->lcs_exp = NULL;
495 	}
496 
497 	seq->lcs_srv = NULL;
498 }
499 EXPORT_SYMBOL(seq_client_fini);
500 
client_fid_init(struct obd_device * obd,struct obd_export * exp,enum lu_cli_type type)501 int client_fid_init(struct obd_device *obd,
502 		    struct obd_export *exp, enum lu_cli_type type)
503 {
504 	struct client_obd *cli = &obd->u.cli;
505 	char *prefix;
506 	int rc;
507 
508 	OBD_ALLOC_PTR(cli->cl_seq);
509 	if (cli->cl_seq == NULL)
510 		return -ENOMEM;
511 
512 	OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
513 	if (prefix == NULL) {
514 		rc = -ENOMEM;
515 		goto out_free_seq;
516 	}
517 
518 	snprintf(prefix, MAX_OBD_NAME + 5, "cli-%s", obd->obd_name);
519 
520 	/* Init client side sequence-manager */
521 	rc = seq_client_init(cli->cl_seq, exp, type, prefix, NULL);
522 	OBD_FREE(prefix, MAX_OBD_NAME + 5);
523 	if (rc)
524 		goto out_free_seq;
525 
526 	return rc;
527 out_free_seq:
528 	OBD_FREE_PTR(cli->cl_seq);
529 	cli->cl_seq = NULL;
530 	return rc;
531 }
532 EXPORT_SYMBOL(client_fid_init);
533 
client_fid_fini(struct obd_device * obd)534 int client_fid_fini(struct obd_device *obd)
535 {
536 	struct client_obd *cli = &obd->u.cli;
537 
538 	if (cli->cl_seq != NULL) {
539 		seq_client_fini(cli->cl_seq);
540 		OBD_FREE_PTR(cli->cl_seq);
541 		cli->cl_seq = NULL;
542 	}
543 
544 	return 0;
545 }
546 EXPORT_SYMBOL(client_fid_fini);
547 
548 struct proc_dir_entry *seq_type_proc_dir;
549 
fid_mod_init(void)550 static int __init fid_mod_init(void)
551 {
552 	seq_type_proc_dir = lprocfs_register(LUSTRE_SEQ_NAME,
553 					     proc_lustre_root,
554 					     NULL, NULL);
555 	return PTR_ERR_OR_ZERO(seq_type_proc_dir);
556 }
557 
fid_mod_exit(void)558 static void __exit fid_mod_exit(void)
559 {
560 	if (seq_type_proc_dir != NULL && !IS_ERR(seq_type_proc_dir)) {
561 		lprocfs_remove(&seq_type_proc_dir);
562 		seq_type_proc_dir = NULL;
563 	}
564 }
565 
566 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
567 MODULE_DESCRIPTION("Lustre FID Module");
568 MODULE_LICENSE("GPL");
569 MODULE_VERSION("0.1.0");
570 
571 module_init(fid_mod_init);
572 module_exit(fid_mod_exit);
573