4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2012, 2015, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/llog.c
34 * OST<->MDS recovery logging infrastructure.
35 * Invariants in implementation:
36 * - we do not share logs among different OST<->MDS connections, so that
37 * if an OST or MDS fails it need only look at log(s) relevant to itself
39 * Author: Andreas Dilger <adilger@clusterfs.com>
40 * Author: Alex Zhuravlev <bzzz@whamcloud.com>
41 * Author: Mikhail Pershin <tappro@whamcloud.com>
44 #define DEBUG_SUBSYSTEM S_LOG
46 #include "../include/obd_class.h"
47 #include "../include/lustre_log.h"
48 #include "llog_internal.h"
51 * Allocate a new log or catalog handle
52 * Used inside llog_open().
54 static struct llog_handle *llog_alloc_handle(void)
56 struct llog_handle *loghandle;
58 loghandle = kzalloc(sizeof(*loghandle), GFP_NOFS);
62 init_rwsem(&loghandle->lgh_lock);
63 spin_lock_init(&loghandle->lgh_hdr_lock);
64 INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
65 atomic_set(&loghandle->lgh_refcount, 1);
71 * Free llog handle and header data if exists. Used in llog_close() only
73 static void llog_free_handle(struct llog_handle *loghandle)
75 /* failed llog_init_handle */
76 if (!loghandle->lgh_hdr)
79 if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
80 LASSERT(list_empty(&loghandle->u.phd.phd_entry));
81 else if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
82 LASSERT(list_empty(&loghandle->u.chd.chd_head));
83 LASSERT(sizeof(*loghandle->lgh_hdr) == LLOG_CHUNK_SIZE);
84 kfree(loghandle->lgh_hdr);
89 void llog_handle_get(struct llog_handle *loghandle)
91 atomic_inc(&loghandle->lgh_refcount);
94 void llog_handle_put(struct llog_handle *loghandle)
96 LASSERT(atomic_read(&loghandle->lgh_refcount) > 0);
97 if (atomic_dec_and_test(&loghandle->lgh_refcount))
98 llog_free_handle(loghandle);
101 static int llog_read_header(const struct lu_env *env,
102 struct llog_handle *handle,
103 struct obd_uuid *uuid)
105 struct llog_operations *lop;
108 rc = llog_handle2ops(handle, &lop);
112 if (!lop->lop_read_header)
115 rc = lop->lop_read_header(env, handle);
116 if (rc == LLOG_EEMPTY) {
117 struct llog_log_hdr *llh = handle->lgh_hdr;
119 handle->lgh_last_idx = 0; /* header is record with index 0 */
120 llh->llh_count = 1; /* for the header record */
121 llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
122 llh->llh_hdr.lrh_len = LLOG_CHUNK_SIZE;
123 llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE;
124 llh->llh_hdr.lrh_index = 0;
125 llh->llh_tail.lrt_index = 0;
126 llh->llh_timestamp = ktime_get_real_seconds();
128 memcpy(&llh->llh_tgtuuid, uuid,
129 sizeof(llh->llh_tgtuuid));
130 llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
131 ext2_set_bit(0, llh->llh_bitmap);
137 int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
138 int flags, struct obd_uuid *uuid)
140 enum llog_flag fmt = flags & LLOG_F_EXT_MASK;
141 struct llog_log_hdr *llh;
144 LASSERT(!handle->lgh_hdr);
146 llh = kzalloc(sizeof(*llh), GFP_NOFS);
149 handle->lgh_hdr = llh;
150 /* first assign flags to use llog_client_ops */
151 llh->llh_flags = flags;
152 rc = llog_read_header(env, handle, uuid);
154 if (unlikely((llh->llh_flags & LLOG_F_IS_PLAIN &&
155 flags & LLOG_F_IS_CAT) ||
156 (llh->llh_flags & LLOG_F_IS_CAT &&
157 flags & LLOG_F_IS_PLAIN))) {
158 CERROR("%s: llog type is %s but initializing %s\n",
159 handle->lgh_ctxt->loc_obd->obd_name,
160 llh->llh_flags & LLOG_F_IS_CAT ?
162 flags & LLOG_F_IS_CAT ? "catalog" : "plain");
165 } else if (llh->llh_flags &
166 (LLOG_F_IS_PLAIN | LLOG_F_IS_CAT)) {
168 * it is possible to open llog without specifying llog
169 * type so it is taken from llh_flags
171 flags = llh->llh_flags;
173 /* for some reason the llh_flags has no type set */
174 CERROR("llog type is not specified!\n");
179 !obd_uuid_equals(uuid, &llh->llh_tgtuuid))) {
180 CERROR("%s: llog uuid mismatch: %s/%s\n",
181 handle->lgh_ctxt->loc_obd->obd_name,
183 (char *)llh->llh_tgtuuid.uuid);
188 if (flags & LLOG_F_IS_CAT) {
189 LASSERT(list_empty(&handle->u.chd.chd_head));
190 INIT_LIST_HEAD(&handle->u.chd.chd_head);
191 llh->llh_size = sizeof(struct llog_logid_rec);
192 } else if (!(flags & LLOG_F_IS_PLAIN)) {
193 CERROR("%s: unknown flags: %#x (expected %#x or %#x)\n",
194 handle->lgh_ctxt->loc_obd->obd_name,
195 flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
198 llh->llh_flags |= fmt;
202 handle->lgh_hdr = NULL;
206 EXPORT_SYMBOL(llog_init_handle);
208 static int llog_process_thread(void *arg)
210 struct llog_process_info *lpi = arg;
211 struct llog_handle *loghandle = lpi->lpi_loghandle;
212 struct llog_log_hdr *llh = loghandle->lgh_hdr;
213 struct llog_process_cat_data *cd = lpi->lpi_catdata;
215 __u64 cur_offset = LLOG_CHUNK_SIZE;
217 int rc = 0, index = 1, last_index;
219 int last_called_index = 0;
223 buf = kzalloc(LLOG_CHUNK_SIZE, GFP_NOFS);
225 lpi->lpi_rc = -ENOMEM;
230 last_called_index = cd->lpcd_first_idx;
231 index = cd->lpcd_first_idx + 1;
233 if (cd && cd->lpcd_last_idx)
234 last_index = cd->lpcd_last_idx;
236 last_index = LLOG_BITMAP_BYTES * 8 - 1;
238 /* Record is not in this buffer. */
239 if (index > last_index)
243 struct llog_rec_hdr *rec;
245 /* skip records not set in bitmap */
246 while (index <= last_index &&
247 !ext2_test_bit(index, llh->llh_bitmap))
250 LASSERT(index <= last_index + 1);
251 if (index == last_index + 1)
254 CDEBUG(D_OTHER, "index: %d last_index %d\n",
257 /* get the buf with our target record; avoid old garbage */
258 memset(buf, 0, LLOG_CHUNK_SIZE);
259 last_offset = cur_offset;
260 rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index,
261 index, &cur_offset, buf, LLOG_CHUNK_SIZE);
265 /* NB: when rec->lrh_len is accessed it is already swabbed
266 * since it is used at the "end" of the loop and the rec
267 * swabbing is done at the beginning of the loop.
269 for (rec = (struct llog_rec_hdr *)buf;
270 (char *)rec < buf + LLOG_CHUNK_SIZE;
271 rec = llog_rec_hdr_next(rec)) {
272 CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
275 if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
276 lustre_swab_llog_rec(rec);
278 CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
279 rec->lrh_type, rec->lrh_index);
281 if (rec->lrh_index == 0) {
282 /* probably another rec just got added? */
284 if (index <= loghandle->lgh_last_idx)
286 goto out; /* no more records */
288 if (rec->lrh_len == 0 ||
289 rec->lrh_len > LLOG_CHUNK_SIZE) {
290 CWARN("invalid length %d in llog record for index %d/%d\n",
292 rec->lrh_index, index);
297 if (rec->lrh_index < index) {
298 CDEBUG(D_OTHER, "skipping lrh_index %d\n",
304 "lrh_index: %d lrh_len: %d (%d remains)\n",
305 rec->lrh_index, rec->lrh_len,
306 (int)(buf + LLOG_CHUNK_SIZE - (char *)rec));
308 loghandle->lgh_cur_idx = rec->lrh_index;
309 loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
312 /* if set, process the callback on this record */
313 if (ext2_test_bit(index, llh->llh_bitmap)) {
314 rc = lpi->lpi_cb(lpi->lpi_env, loghandle, rec,
316 last_called_index = index;
320 CDEBUG(D_OTHER, "Skipped index %d\n", index);
323 /* next record, still in buffer? */
325 if (index > last_index) {
334 cd->lpcd_last_idx = last_called_index;
341 static int llog_process_thread_daemonize(void *arg)
343 struct llog_process_info *lpi = arg;
349 /* client env has no keys, tags is just 0 */
350 rc = lu_env_init(&env, LCT_LOCAL | LCT_MG_THREAD);
355 rc = llog_process_thread(arg);
359 complete(&lpi->lpi_completion);
363 int llog_process_or_fork(const struct lu_env *env,
364 struct llog_handle *loghandle,
365 llog_cb_t cb, void *data, void *catdata, bool fork)
367 struct llog_process_info *lpi;
370 lpi = kzalloc(sizeof(*lpi), GFP_NOFS);
373 lpi->lpi_loghandle = loghandle;
375 lpi->lpi_cbdata = data;
376 lpi->lpi_catdata = catdata;
379 struct task_struct *task;
381 /* The new thread can't use parent env,
382 * init the new one in llog_process_thread_daemonize.
385 init_completion(&lpi->lpi_completion);
386 task = kthread_run(llog_process_thread_daemonize, lpi,
387 "llog_process_thread");
390 CERROR("%s: cannot start thread: rc = %d\n",
391 loghandle->lgh_ctxt->loc_obd->obd_name, rc);
394 wait_for_completion(&lpi->lpi_completion);
397 llog_process_thread(lpi);
404 EXPORT_SYMBOL(llog_process_or_fork);
406 int llog_process(const struct lu_env *env, struct llog_handle *loghandle,
407 llog_cb_t cb, void *data, void *catdata)
409 return llog_process_or_fork(env, loghandle, cb, data, catdata, true);
411 EXPORT_SYMBOL(llog_process);
413 int llog_open(const struct lu_env *env, struct llog_ctxt *ctxt,
414 struct llog_handle **lgh, struct llog_logid *logid,
415 char *name, enum llog_open_param open_param)
421 LASSERT(ctxt->loc_logops);
423 if (!ctxt->loc_logops->lop_open) {
428 *lgh = llog_alloc_handle();
431 (*lgh)->lgh_ctxt = ctxt;
432 (*lgh)->lgh_logops = ctxt->loc_logops;
434 raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
436 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
437 rc = ctxt->loc_logops->lop_open(env, *lgh, logid, name, open_param);
439 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
441 llog_free_handle(*lgh);
446 EXPORT_SYMBOL(llog_open);
448 int llog_close(const struct lu_env *env, struct llog_handle *loghandle)
450 struct llog_operations *lop;
453 rc = llog_handle2ops(loghandle, &lop);
456 if (!lop->lop_close) {
460 rc = lop->lop_close(env, loghandle);
462 llog_handle_put(loghandle);
465 EXPORT_SYMBOL(llog_close);