4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2015, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_OSC
35 #include <linux/libcfs/libcfs.h>
37 #include <lustre_dlm.h>
38 #include <lustre_net.h>
39 #include <uapi/linux/lustre/lustre_idl.h>
40 #include <obd_cksum.h>
42 #include <lustre_ha.h>
43 #include <lprocfs_status.h>
44 #include <uapi/linux/lustre/lustre_ioctl.h>
45 #include <lustre_debug.h>
46 #include <lustre_obdo.h>
47 #include <uapi/linux/lustre/lustre_param.h>
48 #include <lustre_fid.h>
49 #include <obd_class.h>
51 #include "osc_internal.h"
52 #include "osc_cl_internal.h"
54 atomic_t osc_pool_req_count;
55 unsigned int osc_reqpool_maxreqcount;
56 struct ptlrpc_request_pool *osc_rq_pool;
58 /* max memory used for request pool, unit is MB */
59 static unsigned int osc_reqpool_mem_max = 5;
60 module_param(osc_reqpool_mem_max, uint, 0444);
62 struct osc_brw_async_args {
68 struct brw_page **aa_ppga;
69 struct client_obd *aa_cli;
70 struct list_head aa_oaps;
71 struct list_head aa_exts;
74 struct osc_async_args {
75 struct obd_info *aa_oi;
78 struct osc_setattr_args {
80 obd_enqueue_update_f sa_upcall;
84 struct osc_fsync_args {
85 struct osc_object *fa_obj;
87 obd_enqueue_update_f fa_upcall;
91 struct osc_enqueue_args {
92 struct obd_export *oa_exp;
93 enum ldlm_type oa_type;
94 enum ldlm_mode oa_mode;
96 osc_enqueue_upcall_f oa_upcall;
98 struct ost_lvb *oa_lvb;
99 struct lustre_handle oa_lockh;
100 unsigned int oa_agl:1;
103 static void osc_release_ppga(struct brw_page **ppga, u32 count);
104 static int brw_interpret(const struct lu_env *env,
105 struct ptlrpc_request *req, void *data, int rc);
107 static inline void osc_pack_req_body(struct ptlrpc_request *req,
110 struct ost_body *body;
112 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
115 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
118 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
121 struct ptlrpc_request *req;
122 struct ost_body *body;
125 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
129 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
131 ptlrpc_request_free(req);
135 osc_pack_req_body(req, oa);
137 ptlrpc_request_set_replen(req);
139 rc = ptlrpc_queue_wait(req);
143 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
149 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
150 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa,
153 oa->o_blksize = cli_brw_size(exp->exp_obd);
154 oa->o_valid |= OBD_MD_FLBLKSZ;
157 ptlrpc_req_finished(req);
161 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
164 struct ptlrpc_request *req;
165 struct ost_body *body;
168 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
170 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
174 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
176 ptlrpc_request_free(req);
180 osc_pack_req_body(req, oa);
182 ptlrpc_request_set_replen(req);
184 rc = ptlrpc_queue_wait(req);
188 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
194 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa,
198 ptlrpc_req_finished(req);
202 static int osc_setattr_interpret(const struct lu_env *env,
203 struct ptlrpc_request *req,
204 struct osc_setattr_args *sa, int rc)
206 struct ost_body *body;
211 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
217 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
220 rc = sa->sa_upcall(sa->sa_cookie, rc);
224 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
225 obd_enqueue_update_f upcall, void *cookie,
226 struct ptlrpc_request_set *rqset)
228 struct ptlrpc_request *req;
229 struct osc_setattr_args *sa;
232 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
236 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
238 ptlrpc_request_free(req);
242 osc_pack_req_body(req, oa);
244 ptlrpc_request_set_replen(req);
246 /* do mds to ost setattr asynchronously */
248 /* Do not wait for response. */
249 ptlrpcd_add_req(req);
251 req->rq_interpret_reply =
252 (ptlrpc_interpterer_t)osc_setattr_interpret;
254 BUILD_BUG_ON(sizeof(*sa) > sizeof(req->rq_async_args));
255 sa = ptlrpc_req_async_args(req);
257 sa->sa_upcall = upcall;
258 sa->sa_cookie = cookie;
260 if (rqset == PTLRPCD_SET)
261 ptlrpcd_add_req(req);
263 ptlrpc_set_add_req(rqset, req);
269 static int osc_create(const struct lu_env *env, struct obd_export *exp,
272 struct ptlrpc_request *req;
273 struct ost_body *body;
277 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
278 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
280 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
286 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
288 ptlrpc_request_free(req);
292 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
295 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
297 ptlrpc_request_set_replen(req);
299 rc = ptlrpc_queue_wait(req);
303 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
309 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
310 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
312 oa->o_blksize = cli_brw_size(exp->exp_obd);
313 oa->o_valid |= OBD_MD_FLBLKSZ;
315 CDEBUG(D_HA, "transno: %lld\n",
316 lustre_msg_get_transno(req->rq_repmsg));
318 ptlrpc_req_finished(req);
323 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
324 obd_enqueue_update_f upcall, void *cookie,
325 struct ptlrpc_request_set *rqset)
327 struct ptlrpc_request *req;
328 struct osc_setattr_args *sa;
329 struct ost_body *body;
332 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
336 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
338 ptlrpc_request_free(req);
341 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
342 ptlrpc_at_set_req_timeout(req);
344 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
346 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
349 ptlrpc_request_set_replen(req);
351 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
352 BUILD_BUG_ON(sizeof(*sa) > sizeof(req->rq_async_args));
353 sa = ptlrpc_req_async_args(req);
355 sa->sa_upcall = upcall;
356 sa->sa_cookie = cookie;
357 if (rqset == PTLRPCD_SET)
358 ptlrpcd_add_req(req);
360 ptlrpc_set_add_req(rqset, req);
365 static int osc_sync_interpret(const struct lu_env *env,
366 struct ptlrpc_request *req,
369 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
370 struct osc_fsync_args *fa = arg;
371 unsigned long valid = 0;
372 struct ost_body *body;
373 struct cl_object *obj;
378 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
380 CERROR("can't unpack ost_body\n");
385 *fa->fa_oa = body->oa;
386 obj = osc2cl(fa->fa_obj);
388 /* Update osc object's blocks attribute */
389 cl_object_attr_lock(obj);
390 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
391 attr->cat_blocks = body->oa.o_blocks;
396 cl_object_attr_update(env, obj, attr, valid);
397 cl_object_attr_unlock(obj);
400 rc = fa->fa_upcall(fa->fa_cookie, rc);
404 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
405 obd_enqueue_update_f upcall, void *cookie,
406 struct ptlrpc_request_set *rqset)
408 struct obd_export *exp = osc_export(obj);
409 struct ptlrpc_request *req;
410 struct ost_body *body;
411 struct osc_fsync_args *fa;
414 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
418 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
420 ptlrpc_request_free(req);
424 /* overload the size and blocks fields in the oa with start/end */
425 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
427 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
430 ptlrpc_request_set_replen(req);
431 req->rq_interpret_reply = osc_sync_interpret;
433 BUILD_BUG_ON(sizeof(*fa) > sizeof(req->rq_async_args));
434 fa = ptlrpc_req_async_args(req);
437 fa->fa_upcall = upcall;
438 fa->fa_cookie = cookie;
440 if (rqset == PTLRPCD_SET)
441 ptlrpcd_add_req(req);
443 ptlrpc_set_add_req(rqset, req);
448 /* Find and cancel locally locks matched by @mode in the resource found by
449 * @objid. Found locks are added into @cancel list. Returns the amount of
450 * locks added to @cancels list.
452 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
453 struct list_head *cancels,
454 enum ldlm_mode mode, __u64 lock_flags)
456 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
457 struct ldlm_res_id res_id;
458 struct ldlm_resource *res;
461 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
462 * export) but disabled through procfs (flag in NS).
464 * This distinguishes from a case when ELC is not supported originally,
465 * when we still want to cancel locks in advance and just cancel them
466 * locally, without sending any RPC.
468 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
471 ostid_build_res_name(&oa->o_oi, &res_id);
472 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
476 LDLM_RESOURCE_ADDREF(res);
477 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
478 lock_flags, 0, NULL);
479 LDLM_RESOURCE_DELREF(res);
480 ldlm_resource_putref(res);
484 static int osc_destroy_interpret(const struct lu_env *env,
485 struct ptlrpc_request *req, void *data,
488 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
490 atomic_dec(&cli->cl_destroy_in_flight);
491 wake_up(&cli->cl_destroy_waitq);
495 static int osc_can_send_destroy(struct client_obd *cli)
497 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
498 cli->cl_max_rpcs_in_flight) {
499 /* The destroy request can be sent */
502 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
503 cli->cl_max_rpcs_in_flight) {
505 * The counter has been modified between the two atomic
508 wake_up(&cli->cl_destroy_waitq);
513 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
516 struct client_obd *cli = &exp->exp_obd->u.cli;
517 struct ptlrpc_request *req;
518 struct ost_body *body;
523 CDEBUG(D_INFO, "oa NULL\n");
527 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
528 LDLM_FL_DISCARD_DATA);
530 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
532 ldlm_lock_list_put(&cancels, l_bl_ast, count);
536 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
539 ptlrpc_request_free(req);
543 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
544 ptlrpc_at_set_req_timeout(req);
546 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
548 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
550 ptlrpc_request_set_replen(req);
552 req->rq_interpret_reply = osc_destroy_interpret;
553 if (!osc_can_send_destroy(cli)) {
554 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
557 * Wait until the number of on-going destroy RPCs drops
558 * under max_rpc_in_flight
560 l_wait_event_exclusive(cli->cl_destroy_waitq,
561 osc_can_send_destroy(cli), &lwi);
564 /* Do not wait for response */
565 ptlrpcd_add_req(req);
569 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
572 u32 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
574 LASSERT(!(oa->o_valid & bits));
577 spin_lock(&cli->cl_loi_list_lock);
578 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
579 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
580 cli->cl_dirty_max_pages)) {
581 CERROR("dirty %lu - %lu > dirty_max %lu\n",
582 cli->cl_dirty_pages, cli->cl_dirty_transit,
583 cli->cl_dirty_max_pages);
585 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
586 atomic_long_read(&obd_dirty_transit_pages) >
587 (long)(obd_max_dirty_pages + 1))) {
588 /* The atomic_read() allowing the atomic_inc() are
589 * not covered by a lock thus they may safely race and trip
590 * this CERROR() unless we add in a small fudge factor (+1).
592 CERROR("%s: dirty %ld + %ld > system dirty_max %ld\n",
593 cli_name(cli), atomic_long_read(&obd_dirty_pages),
594 atomic_long_read(&obd_dirty_transit_pages),
595 obd_max_dirty_pages);
597 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
599 CERROR("dirty %lu - dirty_max %lu too big???\n",
600 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
603 unsigned long max_in_flight;
605 max_in_flight = (cli->cl_max_pages_per_rpc << PAGE_SHIFT) *
606 (cli->cl_max_rpcs_in_flight + 1);
607 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_SHIFT,
610 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
611 oa->o_dropped = cli->cl_lost_grant;
612 cli->cl_lost_grant = 0;
613 spin_unlock(&cli->cl_loi_list_lock);
614 CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
615 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
618 void osc_update_next_shrink(struct client_obd *cli)
620 cli->cl_next_shrink_grant =
621 cfs_time_shift(cli->cl_grant_shrink_interval);
622 CDEBUG(D_CACHE, "next time %ld to shrink grant\n",
623 cli->cl_next_shrink_grant);
626 static void __osc_update_grant(struct client_obd *cli, u64 grant)
628 spin_lock(&cli->cl_loi_list_lock);
629 cli->cl_avail_grant += grant;
630 spin_unlock(&cli->cl_loi_list_lock);
633 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
635 if (body->oa.o_valid & OBD_MD_FLGRANT) {
636 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
637 __osc_update_grant(cli, body->oa.o_grant);
641 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
642 u32 keylen, void *key, u32 vallen,
643 void *val, struct ptlrpc_request_set *set);
645 static int osc_shrink_grant_interpret(const struct lu_env *env,
646 struct ptlrpc_request *req,
649 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
650 struct obdo *oa = ((struct osc_brw_async_args *)aa)->aa_oa;
651 struct ost_body *body;
654 __osc_update_grant(cli, oa->o_grant);
658 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
660 osc_update_grant(cli, body);
662 kmem_cache_free(obdo_cachep, oa);
666 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
668 spin_lock(&cli->cl_loi_list_lock);
669 oa->o_grant = cli->cl_avail_grant / 4;
670 cli->cl_avail_grant -= oa->o_grant;
671 spin_unlock(&cli->cl_loi_list_lock);
672 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
673 oa->o_valid |= OBD_MD_FLFLAGS;
676 oa->o_flags |= OBD_FL_SHRINK_GRANT;
677 osc_update_next_shrink(cli);
680 /* Shrink the current grant, either from some large amount to enough for a
681 * full set of in-flight RPCs, or if we have already shrunk to that limit
682 * then to enough for a single RPC. This avoids keeping more grant than
683 * needed, and avoids shrinking the grant piecemeal.
685 static int osc_shrink_grant(struct client_obd *cli)
687 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
688 (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
690 spin_lock(&cli->cl_loi_list_lock);
691 if (cli->cl_avail_grant <= target_bytes)
692 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
693 spin_unlock(&cli->cl_loi_list_lock);
695 return osc_shrink_grant_to_target(cli, target_bytes);
698 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
701 struct ost_body *body;
703 spin_lock(&cli->cl_loi_list_lock);
704 /* Don't shrink if we are already above or below the desired limit
705 * We don't want to shrink below a single RPC, as that will negatively
706 * impact block allocation and long-term performance.
708 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
709 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
711 if (target_bytes >= cli->cl_avail_grant) {
712 spin_unlock(&cli->cl_loi_list_lock);
715 spin_unlock(&cli->cl_loi_list_lock);
717 body = kzalloc(sizeof(*body), GFP_NOFS);
721 osc_announce_cached(cli, &body->oa, 0);
723 spin_lock(&cli->cl_loi_list_lock);
724 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
725 cli->cl_avail_grant = target_bytes;
726 spin_unlock(&cli->cl_loi_list_lock);
727 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
728 body->oa.o_valid |= OBD_MD_FLFLAGS;
729 body->oa.o_flags = 0;
731 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
732 osc_update_next_shrink(cli);
734 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
735 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
736 sizeof(*body), body, NULL);
738 __osc_update_grant(cli, body->oa.o_grant);
743 static int osc_should_shrink_grant(struct client_obd *client)
745 unsigned long time = cfs_time_current();
746 unsigned long next_shrink = client->cl_next_shrink_grant;
748 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
749 OBD_CONNECT_GRANT_SHRINK) == 0)
752 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
753 /* Get the current RPC size directly, instead of going via:
754 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
755 * Keep comment here so that it can be found by searching.
757 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
759 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
760 client->cl_avail_grant > brw_size)
763 osc_update_next_shrink(client);
768 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
770 struct client_obd *client;
772 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
773 if (osc_should_shrink_grant(client))
774 osc_shrink_grant(client);
779 static int osc_add_shrink_grant(struct client_obd *client)
783 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
785 osc_grant_shrink_grant_cb, NULL,
786 &client->cl_grant_shrink_list);
788 CERROR("add grant client %s error %d\n", cli_name(client), rc);
791 CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
792 osc_update_next_shrink(client);
796 static int osc_del_shrink_grant(struct client_obd *client)
798 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
802 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
805 * ocd_grant is the total grant amount we're expect to hold: if we've
806 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
807 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
810 * race is tolerable here: if we're evicted, but imp_state already
811 * left EVICTED state, then cl_dirty_pages must be 0 already.
813 spin_lock(&cli->cl_loi_list_lock);
814 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
815 cli->cl_avail_grant = ocd->ocd_grant;
817 cli->cl_avail_grant = ocd->ocd_grant -
818 (cli->cl_dirty_pages << PAGE_SHIFT);
820 /* determine the appropriate chunk size used by osc_extent. */
821 cli->cl_chunkbits = max_t(int, PAGE_SHIFT, ocd->ocd_blocksize);
822 spin_unlock(&cli->cl_loi_list_lock);
824 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld chunk bits: %d\n",
825 cli_name(cli), cli->cl_avail_grant, cli->cl_lost_grant,
828 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
829 list_empty(&cli->cl_grant_shrink_list))
830 osc_add_shrink_grant(cli);
833 /* We assume that the reason this OSC got a short read is because it read
834 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
835 * via the LOV, and it _knows_ it's reading inside the file, it's just that
836 * this stripe never got written at or beyond this stripe offset yet.
838 static void handle_short_read(int nob_read, u32 page_count,
839 struct brw_page **pga)
844 /* skip bytes read OK */
845 while (nob_read > 0) {
846 LASSERT(page_count > 0);
848 if (pga[i]->count > nob_read) {
849 /* EOF inside this page */
850 ptr = kmap(pga[i]->pg) +
851 (pga[i]->off & ~PAGE_MASK);
852 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
859 nob_read -= pga[i]->count;
864 /* zero remaining pages */
865 while (page_count-- > 0) {
866 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
867 memset(ptr, 0, pga[i]->count);
873 static int check_write_rcs(struct ptlrpc_request *req,
874 int requested_nob, int niocount,
875 u32 page_count, struct brw_page **pga)
880 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
881 sizeof(*remote_rcs) *
884 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
888 /* return error if any niobuf was in error */
889 for (i = 0; i < niocount; i++) {
890 if ((int)remote_rcs[i] < 0)
891 return remote_rcs[i];
893 if (remote_rcs[i] != 0) {
894 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
895 i, remote_rcs[i], req);
900 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
901 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
902 req->rq_bulk->bd_nob_transferred, requested_nob);
909 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
911 if (p1->flag != p2->flag) {
912 unsigned int mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
913 OBD_BRW_SYNC | OBD_BRW_ASYNC |
914 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
916 /* warn if we try to combine flags that we don't know to be
919 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
920 CWARN("Saw flags 0x%x and 0x%x in the same brw, please report this at http://bugs.whamcloud.com/\n",
926 return (p1->off + p1->count == p2->off);
929 static u32 osc_checksum_bulk(int nob, u32 pg_count,
930 struct brw_page **pga, int opc,
931 enum cksum_type cksum_type)
935 struct cfs_crypto_hash_desc *hdesc;
936 unsigned int bufsize;
937 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
939 LASSERT(pg_count > 0);
941 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
943 CERROR("Unable to initialize checksum hash %s\n",
944 cfs_crypto_hash_name(cfs_alg));
945 return PTR_ERR(hdesc);
948 while (nob > 0 && pg_count > 0) {
949 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
951 /* corrupt the data before we compute the checksum, to
952 * simulate an OST->client data error
954 if (i == 0 && opc == OST_READ &&
955 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
956 unsigned char *ptr = kmap(pga[i]->pg);
957 int off = pga[i]->off & ~PAGE_MASK;
959 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
962 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
963 pga[i]->off & ~PAGE_MASK,
966 "page %p map %p index %lu flags %lx count %u priv %0lx: off %d\n",
967 pga[i]->pg, pga[i]->pg->mapping, pga[i]->pg->index,
968 (long)pga[i]->pg->flags, page_count(pga[i]->pg),
969 page_private(pga[i]->pg),
970 (int)(pga[i]->off & ~PAGE_MASK));
972 nob -= pga[i]->count;
977 bufsize = sizeof(cksum);
978 cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
980 /* For sending we only compute the wrong checksum instead
981 * of corrupting the data so it is still correct on a redo
983 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
989 static int osc_brw_prep_request(int cmd, struct client_obd *cli,
990 struct obdo *oa, u32 page_count,
991 struct brw_page **pga,
992 struct ptlrpc_request **reqp,
996 struct ptlrpc_request *req;
997 struct ptlrpc_bulk_desc *desc;
998 struct ost_body *body;
999 struct obd_ioobj *ioobj;
1000 struct niobuf_remote *niobuf;
1001 int niocount, i, requested_nob, opc, rc;
1002 struct osc_brw_async_args *aa;
1003 struct req_capsule *pill;
1004 struct brw_page *pg_prev;
1006 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1007 return -ENOMEM; /* Recoverable */
1008 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1009 return -EINVAL; /* Fatal */
1011 if ((cmd & OBD_BRW_WRITE) != 0) {
1013 req = ptlrpc_request_alloc_pool(cli->cl_import,
1015 &RQF_OST_BRW_WRITE);
1018 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1023 for (niocount = i = 1; i < page_count; i++) {
1024 if (!can_merge_pages(pga[i - 1], pga[i]))
1028 pill = &req->rq_pill;
1029 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1031 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1032 niocount * sizeof(*niobuf));
1034 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1036 ptlrpc_request_free(req);
1039 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1040 ptlrpc_at_set_req_timeout(req);
1041 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1044 req->rq_no_retry_einprogress = 1;
1046 desc = ptlrpc_prep_bulk_imp(req, page_count,
1047 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1048 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1049 PTLRPC_BULK_PUT_SINK) | PTLRPC_BULK_BUF_KIOV, OST_BULK_PORTAL,
1050 &ptlrpc_bulk_kiov_pin_ops);
1056 /* NB request now owns desc and will free it when it gets freed */
1058 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1059 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1060 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1061 LASSERT(body && ioobj && niobuf);
1063 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1065 obdo_to_ioobj(oa, ioobj);
1066 ioobj->ioo_bufcnt = niocount;
1067 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1068 * that might be send for this request. The actual number is decided
1069 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1070 * "max - 1" for old client compatibility sending "0", and also so the
1071 * the actual maximum is a power-of-two number, not one less. LU-1431
1073 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1074 LASSERT(page_count > 0);
1076 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1077 struct brw_page *pg = pga[i];
1078 int poff = pg->off & ~PAGE_MASK;
1080 LASSERT(pg->count > 0);
1081 /* make sure there is no gap in the middle of page array */
1082 LASSERTF(page_count == 1 ||
1083 (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1084 ergo(i > 0 && i < page_count - 1,
1085 poff == 0 && pg->count == PAGE_SIZE) &&
1086 ergo(i == page_count - 1, poff == 0)),
1087 "i: %d/%d pg: %p off: %llu, count: %u\n",
1088 i, page_count, pg, pg->off, pg->count);
1089 LASSERTF(i == 0 || pg->off > pg_prev->off,
1090 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu prev_pg %p [pri %lu ind %lu] off %llu\n",
1092 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1093 pg_prev->pg, page_private(pg_prev->pg),
1094 pg_prev->pg->index, pg_prev->off);
1095 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1096 (pg->flag & OBD_BRW_SRVLOCK));
1098 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1099 requested_nob += pg->count;
1101 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1103 niobuf->rnb_len += pg->count;
1105 niobuf->rnb_offset = pg->off;
1106 niobuf->rnb_len = pg->count;
1107 niobuf->rnb_flags = pg->flag;
1112 LASSERTF((void *)(niobuf - niocount) ==
1113 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1114 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1115 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1117 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1119 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1120 body->oa.o_valid |= OBD_MD_FLFLAGS;
1121 body->oa.o_flags = 0;
1123 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1126 if (osc_should_shrink_grant(cli))
1127 osc_shrink_grant_local(cli, &body->oa);
1129 /* size[REQ_REC_OFF] still sizeof (*body) */
1130 if (opc == OST_WRITE) {
1131 if (cli->cl_checksum &&
1132 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1133 /* store cl_cksum_type in a local variable since
1134 * it can be changed via lprocfs
1136 enum cksum_type cksum_type = cli->cl_cksum_type;
1138 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1139 oa->o_flags &= OBD_FL_LOCAL_MASK;
1140 body->oa.o_flags = 0;
1142 body->oa.o_flags |= cksum_type_pack(cksum_type);
1143 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1144 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1148 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1150 /* save this in 'oa', too, for later checking */
1151 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1152 oa->o_flags |= cksum_type_pack(cksum_type);
1154 /* clear out the checksum flag, in case this is a
1155 * resend but cl_checksum is no longer set. b=11238
1157 oa->o_valid &= ~OBD_MD_FLCKSUM;
1159 oa->o_cksum = body->oa.o_cksum;
1160 /* 1 RC per niobuf */
1161 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1162 sizeof(__u32) * niocount);
1164 if (cli->cl_checksum &&
1165 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1166 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1167 body->oa.o_flags = 0;
1168 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1169 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1172 ptlrpc_request_set_replen(req);
1174 BUILD_BUG_ON(sizeof(*aa) > sizeof(req->rq_async_args));
1175 aa = ptlrpc_req_async_args(req);
1177 aa->aa_requested_nob = requested_nob;
1178 aa->aa_nio_count = niocount;
1179 aa->aa_page_count = page_count;
1183 INIT_LIST_HEAD(&aa->aa_oaps);
1186 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1187 CDEBUG(D_RPCTRACE, "brw rpc %p - object " DOSTID " offset %lld<>%lld\n",
1188 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1189 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1194 ptlrpc_req_finished(req);
1198 static int check_write_checksum(struct obdo *oa,
1199 const struct lnet_process_id *peer,
1200 __u32 client_cksum, __u32 server_cksum, int nob,
1201 u32 page_count, struct brw_page **pga,
1202 enum cksum_type client_cksum_type)
1206 enum cksum_type cksum_type;
1208 if (server_cksum == client_cksum) {
1209 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1213 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1215 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1218 if (cksum_type != client_cksum_type)
1219 msg = "the server did not use the checksum type specified in the original request - likely a protocol problem"
1221 else if (new_cksum == server_cksum)
1222 msg = "changed on the client after we checksummed it - likely false positive due to mmap IO (bug 11742)"
1224 else if (new_cksum == client_cksum)
1225 msg = "changed in transit before arrival at OST";
1227 msg = "changed in transit AND doesn't match the original - likely false positive due to mmap IO (bug 11742)"
1230 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode " DFID " object " DOSTID " extent [%llu-%llu]\n",
1231 msg, libcfs_nid2str(peer->nid),
1232 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1233 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1234 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1235 POSTID(&oa->o_oi), pga[0]->off,
1236 pga[page_count - 1]->off +
1237 pga[page_count - 1]->count - 1);
1238 CERROR("original client csum %x (type %x), server csum %x (type %x), client csum now %x\n",
1239 client_cksum, client_cksum_type,
1240 server_cksum, cksum_type, new_cksum);
1244 /* Note rc enters this function as number of bytes transferred */
1245 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1247 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1248 const struct lnet_process_id *peer =
1249 &req->rq_import->imp_connection->c_peer;
1250 struct client_obd *cli = aa->aa_cli;
1251 struct ost_body *body;
1252 __u32 client_cksum = 0;
1254 if (rc < 0 && rc != -EDQUOT) {
1255 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1259 LASSERTF(req->rq_repmsg, "rc = %d\n", rc);
1260 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1262 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1266 /* set/clear over quota flag for a uid/gid */
1267 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1268 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1269 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1271 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %#llx, flags %x\n",
1272 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1274 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1277 osc_update_grant(cli, body);
1282 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1283 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1285 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1287 CERROR("Unexpected +ve rc %d\n", rc);
1290 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1292 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1295 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1296 check_write_checksum(&body->oa, peer, client_cksum,
1297 body->oa.o_cksum, aa->aa_requested_nob,
1298 aa->aa_page_count, aa->aa_ppga,
1299 cksum_type_unpack(aa->aa_oa->o_flags)))
1302 rc = check_write_rcs(req, aa->aa_requested_nob,
1304 aa->aa_page_count, aa->aa_ppga);
1308 /* The rest of this function executes only for OST_READs */
1310 /* if unwrap_bulk failed, return -EAGAIN to retry */
1311 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1317 if (rc > aa->aa_requested_nob) {
1318 CERROR("Unexpected rc %d (%d requested)\n", rc,
1319 aa->aa_requested_nob);
1323 if (rc != req->rq_bulk->bd_nob_transferred) {
1324 CERROR("Unexpected rc %d (%d transferred)\n",
1325 rc, req->rq_bulk->bd_nob_transferred);
1329 if (rc < aa->aa_requested_nob)
1330 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1332 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1333 static int cksum_counter;
1334 __u32 server_cksum = body->oa.o_cksum;
1337 enum cksum_type cksum_type;
1339 cksum_type = cksum_type_unpack(body->oa.o_valid &
1341 body->oa.o_flags : 0);
1342 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1343 aa->aa_ppga, OST_READ,
1346 if (peer->nid != req->rq_bulk->bd_sender) {
1348 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1351 if (server_cksum != client_cksum) {
1352 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from %s%s%s inode " DFID " object " DOSTID " extent [%llu-%llu]\n",
1353 req->rq_import->imp_obd->obd_name,
1354 libcfs_nid2str(peer->nid),
1356 body->oa.o_valid & OBD_MD_FLFID ?
1357 body->oa.o_parent_seq : (__u64)0,
1358 body->oa.o_valid & OBD_MD_FLFID ?
1359 body->oa.o_parent_oid : 0,
1360 body->oa.o_valid & OBD_MD_FLFID ?
1361 body->oa.o_parent_ver : 0,
1362 POSTID(&body->oa.o_oi),
1363 aa->aa_ppga[0]->off,
1364 aa->aa_ppga[aa->aa_page_count-1]->off +
1365 aa->aa_ppga[aa->aa_page_count-1]->count -
1367 CERROR("client %x, server %x, cksum_type %x\n",
1368 client_cksum, server_cksum, cksum_type);
1370 aa->aa_oa->o_cksum = client_cksum;
1374 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1377 } else if (unlikely(client_cksum)) {
1378 static int cksum_missed;
1381 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1382 CERROR("Checksum %u requested from %s but not sent\n",
1383 cksum_missed, libcfs_nid2str(peer->nid));
1389 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1390 aa->aa_oa, &body->oa);
1395 static int osc_brw_redo_request(struct ptlrpc_request *request,
1396 struct osc_brw_async_args *aa, int rc)
1398 struct ptlrpc_request *new_req;
1399 struct osc_brw_async_args *new_aa;
1400 struct osc_async_page *oap;
1402 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1403 "redo for recoverable error %d", rc);
1405 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1406 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1407 aa->aa_cli, aa->aa_oa,
1408 aa->aa_page_count, aa->aa_ppga,
1413 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1414 if (oap->oap_request) {
1415 LASSERTF(request == oap->oap_request,
1416 "request %p != oap_request %p\n",
1417 request, oap->oap_request);
1418 if (oap->oap_interrupted) {
1419 ptlrpc_req_finished(new_req);
1424 /* New request takes over pga and oaps from old request.
1425 * Note that copying a list_head doesn't work, need to move it...
1428 new_req->rq_interpret_reply = request->rq_interpret_reply;
1429 new_req->rq_async_args = request->rq_async_args;
1430 new_req->rq_commit_cb = request->rq_commit_cb;
1431 /* cap resend delay to the current request timeout, this is similar to
1432 * what ptlrpc does (see after_reply())
1434 if (aa->aa_resends > new_req->rq_timeout)
1435 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1437 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1438 new_req->rq_generation_set = 1;
1439 new_req->rq_import_generation = request->rq_import_generation;
1441 new_aa = ptlrpc_req_async_args(new_req);
1443 INIT_LIST_HEAD(&new_aa->aa_oaps);
1444 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1445 INIT_LIST_HEAD(&new_aa->aa_exts);
1446 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1447 new_aa->aa_resends = aa->aa_resends;
1449 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1450 if (oap->oap_request) {
1451 ptlrpc_req_finished(oap->oap_request);
1452 oap->oap_request = ptlrpc_request_addref(new_req);
1456 /* XXX: This code will run into problem if we're going to support
1457 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1458 * and wait for all of them to be finished. We should inherit request
1459 * set from old request.
1461 ptlrpcd_add_req(new_req);
1463 DEBUG_REQ(D_INFO, new_req, "new request");
1468 * ugh, we want disk allocation on the target to happen in offset order. we'll
1469 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1470 * fine for our small page arrays and doesn't require allocation. its an
1471 * insertion sort that swaps elements that are strides apart, shrinking the
1472 * stride down until its '1' and the array is sorted.
1474 static void sort_brw_pages(struct brw_page **array, int num)
1477 struct brw_page *tmp;
1481 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1486 for (i = stride ; i < num ; i++) {
1489 while (j >= stride && array[j - stride]->off > tmp->off) {
1490 array[j] = array[j - stride];
1495 } while (stride > 1);
1498 static void osc_release_ppga(struct brw_page **ppga, u32 count)
1504 static int brw_interpret(const struct lu_env *env,
1505 struct ptlrpc_request *req, void *data, int rc)
1507 struct osc_brw_async_args *aa = data;
1508 struct osc_extent *ext;
1509 struct osc_extent *tmp;
1510 struct client_obd *cli = aa->aa_cli;
1512 rc = osc_brw_fini_request(req, rc);
1513 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1514 /* When server return -EINPROGRESS, client should always retry
1515 * regardless of the number of times the bulk was resent already.
1517 if (osc_recoverable_error(rc)) {
1518 if (req->rq_import_generation !=
1519 req->rq_import->imp_generation) {
1520 CDEBUG(D_HA, "%s: resend cross eviction for object: " DOSTID ", rc = %d.\n",
1521 req->rq_import->imp_obd->obd_name,
1522 POSTID(&aa->aa_oa->o_oi), rc);
1523 } else if (rc == -EINPROGRESS ||
1524 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1525 rc = osc_brw_redo_request(req, aa, rc);
1527 CERROR("%s: too many resent retries for object: %llu:%llu, rc = %d.\n",
1528 req->rq_import->imp_obd->obd_name,
1529 POSTID(&aa->aa_oa->o_oi), rc);
1534 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1539 struct obdo *oa = aa->aa_oa;
1540 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1541 unsigned long valid = 0;
1542 struct cl_object *obj;
1543 struct osc_async_page *last;
1545 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1546 obj = osc2cl(last->oap_obj);
1548 cl_object_attr_lock(obj);
1549 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1550 attr->cat_blocks = oa->o_blocks;
1551 valid |= CAT_BLOCKS;
1553 if (oa->o_valid & OBD_MD_FLMTIME) {
1554 attr->cat_mtime = oa->o_mtime;
1557 if (oa->o_valid & OBD_MD_FLATIME) {
1558 attr->cat_atime = oa->o_atime;
1561 if (oa->o_valid & OBD_MD_FLCTIME) {
1562 attr->cat_ctime = oa->o_ctime;
1566 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1567 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1568 loff_t last_off = last->oap_count + last->oap_obj_off +
1571 /* Change file size if this is an out of quota or
1572 * direct IO write and it extends the file size
1574 if (loi->loi_lvb.lvb_size < last_off) {
1575 attr->cat_size = last_off;
1578 /* Extend KMS if it's not a lockless write */
1579 if (loi->loi_kms < last_off &&
1580 oap2osc_page(last)->ops_srvlock == 0) {
1581 attr->cat_kms = last_off;
1587 cl_object_attr_update(env, obj, attr, valid);
1588 cl_object_attr_unlock(obj);
1590 kmem_cache_free(obdo_cachep, aa->aa_oa);
1592 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1593 osc_inc_unstable_pages(req);
1595 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1596 list_del_init(&ext->oe_link);
1597 osc_extent_finish(env, ext, 1, rc);
1599 LASSERT(list_empty(&aa->aa_exts));
1600 LASSERT(list_empty(&aa->aa_oaps));
1602 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1603 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1605 spin_lock(&cli->cl_loi_list_lock);
1606 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1607 * is called so we know whether to go to sync BRWs or wait for more
1610 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1611 cli->cl_w_in_flight--;
1613 cli->cl_r_in_flight--;
1614 osc_wake_cache_waiters(cli);
1615 spin_unlock(&cli->cl_loi_list_lock);
1617 osc_io_unplug(env, cli, NULL);
1621 static void brw_commit(struct ptlrpc_request *req)
1624 * If osc_inc_unstable_pages (via osc_extent_finish) races with
1625 * this called via the rq_commit_cb, I need to ensure
1626 * osc_dec_unstable_pages is still called. Otherwise unstable
1627 * pages may be leaked.
1629 spin_lock(&req->rq_lock);
1630 if (unlikely(req->rq_unstable)) {
1631 req->rq_unstable = 0;
1632 spin_unlock(&req->rq_lock);
1633 osc_dec_unstable_pages(req);
1635 req->rq_committed = 1;
1636 spin_unlock(&req->rq_lock);
1641 * Build an RPC by the list of extent @ext_list. The caller must ensure
1642 * that the total pages in this list are NOT over max pages per RPC.
1643 * Extents in the list must be in OES_RPC state.
1645 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1646 struct list_head *ext_list, int cmd)
1648 struct ptlrpc_request *req = NULL;
1649 struct osc_extent *ext;
1650 struct brw_page **pga = NULL;
1651 struct osc_brw_async_args *aa = NULL;
1652 struct obdo *oa = NULL;
1653 struct osc_async_page *oap;
1654 struct osc_object *obj = NULL;
1655 struct cl_req_attr *crattr = NULL;
1656 u64 starting_offset = OBD_OBJECT_EOF;
1657 u64 ending_offset = 0;
1661 bool soft_sync = false;
1662 bool interrupted = false;
1665 struct ost_body *body;
1666 LIST_HEAD(rpc_list);
1668 LASSERT(!list_empty(ext_list));
1670 /* add pages into rpc_list to build BRW rpc */
1671 list_for_each_entry(ext, ext_list, oe_link) {
1672 LASSERT(ext->oe_state == OES_RPC);
1673 mem_tight |= ext->oe_memalloc;
1674 page_count += ext->oe_nr_pages;
1679 soft_sync = osc_over_unstable_soft_limit(cli);
1681 mpflag = cfs_memory_pressure_get_and_set();
1683 pga = kcalloc(page_count, sizeof(*pga), GFP_NOFS);
1689 oa = kmem_cache_zalloc(obdo_cachep, GFP_NOFS);
1696 list_for_each_entry(ext, ext_list, oe_link) {
1697 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1699 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1701 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1702 pga[i] = &oap->oap_brw_page;
1703 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1706 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1707 if (starting_offset == OBD_OBJECT_EOF ||
1708 starting_offset > oap->oap_obj_off)
1709 starting_offset = oap->oap_obj_off;
1711 LASSERT(!oap->oap_page_off);
1712 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1713 ending_offset = oap->oap_obj_off +
1716 LASSERT(oap->oap_page_off + oap->oap_count ==
1718 if (oap->oap_interrupted)
1723 /* first page in the list */
1724 oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1726 crattr = &osc_env_info(env)->oti_req_attr;
1727 memset(crattr, 0, sizeof(*crattr));
1728 crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1729 crattr->cra_flags = ~0ULL;
1730 crattr->cra_page = oap2cl_page(oap);
1731 crattr->cra_oa = oa;
1732 cl_req_attr_set(env, osc2cl(obj), crattr);
1734 sort_brw_pages(pga, page_count);
1735 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 1, 0);
1737 CERROR("prep_req failed: %d\n", rc);
1741 req->rq_commit_cb = brw_commit;
1742 req->rq_interpret_reply = brw_interpret;
1744 req->rq_memalloc = mem_tight != 0;
1745 oap->oap_request = ptlrpc_request_addref(req);
1746 if (interrupted && !req->rq_intr)
1747 ptlrpc_mark_interrupted(req);
1749 /* Need to update the timestamps after the request is built in case
1750 * we race with setattr (locally or in queue at OST). If OST gets
1751 * later setattr before earlier BRW (as determined by the request xid),
1752 * the OST will not use BRW timestamps. Sadly, there is no obvious
1753 * way to do this in a single call. bug 10150
1755 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1756 crattr->cra_oa = &body->oa;
1757 crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
1758 cl_req_attr_set(env, osc2cl(obj), crattr);
1759 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1761 BUILD_BUG_ON(sizeof(*aa) > sizeof(req->rq_async_args));
1762 aa = ptlrpc_req_async_args(req);
1763 INIT_LIST_HEAD(&aa->aa_oaps);
1764 list_splice_init(&rpc_list, &aa->aa_oaps);
1765 INIT_LIST_HEAD(&aa->aa_exts);
1766 list_splice_init(ext_list, &aa->aa_exts);
1768 spin_lock(&cli->cl_loi_list_lock);
1769 starting_offset >>= PAGE_SHIFT;
1770 if (cmd == OBD_BRW_READ) {
1771 cli->cl_r_in_flight++;
1772 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1773 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1774 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1775 starting_offset + 1);
1777 cli->cl_w_in_flight++;
1778 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1779 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1780 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1781 starting_offset + 1);
1783 spin_unlock(&cli->cl_loi_list_lock);
1785 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%dw in flight",
1786 page_count, aa, cli->cl_r_in_flight,
1787 cli->cl_w_in_flight);
1788 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
1790 ptlrpcd_add_req(req);
1795 cfs_memory_pressure_restore(mpflag);
1801 kmem_cache_free(obdo_cachep, oa);
1803 /* this should happen rarely and is pretty bad, it makes the
1804 * pending list not follow the dirty order
1806 while (!list_empty(ext_list)) {
1807 ext = list_entry(ext_list->next, struct osc_extent,
1809 list_del_init(&ext->oe_link);
1810 osc_extent_finish(env, ext, 0, rc);
1816 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
1822 lock_res_and_lock(lock);
1824 if (!lock->l_ast_data)
1825 lock->l_ast_data = data;
1826 if (lock->l_ast_data == data)
1829 unlock_res_and_lock(lock);
1834 static int osc_enqueue_fini(struct ptlrpc_request *req,
1835 osc_enqueue_upcall_f upcall, void *cookie,
1836 struct lustre_handle *lockh, enum ldlm_mode mode,
1837 __u64 *flags, int agl, int errcode)
1839 bool intent = *flags & LDLM_FL_HAS_INTENT;
1842 /* The request was created before ldlm_cli_enqueue call. */
1843 if (intent && errcode == ELDLM_LOCK_ABORTED) {
1844 struct ldlm_reply *rep;
1846 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1848 rep->lock_policy_res1 =
1849 ptlrpc_status_ntoh(rep->lock_policy_res1);
1850 if (rep->lock_policy_res1)
1851 errcode = rep->lock_policy_res1;
1853 *flags |= LDLM_FL_LVB_READY;
1854 } else if (errcode == ELDLM_OK) {
1855 *flags |= LDLM_FL_LVB_READY;
1858 /* Call the update callback. */
1859 rc = (*upcall)(cookie, lockh, errcode);
1860 /* release the reference taken in ldlm_cli_enqueue() */
1861 if (errcode == ELDLM_LOCK_MATCHED)
1863 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1864 ldlm_lock_decref(lockh, mode);
1869 static int osc_enqueue_interpret(const struct lu_env *env,
1870 struct ptlrpc_request *req,
1871 struct osc_enqueue_args *aa, int rc)
1873 struct ldlm_lock *lock;
1874 struct lustre_handle *lockh = &aa->oa_lockh;
1875 enum ldlm_mode mode = aa->oa_mode;
1876 struct ost_lvb *lvb = aa->oa_lvb;
1877 __u32 lvb_len = sizeof(*lvb);
1881 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
1884 lock = ldlm_handle2lock(lockh);
1885 LASSERTF(lock, "lockh %llx, req %p, aa %p - client evicted?\n",
1886 lockh->cookie, req, aa);
1888 /* Take an additional reference so that a blocking AST that
1889 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
1890 * to arrive after an upcall has been executed by
1891 * osc_enqueue_fini().
1893 ldlm_lock_addref(lockh, mode);
1895 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
1896 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
1898 /* Let CP AST to grant the lock first. */
1899 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
1902 LASSERT(!aa->oa_lvb);
1903 LASSERT(!aa->oa_flags);
1904 aa->oa_flags = &flags;
1907 /* Complete obtaining the lock procedure. */
1908 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
1909 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
1911 /* Complete osc stuff. */
1912 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
1913 aa->oa_flags, aa->oa_agl, rc);
1915 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
1917 ldlm_lock_decref(lockh, mode);
1918 LDLM_LOCK_PUT(lock);
1922 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
1924 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
1925 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
1926 * other synchronous requests, however keeping some locks and trying to obtain
1927 * others may take a considerable amount of time in a case of ost failure; and
1928 * when other sync requests do not get released lock from a client, the client
1929 * is evicted from the cluster -- such scenaries make the life difficult, so
1930 * release locks just after they are obtained.
1932 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
1933 __u64 *flags, union ldlm_policy_data *policy,
1934 struct ost_lvb *lvb, int kms_valid,
1935 osc_enqueue_upcall_f upcall, void *cookie,
1936 struct ldlm_enqueue_info *einfo,
1937 struct ptlrpc_request_set *rqset, int async, int agl)
1939 struct obd_device *obd = exp->exp_obd;
1940 struct lustre_handle lockh = { 0 };
1941 struct ptlrpc_request *req = NULL;
1942 int intent = *flags & LDLM_FL_HAS_INTENT;
1943 __u64 match_flags = *flags;
1944 enum ldlm_mode mode;
1947 /* Filesystem lock extents are extended to page boundaries so that
1948 * dealing with the page cache is a little smoother.
1950 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
1951 policy->l_extent.end |= ~PAGE_MASK;
1954 * kms is not valid when either object is completely fresh (so that no
1955 * locks are cached), or object was evicted. In the latter case cached
1956 * lock cannot be used, because it would prime inode state with
1957 * potentially stale LVB.
1962 /* Next, search for already existing extent locks that will cover us */
1963 /* If we're trying to read, we also search for an existing PW lock. The
1964 * VFS and page cache already protect us locally, so lots of readers/
1965 * writers can share a single PW lock.
1967 * There are problems with conversion deadlocks, so instead of
1968 * converting a read lock to a write lock, we'll just enqueue a new
1971 * At some point we should cancel the read lock instead of making them
1972 * send us a blocking callback, but there are problems with canceling
1973 * locks out from other users right now, too.
1975 mode = einfo->ei_mode;
1976 if (einfo->ei_mode == LCK_PR)
1979 match_flags |= LDLM_FL_LVB_READY;
1981 match_flags |= LDLM_FL_BLOCK_GRANTED;
1982 mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
1983 einfo->ei_type, policy, mode, &lockh, 0);
1985 struct ldlm_lock *matched;
1987 if (*flags & LDLM_FL_TEST_LOCK)
1990 matched = ldlm_handle2lock(&lockh);
1992 /* AGL enqueues DLM locks speculatively. Therefore if
1993 * it already exists a DLM lock, it wll just inform the
1994 * caller to cancel the AGL process for this stripe.
1996 ldlm_lock_decref(&lockh, mode);
1997 LDLM_LOCK_PUT(matched);
1999 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2000 *flags |= LDLM_FL_LVB_READY;
2001 /* We already have a lock, and it's referenced. */
2002 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2004 ldlm_lock_decref(&lockh, mode);
2005 LDLM_LOCK_PUT(matched);
2008 ldlm_lock_decref(&lockh, mode);
2009 LDLM_LOCK_PUT(matched);
2014 if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2017 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2018 &RQF_LDLM_ENQUEUE_LVB);
2022 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2024 ptlrpc_request_free(req);
2028 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2030 ptlrpc_request_set_replen(req);
2033 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2034 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2036 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2037 sizeof(*lvb), LVB_T_OST, &lockh, async);
2040 struct osc_enqueue_args *aa;
2042 BUILD_BUG_ON(sizeof(*aa) > sizeof(req->rq_async_args));
2043 aa = ptlrpc_req_async_args(req);
2045 aa->oa_mode = einfo->ei_mode;
2046 aa->oa_type = einfo->ei_type;
2047 lustre_handle_copy(&aa->oa_lockh, &lockh);
2048 aa->oa_upcall = upcall;
2049 aa->oa_cookie = cookie;
2052 aa->oa_flags = flags;
2055 /* AGL is essentially to enqueue an DLM lock
2056 * in advance, so we don't care about the
2057 * result of AGL enqueue.
2060 aa->oa_flags = NULL;
2063 req->rq_interpret_reply =
2064 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2065 if (rqset == PTLRPCD_SET)
2066 ptlrpcd_add_req(req);
2068 ptlrpc_set_add_req(rqset, req);
2069 } else if (intent) {
2070 ptlrpc_req_finished(req);
2075 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2078 ptlrpc_req_finished(req);
2083 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2084 enum ldlm_type type, union ldlm_policy_data *policy,
2085 enum ldlm_mode mode, __u64 *flags, void *data,
2086 struct lustre_handle *lockh, int unref)
2088 struct obd_device *obd = exp->exp_obd;
2089 __u64 lflags = *flags;
2092 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2095 /* Filesystem lock extents are extended to page boundaries so that
2096 * dealing with the page cache is a little smoother
2098 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2099 policy->l_extent.end |= ~PAGE_MASK;
2101 /* Next, search for already existing extent locks that will cover us */
2102 /* If we're trying to read, we also search for an existing PW lock. The
2103 * VFS and page cache already protect us locally, so lots of readers/
2104 * writers can share a single PW lock.
2109 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2110 res_id, type, policy, rc, lockh, unref);
2111 if (!rc || lflags & LDLM_FL_TEST_LOCK)
2115 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2118 if (!osc_set_lock_data(lock, data)) {
2119 ldlm_lock_decref(lockh, rc);
2122 LDLM_LOCK_PUT(lock);
2127 static int osc_statfs_interpret(const struct lu_env *env,
2128 struct ptlrpc_request *req,
2129 struct osc_async_args *aa, int rc)
2131 struct obd_statfs *msfs;
2134 /* The request has in fact never been sent
2135 * due to issues at a higher level (LOV).
2136 * Exit immediately since the caller is
2137 * aware of the problem and takes care
2142 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2143 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY)) {
2151 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2157 *aa->aa_oi->oi_osfs = *msfs;
2159 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2163 static int osc_statfs_async(struct obd_export *exp,
2164 struct obd_info *oinfo, __u64 max_age,
2165 struct ptlrpc_request_set *rqset)
2167 struct obd_device *obd = class_exp2obd(exp);
2168 struct ptlrpc_request *req;
2169 struct osc_async_args *aa;
2172 /* We could possibly pass max_age in the request (as an absolute
2173 * timestamp or a "seconds.usec ago") so the target can avoid doing
2174 * extra calls into the filesystem if that isn't necessary (e.g.
2175 * during mount that would help a bit). Having relative timestamps
2176 * is not so great if request processing is slow, while absolute
2177 * timestamps are not ideal because they need time synchronization.
2179 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2183 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2185 ptlrpc_request_free(req);
2188 ptlrpc_request_set_replen(req);
2189 req->rq_request_portal = OST_CREATE_PORTAL;
2190 ptlrpc_at_set_req_timeout(req);
2192 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2193 /* procfs requests not want stat in wait for avoid deadlock */
2194 req->rq_no_resend = 1;
2195 req->rq_no_delay = 1;
2198 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2199 BUILD_BUG_ON(sizeof(*aa) > sizeof(req->rq_async_args));
2200 aa = ptlrpc_req_async_args(req);
2203 ptlrpc_set_add_req(rqset, req);
2207 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2208 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2210 struct obd_device *obd = class_exp2obd(exp);
2211 struct obd_statfs *msfs;
2212 struct ptlrpc_request *req;
2213 struct obd_import *imp = NULL;
2216 /* Since the request might also come from lprocfs, so we need
2217 * sync this with client_disconnect_export Bug15684
2219 down_read(&obd->u.cli.cl_sem);
2220 if (obd->u.cli.cl_import)
2221 imp = class_import_get(obd->u.cli.cl_import);
2222 up_read(&obd->u.cli.cl_sem);
2226 /* We could possibly pass max_age in the request (as an absolute
2227 * timestamp or a "seconds.usec ago") so the target can avoid doing
2228 * extra calls into the filesystem if that isn't necessary (e.g.
2229 * during mount that would help a bit). Having relative timestamps
2230 * is not so great if request processing is slow, while absolute
2231 * timestamps are not ideal because they need time synchronization.
2233 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2235 class_import_put(imp);
2240 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2242 ptlrpc_request_free(req);
2245 ptlrpc_request_set_replen(req);
2246 req->rq_request_portal = OST_CREATE_PORTAL;
2247 ptlrpc_at_set_req_timeout(req);
2249 if (flags & OBD_STATFS_NODELAY) {
2250 /* procfs requests not want stat in wait for avoid deadlock */
2251 req->rq_no_resend = 1;
2252 req->rq_no_delay = 1;
2255 rc = ptlrpc_queue_wait(req);
2259 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2268 ptlrpc_req_finished(req);
2272 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2273 void *karg, void __user *uarg)
2275 struct obd_device *obd = exp->exp_obd;
2276 struct obd_ioctl_data *data = karg;
2279 if (!try_module_get(THIS_MODULE)) {
2280 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2281 module_name(THIS_MODULE));
2285 case OBD_IOC_CLIENT_RECOVER:
2286 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2287 data->ioc_inlbuf1, 0);
2291 case IOC_OSC_SET_ACTIVE:
2292 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2295 case OBD_IOC_PING_TARGET:
2296 err = ptlrpc_obd_ping(obd);
2299 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2300 cmd, current_comm());
2305 module_put(THIS_MODULE);
2309 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2310 u32 keylen, void *key, u32 vallen,
2311 void *val, struct ptlrpc_request_set *set)
2313 struct ptlrpc_request *req;
2314 struct obd_device *obd = exp->exp_obd;
2315 struct obd_import *imp = class_exp2cliimp(exp);
2319 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2321 if (KEY_IS(KEY_CHECKSUM)) {
2322 if (vallen != sizeof(int))
2324 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2328 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2329 sptlrpc_conf_client_adapt(obd);
2333 if (KEY_IS(KEY_FLUSH_CTX)) {
2334 sptlrpc_import_flush_my_ctx(imp);
2338 if (KEY_IS(KEY_CACHE_SET)) {
2339 struct client_obd *cli = &obd->u.cli;
2341 LASSERT(!cli->cl_cache); /* only once */
2342 cli->cl_cache = val;
2343 cl_cache_incref(cli->cl_cache);
2344 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2346 /* add this osc into entity list */
2347 LASSERT(list_empty(&cli->cl_lru_osc));
2348 spin_lock(&cli->cl_cache->ccc_lru_lock);
2349 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2350 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2355 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2356 struct client_obd *cli = &obd->u.cli;
2357 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2358 long target = *(long *)val;
2360 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2365 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2368 /* We pass all other commands directly to OST. Since nobody calls osc
2369 * methods directly and everybody is supposed to go through LOV, we
2370 * assume lov checked invalid values for us.
2371 * The only recognised values so far are evict_by_nid and mds_conn.
2372 * Even if something bad goes through, we'd get a -EINVAL from OST
2376 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2377 &RQF_OST_SET_GRANT_INFO :
2382 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2383 RCL_CLIENT, keylen);
2384 if (!KEY_IS(KEY_GRANT_SHRINK))
2385 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2386 RCL_CLIENT, vallen);
2387 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2389 ptlrpc_request_free(req);
2393 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2394 memcpy(tmp, key, keylen);
2395 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2398 memcpy(tmp, val, vallen);
2400 if (KEY_IS(KEY_GRANT_SHRINK)) {
2401 struct osc_brw_async_args *aa;
2404 BUILD_BUG_ON(sizeof(*aa) > sizeof(req->rq_async_args));
2405 aa = ptlrpc_req_async_args(req);
2406 oa = kmem_cache_zalloc(obdo_cachep, GFP_NOFS);
2408 ptlrpc_req_finished(req);
2411 *oa = ((struct ost_body *)val)->oa;
2413 req->rq_interpret_reply = osc_shrink_grant_interpret;
2416 ptlrpc_request_set_replen(req);
2417 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2419 ptlrpc_set_add_req(set, req);
2420 ptlrpc_check_set(NULL, set);
2422 ptlrpcd_add_req(req);
2428 static int osc_reconnect(const struct lu_env *env,
2429 struct obd_export *exp, struct obd_device *obd,
2430 struct obd_uuid *cluuid,
2431 struct obd_connect_data *data,
2434 struct client_obd *cli = &obd->u.cli;
2436 if (data && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2439 spin_lock(&cli->cl_loi_list_lock);
2440 data->ocd_grant = (cli->cl_avail_grant +
2441 (cli->cl_dirty_pages << PAGE_SHIFT)) ?:
2442 2 * cli_brw_size(obd);
2443 lost_grant = cli->cl_lost_grant;
2444 cli->cl_lost_grant = 0;
2445 spin_unlock(&cli->cl_loi_list_lock);
2447 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d ocd_grant: %d, lost: %ld.\n",
2448 data->ocd_connect_flags,
2449 data->ocd_version, data->ocd_grant, lost_grant);
2455 static int osc_disconnect(struct obd_export *exp)
2457 struct obd_device *obd = class_exp2obd(exp);
2460 rc = client_disconnect_export(exp);
2462 * Initially we put del_shrink_grant before disconnect_export, but it
2463 * causes the following problem if setup (connect) and cleanup
2464 * (disconnect) are tangled together.
2465 * connect p1 disconnect p2
2466 * ptlrpc_connect_import
2467 * ............... class_manual_cleanup
2470 * ptlrpc_connect_interrupt
2472 * add this client to shrink list
2474 * Bang! pinger trigger the shrink.
2475 * So the osc should be disconnected from the shrink list, after we
2476 * are sure the import has been destroyed. BUG18662
2478 if (!obd->u.cli.cl_import)
2479 osc_del_shrink_grant(&obd->u.cli);
2483 static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
2484 struct cfs_hash_bd *bd,
2485 struct hlist_node *hnode, void *arg)
2487 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2488 struct osc_object *osc = NULL;
2489 struct lu_env *env = arg;
2490 struct ldlm_lock *lock;
2493 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2494 if (lock->l_ast_data && !osc) {
2495 osc = lock->l_ast_data;
2496 cl_object_get(osc2cl(osc));
2500 * clear LDLM_FL_CLEANED flag to make sure it will be canceled
2501 * by the 2nd round of ldlm_namespace_clean() call in
2502 * osc_import_event().
2504 ldlm_clear_cleaned(lock);
2509 osc_object_invalidate(env, osc);
2510 cl_object_put(env, osc2cl(osc));
2516 static int osc_import_event(struct obd_device *obd,
2517 struct obd_import *imp,
2518 enum obd_import_event event)
2520 struct client_obd *cli;
2523 LASSERT(imp->imp_obd == obd);
2526 case IMP_EVENT_DISCON: {
2528 spin_lock(&cli->cl_loi_list_lock);
2529 cli->cl_avail_grant = 0;
2530 cli->cl_lost_grant = 0;
2531 spin_unlock(&cli->cl_loi_list_lock);
2534 case IMP_EVENT_INACTIVE: {
2535 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2538 case IMP_EVENT_INVALIDATE: {
2539 struct ldlm_namespace *ns = obd->obd_namespace;
2543 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2545 env = cl_env_get(&refcheck);
2547 osc_io_unplug(env, &obd->u.cli, NULL);
2549 cfs_hash_for_each_nolock(ns->ns_rs_hash,
2550 osc_ldlm_resource_invalidate,
2552 cl_env_put(env, &refcheck);
2554 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2560 case IMP_EVENT_ACTIVE: {
2561 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2564 case IMP_EVENT_OCD: {
2565 struct obd_connect_data *ocd = &imp->imp_connect_data;
2567 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2568 osc_init_grant(&obd->u.cli, ocd);
2571 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2572 imp->imp_client->cli_request_portal = OST_REQUEST_PORTAL;
2574 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2577 case IMP_EVENT_DEACTIVATE: {
2578 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2581 case IMP_EVENT_ACTIVATE: {
2582 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2586 CERROR("Unknown import event %d\n", event);
2593 * Determine whether the lock can be canceled before replaying the lock
2594 * during recovery, see bug16774 for detailed information.
2596 * \retval zero the lock can't be canceled
2597 * \retval other ok to cancel
2599 static int osc_cancel_weight(struct ldlm_lock *lock)
2602 * Cancel all unused and granted extent lock.
2604 if (lock->l_resource->lr_type == LDLM_EXTENT &&
2605 lock->l_granted_mode == lock->l_req_mode &&
2606 osc_ldlm_weigh_ast(lock) == 0)
2612 static int brw_queue_work(const struct lu_env *env, void *data)
2614 struct client_obd *cli = data;
2616 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2618 osc_io_unplug(env, cli, NULL);
2622 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2624 struct lprocfs_static_vars lvars = { NULL };
2625 struct client_obd *cli = &obd->u.cli;
2632 rc = ptlrpcd_addref();
2636 rc = client_obd_setup(obd, lcfg);
2640 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2641 if (IS_ERR(handler)) {
2642 rc = PTR_ERR(handler);
2643 goto out_client_setup;
2645 cli->cl_writeback_work = handler;
2647 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2648 if (IS_ERR(handler)) {
2649 rc = PTR_ERR(handler);
2650 goto out_ptlrpcd_work;
2653 cli->cl_lru_work = handler;
2655 rc = osc_quota_setup(obd);
2657 goto out_ptlrpcd_work;
2659 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2660 lprocfs_osc_init_vars(&lvars);
2661 if (lprocfs_obd_setup(obd, lvars.obd_vars, lvars.sysfs_vars) == 0) {
2662 lproc_osc_attach_seqstat(obd);
2663 sptlrpc_lprocfs_cliobd_attach(obd);
2664 ptlrpc_lprocfs_register_obd(obd);
2668 * We try to control the total number of requests with a upper limit
2669 * osc_reqpool_maxreqcount. There might be some race which will cause
2670 * over-limit allocation, but it is fine.
2672 req_count = atomic_read(&osc_pool_req_count);
2673 if (req_count < osc_reqpool_maxreqcount) {
2674 adding = cli->cl_max_rpcs_in_flight + 2;
2675 if (req_count + adding > osc_reqpool_maxreqcount)
2676 adding = osc_reqpool_maxreqcount - req_count;
2678 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2679 atomic_add(added, &osc_pool_req_count);
2682 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2683 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2685 spin_lock(&osc_shrink_lock);
2686 list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
2687 spin_unlock(&osc_shrink_lock);
2692 if (cli->cl_writeback_work) {
2693 ptlrpcd_destroy_work(cli->cl_writeback_work);
2694 cli->cl_writeback_work = NULL;
2696 if (cli->cl_lru_work) {
2697 ptlrpcd_destroy_work(cli->cl_lru_work);
2698 cli->cl_lru_work = NULL;
2701 client_obd_cleanup(obd);
2707 static int osc_precleanup(struct obd_device *obd)
2709 struct client_obd *cli = &obd->u.cli;
2712 * for echo client, export may be on zombie list, wait for
2713 * zombie thread to cull it, because cli.cl_import will be
2714 * cleared in client_disconnect_export():
2715 * class_export_destroy() -> obd_cleanup() ->
2716 * echo_device_free() -> echo_client_cleanup() ->
2717 * obd_disconnect() -> osc_disconnect() ->
2718 * client_disconnect_export()
2720 obd_zombie_barrier();
2721 if (cli->cl_writeback_work) {
2722 ptlrpcd_destroy_work(cli->cl_writeback_work);
2723 cli->cl_writeback_work = NULL;
2726 if (cli->cl_lru_work) {
2727 ptlrpcd_destroy_work(cli->cl_lru_work);
2728 cli->cl_lru_work = NULL;
2731 obd_cleanup_client_import(obd);
2732 ptlrpc_lprocfs_unregister_obd(obd);
2733 lprocfs_obd_cleanup(obd);
2737 static int osc_cleanup(struct obd_device *obd)
2739 struct client_obd *cli = &obd->u.cli;
2742 spin_lock(&osc_shrink_lock);
2743 list_del(&cli->cl_shrink_list);
2744 spin_unlock(&osc_shrink_lock);
2747 if (cli->cl_cache) {
2748 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2749 spin_lock(&cli->cl_cache->ccc_lru_lock);
2750 list_del_init(&cli->cl_lru_osc);
2751 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2752 cli->cl_lru_left = NULL;
2753 cl_cache_decref(cli->cl_cache);
2754 cli->cl_cache = NULL;
2757 /* free memory of osc quota cache */
2758 osc_quota_cleanup(obd);
2760 rc = client_obd_cleanup(obd);
2766 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2768 struct lprocfs_static_vars lvars = { NULL };
2771 lprocfs_osc_init_vars(&lvars);
2773 switch (lcfg->lcfg_command) {
2775 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
2785 static int osc_process_config(struct obd_device *obd, u32 len, void *buf)
2787 return osc_process_config_base(obd, buf);
2790 static struct obd_ops osc_obd_ops = {
2791 .owner = THIS_MODULE,
2793 .precleanup = osc_precleanup,
2794 .cleanup = osc_cleanup,
2795 .add_conn = client_import_add_conn,
2796 .del_conn = client_import_del_conn,
2797 .connect = client_connect_import,
2798 .reconnect = osc_reconnect,
2799 .disconnect = osc_disconnect,
2800 .statfs = osc_statfs,
2801 .statfs_async = osc_statfs_async,
2802 .create = osc_create,
2803 .destroy = osc_destroy,
2804 .getattr = osc_getattr,
2805 .setattr = osc_setattr,
2806 .iocontrol = osc_iocontrol,
2807 .set_info_async = osc_set_info_async,
2808 .import_event = osc_import_event,
2809 .process_config = osc_process_config,
2810 .quotactl = osc_quotactl,
2813 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
2814 DEFINE_SPINLOCK(osc_shrink_lock);
2816 static struct shrinker osc_cache_shrinker = {
2817 .count_objects = osc_cache_shrink_count,
2818 .scan_objects = osc_cache_shrink_scan,
2819 .seeks = DEFAULT_SEEKS,
2822 static int __init osc_init(void)
2824 struct lprocfs_static_vars lvars = { NULL };
2825 unsigned int reqpool_size;
2826 unsigned int reqsize;
2829 /* print an address of _any_ initialized kernel symbol from this
2830 * module, to allow debugging with gdb that doesn't support data
2831 * symbols from modules.
2833 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2835 rc = lu_kmem_init(osc_caches);
2839 lprocfs_osc_init_vars(&lvars);
2841 rc = class_register_type(&osc_obd_ops, NULL,
2842 LUSTRE_OSC_NAME, &osc_device_type);
2846 register_shrinker(&osc_cache_shrinker);
2848 /* This is obviously too much memory, only prevent overflow here */
2849 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0) {
2854 reqpool_size = osc_reqpool_mem_max << 20;
2857 while (reqsize < OST_MAXREQSIZE)
2858 reqsize = reqsize << 1;
2861 * We don't enlarge the request count in OSC pool according to
2862 * cl_max_rpcs_in_flight. The allocation from the pool will only be
2863 * tried after normal allocation failed. So a small OSC pool won't
2864 * cause much performance degression in most of cases.
2866 osc_reqpool_maxreqcount = reqpool_size / reqsize;
2868 atomic_set(&osc_pool_req_count, 0);
2869 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_MAXREQSIZE,
2870 ptlrpc_add_rqs_to_pool);
2878 class_unregister_type(LUSTRE_OSC_NAME);
2880 lu_kmem_fini(osc_caches);
2884 static void /*__exit*/ osc_exit(void)
2886 unregister_shrinker(&osc_cache_shrinker);
2887 class_unregister_type(LUSTRE_OSC_NAME);
2888 lu_kmem_fini(osc_caches);
2889 ptlrpc_free_rq_pool(osc_rq_pool);
2892 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2893 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
2894 MODULE_LICENSE("GPL");
2895 MODULE_VERSION(LUSTRE_VERSION_STRING);
2897 module_init(osc_init);
2898 module_exit(osc_exit);