4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2015, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * Implementation of cl_io for LOV layer.
34 * Author: Nikita Danilov <nikita.danilov@sun.com>
35 * Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
38 #define DEBUG_SUBSYSTEM S_LOV
40 #include "lov_cl_internal.h"
46 static void lov_io_sub_fini(const struct lu_env *env, struct lov_io *lio,
47 struct lov_io_sub *sub)
50 if (sub->sub_io_initialized) {
51 cl_io_fini(sub->sub_env, sub->sub_io);
52 sub->sub_io_initialized = 0;
53 lio->lis_active_subios--;
55 if (sub->sub_stripe == lio->lis_single_subio_index)
56 lio->lis_single_subio_index = -1;
57 else if (!sub->sub_borrowed)
61 if (!IS_ERR_OR_NULL(sub->sub_env)) {
62 if (!sub->sub_borrowed)
63 cl_env_put(sub->sub_env, &sub->sub_refcheck);
68 static void lov_io_sub_inherit(struct cl_io *io, struct lov_io *lio,
69 int stripe, loff_t start, loff_t end)
71 struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
72 struct cl_io *parent = lio->lis_cl.cis_io;
74 switch (io->ci_type) {
76 io->u.ci_setattr.sa_attr = parent->u.ci_setattr.sa_attr;
77 io->u.ci_setattr.sa_attr_flags =
78 parent->u.ci_setattr.sa_attr_flags;
79 io->u.ci_setattr.sa_valid = parent->u.ci_setattr.sa_valid;
80 io->u.ci_setattr.sa_stripe_index = stripe;
81 io->u.ci_setattr.sa_parent_fid =
82 parent->u.ci_setattr.sa_parent_fid;
83 if (cl_io_is_trunc(io)) {
84 loff_t new_size = parent->u.ci_setattr.sa_attr.lvb_size;
86 new_size = lov_size_to_stripe(lsm, new_size, stripe);
87 io->u.ci_setattr.sa_attr.lvb_size = new_size;
91 case CIT_DATA_VERSION: {
92 io->u.ci_data_version.dv_data_version = 0;
93 io->u.ci_data_version.dv_flags =
94 parent->u.ci_data_version.dv_flags;
98 struct cl_object *obj = parent->ci_obj;
99 loff_t off = cl_offset(obj, parent->u.ci_fault.ft_index);
101 io->u.ci_fault = parent->u.ci_fault;
102 off = lov_size_to_stripe(lsm, off, stripe);
103 io->u.ci_fault.ft_index = cl_index(obj, off);
107 io->u.ci_fsync.fi_start = start;
108 io->u.ci_fsync.fi_end = end;
109 io->u.ci_fsync.fi_fid = parent->u.ci_fsync.fi_fid;
110 io->u.ci_fsync.fi_mode = parent->u.ci_fsync.fi_mode;
115 io->u.ci_wr.wr_sync = cl_io_is_sync_write(parent);
116 if (cl_io_is_append(parent)) {
117 io->u.ci_wr.wr_append = 1;
119 io->u.ci_rw.crw_pos = start;
120 io->u.ci_rw.crw_count = end - start;
129 static int lov_io_sub_init(const struct lu_env *env, struct lov_io *lio,
130 struct lov_io_sub *sub)
132 struct lov_object *lov = lio->lis_object;
133 struct cl_io *sub_io;
134 struct cl_object *sub_obj;
135 struct cl_io *io = lio->lis_cl.cis_io;
136 int stripe = sub->sub_stripe;
139 LASSERT(!sub->sub_io);
140 LASSERT(!sub->sub_env);
141 LASSERT(sub->sub_stripe < lio->lis_stripe_count);
143 if (unlikely(!lov_r0(lov)->lo_sub[stripe]))
146 sub->sub_io_initialized = 0;
147 sub->sub_borrowed = 0;
149 /* obtain new environment */
150 sub->sub_env = cl_env_get(&sub->sub_refcheck);
151 if (IS_ERR(sub->sub_env)) {
152 rc = PTR_ERR(sub->sub_env);
157 * First sub-io. Use ->lis_single_subio to
158 * avoid dynamic allocation.
160 if (lio->lis_active_subios == 0) {
161 sub->sub_io = &lio->lis_single_subio;
162 lio->lis_single_subio_index = stripe;
164 sub->sub_io = kzalloc(sizeof(*sub->sub_io),
172 sub_obj = lovsub2cl(lov_r0(lov)->lo_sub[stripe]);
173 sub_io = sub->sub_io;
175 sub_io->ci_obj = sub_obj;
176 sub_io->ci_result = 0;
177 sub_io->ci_parent = io;
178 sub_io->ci_lockreq = io->ci_lockreq;
179 sub_io->ci_type = io->ci_type;
180 sub_io->ci_no_srvlock = io->ci_no_srvlock;
181 sub_io->ci_noatime = io->ci_noatime;
183 rc = cl_io_sub_init(sub->sub_env, sub_io, io->ci_type, sub_obj);
185 lio->lis_active_subios++;
186 sub->sub_io_initialized = 1;
191 lov_io_sub_fini(env, lio, sub);
195 struct lov_io_sub *lov_sub_get(const struct lu_env *env,
196 struct lov_io *lio, int stripe)
199 struct lov_io_sub *sub = &lio->lis_subs[stripe];
201 LASSERT(stripe < lio->lis_stripe_count);
203 if (!sub->sub_io_initialized) {
204 sub->sub_stripe = stripe;
205 rc = lov_io_sub_init(env, lio, sub);
215 /*****************************************************************************
221 int lov_page_stripe(const struct cl_page *page)
223 const struct cl_page_slice *slice;
225 slice = cl_page_at(page, &lov_device_type);
226 LASSERT(slice->cpl_obj);
228 return cl2lov_page(slice)->lps_stripe;
231 static int lov_io_subio_init(const struct lu_env *env, struct lov_io *lio,
234 struct lov_stripe_md *lsm;
237 LASSERT(lio->lis_object);
238 lsm = lio->lis_object->lo_lsm;
241 * Need to be optimized, we can't afford to allocate a piece of memory
242 * when writing a page. -jay
245 libcfs_kvzalloc(lsm->lsm_stripe_count *
246 sizeof(lio->lis_subs[0]),
249 lio->lis_nr_subios = lio->lis_stripe_count;
250 lio->lis_single_subio_index = -1;
251 lio->lis_active_subios = 0;
259 static int lov_io_slice_init(struct lov_io *lio, struct lov_object *obj,
263 lio->lis_object = obj;
265 lio->lis_stripe_count = obj->lo_lsm->lsm_stripe_count;
267 switch (io->ci_type) {
270 lio->lis_pos = io->u.ci_rw.crw_pos;
271 lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
272 lio->lis_io_endpos = lio->lis_endpos;
273 if (cl_io_is_append(io)) {
274 LASSERT(io->ci_type == CIT_WRITE);
277 * If there is LOV EA hole, then we may cannot locate
278 * the current file-tail exactly.
280 if (unlikely(obj->lo_lsm->lsm_pattern &
285 lio->lis_endpos = OBD_OBJECT_EOF;
290 if (cl_io_is_trunc(io))
291 lio->lis_pos = io->u.ci_setattr.sa_attr.lvb_size;
294 lio->lis_endpos = OBD_OBJECT_EOF;
297 case CIT_DATA_VERSION:
299 lio->lis_endpos = OBD_OBJECT_EOF;
303 pgoff_t index = io->u.ci_fault.ft_index;
305 lio->lis_pos = cl_offset(io->ci_obj, index);
306 lio->lis_endpos = cl_offset(io->ci_obj, index + 1);
311 lio->lis_pos = io->u.ci_fsync.fi_start;
312 lio->lis_endpos = io->u.ci_fsync.fi_end;
318 lio->lis_endpos = OBD_OBJECT_EOF;
327 static void lov_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
329 struct lov_io *lio = cl2lov_io(env, ios);
330 struct lov_object *lov = cl2lov(ios->cis_obj);
334 for (i = 0; i < lio->lis_nr_subios; i++)
335 lov_io_sub_fini(env, lio, &lio->lis_subs[i]);
336 kvfree(lio->lis_subs);
337 lio->lis_nr_subios = 0;
340 LASSERT(atomic_read(&lov->lo_active_ios) > 0);
341 if (atomic_dec_and_test(&lov->lo_active_ios))
342 wake_up_all(&lov->lo_waitq);
345 static u64 lov_offset_mod(u64 val, int delta)
347 if (val != OBD_OBJECT_EOF)
352 static int lov_io_iter_init(const struct lu_env *env,
353 const struct cl_io_slice *ios)
355 struct lov_io *lio = cl2lov_io(env, ios);
356 struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
357 struct lov_io_sub *sub;
364 endpos = lov_offset_mod(lio->lis_endpos, -1);
365 for (stripe = 0; stripe < lio->lis_stripe_count; stripe++) {
366 if (!lov_stripe_intersects(lsm, stripe, lio->lis_pos,
367 endpos, &start, &end))
370 if (unlikely(!lov_r0(lio->lis_object)->lo_sub[stripe])) {
371 if (ios->cis_io->ci_type == CIT_READ ||
372 ios->cis_io->ci_type == CIT_WRITE ||
373 ios->cis_io->ci_type == CIT_FAULT)
379 end = lov_offset_mod(end, 1);
380 sub = lov_sub_get(env, lio, stripe);
386 lov_io_sub_inherit(sub->sub_io, lio, stripe, start, end);
387 rc = cl_io_iter_init(sub->sub_env, sub->sub_io);
389 cl_io_iter_fini(sub->sub_env, sub->sub_io);
392 CDEBUG(D_VFSTRACE, "shrink: %d [%llu, %llu)\n",
395 list_add_tail(&sub->sub_linkage, &lio->lis_active);
400 static int lov_io_rw_iter_init(const struct lu_env *env,
401 const struct cl_io_slice *ios)
403 struct lov_io *lio = cl2lov_io(env, ios);
404 struct cl_io *io = ios->cis_io;
405 struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
406 __u64 start = io->u.ci_rw.crw_pos;
408 unsigned long ssize = lsm->lsm_stripe_size;
410 LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
412 /* fast path for common case. */
413 if (lio->lis_nr_subios != 1 && !cl_io_is_append(io)) {
414 lov_do_div64(start, ssize);
415 next = (start + 1) * ssize;
416 if (next <= start * ssize)
419 io->ci_continue = next < lio->lis_io_endpos;
420 io->u.ci_rw.crw_count = min_t(loff_t, lio->lis_io_endpos,
421 next) - io->u.ci_rw.crw_pos;
422 lio->lis_pos = io->u.ci_rw.crw_pos;
423 lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
424 CDEBUG(D_VFSTRACE, "stripe: %llu chunk: [%llu, %llu) %llu\n",
425 (__u64)start, lio->lis_pos, lio->lis_endpos,
426 (__u64)lio->lis_io_endpos);
429 * XXX The following call should be optimized: we know, that
430 * [lio->lis_pos, lio->lis_endpos) intersects with exactly one stripe.
432 return lov_io_iter_init(env, ios);
435 static int lov_io_call(const struct lu_env *env, struct lov_io *lio,
436 int (*iofunc)(const struct lu_env *, struct cl_io *))
438 struct cl_io *parent = lio->lis_cl.cis_io;
439 struct lov_io_sub *sub;
442 list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
443 rc = iofunc(sub->sub_env, sub->sub_io);
447 if (parent->ci_result == 0)
448 parent->ci_result = sub->sub_io->ci_result;
453 static int lov_io_lock(const struct lu_env *env, const struct cl_io_slice *ios)
455 return lov_io_call(env, cl2lov_io(env, ios), cl_io_lock);
458 static int lov_io_start(const struct lu_env *env, const struct cl_io_slice *ios)
460 return lov_io_call(env, cl2lov_io(env, ios), cl_io_start);
463 static int lov_io_end_wrapper(const struct lu_env *env, struct cl_io *io)
466 * It's possible that lov_io_start() wasn't called against this
467 * sub-io, either because previous sub-io failed, or upper layer
470 if (io->ci_state == CIS_IO_GOING)
473 io->ci_state = CIS_IO_FINISHED;
478 lov_io_data_version_end(const struct lu_env *env, const struct cl_io_slice *ios)
480 struct lov_io *lio = cl2lov_io(env, ios);
481 struct cl_io *parent = lio->lis_cl.cis_io;
482 struct lov_io_sub *sub;
484 list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
485 lov_io_end_wrapper(env, sub->sub_io);
487 parent->u.ci_data_version.dv_data_version +=
488 sub->sub_io->u.ci_data_version.dv_data_version;
490 if (!parent->ci_result)
491 parent->ci_result = sub->sub_io->ci_result;
495 static int lov_io_iter_fini_wrapper(const struct lu_env *env, struct cl_io *io)
497 cl_io_iter_fini(env, io);
501 static int lov_io_unlock_wrapper(const struct lu_env *env, struct cl_io *io)
503 cl_io_unlock(env, io);
507 static void lov_io_end(const struct lu_env *env, const struct cl_io_slice *ios)
511 rc = lov_io_call(env, cl2lov_io(env, ios), lov_io_end_wrapper);
515 static void lov_io_iter_fini(const struct lu_env *env,
516 const struct cl_io_slice *ios)
518 struct lov_io *lio = cl2lov_io(env, ios);
521 rc = lov_io_call(env, lio, lov_io_iter_fini_wrapper);
523 while (!list_empty(&lio->lis_active))
524 list_del_init(lio->lis_active.next);
527 static void lov_io_unlock(const struct lu_env *env,
528 const struct cl_io_slice *ios)
532 rc = lov_io_call(env, cl2lov_io(env, ios), lov_io_unlock_wrapper);
536 static int lov_io_read_ahead(const struct lu_env *env,
537 const struct cl_io_slice *ios,
538 pgoff_t start, struct cl_read_ahead *ra)
540 struct lov_io *lio = cl2lov_io(env, ios);
541 struct lov_object *loo = lio->lis_object;
542 struct cl_object *obj = lov2cl(loo);
543 struct lov_layout_raid0 *r0 = lov_r0(loo);
544 unsigned int pps; /* pages per stripe */
545 struct lov_io_sub *sub;
551 stripe = lov_stripe_number(loo->lo_lsm, cl_offset(obj, start));
552 if (unlikely(!r0->lo_sub[stripe]))
555 sub = lov_sub_get(env, lio, stripe);
559 lov_stripe_offset(loo->lo_lsm, cl_offset(obj, start), stripe, &suboff);
560 rc = cl_io_read_ahead(sub->sub_env, sub->sub_io,
561 cl_index(lovsub2cl(r0->lo_sub[stripe]), suboff),
564 CDEBUG(D_READA, DFID " cra_end = %lu, stripes = %d, rc = %d\n",
565 PFID(lu_object_fid(lov2lu(loo))), ra->cra_end, r0->lo_nr, rc);
570 * Adjust the stripe index by layout of raid0. ra->cra_end is
571 * the maximum page index covered by an underlying DLM lock.
572 * This function converts cra_end from stripe level to file
573 * level, and make sure it's not beyond stripe boundary.
575 if (r0->lo_nr == 1) /* single stripe file */
578 /* cra_end is stripe level, convert it into file level */
579 ra_end = ra->cra_end;
580 if (ra_end != CL_PAGE_EOF)
581 ra_end = lov_stripe_pgoff(loo->lo_lsm, ra_end, stripe);
583 pps = loo->lo_lsm->lsm_stripe_size >> PAGE_SHIFT;
585 CDEBUG(D_READA, DFID " max_index = %lu, pps = %u, stripe_size = %u, stripe no = %u, start index = %lu\n",
586 PFID(lu_object_fid(lov2lu(loo))), ra_end, pps,
587 loo->lo_lsm->lsm_stripe_size, stripe, start);
589 /* never exceed the end of the stripe */
590 ra->cra_end = min_t(pgoff_t, ra_end, start + pps - start % pps - 1);
595 * lov implementation of cl_operations::cio_submit() method. It takes a list
596 * of pages in \a queue, splits it into per-stripe sub-lists, invokes
597 * cl_io_submit() on underlying devices to submit sub-lists, and then splices
600 * Major complication of this function is a need to handle memory cleansing:
601 * cl_io_submit() is called to write out pages as a part of VM memory
602 * reclamation, and hence it may not fail due to memory shortages (system
603 * dead-locks otherwise). To deal with this, some resources (sub-lists,
604 * sub-environment, etc.) are allocated per-device on "startup" (i.e., in a
605 * not-memory cleansing context), and in case of memory shortage, these
606 * pre-allocated resources are used by lov_io_submit() under
607 * lov_device::ld_mutex mutex.
609 static int lov_io_submit(const struct lu_env *env,
610 const struct cl_io_slice *ios,
611 enum cl_req_type crt, struct cl_2queue *queue)
613 struct cl_page_list *qin = &queue->c2_qin;
614 struct lov_io *lio = cl2lov_io(env, ios);
615 struct lov_io_sub *sub;
616 struct cl_page_list *plist = &lov_env_info(env)->lti_plist;
617 struct cl_page *page;
622 if (lio->lis_active_subios == 1) {
623 int idx = lio->lis_single_subio_index;
625 LASSERT(idx < lio->lis_nr_subios);
626 sub = lov_sub_get(env, lio, idx);
627 LASSERT(!IS_ERR(sub));
628 LASSERT(sub->sub_io == &lio->lis_single_subio);
629 rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
634 LASSERT(lio->lis_subs);
636 cl_page_list_init(plist);
637 while (qin->pl_nr > 0) {
638 struct cl_2queue *cl2q = &lov_env_info(env)->lti_cl2q;
640 cl_2queue_init(cl2q);
642 page = cl_page_list_first(qin);
643 cl_page_list_move(&cl2q->c2_qin, qin, page);
645 stripe = lov_page_stripe(page);
646 while (qin->pl_nr > 0) {
647 page = cl_page_list_first(qin);
648 if (stripe != lov_page_stripe(page))
651 cl_page_list_move(&cl2q->c2_qin, qin, page);
654 sub = lov_sub_get(env, lio, stripe);
656 rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
662 cl_page_list_splice(&cl2q->c2_qin, plist);
663 cl_page_list_splice(&cl2q->c2_qout, &queue->c2_qout);
664 cl_2queue_fini(env, cl2q);
670 cl_page_list_splice(plist, qin);
671 cl_page_list_fini(env, plist);
676 static int lov_io_commit_async(const struct lu_env *env,
677 const struct cl_io_slice *ios,
678 struct cl_page_list *queue, int from, int to,
681 struct cl_page_list *plist = &lov_env_info(env)->lti_plist;
682 struct lov_io *lio = cl2lov_io(env, ios);
683 struct lov_io_sub *sub;
684 struct cl_page *page;
687 if (lio->lis_active_subios == 1) {
688 int idx = lio->lis_single_subio_index;
690 LASSERT(idx < lio->lis_nr_subios);
691 sub = lov_sub_get(env, lio, idx);
692 LASSERT(!IS_ERR(sub));
693 LASSERT(sub->sub_io == &lio->lis_single_subio);
694 rc = cl_io_commit_async(sub->sub_env, sub->sub_io, queue,
699 LASSERT(lio->lis_subs);
701 cl_page_list_init(plist);
702 while (queue->pl_nr > 0) {
706 LASSERT(plist->pl_nr == 0);
707 page = cl_page_list_first(queue);
708 cl_page_list_move(plist, queue, page);
710 stripe = lov_page_stripe(page);
711 while (queue->pl_nr > 0) {
712 page = cl_page_list_first(queue);
713 if (stripe != lov_page_stripe(page))
716 cl_page_list_move(plist, queue, page);
719 if (queue->pl_nr > 0) /* still has more pages */
720 stripe_to = PAGE_SIZE;
722 sub = lov_sub_get(env, lio, stripe);
724 rc = cl_io_commit_async(sub->sub_env, sub->sub_io,
725 plist, from, stripe_to, cb);
731 if (plist->pl_nr > 0) /* short write */
737 /* for error case, add the page back into the qin list */
738 LASSERT(ergo(rc == 0, plist->pl_nr == 0));
739 while (plist->pl_nr > 0) {
740 /* error occurred, add the uncommitted pages back into queue */
741 page = cl_page_list_last(plist);
742 cl_page_list_move_head(queue, plist, page);
748 static int lov_io_fault_start(const struct lu_env *env,
749 const struct cl_io_slice *ios)
751 struct cl_fault_io *fio;
753 struct lov_io_sub *sub;
755 fio = &ios->cis_io->u.ci_fault;
756 lio = cl2lov_io(env, ios);
757 sub = lov_sub_get(env, lio, lov_page_stripe(fio->ft_page));
760 sub->sub_io->u.ci_fault.ft_nob = fio->ft_nob;
761 return lov_io_start(env, ios);
764 static void lov_io_fsync_end(const struct lu_env *env,
765 const struct cl_io_slice *ios)
767 struct lov_io *lio = cl2lov_io(env, ios);
768 struct lov_io_sub *sub;
769 unsigned int *written = &ios->cis_io->u.ci_fsync.fi_nr_written;
772 list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
773 struct cl_io *subio = sub->sub_io;
775 lov_io_end_wrapper(sub->sub_env, subio);
777 if (subio->ci_result == 0)
778 *written += subio->u.ci_fsync.fi_nr_written;
782 static const struct cl_io_operations lov_io_ops = {
785 .cio_fini = lov_io_fini,
786 .cio_iter_init = lov_io_rw_iter_init,
787 .cio_iter_fini = lov_io_iter_fini,
788 .cio_lock = lov_io_lock,
789 .cio_unlock = lov_io_unlock,
790 .cio_start = lov_io_start,
791 .cio_end = lov_io_end
794 .cio_fini = lov_io_fini,
795 .cio_iter_init = lov_io_rw_iter_init,
796 .cio_iter_fini = lov_io_iter_fini,
797 .cio_lock = lov_io_lock,
798 .cio_unlock = lov_io_unlock,
799 .cio_start = lov_io_start,
800 .cio_end = lov_io_end
803 .cio_fini = lov_io_fini,
804 .cio_iter_init = lov_io_iter_init,
805 .cio_iter_fini = lov_io_iter_fini,
806 .cio_lock = lov_io_lock,
807 .cio_unlock = lov_io_unlock,
808 .cio_start = lov_io_start,
809 .cio_end = lov_io_end
811 [CIT_DATA_VERSION] = {
812 .cio_fini = lov_io_fini,
813 .cio_iter_init = lov_io_iter_init,
814 .cio_iter_fini = lov_io_iter_fini,
815 .cio_lock = lov_io_lock,
816 .cio_unlock = lov_io_unlock,
817 .cio_start = lov_io_start,
818 .cio_end = lov_io_data_version_end,
821 .cio_fini = lov_io_fini,
822 .cio_iter_init = lov_io_iter_init,
823 .cio_iter_fini = lov_io_iter_fini,
824 .cio_lock = lov_io_lock,
825 .cio_unlock = lov_io_unlock,
826 .cio_start = lov_io_fault_start,
827 .cio_end = lov_io_end
830 .cio_fini = lov_io_fini,
831 .cio_iter_init = lov_io_iter_init,
832 .cio_iter_fini = lov_io_iter_fini,
833 .cio_lock = lov_io_lock,
834 .cio_unlock = lov_io_unlock,
835 .cio_start = lov_io_start,
836 .cio_end = lov_io_fsync_end
839 .cio_fini = lov_io_fini
842 .cio_read_ahead = lov_io_read_ahead,
843 .cio_submit = lov_io_submit,
844 .cio_commit_async = lov_io_commit_async,
847 /*****************************************************************************
849 * Empty lov io operations.
853 static void lov_empty_io_fini(const struct lu_env *env,
854 const struct cl_io_slice *ios)
856 struct lov_object *lov = cl2lov(ios->cis_obj);
858 if (atomic_dec_and_test(&lov->lo_active_ios))
859 wake_up_all(&lov->lo_waitq);
862 static int lov_empty_io_submit(const struct lu_env *env,
863 const struct cl_io_slice *ios,
864 enum cl_req_type crt, struct cl_2queue *queue)
869 static void lov_empty_impossible(const struct lu_env *env,
870 struct cl_io_slice *ios)
875 #define LOV_EMPTY_IMPOSSIBLE ((void *)lov_empty_impossible)
878 * An io operation vector for files without stripes.
880 static const struct cl_io_operations lov_empty_io_ops = {
883 .cio_fini = lov_empty_io_fini,
886 .cio_fini = lov_empty_io_fini,
887 .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
888 .cio_lock = LOV_EMPTY_IMPOSSIBLE,
889 .cio_start = LOV_EMPTY_IMPOSSIBLE,
890 .cio_end = LOV_EMPTY_IMPOSSIBLE
893 .cio_fini = lov_empty_io_fini,
894 .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
895 .cio_lock = LOV_EMPTY_IMPOSSIBLE,
896 .cio_start = LOV_EMPTY_IMPOSSIBLE,
897 .cio_end = LOV_EMPTY_IMPOSSIBLE
900 .cio_fini = lov_empty_io_fini,
901 .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
902 .cio_lock = LOV_EMPTY_IMPOSSIBLE,
903 .cio_start = LOV_EMPTY_IMPOSSIBLE,
904 .cio_end = LOV_EMPTY_IMPOSSIBLE
907 .cio_fini = lov_empty_io_fini
910 .cio_fini = lov_empty_io_fini
913 .cio_submit = lov_empty_io_submit,
914 .cio_commit_async = LOV_EMPTY_IMPOSSIBLE
917 int lov_io_init_raid0(const struct lu_env *env, struct cl_object *obj,
920 struct lov_io *lio = lov_env_io(env);
921 struct lov_object *lov = cl2lov(obj);
923 INIT_LIST_HEAD(&lio->lis_active);
924 io->ci_result = lov_io_slice_init(lio, lov, io);
925 if (io->ci_result == 0) {
926 io->ci_result = lov_io_subio_init(env, lio, io);
927 if (io->ci_result == 0) {
928 cl_io_slice_add(io, &lio->lis_cl, obj, &lov_io_ops);
929 atomic_inc(&lov->lo_active_ios);
932 return io->ci_result;
935 int lov_io_init_empty(const struct lu_env *env, struct cl_object *obj,
938 struct lov_object *lov = cl2lov(obj);
939 struct lov_io *lio = lov_env_io(env);
942 lio->lis_object = lov;
943 switch (io->ci_type) {
952 case CIT_DATA_VERSION:
960 CERROR("Page fault on a file without stripes: " DFID "\n",
961 PFID(lu_object_fid(&obj->co_lu)));
965 cl_io_slice_add(io, &lio->lis_cl, obj, &lov_empty_io_ops);
966 atomic_inc(&lov->lo_active_ios);
969 io->ci_result = result < 0 ? result : 0;
973 int lov_io_init_released(const struct lu_env *env, struct cl_object *obj,
976 struct lov_object *lov = cl2lov(obj);
977 struct lov_io *lio = lov_env_io(env);
980 LASSERT(lov->lo_lsm);
981 lio->lis_object = lov;
983 switch (io->ci_type) {
985 LASSERTF(0, "invalid type %d\n", io->ci_type);
986 result = -EOPNOTSUPP;
990 case CIT_DATA_VERSION:
994 /* the truncate to 0 is managed by MDT:
995 * - in open, for open O_TRUNC
996 * - in setattr, for truncate
998 /* the truncate is for size > 0 so triggers a restore */
999 if (cl_io_is_trunc(io)) {
1000 io->ci_restore_needed = 1;
1009 io->ci_restore_needed = 1;
1014 cl_io_slice_add(io, &lio->lis_cl, obj, &lov_empty_io_ops);
1015 atomic_inc(&lov->lo_active_ios);
1018 io->ci_result = result < 0 ? result : 0;