1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/llog.c
37  *
38  * OST<->MDS recovery logging infrastructure.
39  * Invariants in implementation:
40  * - we do not share logs among different OST<->MDS connections, so that
41  *   if an OST or MDS fails it need only look at log(s) relevant to itself
42  *
43  * Author: Andreas Dilger <adilger@clusterfs.com>
44  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
45  * Author: Mikhail Pershin <tappro@whamcloud.com>
46  */
47 
48 #define DEBUG_SUBSYSTEM S_LOG
49 
50 
51 #include "../include/obd_class.h"
52 #include "../include/lustre_log.h"
53 #include "llog_internal.h"
54 
55 /*
56  * Allocate a new log or catalog handle
57  * Used inside llog_open().
58  */
llog_alloc_handle(void)59 static struct llog_handle *llog_alloc_handle(void)
60 {
61 	struct llog_handle *loghandle;
62 
63 	OBD_ALLOC_PTR(loghandle);
64 	if (loghandle == NULL)
65 		return NULL;
66 
67 	init_rwsem(&loghandle->lgh_lock);
68 	spin_lock_init(&loghandle->lgh_hdr_lock);
69 	INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
70 	atomic_set(&loghandle->lgh_refcount, 1);
71 
72 	return loghandle;
73 }
74 
75 /*
76  * Free llog handle and header data if exists. Used in llog_close() only
77  */
llog_free_handle(struct llog_handle * loghandle)78 static void llog_free_handle(struct llog_handle *loghandle)
79 {
80 	LASSERT(loghandle != NULL);
81 
82 	/* failed llog_init_handle */
83 	if (!loghandle->lgh_hdr)
84 		goto out;
85 
86 	if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
87 		LASSERT(list_empty(&loghandle->u.phd.phd_entry));
88 	else if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
89 		LASSERT(list_empty(&loghandle->u.chd.chd_head));
90 	LASSERT(sizeof(*(loghandle->lgh_hdr)) == LLOG_CHUNK_SIZE);
91 	OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
92 out:
93 	OBD_FREE_PTR(loghandle);
94 }
95 
llog_handle_get(struct llog_handle * loghandle)96 void llog_handle_get(struct llog_handle *loghandle)
97 {
98 	atomic_inc(&loghandle->lgh_refcount);
99 }
100 
llog_handle_put(struct llog_handle * loghandle)101 void llog_handle_put(struct llog_handle *loghandle)
102 {
103 	LASSERT(atomic_read(&loghandle->lgh_refcount) > 0);
104 	if (atomic_dec_and_test(&loghandle->lgh_refcount))
105 		llog_free_handle(loghandle);
106 }
107 
108 /* returns negative on error; 0 if success; 1 if success & log destroyed */
llog_cancel_rec(const struct lu_env * env,struct llog_handle * loghandle,int index)109 int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
110 		    int index)
111 {
112 	struct llog_log_hdr *llh = loghandle->lgh_hdr;
113 	int rc = 0;
114 
115 	CDEBUG(D_RPCTRACE, "Canceling %d in log "DOSTID"\n",
116 	       index, POSTID(&loghandle->lgh_id.lgl_oi));
117 
118 	if (index == 0) {
119 		CERROR("Can't cancel index 0 which is header\n");
120 		return -EINVAL;
121 	}
122 
123 	spin_lock(&loghandle->lgh_hdr_lock);
124 	if (!ext2_clear_bit(index, llh->llh_bitmap)) {
125 		spin_unlock(&loghandle->lgh_hdr_lock);
126 		CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n", index);
127 		return -ENOENT;
128 	}
129 
130 	llh->llh_count--;
131 
132 	if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
133 	    (llh->llh_count == 1) &&
134 	    (loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) {
135 		spin_unlock(&loghandle->lgh_hdr_lock);
136 		rc = llog_destroy(env, loghandle);
137 		if (rc < 0) {
138 			CERROR("%s: can't destroy empty llog #"DOSTID
139 			       "#%08x: rc = %d\n",
140 			       loghandle->lgh_ctxt->loc_obd->obd_name,
141 			       POSTID(&loghandle->lgh_id.lgl_oi),
142 			       loghandle->lgh_id.lgl_ogen, rc);
143 			goto out_err;
144 		}
145 		return 1;
146 	}
147 	spin_unlock(&loghandle->lgh_hdr_lock);
148 
149 	rc = llog_write(env, loghandle, &llh->llh_hdr, NULL, 0, NULL, 0);
150 	if (rc < 0) {
151 		CERROR("%s: fail to write header for llog #"DOSTID
152 		       "#%08x: rc = %d\n",
153 		       loghandle->lgh_ctxt->loc_obd->obd_name,
154 		       POSTID(&loghandle->lgh_id.lgl_oi),
155 		       loghandle->lgh_id.lgl_ogen, rc);
156 		goto out_err;
157 	}
158 	return 0;
159 out_err:
160 	spin_lock(&loghandle->lgh_hdr_lock);
161 	ext2_set_bit(index, llh->llh_bitmap);
162 	llh->llh_count++;
163 	spin_unlock(&loghandle->lgh_hdr_lock);
164 	return rc;
165 }
166 EXPORT_SYMBOL(llog_cancel_rec);
167 
llog_read_header(const struct lu_env * env,struct llog_handle * handle,struct obd_uuid * uuid)168 static int llog_read_header(const struct lu_env *env,
169 			    struct llog_handle *handle,
170 			    struct obd_uuid *uuid)
171 {
172 	struct llog_operations *lop;
173 	int rc;
174 
175 	rc = llog_handle2ops(handle, &lop);
176 	if (rc)
177 		return rc;
178 
179 	if (lop->lop_read_header == NULL)
180 		return -EOPNOTSUPP;
181 
182 	rc = lop->lop_read_header(env, handle);
183 	if (rc == LLOG_EEMPTY) {
184 		struct llog_log_hdr *llh = handle->lgh_hdr;
185 
186 		handle->lgh_last_idx = 0; /* header is record with index 0 */
187 		llh->llh_count = 1;	 /* for the header record */
188 		llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
189 		llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE;
190 		llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0;
191 		llh->llh_timestamp = get_seconds();
192 		if (uuid)
193 			memcpy(&llh->llh_tgtuuid, uuid,
194 			       sizeof(llh->llh_tgtuuid));
195 		llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
196 		ext2_set_bit(0, llh->llh_bitmap);
197 		rc = 0;
198 	}
199 	return rc;
200 }
201 
llog_init_handle(const struct lu_env * env,struct llog_handle * handle,int flags,struct obd_uuid * uuid)202 int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
203 		     int flags, struct obd_uuid *uuid)
204 {
205 	struct llog_log_hdr	*llh;
206 	int			 rc;
207 
208 	LASSERT(handle->lgh_hdr == NULL);
209 
210 	OBD_ALLOC_PTR(llh);
211 	if (llh == NULL)
212 		return -ENOMEM;
213 	handle->lgh_hdr = llh;
214 	/* first assign flags to use llog_client_ops */
215 	llh->llh_flags = flags;
216 	rc = llog_read_header(env, handle, uuid);
217 	if (rc == 0) {
218 		if (unlikely((llh->llh_flags & LLOG_F_IS_PLAIN &&
219 			      flags & LLOG_F_IS_CAT) ||
220 			     (llh->llh_flags & LLOG_F_IS_CAT &&
221 			      flags & LLOG_F_IS_PLAIN))) {
222 			CERROR("%s: llog type is %s but initializing %s\n",
223 			       handle->lgh_ctxt->loc_obd->obd_name,
224 			       llh->llh_flags & LLOG_F_IS_CAT ?
225 			       "catalog" : "plain",
226 			       flags & LLOG_F_IS_CAT ? "catalog" : "plain");
227 			rc = -EINVAL;
228 			goto out;
229 		} else if (llh->llh_flags &
230 			   (LLOG_F_IS_PLAIN | LLOG_F_IS_CAT)) {
231 			/*
232 			 * it is possible to open llog without specifying llog
233 			 * type so it is taken from llh_flags
234 			 */
235 			flags = llh->llh_flags;
236 		} else {
237 			/* for some reason the llh_flags has no type set */
238 			CERROR("llog type is not specified!\n");
239 			rc = -EINVAL;
240 			goto out;
241 		}
242 		if (unlikely(uuid &&
243 			     !obd_uuid_equals(uuid, &llh->llh_tgtuuid))) {
244 			CERROR("%s: llog uuid mismatch: %s/%s\n",
245 			       handle->lgh_ctxt->loc_obd->obd_name,
246 			       (char *)uuid->uuid,
247 			       (char *)llh->llh_tgtuuid.uuid);
248 			rc = -EEXIST;
249 			goto out;
250 		}
251 	}
252 	if (flags & LLOG_F_IS_CAT) {
253 		LASSERT(list_empty(&handle->u.chd.chd_head));
254 		INIT_LIST_HEAD(&handle->u.chd.chd_head);
255 		llh->llh_size = sizeof(struct llog_logid_rec);
256 	} else if (!(flags & LLOG_F_IS_PLAIN)) {
257 		CERROR("%s: unknown flags: %#x (expected %#x or %#x)\n",
258 		       handle->lgh_ctxt->loc_obd->obd_name,
259 		       flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
260 		rc = -EINVAL;
261 	}
262 out:
263 	if (rc) {
264 		OBD_FREE_PTR(llh);
265 		handle->lgh_hdr = NULL;
266 	}
267 	return rc;
268 }
269 EXPORT_SYMBOL(llog_init_handle);
270 
llog_process_thread(void * arg)271 static int llog_process_thread(void *arg)
272 {
273 	struct llog_process_info	*lpi = arg;
274 	struct llog_handle		*loghandle = lpi->lpi_loghandle;
275 	struct llog_log_hdr		*llh = loghandle->lgh_hdr;
276 	struct llog_process_cat_data	*cd  = lpi->lpi_catdata;
277 	char				*buf;
278 	__u64				 cur_offset = LLOG_CHUNK_SIZE;
279 	__u64				 last_offset;
280 	int				 rc = 0, index = 1, last_index;
281 	int				 saved_index = 0;
282 	int				 last_called_index = 0;
283 
284 	LASSERT(llh);
285 
286 	OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
287 	if (!buf) {
288 		lpi->lpi_rc = -ENOMEM;
289 		return 0;
290 	}
291 
292 	if (cd != NULL) {
293 		last_called_index = cd->lpcd_first_idx;
294 		index = cd->lpcd_first_idx + 1;
295 	}
296 	if (cd != NULL && cd->lpcd_last_idx)
297 		last_index = cd->lpcd_last_idx;
298 	else
299 		last_index = LLOG_BITMAP_BYTES * 8 - 1;
300 
301 	while (rc == 0) {
302 		struct llog_rec_hdr *rec;
303 
304 		/* skip records not set in bitmap */
305 		while (index <= last_index &&
306 		       !ext2_test_bit(index, llh->llh_bitmap))
307 			++index;
308 
309 		LASSERT(index <= last_index + 1);
310 		if (index == last_index + 1)
311 			break;
312 repeat:
313 		CDEBUG(D_OTHER, "index: %d last_index %d\n",
314 		       index, last_index);
315 
316 		/* get the buf with our target record; avoid old garbage */
317 		memset(buf, 0, LLOG_CHUNK_SIZE);
318 		last_offset = cur_offset;
319 		rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index,
320 				     index, &cur_offset, buf, LLOG_CHUNK_SIZE);
321 		if (rc)
322 			goto out;
323 
324 		/* NB: when rec->lrh_len is accessed it is already swabbed
325 		 * since it is used at the "end" of the loop and the rec
326 		 * swabbing is done at the beginning of the loop. */
327 		for (rec = (struct llog_rec_hdr *)buf;
328 		     (char *)rec < buf + LLOG_CHUNK_SIZE;
329 		     rec = (struct llog_rec_hdr *)((char *)rec + rec->lrh_len)){
330 
331 			CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
332 			       rec, rec->lrh_type);
333 
334 			if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
335 				lustre_swab_llog_rec(rec);
336 
337 			CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
338 			       rec->lrh_type, rec->lrh_index);
339 
340 			if (rec->lrh_index == 0) {
341 				/* probably another rec just got added? */
342 				rc = 0;
343 				if (index <= loghandle->lgh_last_idx)
344 					goto repeat;
345 				goto out; /* no more records */
346 			}
347 			if (rec->lrh_len == 0 ||
348 			    rec->lrh_len > LLOG_CHUNK_SIZE) {
349 				CWARN("invalid length %d in llog record for index %d/%d\n",
350 				      rec->lrh_len,
351 				      rec->lrh_index, index);
352 				rc = -EINVAL;
353 				goto out;
354 			}
355 
356 			if (rec->lrh_index < index) {
357 				CDEBUG(D_OTHER, "skipping lrh_index %d\n",
358 				       rec->lrh_index);
359 				continue;
360 			}
361 
362 			CDEBUG(D_OTHER,
363 			       "lrh_index: %d lrh_len: %d (%d remains)\n",
364 			       rec->lrh_index, rec->lrh_len,
365 			       (int)(buf + LLOG_CHUNK_SIZE - (char *)rec));
366 
367 			loghandle->lgh_cur_idx = rec->lrh_index;
368 			loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
369 						    last_offset;
370 
371 			/* if set, process the callback on this record */
372 			if (ext2_test_bit(index, llh->llh_bitmap)) {
373 				rc = lpi->lpi_cb(lpi->lpi_env, loghandle, rec,
374 						 lpi->lpi_cbdata);
375 				last_called_index = index;
376 				if (rc == LLOG_PROC_BREAK) {
377 					goto out;
378 				} else if (rc == LLOG_DEL_RECORD) {
379 					llog_cancel_rec(lpi->lpi_env,
380 							loghandle,
381 							rec->lrh_index);
382 					rc = 0;
383 				}
384 				if (rc)
385 					goto out;
386 			} else {
387 				CDEBUG(D_OTHER, "Skipped index %d\n", index);
388 			}
389 
390 			/* next record, still in buffer? */
391 			++index;
392 			if (index > last_index) {
393 				rc = 0;
394 				goto out;
395 			}
396 		}
397 	}
398 
399 out:
400 	if (cd != NULL)
401 		cd->lpcd_last_idx = last_called_index;
402 
403 	OBD_FREE(buf, LLOG_CHUNK_SIZE);
404 	lpi->lpi_rc = rc;
405 	return 0;
406 }
407 
llog_process_thread_daemonize(void * arg)408 static int llog_process_thread_daemonize(void *arg)
409 {
410 	struct llog_process_info	*lpi = arg;
411 	struct lu_env			 env;
412 	int				 rc;
413 
414 	unshare_fs_struct();
415 
416 	/* client env has no keys, tags is just 0 */
417 	rc = lu_env_init(&env, LCT_LOCAL | LCT_MG_THREAD);
418 	if (rc)
419 		goto out;
420 	lpi->lpi_env = &env;
421 
422 	rc = llog_process_thread(arg);
423 
424 	lu_env_fini(&env);
425 out:
426 	complete(&lpi->lpi_completion);
427 	return rc;
428 }
429 
llog_process_or_fork(const struct lu_env * env,struct llog_handle * loghandle,llog_cb_t cb,void * data,void * catdata,bool fork)430 int llog_process_or_fork(const struct lu_env *env,
431 			 struct llog_handle *loghandle,
432 			 llog_cb_t cb, void *data, void *catdata, bool fork)
433 {
434 	struct llog_process_info *lpi;
435 	int		      rc;
436 
437 	OBD_ALLOC_PTR(lpi);
438 	if (lpi == NULL) {
439 		CERROR("cannot alloc pointer\n");
440 		return -ENOMEM;
441 	}
442 	lpi->lpi_loghandle = loghandle;
443 	lpi->lpi_cb	= cb;
444 	lpi->lpi_cbdata    = data;
445 	lpi->lpi_catdata   = catdata;
446 
447 	if (fork) {
448 		/* The new thread can't use parent env,
449 		 * init the new one in llog_process_thread_daemonize. */
450 		lpi->lpi_env = NULL;
451 		init_completion(&lpi->lpi_completion);
452 		rc = PTR_ERR(kthread_run(llog_process_thread_daemonize, lpi,
453 					     "llog_process_thread"));
454 		if (IS_ERR_VALUE(rc)) {
455 			CERROR("%s: cannot start thread: rc = %d\n",
456 			       loghandle->lgh_ctxt->loc_obd->obd_name, rc);
457 			OBD_FREE_PTR(lpi);
458 			return rc;
459 		}
460 		wait_for_completion(&lpi->lpi_completion);
461 	} else {
462 		lpi->lpi_env = env;
463 		llog_process_thread(lpi);
464 	}
465 	rc = lpi->lpi_rc;
466 	OBD_FREE_PTR(lpi);
467 	return rc;
468 }
469 EXPORT_SYMBOL(llog_process_or_fork);
470 
llog_process(const struct lu_env * env,struct llog_handle * loghandle,llog_cb_t cb,void * data,void * catdata)471 int llog_process(const struct lu_env *env, struct llog_handle *loghandle,
472 		 llog_cb_t cb, void *data, void *catdata)
473 {
474 	return llog_process_or_fork(env, loghandle, cb, data, catdata, true);
475 }
476 EXPORT_SYMBOL(llog_process);
477 
llog_reverse_process(const struct lu_env * env,struct llog_handle * loghandle,llog_cb_t cb,void * data,void * catdata)478 int llog_reverse_process(const struct lu_env *env,
479 			 struct llog_handle *loghandle, llog_cb_t cb,
480 			 void *data, void *catdata)
481 {
482 	struct llog_log_hdr *llh = loghandle->lgh_hdr;
483 	struct llog_process_cat_data *cd = catdata;
484 	void *buf;
485 	int rc = 0, first_index = 1, index, idx;
486 
487 	OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
488 	if (!buf)
489 		return -ENOMEM;
490 
491 	if (cd != NULL)
492 		first_index = cd->lpcd_first_idx + 1;
493 	if (cd != NULL && cd->lpcd_last_idx)
494 		index = cd->lpcd_last_idx;
495 	else
496 		index = LLOG_BITMAP_BYTES * 8 - 1;
497 
498 	while (rc == 0) {
499 		struct llog_rec_hdr *rec;
500 		struct llog_rec_tail *tail;
501 
502 		/* skip records not set in bitmap */
503 		while (index >= first_index &&
504 		       !ext2_test_bit(index, llh->llh_bitmap))
505 			--index;
506 
507 		LASSERT(index >= first_index - 1);
508 		if (index == first_index - 1)
509 			break;
510 
511 		/* get the buf with our target record; avoid old garbage */
512 		memset(buf, 0, LLOG_CHUNK_SIZE);
513 		rc = llog_prev_block(env, loghandle, index, buf,
514 				     LLOG_CHUNK_SIZE);
515 		if (rc)
516 			goto out;
517 
518 		rec = buf;
519 		idx = rec->lrh_index;
520 		CDEBUG(D_RPCTRACE, "index %u : idx %u\n", index, idx);
521 		while (idx < index) {
522 			rec = (void *)rec + rec->lrh_len;
523 			if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
524 				lustre_swab_llog_rec(rec);
525 			idx ++;
526 		}
527 		LASSERT(idx == index);
528 		tail = (void *)rec + rec->lrh_len - sizeof(*tail);
529 
530 		/* process records in buffer, starting where we found one */
531 		while ((void *)tail > buf) {
532 			if (tail->lrt_index == 0) {
533 				/* no more records */
534 				rc = 0;
535 				goto out;
536 			}
537 
538 			/* if set, process the callback on this record */
539 			if (ext2_test_bit(index, llh->llh_bitmap)) {
540 				rec = (void *)tail - tail->lrt_len +
541 				      sizeof(*tail);
542 
543 				rc = cb(env, loghandle, rec, data);
544 				if (rc == LLOG_PROC_BREAK) {
545 					goto out;
546 				} else if (rc == LLOG_DEL_RECORD) {
547 					llog_cancel_rec(env, loghandle,
548 							tail->lrt_index);
549 					rc = 0;
550 				}
551 				if (rc)
552 					goto out;
553 			}
554 
555 			/* previous record, still in buffer? */
556 			--index;
557 			if (index < first_index) {
558 				rc = 0;
559 				goto out;
560 			}
561 			tail = (void *)tail - tail->lrt_len;
562 		}
563 	}
564 
565 out:
566 	if (buf)
567 		OBD_FREE(buf, LLOG_CHUNK_SIZE);
568 	return rc;
569 }
570 EXPORT_SYMBOL(llog_reverse_process);
571 
572 /**
573  * new llog API
574  *
575  * API functions:
576  *      llog_open - open llog, may not exist
577  *      llog_exist - check if llog exists
578  *      llog_close - close opened llog, pair for open, frees llog_handle
579  *      llog_declare_create - declare llog creation
580  *      llog_create - create new llog on disk, need transaction handle
581  *      llog_declare_write_rec - declaration of llog write
582  *      llog_write_rec - write llog record on disk, need transaction handle
583  *      llog_declare_add - declare llog catalog record addition
584  *      llog_add - add llog record in catalog, need transaction handle
585  */
llog_exist(struct llog_handle * loghandle)586 int llog_exist(struct llog_handle *loghandle)
587 {
588 	struct llog_operations	*lop;
589 	int			 rc;
590 
591 	rc = llog_handle2ops(loghandle, &lop);
592 	if (rc)
593 		return rc;
594 	if (lop->lop_exist == NULL)
595 		return -EOPNOTSUPP;
596 
597 	rc = lop->lop_exist(loghandle);
598 	return rc;
599 }
600 EXPORT_SYMBOL(llog_exist);
601 
llog_declare_create(const struct lu_env * env,struct llog_handle * loghandle,struct thandle * th)602 int llog_declare_create(const struct lu_env *env,
603 			struct llog_handle *loghandle, struct thandle *th)
604 {
605 	struct llog_operations	*lop;
606 	int			 raised, rc;
607 
608 	rc = llog_handle2ops(loghandle, &lop);
609 	if (rc)
610 		return rc;
611 	if (lop->lop_declare_create == NULL)
612 		return -EOPNOTSUPP;
613 
614 	raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
615 	if (!raised)
616 		cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
617 	rc = lop->lop_declare_create(env, loghandle, th);
618 	if (!raised)
619 		cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
620 	return rc;
621 }
622 EXPORT_SYMBOL(llog_declare_create);
623 
llog_create(const struct lu_env * env,struct llog_handle * handle,struct thandle * th)624 int llog_create(const struct lu_env *env, struct llog_handle *handle,
625 		struct thandle *th)
626 {
627 	struct llog_operations	*lop;
628 	int			 raised, rc;
629 
630 	rc = llog_handle2ops(handle, &lop);
631 	if (rc)
632 		return rc;
633 	if (lop->lop_create == NULL)
634 		return -EOPNOTSUPP;
635 
636 	raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
637 	if (!raised)
638 		cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
639 	rc = lop->lop_create(env, handle, th);
640 	if (!raised)
641 		cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
642 	return rc;
643 }
644 EXPORT_SYMBOL(llog_create);
645 
llog_declare_write_rec(const struct lu_env * env,struct llog_handle * handle,struct llog_rec_hdr * rec,int idx,struct thandle * th)646 int llog_declare_write_rec(const struct lu_env *env,
647 			   struct llog_handle *handle,
648 			   struct llog_rec_hdr *rec, int idx,
649 			   struct thandle *th)
650 {
651 	struct llog_operations	*lop;
652 	int			 raised, rc;
653 
654 	rc = llog_handle2ops(handle, &lop);
655 	if (rc)
656 		return rc;
657 	LASSERT(lop);
658 	if (lop->lop_declare_write_rec == NULL)
659 		return -EOPNOTSUPP;
660 
661 	raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
662 	if (!raised)
663 		cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
664 	rc = lop->lop_declare_write_rec(env, handle, rec, idx, th);
665 	if (!raised)
666 		cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
667 	return rc;
668 }
669 EXPORT_SYMBOL(llog_declare_write_rec);
670 
llog_write_rec(const struct lu_env * env,struct llog_handle * handle,struct llog_rec_hdr * rec,struct llog_cookie * logcookies,int numcookies,void * buf,int idx,struct thandle * th)671 int llog_write_rec(const struct lu_env *env, struct llog_handle *handle,
672 		   struct llog_rec_hdr *rec, struct llog_cookie *logcookies,
673 		   int numcookies, void *buf, int idx, struct thandle *th)
674 {
675 	struct llog_operations	*lop;
676 	int			 raised, rc, buflen;
677 
678 	rc = llog_handle2ops(handle, &lop);
679 	if (rc)
680 		return rc;
681 
682 	LASSERT(lop);
683 	if (lop->lop_write_rec == NULL)
684 		return -EOPNOTSUPP;
685 
686 	if (buf)
687 		buflen = rec->lrh_len + sizeof(struct llog_rec_hdr) +
688 			 sizeof(struct llog_rec_tail);
689 	else
690 		buflen = rec->lrh_len;
691 	LASSERT(cfs_size_round(buflen) == buflen);
692 
693 	raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
694 	if (!raised)
695 		cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
696 	rc = lop->lop_write_rec(env, handle, rec, logcookies, numcookies,
697 				buf, idx, th);
698 	if (!raised)
699 		cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
700 	return rc;
701 }
702 EXPORT_SYMBOL(llog_write_rec);
703 
llog_add(const struct lu_env * env,struct llog_handle * lgh,struct llog_rec_hdr * rec,struct llog_cookie * logcookies,void * buf,struct thandle * th)704 int llog_add(const struct lu_env *env, struct llog_handle *lgh,
705 	     struct llog_rec_hdr *rec, struct llog_cookie *logcookies,
706 	     void *buf, struct thandle *th)
707 {
708 	int raised, rc;
709 
710 	if (lgh->lgh_logops->lop_add == NULL)
711 		return -EOPNOTSUPP;
712 
713 	raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
714 	if (!raised)
715 		cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
716 	rc = lgh->lgh_logops->lop_add(env, lgh, rec, logcookies, buf, th);
717 	if (!raised)
718 		cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
719 	return rc;
720 }
721 EXPORT_SYMBOL(llog_add);
722 
llog_declare_add(const struct lu_env * env,struct llog_handle * lgh,struct llog_rec_hdr * rec,struct thandle * th)723 int llog_declare_add(const struct lu_env *env, struct llog_handle *lgh,
724 		     struct llog_rec_hdr *rec, struct thandle *th)
725 {
726 	int raised, rc;
727 
728 	if (lgh->lgh_logops->lop_declare_add == NULL)
729 		return -EOPNOTSUPP;
730 
731 	raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
732 	if (!raised)
733 		cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
734 	rc = lgh->lgh_logops->lop_declare_add(env, lgh, rec, th);
735 	if (!raised)
736 		cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
737 	return rc;
738 }
739 EXPORT_SYMBOL(llog_declare_add);
740 
741 /**
742  * Helper function to open llog or create it if doesn't exist.
743  * It hides all transaction handling from caller.
744  */
llog_open_create(const struct lu_env * env,struct llog_ctxt * ctxt,struct llog_handle ** res,struct llog_logid * logid,char * name)745 int llog_open_create(const struct lu_env *env, struct llog_ctxt *ctxt,
746 		     struct llog_handle **res, struct llog_logid *logid,
747 		     char *name)
748 {
749 	struct dt_device	*d;
750 	struct thandle		*th;
751 	int			 rc;
752 
753 	rc = llog_open(env, ctxt, res, logid, name, LLOG_OPEN_NEW);
754 	if (rc)
755 		return rc;
756 
757 	if (llog_exist(*res))
758 		return 0;
759 
760 	LASSERT((*res)->lgh_obj != NULL);
761 
762 	d = lu2dt_dev((*res)->lgh_obj->do_lu.lo_dev);
763 
764 	th = dt_trans_create(env, d);
765 	if (IS_ERR(th)) {
766 		rc = PTR_ERR(th);
767 		goto out;
768 	}
769 
770 	rc = llog_declare_create(env, *res, th);
771 	if (rc == 0) {
772 		rc = dt_trans_start_local(env, d, th);
773 		if (rc == 0)
774 			rc = llog_create(env, *res, th);
775 	}
776 	dt_trans_stop(env, d, th);
777 out:
778 	if (rc)
779 		llog_close(env, *res);
780 	return rc;
781 }
782 EXPORT_SYMBOL(llog_open_create);
783 
784 /**
785  * Helper function to delete existent llog.
786  */
llog_erase(const struct lu_env * env,struct llog_ctxt * ctxt,struct llog_logid * logid,char * name)787 int llog_erase(const struct lu_env *env, struct llog_ctxt *ctxt,
788 	       struct llog_logid *logid, char *name)
789 {
790 	struct llog_handle	*handle;
791 	int			 rc = 0, rc2;
792 
793 	/* nothing to erase */
794 	if (name == NULL && logid == NULL)
795 		return 0;
796 
797 	rc = llog_open(env, ctxt, &handle, logid, name, LLOG_OPEN_EXISTS);
798 	if (rc < 0)
799 		return rc;
800 
801 	rc = llog_init_handle(env, handle, LLOG_F_IS_PLAIN, NULL);
802 	if (rc == 0)
803 		rc = llog_destroy(env, handle);
804 
805 	rc2 = llog_close(env, handle);
806 	if (rc == 0)
807 		rc = rc2;
808 	return rc;
809 }
810 EXPORT_SYMBOL(llog_erase);
811 
812 /*
813  * Helper function for write record in llog.
814  * It hides all transaction handling from caller.
815  * Valid only with local llog.
816  */
llog_write(const struct lu_env * env,struct llog_handle * loghandle,struct llog_rec_hdr * rec,struct llog_cookie * reccookie,int cookiecount,void * buf,int idx)817 int llog_write(const struct lu_env *env, struct llog_handle *loghandle,
818 	       struct llog_rec_hdr *rec, struct llog_cookie *reccookie,
819 	       int cookiecount, void *buf, int idx)
820 {
821 	struct dt_device	*dt;
822 	struct thandle		*th;
823 	int			 rc;
824 
825 	LASSERT(loghandle);
826 	LASSERT(loghandle->lgh_ctxt);
827 	LASSERT(loghandle->lgh_obj != NULL);
828 
829 	dt = lu2dt_dev(loghandle->lgh_obj->do_lu.lo_dev);
830 
831 	th = dt_trans_create(env, dt);
832 	if (IS_ERR(th))
833 		return PTR_ERR(th);
834 
835 	rc = llog_declare_write_rec(env, loghandle, rec, idx, th);
836 	if (rc)
837 		goto out_trans;
838 
839 	rc = dt_trans_start_local(env, dt, th);
840 	if (rc)
841 		goto out_trans;
842 
843 	down_write(&loghandle->lgh_lock);
844 	rc = llog_write_rec(env, loghandle, rec, reccookie,
845 			    cookiecount, buf, idx, th);
846 	up_write(&loghandle->lgh_lock);
847 out_trans:
848 	dt_trans_stop(env, dt, th);
849 	return rc;
850 }
851 EXPORT_SYMBOL(llog_write);
852 
llog_open(const struct lu_env * env,struct llog_ctxt * ctxt,struct llog_handle ** lgh,struct llog_logid * logid,char * name,enum llog_open_param open_param)853 int llog_open(const struct lu_env *env, struct llog_ctxt *ctxt,
854 	      struct llog_handle **lgh, struct llog_logid *logid,
855 	      char *name, enum llog_open_param open_param)
856 {
857 	int	 raised;
858 	int	 rc;
859 
860 	LASSERT(ctxt);
861 	LASSERT(ctxt->loc_logops);
862 
863 	if (ctxt->loc_logops->lop_open == NULL) {
864 		*lgh = NULL;
865 		return -EOPNOTSUPP;
866 	}
867 
868 	*lgh = llog_alloc_handle();
869 	if (*lgh == NULL)
870 		return -ENOMEM;
871 	(*lgh)->lgh_ctxt = ctxt;
872 	(*lgh)->lgh_logops = ctxt->loc_logops;
873 
874 	raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
875 	if (!raised)
876 		cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
877 	rc = ctxt->loc_logops->lop_open(env, *lgh, logid, name, open_param);
878 	if (!raised)
879 		cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
880 	if (rc) {
881 		llog_free_handle(*lgh);
882 		*lgh = NULL;
883 	}
884 	return rc;
885 }
886 EXPORT_SYMBOL(llog_open);
887 
llog_close(const struct lu_env * env,struct llog_handle * loghandle)888 int llog_close(const struct lu_env *env, struct llog_handle *loghandle)
889 {
890 	struct llog_operations	*lop;
891 	int			 rc;
892 
893 	rc = llog_handle2ops(loghandle, &lop);
894 	if (rc)
895 		goto out;
896 	if (lop->lop_close == NULL) {
897 		rc = -EOPNOTSUPP;
898 		goto out;
899 	}
900 	rc = lop->lop_close(env, loghandle);
901 out:
902 	llog_handle_put(loghandle);
903 	return rc;
904 }
905 EXPORT_SYMBOL(llog_close);
906 
llog_is_empty(const struct lu_env * env,struct llog_ctxt * ctxt,char * name)907 int llog_is_empty(const struct lu_env *env, struct llog_ctxt *ctxt,
908 		  char *name)
909 {
910 	struct llog_handle	*llh;
911 	int			 rc = 0;
912 
913 	rc = llog_open(env, ctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
914 	if (rc < 0) {
915 		if (likely(rc == -ENOENT))
916 			rc = 0;
917 		goto out;
918 	}
919 
920 	rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
921 	if (rc)
922 		goto out_close;
923 	rc = llog_get_size(llh);
924 
925 out_close:
926 	llog_close(env, llh);
927 out:
928 	/* header is record 1 */
929 	return rc <= 1;
930 }
931 EXPORT_SYMBOL(llog_is_empty);
932 
llog_copy_handler(const struct lu_env * env,struct llog_handle * llh,struct llog_rec_hdr * rec,void * data)933 int llog_copy_handler(const struct lu_env *env, struct llog_handle *llh,
934 		      struct llog_rec_hdr *rec, void *data)
935 {
936 	struct llog_handle	*copy_llh = data;
937 
938 	/* Append all records */
939 	return llog_write(env, copy_llh, rec, NULL, 0, NULL, -1);
940 }
941 EXPORT_SYMBOL(llog_copy_handler);
942 
943 /* backup plain llog */
llog_backup(const struct lu_env * env,struct obd_device * obd,struct llog_ctxt * ctxt,struct llog_ctxt * bctxt,char * name,char * backup)944 int llog_backup(const struct lu_env *env, struct obd_device *obd,
945 		struct llog_ctxt *ctxt, struct llog_ctxt *bctxt,
946 		char *name, char *backup)
947 {
948 	struct llog_handle	*llh, *bllh;
949 	int			 rc;
950 
951 
952 
953 	/* open original log */
954 	rc = llog_open(env, ctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
955 	if (rc < 0) {
956 		/* the -ENOENT case is also reported to the caller
957 		 * but silently so it should handle that if needed.
958 		 */
959 		if (rc != -ENOENT)
960 			CERROR("%s: failed to open log %s: rc = %d\n",
961 			       obd->obd_name, name, rc);
962 		return rc;
963 	}
964 
965 	rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
966 	if (rc)
967 		goto out_close;
968 
969 	/* Make sure there's no old backup log */
970 	rc = llog_erase(env, bctxt, NULL, backup);
971 	if (rc < 0 && rc != -ENOENT)
972 		goto out_close;
973 
974 	/* open backup log */
975 	rc = llog_open_create(env, bctxt, &bllh, NULL, backup);
976 	if (rc) {
977 		CERROR("%s: failed to open backup logfile %s: rc = %d\n",
978 		       obd->obd_name, backup, rc);
979 		goto out_close;
980 	}
981 
982 	/* check that backup llog is not the same object as original one */
983 	if (llh->lgh_obj == bllh->lgh_obj) {
984 		CERROR("%s: backup llog %s to itself (%s), objects %p/%p\n",
985 		       obd->obd_name, name, backup, llh->lgh_obj,
986 		       bllh->lgh_obj);
987 		rc = -EEXIST;
988 		goto out_backup;
989 	}
990 
991 	rc = llog_init_handle(env, bllh, LLOG_F_IS_PLAIN, NULL);
992 	if (rc)
993 		goto out_backup;
994 
995 	/* Copy log record by record */
996 	rc = llog_process_or_fork(env, llh, llog_copy_handler, (void *)bllh,
997 				  NULL, false);
998 	if (rc)
999 		CERROR("%s: failed to backup log %s: rc = %d\n",
1000 		       obd->obd_name, name, rc);
1001 out_backup:
1002 	llog_close(env, bllh);
1003 out_close:
1004 	llog_close(env, llh);
1005 	return rc;
1006 }
1007 EXPORT_SYMBOL(llog_backup);
1008