1 /*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26 /*
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2012, Intel Corporation.
31 */
32 /*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 *
36 * lustre/obdclass/llog.c
37 *
38 * OST<->MDS recovery logging infrastructure.
39 * Invariants in implementation:
40 * - we do not share logs among different OST<->MDS connections, so that
41 * if an OST or MDS fails it need only look at log(s) relevant to itself
42 *
43 * Author: Andreas Dilger <adilger@clusterfs.com>
44 * Author: Alex Zhuravlev <bzzz@whamcloud.com>
45 * Author: Mikhail Pershin <tappro@whamcloud.com>
46 */
47
48 #define DEBUG_SUBSYSTEM S_LOG
49
50
51 #include "../include/obd_class.h"
52 #include "../include/lustre_log.h"
53 #include "llog_internal.h"
54
55 /*
56 * Allocate a new log or catalog handle
57 * Used inside llog_open().
58 */
llog_alloc_handle(void)59 static struct llog_handle *llog_alloc_handle(void)
60 {
61 struct llog_handle *loghandle;
62
63 OBD_ALLOC_PTR(loghandle);
64 if (loghandle == NULL)
65 return NULL;
66
67 init_rwsem(&loghandle->lgh_lock);
68 spin_lock_init(&loghandle->lgh_hdr_lock);
69 INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
70 atomic_set(&loghandle->lgh_refcount, 1);
71
72 return loghandle;
73 }
74
75 /*
76 * Free llog handle and header data if exists. Used in llog_close() only
77 */
llog_free_handle(struct llog_handle * loghandle)78 static void llog_free_handle(struct llog_handle *loghandle)
79 {
80 LASSERT(loghandle != NULL);
81
82 /* failed llog_init_handle */
83 if (!loghandle->lgh_hdr)
84 goto out;
85
86 if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
87 LASSERT(list_empty(&loghandle->u.phd.phd_entry));
88 else if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
89 LASSERT(list_empty(&loghandle->u.chd.chd_head));
90 LASSERT(sizeof(*(loghandle->lgh_hdr)) == LLOG_CHUNK_SIZE);
91 OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
92 out:
93 OBD_FREE_PTR(loghandle);
94 }
95
llog_handle_get(struct llog_handle * loghandle)96 void llog_handle_get(struct llog_handle *loghandle)
97 {
98 atomic_inc(&loghandle->lgh_refcount);
99 }
100
llog_handle_put(struct llog_handle * loghandle)101 void llog_handle_put(struct llog_handle *loghandle)
102 {
103 LASSERT(atomic_read(&loghandle->lgh_refcount) > 0);
104 if (atomic_dec_and_test(&loghandle->lgh_refcount))
105 llog_free_handle(loghandle);
106 }
107
108 /* returns negative on error; 0 if success; 1 if success & log destroyed */
llog_cancel_rec(const struct lu_env * env,struct llog_handle * loghandle,int index)109 int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
110 int index)
111 {
112 struct llog_log_hdr *llh = loghandle->lgh_hdr;
113 int rc = 0;
114
115 CDEBUG(D_RPCTRACE, "Canceling %d in log "DOSTID"\n",
116 index, POSTID(&loghandle->lgh_id.lgl_oi));
117
118 if (index == 0) {
119 CERROR("Can't cancel index 0 which is header\n");
120 return -EINVAL;
121 }
122
123 spin_lock(&loghandle->lgh_hdr_lock);
124 if (!ext2_clear_bit(index, llh->llh_bitmap)) {
125 spin_unlock(&loghandle->lgh_hdr_lock);
126 CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n", index);
127 return -ENOENT;
128 }
129
130 llh->llh_count--;
131
132 if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
133 (llh->llh_count == 1) &&
134 (loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) {
135 spin_unlock(&loghandle->lgh_hdr_lock);
136 rc = llog_destroy(env, loghandle);
137 if (rc < 0) {
138 CERROR("%s: can't destroy empty llog #"DOSTID
139 "#%08x: rc = %d\n",
140 loghandle->lgh_ctxt->loc_obd->obd_name,
141 POSTID(&loghandle->lgh_id.lgl_oi),
142 loghandle->lgh_id.lgl_ogen, rc);
143 goto out_err;
144 }
145 return 1;
146 }
147 spin_unlock(&loghandle->lgh_hdr_lock);
148
149 rc = llog_write(env, loghandle, &llh->llh_hdr, NULL, 0, NULL, 0);
150 if (rc < 0) {
151 CERROR("%s: fail to write header for llog #"DOSTID
152 "#%08x: rc = %d\n",
153 loghandle->lgh_ctxt->loc_obd->obd_name,
154 POSTID(&loghandle->lgh_id.lgl_oi),
155 loghandle->lgh_id.lgl_ogen, rc);
156 goto out_err;
157 }
158 return 0;
159 out_err:
160 spin_lock(&loghandle->lgh_hdr_lock);
161 ext2_set_bit(index, llh->llh_bitmap);
162 llh->llh_count++;
163 spin_unlock(&loghandle->lgh_hdr_lock);
164 return rc;
165 }
166 EXPORT_SYMBOL(llog_cancel_rec);
167
llog_read_header(const struct lu_env * env,struct llog_handle * handle,struct obd_uuid * uuid)168 static int llog_read_header(const struct lu_env *env,
169 struct llog_handle *handle,
170 struct obd_uuid *uuid)
171 {
172 struct llog_operations *lop;
173 int rc;
174
175 rc = llog_handle2ops(handle, &lop);
176 if (rc)
177 return rc;
178
179 if (lop->lop_read_header == NULL)
180 return -EOPNOTSUPP;
181
182 rc = lop->lop_read_header(env, handle);
183 if (rc == LLOG_EEMPTY) {
184 struct llog_log_hdr *llh = handle->lgh_hdr;
185
186 handle->lgh_last_idx = 0; /* header is record with index 0 */
187 llh->llh_count = 1; /* for the header record */
188 llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
189 llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE;
190 llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0;
191 llh->llh_timestamp = get_seconds();
192 if (uuid)
193 memcpy(&llh->llh_tgtuuid, uuid,
194 sizeof(llh->llh_tgtuuid));
195 llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
196 ext2_set_bit(0, llh->llh_bitmap);
197 rc = 0;
198 }
199 return rc;
200 }
201
llog_init_handle(const struct lu_env * env,struct llog_handle * handle,int flags,struct obd_uuid * uuid)202 int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
203 int flags, struct obd_uuid *uuid)
204 {
205 struct llog_log_hdr *llh;
206 int rc;
207
208 LASSERT(handle->lgh_hdr == NULL);
209
210 OBD_ALLOC_PTR(llh);
211 if (llh == NULL)
212 return -ENOMEM;
213 handle->lgh_hdr = llh;
214 /* first assign flags to use llog_client_ops */
215 llh->llh_flags = flags;
216 rc = llog_read_header(env, handle, uuid);
217 if (rc == 0) {
218 if (unlikely((llh->llh_flags & LLOG_F_IS_PLAIN &&
219 flags & LLOG_F_IS_CAT) ||
220 (llh->llh_flags & LLOG_F_IS_CAT &&
221 flags & LLOG_F_IS_PLAIN))) {
222 CERROR("%s: llog type is %s but initializing %s\n",
223 handle->lgh_ctxt->loc_obd->obd_name,
224 llh->llh_flags & LLOG_F_IS_CAT ?
225 "catalog" : "plain",
226 flags & LLOG_F_IS_CAT ? "catalog" : "plain");
227 rc = -EINVAL;
228 goto out;
229 } else if (llh->llh_flags &
230 (LLOG_F_IS_PLAIN | LLOG_F_IS_CAT)) {
231 /*
232 * it is possible to open llog without specifying llog
233 * type so it is taken from llh_flags
234 */
235 flags = llh->llh_flags;
236 } else {
237 /* for some reason the llh_flags has no type set */
238 CERROR("llog type is not specified!\n");
239 rc = -EINVAL;
240 goto out;
241 }
242 if (unlikely(uuid &&
243 !obd_uuid_equals(uuid, &llh->llh_tgtuuid))) {
244 CERROR("%s: llog uuid mismatch: %s/%s\n",
245 handle->lgh_ctxt->loc_obd->obd_name,
246 (char *)uuid->uuid,
247 (char *)llh->llh_tgtuuid.uuid);
248 rc = -EEXIST;
249 goto out;
250 }
251 }
252 if (flags & LLOG_F_IS_CAT) {
253 LASSERT(list_empty(&handle->u.chd.chd_head));
254 INIT_LIST_HEAD(&handle->u.chd.chd_head);
255 llh->llh_size = sizeof(struct llog_logid_rec);
256 } else if (!(flags & LLOG_F_IS_PLAIN)) {
257 CERROR("%s: unknown flags: %#x (expected %#x or %#x)\n",
258 handle->lgh_ctxt->loc_obd->obd_name,
259 flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
260 rc = -EINVAL;
261 }
262 out:
263 if (rc) {
264 OBD_FREE_PTR(llh);
265 handle->lgh_hdr = NULL;
266 }
267 return rc;
268 }
269 EXPORT_SYMBOL(llog_init_handle);
270
llog_process_thread(void * arg)271 static int llog_process_thread(void *arg)
272 {
273 struct llog_process_info *lpi = arg;
274 struct llog_handle *loghandle = lpi->lpi_loghandle;
275 struct llog_log_hdr *llh = loghandle->lgh_hdr;
276 struct llog_process_cat_data *cd = lpi->lpi_catdata;
277 char *buf;
278 __u64 cur_offset = LLOG_CHUNK_SIZE;
279 __u64 last_offset;
280 int rc = 0, index = 1, last_index;
281 int saved_index = 0;
282 int last_called_index = 0;
283
284 LASSERT(llh);
285
286 OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
287 if (!buf) {
288 lpi->lpi_rc = -ENOMEM;
289 return 0;
290 }
291
292 if (cd != NULL) {
293 last_called_index = cd->lpcd_first_idx;
294 index = cd->lpcd_first_idx + 1;
295 }
296 if (cd != NULL && cd->lpcd_last_idx)
297 last_index = cd->lpcd_last_idx;
298 else
299 last_index = LLOG_BITMAP_BYTES * 8 - 1;
300
301 while (rc == 0) {
302 struct llog_rec_hdr *rec;
303
304 /* skip records not set in bitmap */
305 while (index <= last_index &&
306 !ext2_test_bit(index, llh->llh_bitmap))
307 ++index;
308
309 LASSERT(index <= last_index + 1);
310 if (index == last_index + 1)
311 break;
312 repeat:
313 CDEBUG(D_OTHER, "index: %d last_index %d\n",
314 index, last_index);
315
316 /* get the buf with our target record; avoid old garbage */
317 memset(buf, 0, LLOG_CHUNK_SIZE);
318 last_offset = cur_offset;
319 rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index,
320 index, &cur_offset, buf, LLOG_CHUNK_SIZE);
321 if (rc)
322 goto out;
323
324 /* NB: when rec->lrh_len is accessed it is already swabbed
325 * since it is used at the "end" of the loop and the rec
326 * swabbing is done at the beginning of the loop. */
327 for (rec = (struct llog_rec_hdr *)buf;
328 (char *)rec < buf + LLOG_CHUNK_SIZE;
329 rec = (struct llog_rec_hdr *)((char *)rec + rec->lrh_len)){
330
331 CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
332 rec, rec->lrh_type);
333
334 if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
335 lustre_swab_llog_rec(rec);
336
337 CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
338 rec->lrh_type, rec->lrh_index);
339
340 if (rec->lrh_index == 0) {
341 /* probably another rec just got added? */
342 rc = 0;
343 if (index <= loghandle->lgh_last_idx)
344 goto repeat;
345 goto out; /* no more records */
346 }
347 if (rec->lrh_len == 0 ||
348 rec->lrh_len > LLOG_CHUNK_SIZE) {
349 CWARN("invalid length %d in llog record for index %d/%d\n",
350 rec->lrh_len,
351 rec->lrh_index, index);
352 rc = -EINVAL;
353 goto out;
354 }
355
356 if (rec->lrh_index < index) {
357 CDEBUG(D_OTHER, "skipping lrh_index %d\n",
358 rec->lrh_index);
359 continue;
360 }
361
362 CDEBUG(D_OTHER,
363 "lrh_index: %d lrh_len: %d (%d remains)\n",
364 rec->lrh_index, rec->lrh_len,
365 (int)(buf + LLOG_CHUNK_SIZE - (char *)rec));
366
367 loghandle->lgh_cur_idx = rec->lrh_index;
368 loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
369 last_offset;
370
371 /* if set, process the callback on this record */
372 if (ext2_test_bit(index, llh->llh_bitmap)) {
373 rc = lpi->lpi_cb(lpi->lpi_env, loghandle, rec,
374 lpi->lpi_cbdata);
375 last_called_index = index;
376 if (rc == LLOG_PROC_BREAK) {
377 goto out;
378 } else if (rc == LLOG_DEL_RECORD) {
379 llog_cancel_rec(lpi->lpi_env,
380 loghandle,
381 rec->lrh_index);
382 rc = 0;
383 }
384 if (rc)
385 goto out;
386 } else {
387 CDEBUG(D_OTHER, "Skipped index %d\n", index);
388 }
389
390 /* next record, still in buffer? */
391 ++index;
392 if (index > last_index) {
393 rc = 0;
394 goto out;
395 }
396 }
397 }
398
399 out:
400 if (cd != NULL)
401 cd->lpcd_last_idx = last_called_index;
402
403 OBD_FREE(buf, LLOG_CHUNK_SIZE);
404 lpi->lpi_rc = rc;
405 return 0;
406 }
407
llog_process_thread_daemonize(void * arg)408 static int llog_process_thread_daemonize(void *arg)
409 {
410 struct llog_process_info *lpi = arg;
411 struct lu_env env;
412 int rc;
413
414 unshare_fs_struct();
415
416 /* client env has no keys, tags is just 0 */
417 rc = lu_env_init(&env, LCT_LOCAL | LCT_MG_THREAD);
418 if (rc)
419 goto out;
420 lpi->lpi_env = &env;
421
422 rc = llog_process_thread(arg);
423
424 lu_env_fini(&env);
425 out:
426 complete(&lpi->lpi_completion);
427 return rc;
428 }
429
llog_process_or_fork(const struct lu_env * env,struct llog_handle * loghandle,llog_cb_t cb,void * data,void * catdata,bool fork)430 int llog_process_or_fork(const struct lu_env *env,
431 struct llog_handle *loghandle,
432 llog_cb_t cb, void *data, void *catdata, bool fork)
433 {
434 struct llog_process_info *lpi;
435 int rc;
436
437 OBD_ALLOC_PTR(lpi);
438 if (lpi == NULL) {
439 CERROR("cannot alloc pointer\n");
440 return -ENOMEM;
441 }
442 lpi->lpi_loghandle = loghandle;
443 lpi->lpi_cb = cb;
444 lpi->lpi_cbdata = data;
445 lpi->lpi_catdata = catdata;
446
447 if (fork) {
448 /* The new thread can't use parent env,
449 * init the new one in llog_process_thread_daemonize. */
450 lpi->lpi_env = NULL;
451 init_completion(&lpi->lpi_completion);
452 rc = PTR_ERR(kthread_run(llog_process_thread_daemonize, lpi,
453 "llog_process_thread"));
454 if (IS_ERR_VALUE(rc)) {
455 CERROR("%s: cannot start thread: rc = %d\n",
456 loghandle->lgh_ctxt->loc_obd->obd_name, rc);
457 OBD_FREE_PTR(lpi);
458 return rc;
459 }
460 wait_for_completion(&lpi->lpi_completion);
461 } else {
462 lpi->lpi_env = env;
463 llog_process_thread(lpi);
464 }
465 rc = lpi->lpi_rc;
466 OBD_FREE_PTR(lpi);
467 return rc;
468 }
469 EXPORT_SYMBOL(llog_process_or_fork);
470
llog_process(const struct lu_env * env,struct llog_handle * loghandle,llog_cb_t cb,void * data,void * catdata)471 int llog_process(const struct lu_env *env, struct llog_handle *loghandle,
472 llog_cb_t cb, void *data, void *catdata)
473 {
474 return llog_process_or_fork(env, loghandle, cb, data, catdata, true);
475 }
476 EXPORT_SYMBOL(llog_process);
477
llog_reverse_process(const struct lu_env * env,struct llog_handle * loghandle,llog_cb_t cb,void * data,void * catdata)478 int llog_reverse_process(const struct lu_env *env,
479 struct llog_handle *loghandle, llog_cb_t cb,
480 void *data, void *catdata)
481 {
482 struct llog_log_hdr *llh = loghandle->lgh_hdr;
483 struct llog_process_cat_data *cd = catdata;
484 void *buf;
485 int rc = 0, first_index = 1, index, idx;
486
487 OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
488 if (!buf)
489 return -ENOMEM;
490
491 if (cd != NULL)
492 first_index = cd->lpcd_first_idx + 1;
493 if (cd != NULL && cd->lpcd_last_idx)
494 index = cd->lpcd_last_idx;
495 else
496 index = LLOG_BITMAP_BYTES * 8 - 1;
497
498 while (rc == 0) {
499 struct llog_rec_hdr *rec;
500 struct llog_rec_tail *tail;
501
502 /* skip records not set in bitmap */
503 while (index >= first_index &&
504 !ext2_test_bit(index, llh->llh_bitmap))
505 --index;
506
507 LASSERT(index >= first_index - 1);
508 if (index == first_index - 1)
509 break;
510
511 /* get the buf with our target record; avoid old garbage */
512 memset(buf, 0, LLOG_CHUNK_SIZE);
513 rc = llog_prev_block(env, loghandle, index, buf,
514 LLOG_CHUNK_SIZE);
515 if (rc)
516 goto out;
517
518 rec = buf;
519 idx = rec->lrh_index;
520 CDEBUG(D_RPCTRACE, "index %u : idx %u\n", index, idx);
521 while (idx < index) {
522 rec = (void *)rec + rec->lrh_len;
523 if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
524 lustre_swab_llog_rec(rec);
525 idx ++;
526 }
527 LASSERT(idx == index);
528 tail = (void *)rec + rec->lrh_len - sizeof(*tail);
529
530 /* process records in buffer, starting where we found one */
531 while ((void *)tail > buf) {
532 if (tail->lrt_index == 0) {
533 /* no more records */
534 rc = 0;
535 goto out;
536 }
537
538 /* if set, process the callback on this record */
539 if (ext2_test_bit(index, llh->llh_bitmap)) {
540 rec = (void *)tail - tail->lrt_len +
541 sizeof(*tail);
542
543 rc = cb(env, loghandle, rec, data);
544 if (rc == LLOG_PROC_BREAK) {
545 goto out;
546 } else if (rc == LLOG_DEL_RECORD) {
547 llog_cancel_rec(env, loghandle,
548 tail->lrt_index);
549 rc = 0;
550 }
551 if (rc)
552 goto out;
553 }
554
555 /* previous record, still in buffer? */
556 --index;
557 if (index < first_index) {
558 rc = 0;
559 goto out;
560 }
561 tail = (void *)tail - tail->lrt_len;
562 }
563 }
564
565 out:
566 if (buf)
567 OBD_FREE(buf, LLOG_CHUNK_SIZE);
568 return rc;
569 }
570 EXPORT_SYMBOL(llog_reverse_process);
571
572 /**
573 * new llog API
574 *
575 * API functions:
576 * llog_open - open llog, may not exist
577 * llog_exist - check if llog exists
578 * llog_close - close opened llog, pair for open, frees llog_handle
579 * llog_declare_create - declare llog creation
580 * llog_create - create new llog on disk, need transaction handle
581 * llog_declare_write_rec - declaration of llog write
582 * llog_write_rec - write llog record on disk, need transaction handle
583 * llog_declare_add - declare llog catalog record addition
584 * llog_add - add llog record in catalog, need transaction handle
585 */
llog_exist(struct llog_handle * loghandle)586 int llog_exist(struct llog_handle *loghandle)
587 {
588 struct llog_operations *lop;
589 int rc;
590
591 rc = llog_handle2ops(loghandle, &lop);
592 if (rc)
593 return rc;
594 if (lop->lop_exist == NULL)
595 return -EOPNOTSUPP;
596
597 rc = lop->lop_exist(loghandle);
598 return rc;
599 }
600 EXPORT_SYMBOL(llog_exist);
601
llog_declare_create(const struct lu_env * env,struct llog_handle * loghandle,struct thandle * th)602 int llog_declare_create(const struct lu_env *env,
603 struct llog_handle *loghandle, struct thandle *th)
604 {
605 struct llog_operations *lop;
606 int raised, rc;
607
608 rc = llog_handle2ops(loghandle, &lop);
609 if (rc)
610 return rc;
611 if (lop->lop_declare_create == NULL)
612 return -EOPNOTSUPP;
613
614 raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
615 if (!raised)
616 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
617 rc = lop->lop_declare_create(env, loghandle, th);
618 if (!raised)
619 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
620 return rc;
621 }
622 EXPORT_SYMBOL(llog_declare_create);
623
llog_create(const struct lu_env * env,struct llog_handle * handle,struct thandle * th)624 int llog_create(const struct lu_env *env, struct llog_handle *handle,
625 struct thandle *th)
626 {
627 struct llog_operations *lop;
628 int raised, rc;
629
630 rc = llog_handle2ops(handle, &lop);
631 if (rc)
632 return rc;
633 if (lop->lop_create == NULL)
634 return -EOPNOTSUPP;
635
636 raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
637 if (!raised)
638 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
639 rc = lop->lop_create(env, handle, th);
640 if (!raised)
641 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
642 return rc;
643 }
644 EXPORT_SYMBOL(llog_create);
645
llog_declare_write_rec(const struct lu_env * env,struct llog_handle * handle,struct llog_rec_hdr * rec,int idx,struct thandle * th)646 int llog_declare_write_rec(const struct lu_env *env,
647 struct llog_handle *handle,
648 struct llog_rec_hdr *rec, int idx,
649 struct thandle *th)
650 {
651 struct llog_operations *lop;
652 int raised, rc;
653
654 rc = llog_handle2ops(handle, &lop);
655 if (rc)
656 return rc;
657 LASSERT(lop);
658 if (lop->lop_declare_write_rec == NULL)
659 return -EOPNOTSUPP;
660
661 raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
662 if (!raised)
663 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
664 rc = lop->lop_declare_write_rec(env, handle, rec, idx, th);
665 if (!raised)
666 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
667 return rc;
668 }
669 EXPORT_SYMBOL(llog_declare_write_rec);
670
llog_write_rec(const struct lu_env * env,struct llog_handle * handle,struct llog_rec_hdr * rec,struct llog_cookie * logcookies,int numcookies,void * buf,int idx,struct thandle * th)671 int llog_write_rec(const struct lu_env *env, struct llog_handle *handle,
672 struct llog_rec_hdr *rec, struct llog_cookie *logcookies,
673 int numcookies, void *buf, int idx, struct thandle *th)
674 {
675 struct llog_operations *lop;
676 int raised, rc, buflen;
677
678 rc = llog_handle2ops(handle, &lop);
679 if (rc)
680 return rc;
681
682 LASSERT(lop);
683 if (lop->lop_write_rec == NULL)
684 return -EOPNOTSUPP;
685
686 if (buf)
687 buflen = rec->lrh_len + sizeof(struct llog_rec_hdr) +
688 sizeof(struct llog_rec_tail);
689 else
690 buflen = rec->lrh_len;
691 LASSERT(cfs_size_round(buflen) == buflen);
692
693 raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
694 if (!raised)
695 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
696 rc = lop->lop_write_rec(env, handle, rec, logcookies, numcookies,
697 buf, idx, th);
698 if (!raised)
699 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
700 return rc;
701 }
702 EXPORT_SYMBOL(llog_write_rec);
703
llog_add(const struct lu_env * env,struct llog_handle * lgh,struct llog_rec_hdr * rec,struct llog_cookie * logcookies,void * buf,struct thandle * th)704 int llog_add(const struct lu_env *env, struct llog_handle *lgh,
705 struct llog_rec_hdr *rec, struct llog_cookie *logcookies,
706 void *buf, struct thandle *th)
707 {
708 int raised, rc;
709
710 if (lgh->lgh_logops->lop_add == NULL)
711 return -EOPNOTSUPP;
712
713 raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
714 if (!raised)
715 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
716 rc = lgh->lgh_logops->lop_add(env, lgh, rec, logcookies, buf, th);
717 if (!raised)
718 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
719 return rc;
720 }
721 EXPORT_SYMBOL(llog_add);
722
llog_declare_add(const struct lu_env * env,struct llog_handle * lgh,struct llog_rec_hdr * rec,struct thandle * th)723 int llog_declare_add(const struct lu_env *env, struct llog_handle *lgh,
724 struct llog_rec_hdr *rec, struct thandle *th)
725 {
726 int raised, rc;
727
728 if (lgh->lgh_logops->lop_declare_add == NULL)
729 return -EOPNOTSUPP;
730
731 raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
732 if (!raised)
733 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
734 rc = lgh->lgh_logops->lop_declare_add(env, lgh, rec, th);
735 if (!raised)
736 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
737 return rc;
738 }
739 EXPORT_SYMBOL(llog_declare_add);
740
741 /**
742 * Helper function to open llog or create it if doesn't exist.
743 * It hides all transaction handling from caller.
744 */
llog_open_create(const struct lu_env * env,struct llog_ctxt * ctxt,struct llog_handle ** res,struct llog_logid * logid,char * name)745 int llog_open_create(const struct lu_env *env, struct llog_ctxt *ctxt,
746 struct llog_handle **res, struct llog_logid *logid,
747 char *name)
748 {
749 struct dt_device *d;
750 struct thandle *th;
751 int rc;
752
753 rc = llog_open(env, ctxt, res, logid, name, LLOG_OPEN_NEW);
754 if (rc)
755 return rc;
756
757 if (llog_exist(*res))
758 return 0;
759
760 LASSERT((*res)->lgh_obj != NULL);
761
762 d = lu2dt_dev((*res)->lgh_obj->do_lu.lo_dev);
763
764 th = dt_trans_create(env, d);
765 if (IS_ERR(th)) {
766 rc = PTR_ERR(th);
767 goto out;
768 }
769
770 rc = llog_declare_create(env, *res, th);
771 if (rc == 0) {
772 rc = dt_trans_start_local(env, d, th);
773 if (rc == 0)
774 rc = llog_create(env, *res, th);
775 }
776 dt_trans_stop(env, d, th);
777 out:
778 if (rc)
779 llog_close(env, *res);
780 return rc;
781 }
782 EXPORT_SYMBOL(llog_open_create);
783
784 /**
785 * Helper function to delete existent llog.
786 */
llog_erase(const struct lu_env * env,struct llog_ctxt * ctxt,struct llog_logid * logid,char * name)787 int llog_erase(const struct lu_env *env, struct llog_ctxt *ctxt,
788 struct llog_logid *logid, char *name)
789 {
790 struct llog_handle *handle;
791 int rc = 0, rc2;
792
793 /* nothing to erase */
794 if (name == NULL && logid == NULL)
795 return 0;
796
797 rc = llog_open(env, ctxt, &handle, logid, name, LLOG_OPEN_EXISTS);
798 if (rc < 0)
799 return rc;
800
801 rc = llog_init_handle(env, handle, LLOG_F_IS_PLAIN, NULL);
802 if (rc == 0)
803 rc = llog_destroy(env, handle);
804
805 rc2 = llog_close(env, handle);
806 if (rc == 0)
807 rc = rc2;
808 return rc;
809 }
810 EXPORT_SYMBOL(llog_erase);
811
812 /*
813 * Helper function for write record in llog.
814 * It hides all transaction handling from caller.
815 * Valid only with local llog.
816 */
llog_write(const struct lu_env * env,struct llog_handle * loghandle,struct llog_rec_hdr * rec,struct llog_cookie * reccookie,int cookiecount,void * buf,int idx)817 int llog_write(const struct lu_env *env, struct llog_handle *loghandle,
818 struct llog_rec_hdr *rec, struct llog_cookie *reccookie,
819 int cookiecount, void *buf, int idx)
820 {
821 struct dt_device *dt;
822 struct thandle *th;
823 int rc;
824
825 LASSERT(loghandle);
826 LASSERT(loghandle->lgh_ctxt);
827 LASSERT(loghandle->lgh_obj != NULL);
828
829 dt = lu2dt_dev(loghandle->lgh_obj->do_lu.lo_dev);
830
831 th = dt_trans_create(env, dt);
832 if (IS_ERR(th))
833 return PTR_ERR(th);
834
835 rc = llog_declare_write_rec(env, loghandle, rec, idx, th);
836 if (rc)
837 goto out_trans;
838
839 rc = dt_trans_start_local(env, dt, th);
840 if (rc)
841 goto out_trans;
842
843 down_write(&loghandle->lgh_lock);
844 rc = llog_write_rec(env, loghandle, rec, reccookie,
845 cookiecount, buf, idx, th);
846 up_write(&loghandle->lgh_lock);
847 out_trans:
848 dt_trans_stop(env, dt, th);
849 return rc;
850 }
851 EXPORT_SYMBOL(llog_write);
852
llog_open(const struct lu_env * env,struct llog_ctxt * ctxt,struct llog_handle ** lgh,struct llog_logid * logid,char * name,enum llog_open_param open_param)853 int llog_open(const struct lu_env *env, struct llog_ctxt *ctxt,
854 struct llog_handle **lgh, struct llog_logid *logid,
855 char *name, enum llog_open_param open_param)
856 {
857 int raised;
858 int rc;
859
860 LASSERT(ctxt);
861 LASSERT(ctxt->loc_logops);
862
863 if (ctxt->loc_logops->lop_open == NULL) {
864 *lgh = NULL;
865 return -EOPNOTSUPP;
866 }
867
868 *lgh = llog_alloc_handle();
869 if (*lgh == NULL)
870 return -ENOMEM;
871 (*lgh)->lgh_ctxt = ctxt;
872 (*lgh)->lgh_logops = ctxt->loc_logops;
873
874 raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
875 if (!raised)
876 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
877 rc = ctxt->loc_logops->lop_open(env, *lgh, logid, name, open_param);
878 if (!raised)
879 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
880 if (rc) {
881 llog_free_handle(*lgh);
882 *lgh = NULL;
883 }
884 return rc;
885 }
886 EXPORT_SYMBOL(llog_open);
887
llog_close(const struct lu_env * env,struct llog_handle * loghandle)888 int llog_close(const struct lu_env *env, struct llog_handle *loghandle)
889 {
890 struct llog_operations *lop;
891 int rc;
892
893 rc = llog_handle2ops(loghandle, &lop);
894 if (rc)
895 goto out;
896 if (lop->lop_close == NULL) {
897 rc = -EOPNOTSUPP;
898 goto out;
899 }
900 rc = lop->lop_close(env, loghandle);
901 out:
902 llog_handle_put(loghandle);
903 return rc;
904 }
905 EXPORT_SYMBOL(llog_close);
906
llog_is_empty(const struct lu_env * env,struct llog_ctxt * ctxt,char * name)907 int llog_is_empty(const struct lu_env *env, struct llog_ctxt *ctxt,
908 char *name)
909 {
910 struct llog_handle *llh;
911 int rc = 0;
912
913 rc = llog_open(env, ctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
914 if (rc < 0) {
915 if (likely(rc == -ENOENT))
916 rc = 0;
917 goto out;
918 }
919
920 rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
921 if (rc)
922 goto out_close;
923 rc = llog_get_size(llh);
924
925 out_close:
926 llog_close(env, llh);
927 out:
928 /* header is record 1 */
929 return rc <= 1;
930 }
931 EXPORT_SYMBOL(llog_is_empty);
932
llog_copy_handler(const struct lu_env * env,struct llog_handle * llh,struct llog_rec_hdr * rec,void * data)933 int llog_copy_handler(const struct lu_env *env, struct llog_handle *llh,
934 struct llog_rec_hdr *rec, void *data)
935 {
936 struct llog_handle *copy_llh = data;
937
938 /* Append all records */
939 return llog_write(env, copy_llh, rec, NULL, 0, NULL, -1);
940 }
941 EXPORT_SYMBOL(llog_copy_handler);
942
943 /* backup plain llog */
llog_backup(const struct lu_env * env,struct obd_device * obd,struct llog_ctxt * ctxt,struct llog_ctxt * bctxt,char * name,char * backup)944 int llog_backup(const struct lu_env *env, struct obd_device *obd,
945 struct llog_ctxt *ctxt, struct llog_ctxt *bctxt,
946 char *name, char *backup)
947 {
948 struct llog_handle *llh, *bllh;
949 int rc;
950
951
952
953 /* open original log */
954 rc = llog_open(env, ctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
955 if (rc < 0) {
956 /* the -ENOENT case is also reported to the caller
957 * but silently so it should handle that if needed.
958 */
959 if (rc != -ENOENT)
960 CERROR("%s: failed to open log %s: rc = %d\n",
961 obd->obd_name, name, rc);
962 return rc;
963 }
964
965 rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
966 if (rc)
967 goto out_close;
968
969 /* Make sure there's no old backup log */
970 rc = llog_erase(env, bctxt, NULL, backup);
971 if (rc < 0 && rc != -ENOENT)
972 goto out_close;
973
974 /* open backup log */
975 rc = llog_open_create(env, bctxt, &bllh, NULL, backup);
976 if (rc) {
977 CERROR("%s: failed to open backup logfile %s: rc = %d\n",
978 obd->obd_name, backup, rc);
979 goto out_close;
980 }
981
982 /* check that backup llog is not the same object as original one */
983 if (llh->lgh_obj == bllh->lgh_obj) {
984 CERROR("%s: backup llog %s to itself (%s), objects %p/%p\n",
985 obd->obd_name, name, backup, llh->lgh_obj,
986 bllh->lgh_obj);
987 rc = -EEXIST;
988 goto out_backup;
989 }
990
991 rc = llog_init_handle(env, bllh, LLOG_F_IS_PLAIN, NULL);
992 if (rc)
993 goto out_backup;
994
995 /* Copy log record by record */
996 rc = llog_process_or_fork(env, llh, llog_copy_handler, (void *)bllh,
997 NULL, false);
998 if (rc)
999 CERROR("%s: failed to backup log %s: rc = %d\n",
1000 obd->obd_name, name, rc);
1001 out_backup:
1002 llog_close(env, bllh);
1003 out_close:
1004 llog_close(env, llh);
1005 return rc;
1006 }
1007 EXPORT_SYMBOL(llog_backup);
1008