GNU Linux-libre 4.14.302-gnu1
[releases.git] / drivers / staging / lustre / lustre / obdclass / lu_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/lu_object.c
33  *
34  * Lustre Object.
35  * These are the only exported functions, they provide some generic
36  * infrastructure for managing object devices
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_CLASS
42
43 #include <linux/libcfs/libcfs.h>
44
45 #include <linux/module.h>
46
47 /* hash_long() */
48 #include <linux/libcfs/libcfs_hash.h>
49 #include <obd_class.h>
50 #include <obd_support.h>
51 #include <lustre_disk.h>
52 #include <lustre_fid.h>
53 #include <lu_object.h>
54 #include <cl_object.h>
55 #include <lu_ref.h>
56 #include <linux/list.h>
57
58 enum {
59         LU_CACHE_PERCENT_MAX     = 50,
60         LU_CACHE_PERCENT_DEFAULT = 20
61 };
62
63 #define LU_CACHE_NR_MAX_ADJUST          512
64 #define LU_CACHE_NR_UNLIMITED           -1
65 #define LU_CACHE_NR_DEFAULT             LU_CACHE_NR_UNLIMITED
66 #define LU_CACHE_NR_LDISKFS_LIMIT       LU_CACHE_NR_UNLIMITED
67 #define LU_CACHE_NR_ZFS_LIMIT           256
68
69 #define LU_SITE_BITS_MIN        12
70 #define LU_SITE_BITS_MAX        24
71 #define LU_SITE_BITS_MAX_CL     19
72 /**
73  * total 256 buckets, we don't want too many buckets because:
74  * - consume too much memory
75  * - avoid unbalanced LRU list
76  */
77 #define LU_SITE_BKT_BITS        8
78
79 static unsigned int lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
80 module_param(lu_cache_percent, int, 0644);
81 MODULE_PARM_DESC(lu_cache_percent, "Percentage of memory to be used as lu_object cache");
82
83 static long lu_cache_nr = LU_CACHE_NR_DEFAULT;
84 module_param(lu_cache_nr, long, 0644);
85 MODULE_PARM_DESC(lu_cache_nr, "Maximum number of objects in lu_object cache");
86
87 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
88 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx);
89
90 /**
91  * Decrease reference counter on object. If last reference is freed, return
92  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
93  * case, free object immediately.
94  */
95 void lu_object_put(const struct lu_env *env, struct lu_object *o)
96 {
97         struct lu_site_bkt_data *bkt;
98         struct lu_object_header *top;
99         struct lu_site    *site;
100         struct lu_object        *orig;
101         struct cfs_hash_bd          bd;
102         const struct lu_fid     *fid;
103
104         top  = o->lo_header;
105         site = o->lo_dev->ld_site;
106         orig = o;
107
108         /*
109          * till we have full fids-on-OST implemented anonymous objects
110          * are possible in OSP. such an object isn't listed in the site
111          * so we should not remove it from the site.
112          */
113         fid = lu_object_fid(o);
114         if (fid_is_zero(fid)) {
115                 LASSERT(!top->loh_hash.next && !top->loh_hash.pprev);
116                 LASSERT(list_empty(&top->loh_lru));
117                 if (!atomic_dec_and_test(&top->loh_ref))
118                         return;
119                 list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
120                         if (o->lo_ops->loo_object_release)
121                                 o->lo_ops->loo_object_release(env, o);
122                 }
123                 lu_object_free(env, orig);
124                 return;
125         }
126
127         cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd);
128         bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
129
130         if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) {
131                 if (lu_object_is_dying(top)) {
132                         /*
133                          * somebody may be waiting for this, currently only
134                          * used for cl_object, see cl_object_put_last().
135                          */
136                         wake_up_all(&bkt->lsb_marche_funebre);
137                 }
138                 return;
139         }
140
141         /*
142          * When last reference is released, iterate over object
143          * layers, and notify them that object is no longer busy.
144          */
145         list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
146                 if (o->lo_ops->loo_object_release)
147                         o->lo_ops->loo_object_release(env, o);
148         }
149
150         if (!lu_object_is_dying(top)) {
151                 LASSERT(list_empty(&top->loh_lru));
152                 list_add_tail(&top->loh_lru, &bkt->lsb_lru);
153                 bkt->lsb_lru_len++;
154                 percpu_counter_inc(&site->ls_lru_len_counter);
155                 CDEBUG(D_INODE, "Add %p to site lru. hash: %p, bkt: %p, lru_len: %ld\n",
156                        o, site->ls_obj_hash, bkt, bkt->lsb_lru_len);
157                 cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
158                 return;
159         }
160
161         /*
162          * If object is dying (will not be cached), then removed it
163          * from hash table and LRU.
164          *
165          * This is done with hash table and LRU lists locked. As the only
166          * way to acquire first reference to previously unreferenced
167          * object is through hash-table lookup (lu_object_find()),
168          * or LRU scanning (lu_site_purge()), that are done under hash-table
169          * and LRU lock, no race with concurrent object lookup is possible
170          * and we can safely destroy object below.
171          */
172         if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags))
173                 cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash);
174         cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
175         /*
176          * Object was already removed from hash and lru above, can
177          * kill it.
178          */
179         lu_object_free(env, orig);
180 }
181 EXPORT_SYMBOL(lu_object_put);
182
183 /**
184  * Kill the object and take it out of LRU cache.
185  * Currently used by client code for layout change.
186  */
187 void lu_object_unhash(const struct lu_env *env, struct lu_object *o)
188 {
189         struct lu_object_header *top;
190
191         top = o->lo_header;
192         set_bit(LU_OBJECT_HEARD_BANSHEE, &top->loh_flags);
193         if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags)) {
194                 struct lu_site *site = o->lo_dev->ld_site;
195                 struct cfs_hash *obj_hash = site->ls_obj_hash;
196                 struct cfs_hash_bd bd;
197
198                 cfs_hash_bd_get_and_lock(obj_hash, &top->loh_fid, &bd, 1);
199                 if (!list_empty(&top->loh_lru)) {
200                         struct lu_site_bkt_data *bkt;
201
202                         list_del_init(&top->loh_lru);
203                         bkt = cfs_hash_bd_extra_get(obj_hash, &bd);
204                         bkt->lsb_lru_len--;
205                         percpu_counter_dec(&site->ls_lru_len_counter);
206                 }
207                 cfs_hash_bd_del_locked(obj_hash, &bd, &top->loh_hash);
208                 cfs_hash_bd_unlock(obj_hash, &bd, 1);
209         }
210 }
211 EXPORT_SYMBOL(lu_object_unhash);
212
213 /**
214  * Allocate new object.
215  *
216  * This follows object creation protocol, described in the comment within
217  * struct lu_device_operations definition.
218  */
219 static struct lu_object *lu_object_alloc(const struct lu_env *env,
220                                          struct lu_device *dev,
221                                          const struct lu_fid *f,
222                                          const struct lu_object_conf *conf)
223 {
224         struct lu_object *scan;
225         struct lu_object *top;
226         struct list_head *layers;
227         unsigned int init_mask = 0;
228         unsigned int init_flag;
229         int clean;
230         int result;
231
232         /*
233          * Create top-level object slice. This will also create
234          * lu_object_header.
235          */
236         top = dev->ld_ops->ldo_object_alloc(env, NULL, dev);
237         if (!top)
238                 return ERR_PTR(-ENOMEM);
239         if (IS_ERR(top))
240                 return top;
241         /*
242          * This is the only place where object fid is assigned. It's constant
243          * after this point.
244          */
245         top->lo_header->loh_fid = *f;
246         layers = &top->lo_header->loh_layers;
247
248         do {
249                 /*
250                  * Call ->loo_object_init() repeatedly, until no more new
251                  * object slices are created.
252                  */
253                 clean = 1;
254                 init_flag = 1;
255                 list_for_each_entry(scan, layers, lo_linkage) {
256                         if (init_mask & init_flag)
257                                 goto next;
258                         clean = 0;
259                         scan->lo_header = top->lo_header;
260                         result = scan->lo_ops->loo_object_init(env, scan, conf);
261                         if (result != 0) {
262                                 lu_object_free(env, top);
263                                 return ERR_PTR(result);
264                         }
265                         init_mask |= init_flag;
266 next:
267                         init_flag <<= 1;
268                 }
269         } while (!clean);
270
271         list_for_each_entry_reverse(scan, layers, lo_linkage) {
272                 if (scan->lo_ops->loo_object_start) {
273                         result = scan->lo_ops->loo_object_start(env, scan);
274                         if (result != 0) {
275                                 lu_object_free(env, top);
276                                 return ERR_PTR(result);
277                         }
278                 }
279         }
280
281         lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED);
282         return top;
283 }
284
285 /**
286  * Free an object.
287  */
288 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
289 {
290         struct lu_site_bkt_data *bkt;
291         struct lu_site    *site;
292         struct lu_object        *scan;
293         struct list_head              *layers;
294         struct list_head               splice;
295
296         site   = o->lo_dev->ld_site;
297         layers = &o->lo_header->loh_layers;
298         bkt    = lu_site_bkt_from_fid(site, &o->lo_header->loh_fid);
299         /*
300          * First call ->loo_object_delete() method to release all resources.
301          */
302         list_for_each_entry_reverse(scan, layers, lo_linkage) {
303                 if (scan->lo_ops->loo_object_delete)
304                         scan->lo_ops->loo_object_delete(env, scan);
305         }
306
307         /*
308          * Then, splice object layers into stand-alone list, and call
309          * ->loo_object_free() on all layers to free memory. Splice is
310          * necessary, because lu_object_header is freed together with the
311          * top-level slice.
312          */
313         INIT_LIST_HEAD(&splice);
314         list_splice_init(layers, &splice);
315         while (!list_empty(&splice)) {
316                 /*
317                  * Free layers in bottom-to-top order, so that object header
318                  * lives as long as possible and ->loo_object_free() methods
319                  * can look at its contents.
320                  */
321                 o = container_of0(splice.prev, struct lu_object, lo_linkage);
322                 list_del_init(&o->lo_linkage);
323                 o->lo_ops->loo_object_free(env, o);
324         }
325
326         if (waitqueue_active(&bkt->lsb_marche_funebre))
327                 wake_up_all(&bkt->lsb_marche_funebre);
328 }
329
330 /**
331  * Free \a nr objects from the cold end of the site LRU list.
332  * if canblock is false, then don't block awaiting for another
333  * instance of lu_site_purge() to complete
334  */
335 int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s,
336                           int nr, bool canblock)
337 {
338         struct lu_object_header *h;
339         struct lu_object_header *temp;
340         struct lu_site_bkt_data *bkt;
341         struct cfs_hash_bd          bd;
342         struct cfs_hash_bd          bd2;
343         struct list_head               dispose;
344         int                   did_sth;
345         unsigned int start = 0;
346         int                   count;
347         int                   bnr;
348         unsigned int i;
349
350         if (OBD_FAIL_CHECK(OBD_FAIL_OBD_NO_LRU))
351                 return 0;
352
353         INIT_LIST_HEAD(&dispose);
354         /*
355          * Under LRU list lock, scan LRU list and move unreferenced objects to
356          * the dispose list, removing them from LRU and hash table.
357          */
358         if (nr != ~0)
359                 start = s->ls_purge_start;
360         bnr = (nr == ~0) ? -1 : nr / (int)CFS_HASH_NBKT(s->ls_obj_hash) + 1;
361  again:
362         /*
363          * It doesn't make any sense to make purge threads parallel, that can
364          * only bring troubles to us. See LU-5331.
365          */
366         if (canblock)
367                 mutex_lock(&s->ls_purge_mutex);
368         else if (!mutex_trylock(&s->ls_purge_mutex))
369                 goto out;
370
371         did_sth = 0;
372         cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
373                 if (i < start)
374                         continue;
375                 count = bnr;
376                 cfs_hash_bd_lock(s->ls_obj_hash, &bd, 1);
377                 bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
378
379                 list_for_each_entry_safe(h, temp, &bkt->lsb_lru, loh_lru) {
380                         LASSERT(atomic_read(&h->loh_ref) == 0);
381
382                         cfs_hash_bd_get(s->ls_obj_hash, &h->loh_fid, &bd2);
383                         LASSERT(bd.bd_bucket == bd2.bd_bucket);
384
385                         cfs_hash_bd_del_locked(s->ls_obj_hash,
386                                                &bd2, &h->loh_hash);
387                         list_move(&h->loh_lru, &dispose);
388                         bkt->lsb_lru_len--;
389                         percpu_counter_dec(&s->ls_lru_len_counter);
390                         if (did_sth == 0)
391                                 did_sth = 1;
392
393                         if (nr != ~0 && --nr == 0)
394                                 break;
395
396                         if (count > 0 && --count == 0)
397                                 break;
398                 }
399                 cfs_hash_bd_unlock(s->ls_obj_hash, &bd, 1);
400                 cond_resched();
401                 /*
402                  * Free everything on the dispose list. This is safe against
403                  * races due to the reasons described in lu_object_put().
404                  */
405                 while (!list_empty(&dispose)) {
406                         h = container_of0(dispose.next,
407                                           struct lu_object_header, loh_lru);
408                         list_del_init(&h->loh_lru);
409                         lu_object_free(env, lu_object_top(h));
410                         lprocfs_counter_incr(s->ls_stats, LU_SS_LRU_PURGED);
411                 }
412
413                 if (nr == 0)
414                         break;
415         }
416         mutex_unlock(&s->ls_purge_mutex);
417
418         if (nr != 0 && did_sth && start != 0) {
419                 start = 0; /* restart from the first bucket */
420                 goto again;
421         }
422         /* race on s->ls_purge_start, but nobody cares */
423         s->ls_purge_start = i % CFS_HASH_NBKT(s->ls_obj_hash);
424 out:
425         return nr;
426 }
427 EXPORT_SYMBOL(lu_site_purge_objects);
428
429 /*
430  * Object printing.
431  *
432  * Code below has to jump through certain loops to output object description
433  * into libcfs_debug_msg-based log. The problem is that lu_object_print()
434  * composes object description from strings that are parts of _lines_ of
435  * output (i.e., strings that are not terminated by newline). This doesn't fit
436  * very well into libcfs_debug_msg() interface that assumes that each message
437  * supplied to it is a self-contained output line.
438  *
439  * To work around this, strings are collected in a temporary buffer
440  * (implemented as a value of lu_cdebug_key key), until terminating newline
441  * character is detected.
442  *
443  */
444
445 enum {
446         /**
447          * Maximal line size.
448          *
449          * XXX overflow is not handled correctly.
450          */
451         LU_CDEBUG_LINE = 512
452 };
453
454 struct lu_cdebug_data {
455         /**
456          * Temporary buffer.
457          */
458         char lck_area[LU_CDEBUG_LINE];
459 };
460
461 /* context key constructor/destructor: lu_global_key_init, lu_global_key_fini */
462 LU_KEY_INIT_FINI(lu_global, struct lu_cdebug_data);
463
464 /**
465  * Key, holding temporary buffer. This key is registered very early by
466  * lu_global_init().
467  */
468 static struct lu_context_key lu_global_key = {
469         .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD |
470                     LCT_MG_THREAD | LCT_CL_THREAD | LCT_LOCAL,
471         .lct_init = lu_global_key_init,
472         .lct_fini = lu_global_key_fini
473 };
474
475 /**
476  * Printer function emitting messages through libcfs_debug_msg().
477  */
478 int lu_cdebug_printer(const struct lu_env *env,
479                       void *cookie, const char *format, ...)
480 {
481         struct libcfs_debug_msg_data *msgdata = cookie;
482         struct lu_cdebug_data   *key;
483         int used;
484         int complete;
485         va_list args;
486
487         va_start(args, format);
488
489         key = lu_context_key_get(&env->le_ctx, &lu_global_key);
490
491         used = strlen(key->lck_area);
492         complete = format[strlen(format) - 1] == '\n';
493         /*
494          * Append new chunk to the buffer.
495          */
496         vsnprintf(key->lck_area + used,
497                   ARRAY_SIZE(key->lck_area) - used, format, args);
498         if (complete) {
499                 if (cfs_cdebug_show(msgdata->msg_mask, msgdata->msg_subsys))
500                         libcfs_debug_msg(msgdata, "%s\n", key->lck_area);
501                 key->lck_area[0] = 0;
502         }
503         va_end(args);
504         return 0;
505 }
506 EXPORT_SYMBOL(lu_cdebug_printer);
507
508 /**
509  * Print object header.
510  */
511 void lu_object_header_print(const struct lu_env *env, void *cookie,
512                             lu_printer_t printer,
513                             const struct lu_object_header *hdr)
514 {
515         (*printer)(env, cookie, "header@%p[%#lx, %d, " DFID "%s%s%s]",
516                    hdr, hdr->loh_flags, atomic_read(&hdr->loh_ref),
517                    PFID(&hdr->loh_fid),
518                    hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
519                    list_empty((struct list_head *)&hdr->loh_lru) ? \
520                    "" : " lru",
521                    hdr->loh_attr & LOHA_EXISTS ? " exist":"");
522 }
523 EXPORT_SYMBOL(lu_object_header_print);
524
525 /**
526  * Print human readable representation of the \a o to the \a printer.
527  */
528 void lu_object_print(const struct lu_env *env, void *cookie,
529                      lu_printer_t printer, const struct lu_object *o)
530 {
531         static const char ruler[] = "........................................";
532         struct lu_object_header *top;
533         int depth = 4;
534
535         top = o->lo_header;
536         lu_object_header_print(env, cookie, printer, top);
537         (*printer)(env, cookie, "{\n");
538
539         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
540                 /*
541                  * print `.' \a depth times followed by type name and address
542                  */
543                 (*printer)(env, cookie, "%*.*s%s@%p", depth, depth, ruler,
544                            o->lo_dev->ld_type->ldt_name, o);
545
546                 if (o->lo_ops->loo_object_print)
547                         (*o->lo_ops->loo_object_print)(env, cookie, printer, o);
548
549                 (*printer)(env, cookie, "\n");
550         }
551
552         (*printer)(env, cookie, "} header@%p\n", top);
553 }
554 EXPORT_SYMBOL(lu_object_print);
555
556 static struct lu_object *htable_lookup(struct lu_site *s,
557                                        struct cfs_hash_bd *bd,
558                                        const struct lu_fid *f,
559                                        wait_queue_entry_t *waiter,
560                                        __u64 *version)
561 {
562         struct lu_site_bkt_data *bkt;
563         struct lu_object_header *h;
564         struct hlist_node       *hnode;
565         __u64  ver = cfs_hash_bd_version_get(bd);
566
567         if (*version == ver)
568                 return ERR_PTR(-ENOENT);
569
570         *version = ver;
571         bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd);
572         /* cfs_hash_bd_peek_locked is a somehow "internal" function
573          * of cfs_hash, it doesn't add refcount on object.
574          */
575         hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f);
576         if (!hnode) {
577                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
578                 return ERR_PTR(-ENOENT);
579         }
580
581         h = container_of0(hnode, struct lu_object_header, loh_hash);
582         if (likely(!lu_object_is_dying(h))) {
583                 cfs_hash_get(s->ls_obj_hash, hnode);
584                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
585                 if (!list_empty(&h->loh_lru)) {
586                         list_del_init(&h->loh_lru);
587                         bkt->lsb_lru_len--;
588                         percpu_counter_dec(&s->ls_lru_len_counter);
589                 }
590                 return lu_object_top(h);
591         }
592
593         /*
594          * Lookup found an object being destroyed this object cannot be
595          * returned (to assure that references to dying objects are eventually
596          * drained), and moreover, lookup has to wait until object is freed.
597          */
598
599         init_waitqueue_entry(waiter, current);
600         add_wait_queue(&bkt->lsb_marche_funebre, waiter);
601         set_current_state(TASK_UNINTERRUPTIBLE);
602         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_DEATH_RACE);
603         return ERR_PTR(-EAGAIN);
604 }
605
606 /**
607  * Search cache for an object with the fid \a f. If such object is found,
608  * return it. Otherwise, create new object, insert it into cache and return
609  * it. In any case, additional reference is acquired on the returned object.
610  */
611 static struct lu_object *lu_object_find(const struct lu_env *env,
612                                         struct lu_device *dev,
613                                         const struct lu_fid *f,
614                                         const struct lu_object_conf *conf)
615 {
616         return lu_object_find_at(env, dev->ld_site->ls_top_dev, f, conf);
617 }
618
619 /*
620  * Limit the lu_object cache to a maximum of lu_cache_nr objects.  Because
621  * the calculation for the number of objects to reclaim is not covered by
622  * a lock the maximum number of objects is capped by LU_CACHE_MAX_ADJUST.
623  * This ensures that many concurrent threads will not accidentally purge
624  * the entire cache.
625  */
626 static void lu_object_limit(const struct lu_env *env, struct lu_device *dev)
627 {
628         __u64 size, nr;
629
630         if (lu_cache_nr == LU_CACHE_NR_UNLIMITED)
631                 return;
632
633         size = cfs_hash_size_get(dev->ld_site->ls_obj_hash);
634         nr = (__u64)lu_cache_nr;
635         if (size <= nr)
636                 return;
637
638         lu_site_purge_objects(env, dev->ld_site,
639                               min_t(__u64, size - nr, LU_CACHE_NR_MAX_ADJUST),
640                               false);
641 }
642
643 static struct lu_object *lu_object_new(const struct lu_env *env,
644                                        struct lu_device *dev,
645                                        const struct lu_fid *f,
646                                        const struct lu_object_conf *conf)
647 {
648         struct lu_object        *o;
649         struct cfs_hash       *hs;
650         struct cfs_hash_bd          bd;
651
652         o = lu_object_alloc(env, dev, f, conf);
653         if (IS_ERR(o))
654                 return o;
655
656         hs = dev->ld_site->ls_obj_hash;
657         cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
658         cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
659         cfs_hash_bd_unlock(hs, &bd, 1);
660
661         lu_object_limit(env, dev);
662
663         return o;
664 }
665
666 /**
667  * Core logic of lu_object_find*() functions.
668  */
669 static struct lu_object *lu_object_find_try(const struct lu_env *env,
670                                             struct lu_device *dev,
671                                             const struct lu_fid *f,
672                                             const struct lu_object_conf *conf,
673                                             wait_queue_entry_t *waiter)
674 {
675         struct lu_object      *o;
676         struct lu_object      *shadow;
677         struct lu_site  *s;
678         struct cfs_hash     *hs;
679         struct cfs_hash_bd        bd;
680         __u64             version = 0;
681
682         /*
683          * This uses standard index maintenance protocol:
684          *
685          *     - search index under lock, and return object if found;
686          *     - otherwise, unlock index, allocate new object;
687          *     - lock index and search again;
688          *     - if nothing is found (usual case), insert newly created
689          *       object into index;
690          *     - otherwise (race: other thread inserted object), free
691          *       object just allocated.
692          *     - unlock index;
693          *     - return object.
694          *
695          * For "LOC_F_NEW" case, we are sure the object is new established.
696          * It is unnecessary to perform lookup-alloc-lookup-insert, instead,
697          * just alloc and insert directly.
698          *
699          * If dying object is found during index search, add @waiter to the
700          * site wait-queue and return ERR_PTR(-EAGAIN).
701          */
702         if (conf && conf->loc_flags & LOC_F_NEW)
703                 return lu_object_new(env, dev, f, conf);
704
705         s  = dev->ld_site;
706         hs = s->ls_obj_hash;
707         cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
708         o = htable_lookup(s, &bd, f, waiter, &version);
709         cfs_hash_bd_unlock(hs, &bd, 1);
710         if (!IS_ERR(o) || PTR_ERR(o) != -ENOENT)
711                 return o;
712
713         /*
714          * Allocate new object. This may result in rather complicated
715          * operations, including fld queries, inode loading, etc.
716          */
717         o = lu_object_alloc(env, dev, f, conf);
718         if (IS_ERR(o))
719                 return o;
720
721         LASSERT(lu_fid_eq(lu_object_fid(o), f));
722
723         cfs_hash_bd_lock(hs, &bd, 1);
724
725         shadow = htable_lookup(s, &bd, f, waiter, &version);
726         if (likely(PTR_ERR(shadow) == -ENOENT)) {
727                 cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
728                 cfs_hash_bd_unlock(hs, &bd, 1);
729
730                 lu_object_limit(env, dev);
731
732                 return o;
733         }
734
735         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
736         cfs_hash_bd_unlock(hs, &bd, 1);
737         lu_object_free(env, o);
738         return shadow;
739 }
740
741 /**
742  * Much like lu_object_find(), but top level device of object is specifically
743  * \a dev rather than top level device of the site. This interface allows
744  * objects of different "stacking" to be created within the same site.
745  */
746 struct lu_object *lu_object_find_at(const struct lu_env *env,
747                                     struct lu_device *dev,
748                                     const struct lu_fid *f,
749                                     const struct lu_object_conf *conf)
750 {
751         struct lu_site_bkt_data *bkt;
752         struct lu_object        *obj;
753         wait_queue_entry_t         wait;
754
755         while (1) {
756                 obj = lu_object_find_try(env, dev, f, conf, &wait);
757                 if (obj != ERR_PTR(-EAGAIN))
758                         return obj;
759                 /*
760                  * lu_object_find_try() already added waiter into the
761                  * wait queue.
762                  */
763                 schedule();
764                 bkt = lu_site_bkt_from_fid(dev->ld_site, (void *)f);
765                 remove_wait_queue(&bkt->lsb_marche_funebre, &wait);
766         }
767 }
768 EXPORT_SYMBOL(lu_object_find_at);
769
770 /**
771  * Find object with given fid, and return its slice belonging to given device.
772  */
773 struct lu_object *lu_object_find_slice(const struct lu_env *env,
774                                        struct lu_device *dev,
775                                        const struct lu_fid *f,
776                                        const struct lu_object_conf *conf)
777 {
778         struct lu_object *top;
779         struct lu_object *obj;
780
781         top = lu_object_find(env, dev, f, conf);
782         if (IS_ERR(top))
783                 return top;
784
785         obj = lu_object_locate(top->lo_header, dev->ld_type);
786         if (unlikely(!obj)) {
787                 lu_object_put(env, top);
788                 obj = ERR_PTR(-ENOENT);
789         }
790
791         return obj;
792 }
793 EXPORT_SYMBOL(lu_object_find_slice);
794
795 /**
796  * Global list of all device types.
797  */
798 static LIST_HEAD(lu_device_types);
799
800 int lu_device_type_init(struct lu_device_type *ldt)
801 {
802         int result = 0;
803
804         atomic_set(&ldt->ldt_device_nr, 0);
805         INIT_LIST_HEAD(&ldt->ldt_linkage);
806         if (ldt->ldt_ops->ldto_init)
807                 result = ldt->ldt_ops->ldto_init(ldt);
808
809         if (!result) {
810                 spin_lock(&obd_types_lock);
811                 list_add(&ldt->ldt_linkage, &lu_device_types);
812                 spin_unlock(&obd_types_lock);
813         }
814
815         return result;
816 }
817 EXPORT_SYMBOL(lu_device_type_init);
818
819 void lu_device_type_fini(struct lu_device_type *ldt)
820 {
821         spin_lock(&obd_types_lock);
822         list_del_init(&ldt->ldt_linkage);
823         spin_unlock(&obd_types_lock);
824         if (ldt->ldt_ops->ldto_fini)
825                 ldt->ldt_ops->ldto_fini(ldt);
826 }
827 EXPORT_SYMBOL(lu_device_type_fini);
828
829 /**
830  * Global list of all sites on this node
831  */
832 static LIST_HEAD(lu_sites);
833 static DECLARE_RWSEM(lu_sites_guard);
834
835 /**
836  * Global environment used by site shrinker.
837  */
838 static struct lu_env lu_shrink_env;
839
840 struct lu_site_print_arg {
841         struct lu_env   *lsp_env;
842         void        *lsp_cookie;
843         lu_printer_t     lsp_printer;
844 };
845
846 static int
847 lu_site_obj_print(struct cfs_hash *hs, struct cfs_hash_bd *bd,
848                   struct hlist_node *hnode, void *data)
849 {
850         struct lu_site_print_arg *arg = (struct lu_site_print_arg *)data;
851         struct lu_object_header  *h;
852
853         h = hlist_entry(hnode, struct lu_object_header, loh_hash);
854         if (!list_empty(&h->loh_layers)) {
855                 const struct lu_object *o;
856
857                 o = lu_object_top(h);
858                 lu_object_print(arg->lsp_env, arg->lsp_cookie,
859                                 arg->lsp_printer, o);
860         } else {
861                 lu_object_header_print(arg->lsp_env, arg->lsp_cookie,
862                                        arg->lsp_printer, h);
863         }
864         return 0;
865 }
866
867 /**
868  * Print all objects in \a s.
869  */
870 void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie,
871                    lu_printer_t printer)
872 {
873         struct lu_site_print_arg arg = {
874                 .lsp_env     = (struct lu_env *)env,
875                 .lsp_cookie  = cookie,
876                 .lsp_printer = printer,
877         };
878
879         cfs_hash_for_each(s->ls_obj_hash, lu_site_obj_print, &arg);
880 }
881 EXPORT_SYMBOL(lu_site_print);
882
883 /**
884  * Return desired hash table order.
885  */
886 static unsigned long lu_htable_order(struct lu_device *top)
887 {
888         unsigned long bits_max = LU_SITE_BITS_MAX;
889         unsigned long cache_size;
890         unsigned long bits;
891
892         if (!strcmp(top->ld_type->ldt_name, LUSTRE_VVP_NAME))
893                 bits_max = LU_SITE_BITS_MAX_CL;
894
895         /*
896          * Calculate hash table size, assuming that we want reasonable
897          * performance when 20% of total memory is occupied by cache of
898          * lu_objects.
899          *
900          * Size of lu_object is (arbitrary) taken as 1K (together with inode).
901          */
902         cache_size = totalram_pages;
903
904 #if BITS_PER_LONG == 32
905         /* limit hashtable size for lowmem systems to low RAM */
906         if (cache_size > 1 << (30 - PAGE_SHIFT))
907                 cache_size = 1 << (30 - PAGE_SHIFT) * 3 / 4;
908 #endif
909
910         /* clear off unreasonable cache setting. */
911         if (lu_cache_percent == 0 || lu_cache_percent > LU_CACHE_PERCENT_MAX) {
912                 CWARN("obdclass: invalid lu_cache_percent: %u, it must be in the range of (0, %u]. Will use default value: %u.\n",
913                       lu_cache_percent, LU_CACHE_PERCENT_MAX,
914                       LU_CACHE_PERCENT_DEFAULT);
915
916                 lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
917         }
918         cache_size = cache_size / 100 * lu_cache_percent *
919                 (PAGE_SIZE / 1024);
920
921         for (bits = 1; (1 << bits) < cache_size; ++bits)
922                 ;
923         return clamp_t(typeof(bits), bits, LU_SITE_BITS_MIN, bits_max);
924 }
925
926 static unsigned int lu_obj_hop_hash(struct cfs_hash *hs,
927                                     const void *key, unsigned int mask)
928 {
929         struct lu_fid  *fid = (struct lu_fid *)key;
930         __u32      hash;
931
932         hash = fid_flatten32(fid);
933         hash += (hash >> 4) + (hash << 12); /* mixing oid and seq */
934         hash = hash_long(hash, hs->hs_bkt_bits);
935
936         /* give me another random factor */
937         hash -= hash_long((unsigned long)hs, fid_oid(fid) % 11 + 3);
938
939         hash <<= hs->hs_cur_bits - hs->hs_bkt_bits;
940         hash |= (fid_seq(fid) + fid_oid(fid)) & (CFS_HASH_NBKT(hs) - 1);
941
942         return hash & mask;
943 }
944
945 static void *lu_obj_hop_object(struct hlist_node *hnode)
946 {
947         return hlist_entry(hnode, struct lu_object_header, loh_hash);
948 }
949
950 static void *lu_obj_hop_key(struct hlist_node *hnode)
951 {
952         struct lu_object_header *h;
953
954         h = hlist_entry(hnode, struct lu_object_header, loh_hash);
955         return &h->loh_fid;
956 }
957
958 static int lu_obj_hop_keycmp(const void *key, struct hlist_node *hnode)
959 {
960         struct lu_object_header *h;
961
962         h = hlist_entry(hnode, struct lu_object_header, loh_hash);
963         return lu_fid_eq(&h->loh_fid, (struct lu_fid *)key);
964 }
965
966 static void lu_obj_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
967 {
968         struct lu_object_header *h;
969
970         h = hlist_entry(hnode, struct lu_object_header, loh_hash);
971         atomic_inc(&h->loh_ref);
972 }
973
974 static void lu_obj_hop_put_locked(struct cfs_hash *hs, struct hlist_node *hnode)
975 {
976         LBUG(); /* we should never called it */
977 }
978
979 static struct cfs_hash_ops lu_site_hash_ops = {
980         .hs_hash        = lu_obj_hop_hash,
981         .hs_key         = lu_obj_hop_key,
982         .hs_keycmp      = lu_obj_hop_keycmp,
983         .hs_object      = lu_obj_hop_object,
984         .hs_get         = lu_obj_hop_get,
985         .hs_put_locked  = lu_obj_hop_put_locked,
986 };
987
988 static void lu_dev_add_linkage(struct lu_site *s, struct lu_device *d)
989 {
990         spin_lock(&s->ls_ld_lock);
991         if (list_empty(&d->ld_linkage))
992                 list_add(&d->ld_linkage, &s->ls_ld_linkage);
993         spin_unlock(&s->ls_ld_lock);
994 }
995
996 /**
997  * Initialize site \a s, with \a d as the top level device.
998  */
999 int lu_site_init(struct lu_site *s, struct lu_device *top)
1000 {
1001         struct lu_site_bkt_data *bkt;
1002         struct cfs_hash_bd bd;
1003         unsigned long bits;
1004         unsigned long i;
1005         char name[16];
1006         int rc;
1007
1008         memset(s, 0, sizeof(*s));
1009         mutex_init(&s->ls_purge_mutex);
1010
1011         rc = percpu_counter_init(&s->ls_lru_len_counter, 0, GFP_NOFS);
1012         if (rc)
1013                 return -ENOMEM;
1014
1015         snprintf(name, sizeof(name), "lu_site_%s", top->ld_type->ldt_name);
1016         for (bits = lu_htable_order(top); bits >= LU_SITE_BITS_MIN; bits--) {
1017                 s->ls_obj_hash = cfs_hash_create(name, bits, bits,
1018                                                  bits - LU_SITE_BKT_BITS,
1019                                                  sizeof(*bkt), 0, 0,
1020                                                  &lu_site_hash_ops,
1021                                                  CFS_HASH_SPIN_BKTLOCK |
1022                                                  CFS_HASH_NO_ITEMREF |
1023                                                  CFS_HASH_DEPTH |
1024                                                  CFS_HASH_ASSERT_EMPTY |
1025                                                  CFS_HASH_COUNTER);
1026                 if (s->ls_obj_hash)
1027                         break;
1028         }
1029
1030         if (!s->ls_obj_hash) {
1031                 CERROR("failed to create lu_site hash with bits: %lu\n", bits);
1032                 return -ENOMEM;
1033         }
1034
1035         cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
1036                 bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
1037                 INIT_LIST_HEAD(&bkt->lsb_lru);
1038                 init_waitqueue_head(&bkt->lsb_marche_funebre);
1039         }
1040
1041         s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0);
1042         if (!s->ls_stats) {
1043                 cfs_hash_putref(s->ls_obj_hash);
1044                 s->ls_obj_hash = NULL;
1045                 return -ENOMEM;
1046         }
1047
1048         lprocfs_counter_init(s->ls_stats, LU_SS_CREATED,
1049                              0, "created", "created");
1050         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_HIT,
1051                              0, "cache_hit", "cache_hit");
1052         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_MISS,
1053                              0, "cache_miss", "cache_miss");
1054         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_RACE,
1055                              0, "cache_race", "cache_race");
1056         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_DEATH_RACE,
1057                              0, "cache_death_race", "cache_death_race");
1058         lprocfs_counter_init(s->ls_stats, LU_SS_LRU_PURGED,
1059                              0, "lru_purged", "lru_purged");
1060
1061         INIT_LIST_HEAD(&s->ls_linkage);
1062         s->ls_top_dev = top;
1063         top->ld_site = s;
1064         lu_device_get(top);
1065         lu_ref_add(&top->ld_reference, "site-top", s);
1066
1067         INIT_LIST_HEAD(&s->ls_ld_linkage);
1068         spin_lock_init(&s->ls_ld_lock);
1069
1070         lu_dev_add_linkage(s, top);
1071
1072         return 0;
1073 }
1074 EXPORT_SYMBOL(lu_site_init);
1075
1076 /**
1077  * Finalize \a s and release its resources.
1078  */
1079 void lu_site_fini(struct lu_site *s)
1080 {
1081         down_write(&lu_sites_guard);
1082         list_del_init(&s->ls_linkage);
1083         up_write(&lu_sites_guard);
1084
1085         percpu_counter_destroy(&s->ls_lru_len_counter);
1086
1087         if (s->ls_obj_hash) {
1088                 cfs_hash_putref(s->ls_obj_hash);
1089                 s->ls_obj_hash = NULL;
1090         }
1091
1092         if (s->ls_top_dev) {
1093                 s->ls_top_dev->ld_site = NULL;
1094                 lu_ref_del(&s->ls_top_dev->ld_reference, "site-top", s);
1095                 lu_device_put(s->ls_top_dev);
1096                 s->ls_top_dev = NULL;
1097         }
1098
1099         if (s->ls_stats)
1100                 lprocfs_free_stats(&s->ls_stats);
1101 }
1102 EXPORT_SYMBOL(lu_site_fini);
1103
1104 /**
1105  * Called when initialization of stack for this site is completed.
1106  */
1107 int lu_site_init_finish(struct lu_site *s)
1108 {
1109         int result;
1110
1111         down_write(&lu_sites_guard);
1112         result = lu_context_refill(&lu_shrink_env.le_ctx);
1113         if (result == 0)
1114                 list_add(&s->ls_linkage, &lu_sites);
1115         up_write(&lu_sites_guard);
1116         return result;
1117 }
1118 EXPORT_SYMBOL(lu_site_init_finish);
1119
1120 /**
1121  * Acquire additional reference on device \a d
1122  */
1123 void lu_device_get(struct lu_device *d)
1124 {
1125         atomic_inc(&d->ld_ref);
1126 }
1127 EXPORT_SYMBOL(lu_device_get);
1128
1129 /**
1130  * Release reference on device \a d.
1131  */
1132 void lu_device_put(struct lu_device *d)
1133 {
1134         LASSERT(atomic_read(&d->ld_ref) > 0);
1135         atomic_dec(&d->ld_ref);
1136 }
1137 EXPORT_SYMBOL(lu_device_put);
1138
1139 /**
1140  * Initialize device \a d of type \a t.
1141  */
1142 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
1143 {
1144         if (atomic_inc_return(&t->ldt_device_nr) == 1 &&
1145             t->ldt_ops->ldto_start)
1146                 t->ldt_ops->ldto_start(t);
1147
1148         memset(d, 0, sizeof(*d));
1149         atomic_set(&d->ld_ref, 0);
1150         d->ld_type = t;
1151         lu_ref_init(&d->ld_reference);
1152         INIT_LIST_HEAD(&d->ld_linkage);
1153         return 0;
1154 }
1155 EXPORT_SYMBOL(lu_device_init);
1156
1157 /**
1158  * Finalize device \a d.
1159  */
1160 void lu_device_fini(struct lu_device *d)
1161 {
1162         struct lu_device_type *t = d->ld_type;
1163
1164         if (d->ld_obd) {
1165                 d->ld_obd->obd_lu_dev = NULL;
1166                 d->ld_obd = NULL;
1167         }
1168
1169         lu_ref_fini(&d->ld_reference);
1170         LASSERTF(atomic_read(&d->ld_ref) == 0,
1171                  "Refcount is %u\n", atomic_read(&d->ld_ref));
1172         LASSERT(atomic_read(&t->ldt_device_nr) > 0);
1173
1174         if (atomic_dec_and_test(&t->ldt_device_nr) &&
1175             t->ldt_ops->ldto_stop)
1176                 t->ldt_ops->ldto_stop(t);
1177 }
1178 EXPORT_SYMBOL(lu_device_fini);
1179
1180 /**
1181  * Initialize object \a o that is part of compound object \a h and was created
1182  * by device \a d.
1183  */
1184 int lu_object_init(struct lu_object *o, struct lu_object_header *h,
1185                    struct lu_device *d)
1186 {
1187         memset(o, 0, sizeof(*o));
1188         o->lo_header = h;
1189         o->lo_dev = d;
1190         lu_device_get(d);
1191         lu_ref_add_at(&d->ld_reference, &o->lo_dev_ref, "lu_object", o);
1192         INIT_LIST_HEAD(&o->lo_linkage);
1193
1194         return 0;
1195 }
1196 EXPORT_SYMBOL(lu_object_init);
1197
1198 /**
1199  * Finalize object and release its resources.
1200  */
1201 void lu_object_fini(struct lu_object *o)
1202 {
1203         struct lu_device *dev = o->lo_dev;
1204
1205         LASSERT(list_empty(&o->lo_linkage));
1206
1207         if (dev) {
1208                 lu_ref_del_at(&dev->ld_reference, &o->lo_dev_ref,
1209                               "lu_object", o);
1210                 lu_device_put(dev);
1211                 o->lo_dev = NULL;
1212         }
1213 }
1214 EXPORT_SYMBOL(lu_object_fini);
1215
1216 /**
1217  * Add object \a o as first layer of compound object \a h
1218  *
1219  * This is typically called by the ->ldo_object_alloc() method of top-level
1220  * device.
1221  */
1222 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
1223 {
1224         list_move(&o->lo_linkage, &h->loh_layers);
1225 }
1226 EXPORT_SYMBOL(lu_object_add_top);
1227
1228 /**
1229  * Add object \a o as a layer of compound object, going after \a before.
1230  *
1231  * This is typically called by the ->ldo_object_alloc() method of \a
1232  * before->lo_dev.
1233  */
1234 void lu_object_add(struct lu_object *before, struct lu_object *o)
1235 {
1236         list_move(&o->lo_linkage, &before->lo_linkage);
1237 }
1238 EXPORT_SYMBOL(lu_object_add);
1239
1240 /**
1241  * Initialize compound object.
1242  */
1243 int lu_object_header_init(struct lu_object_header *h)
1244 {
1245         memset(h, 0, sizeof(*h));
1246         atomic_set(&h->loh_ref, 1);
1247         INIT_HLIST_NODE(&h->loh_hash);
1248         INIT_LIST_HEAD(&h->loh_lru);
1249         INIT_LIST_HEAD(&h->loh_layers);
1250         lu_ref_init(&h->loh_reference);
1251         return 0;
1252 }
1253 EXPORT_SYMBOL(lu_object_header_init);
1254
1255 /**
1256  * Finalize compound object.
1257  */
1258 void lu_object_header_fini(struct lu_object_header *h)
1259 {
1260         LASSERT(list_empty(&h->loh_layers));
1261         LASSERT(list_empty(&h->loh_lru));
1262         LASSERT(hlist_unhashed(&h->loh_hash));
1263         lu_ref_fini(&h->loh_reference);
1264 }
1265 EXPORT_SYMBOL(lu_object_header_fini);
1266
1267 /**
1268  * Given a compound object, find its slice, corresponding to the device type
1269  * \a dtype.
1270  */
1271 struct lu_object *lu_object_locate(struct lu_object_header *h,
1272                                    const struct lu_device_type *dtype)
1273 {
1274         struct lu_object *o;
1275
1276         list_for_each_entry(o, &h->loh_layers, lo_linkage) {
1277                 if (o->lo_dev->ld_type == dtype)
1278                         return o;
1279         }
1280         return NULL;
1281 }
1282 EXPORT_SYMBOL(lu_object_locate);
1283
1284 /**
1285  * Finalize and free devices in the device stack.
1286  *
1287  * Finalize device stack by purging object cache, and calling
1288  * lu_device_type_operations::ldto_device_fini() and
1289  * lu_device_type_operations::ldto_device_free() on all devices in the stack.
1290  */
1291 void lu_stack_fini(const struct lu_env *env, struct lu_device *top)
1292 {
1293         struct lu_site   *site = top->ld_site;
1294         struct lu_device *scan;
1295         struct lu_device *next;
1296
1297         lu_site_purge(env, site, ~0);
1298         for (scan = top; scan; scan = next) {
1299                 next = scan->ld_type->ldt_ops->ldto_device_fini(env, scan);
1300                 lu_ref_del(&scan->ld_reference, "lu-stack", &lu_site_init);
1301                 lu_device_put(scan);
1302         }
1303
1304         /* purge again. */
1305         lu_site_purge(env, site, ~0);
1306
1307         for (scan = top; scan; scan = next) {
1308                 const struct lu_device_type *ldt = scan->ld_type;
1309                 struct obd_type      *type;
1310
1311                 next = ldt->ldt_ops->ldto_device_free(env, scan);
1312                 type = ldt->ldt_obd_type;
1313                 if (type) {
1314                         type->typ_refcnt--;
1315                         class_put_type(type);
1316                 }
1317         }
1318 }
1319
1320 enum {
1321         /**
1322          * Maximal number of tld slots.
1323          */
1324         LU_CONTEXT_KEY_NR = 40
1325 };
1326
1327 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
1328
1329 static DEFINE_SPINLOCK(lu_keys_guard);
1330 static atomic_t lu_key_initing_cnt = ATOMIC_INIT(0);
1331
1332 /**
1333  * Global counter incremented whenever key is registered, unregistered,
1334  * revived or quiesced. This is used to void unnecessary calls to
1335  * lu_context_refill(). No locking is provided, as initialization and shutdown
1336  * are supposed to be externally serialized.
1337  */
1338 static unsigned int key_set_version;
1339
1340 /**
1341  * Register new key.
1342  */
1343 int lu_context_key_register(struct lu_context_key *key)
1344 {
1345         int result;
1346         unsigned int i;
1347
1348         LASSERT(key->lct_init);
1349         LASSERT(key->lct_fini);
1350         LASSERT(key->lct_tags != 0);
1351
1352         result = -ENFILE;
1353         spin_lock(&lu_keys_guard);
1354         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1355                 if (!lu_keys[i]) {
1356                         key->lct_index = i;
1357                         atomic_set(&key->lct_used, 1);
1358                         lu_keys[i] = key;
1359                         lu_ref_init(&key->lct_reference);
1360                         result = 0;
1361                         ++key_set_version;
1362                         break;
1363                 }
1364         }
1365         spin_unlock(&lu_keys_guard);
1366         return result;
1367 }
1368 EXPORT_SYMBOL(lu_context_key_register);
1369
1370 static void key_fini(struct lu_context *ctx, int index)
1371 {
1372         if (ctx->lc_value && ctx->lc_value[index]) {
1373                 struct lu_context_key *key;
1374
1375                 key = lu_keys[index];
1376                 LASSERT(atomic_read(&key->lct_used) > 1);
1377
1378                 key->lct_fini(ctx, key, ctx->lc_value[index]);
1379                 lu_ref_del(&key->lct_reference, "ctx", ctx);
1380                 atomic_dec(&key->lct_used);
1381
1382                 if ((ctx->lc_tags & LCT_NOREF) == 0) {
1383 #ifdef CONFIG_MODULE_UNLOAD
1384                         LINVRNT(module_refcount(key->lct_owner) > 0);
1385 #endif
1386                         module_put(key->lct_owner);
1387                 }
1388                 ctx->lc_value[index] = NULL;
1389         }
1390 }
1391
1392 /**
1393  * Deregister key.
1394  */
1395 void lu_context_key_degister(struct lu_context_key *key)
1396 {
1397         LASSERT(atomic_read(&key->lct_used) >= 1);
1398         LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1399
1400         lu_context_key_quiesce(key);
1401
1402         ++key_set_version;
1403         spin_lock(&lu_keys_guard);
1404         key_fini(&lu_shrink_env.le_ctx, key->lct_index);
1405
1406         /**
1407          * Wait until all transient contexts referencing this key have
1408          * run lu_context_key::lct_fini() method.
1409          */
1410         while (atomic_read(&key->lct_used) > 1) {
1411                 spin_unlock(&lu_keys_guard);
1412                 CDEBUG(D_INFO, "%s: \"%s\" %p, %d\n",
1413                        __func__, key->lct_owner ? key->lct_owner->name : "",
1414                        key, atomic_read(&key->lct_used));
1415                 schedule();
1416                 spin_lock(&lu_keys_guard);
1417         }
1418         if (lu_keys[key->lct_index]) {
1419                 lu_keys[key->lct_index] = NULL;
1420                 lu_ref_fini(&key->lct_reference);
1421         }
1422         spin_unlock(&lu_keys_guard);
1423
1424         LASSERTF(atomic_read(&key->lct_used) == 1,
1425                  "key has instances: %d\n",
1426                  atomic_read(&key->lct_used));
1427 }
1428 EXPORT_SYMBOL(lu_context_key_degister);
1429
1430 /**
1431  * Register a number of keys. This has to be called after all keys have been
1432  * initialized by a call to LU_CONTEXT_KEY_INIT().
1433  */
1434 int lu_context_key_register_many(struct lu_context_key *k, ...)
1435 {
1436         struct lu_context_key *key = k;
1437         va_list args;
1438         int result;
1439
1440         va_start(args, k);
1441         do {
1442                 result = lu_context_key_register(key);
1443                 if (result)
1444                         break;
1445                 key = va_arg(args, struct lu_context_key *);
1446         } while (key);
1447         va_end(args);
1448
1449         if (result != 0) {
1450                 va_start(args, k);
1451                 while (k != key) {
1452                         lu_context_key_degister(k);
1453                         k = va_arg(args, struct lu_context_key *);
1454                 }
1455                 va_end(args);
1456         }
1457
1458         return result;
1459 }
1460 EXPORT_SYMBOL(lu_context_key_register_many);
1461
1462 /**
1463  * De-register a number of keys. This is a dual to
1464  * lu_context_key_register_many().
1465  */
1466 void lu_context_key_degister_many(struct lu_context_key *k, ...)
1467 {
1468         va_list args;
1469
1470         va_start(args, k);
1471         do {
1472                 lu_context_key_degister(k);
1473                 k = va_arg(args, struct lu_context_key*);
1474         } while (k);
1475         va_end(args);
1476 }
1477 EXPORT_SYMBOL(lu_context_key_degister_many);
1478
1479 /**
1480  * Revive a number of keys.
1481  */
1482 void lu_context_key_revive_many(struct lu_context_key *k, ...)
1483 {
1484         va_list args;
1485
1486         va_start(args, k);
1487         do {
1488                 lu_context_key_revive(k);
1489                 k = va_arg(args, struct lu_context_key*);
1490         } while (k);
1491         va_end(args);
1492 }
1493 EXPORT_SYMBOL(lu_context_key_revive_many);
1494
1495 /**
1496  * Quiescent a number of keys.
1497  */
1498 void lu_context_key_quiesce_many(struct lu_context_key *k, ...)
1499 {
1500         va_list args;
1501
1502         va_start(args, k);
1503         do {
1504                 lu_context_key_quiesce(k);
1505                 k = va_arg(args, struct lu_context_key*);
1506         } while (k);
1507         va_end(args);
1508 }
1509 EXPORT_SYMBOL(lu_context_key_quiesce_many);
1510
1511 /**
1512  * Return value associated with key \a key in context \a ctx.
1513  */
1514 void *lu_context_key_get(const struct lu_context *ctx,
1515                          const struct lu_context_key *key)
1516 {
1517         LINVRNT(ctx->lc_state == LCS_ENTERED);
1518         LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1519         LASSERT(lu_keys[key->lct_index] == key);
1520         return ctx->lc_value[key->lct_index];
1521 }
1522 EXPORT_SYMBOL(lu_context_key_get);
1523
1524 /**
1525  * List of remembered contexts. XXX document me.
1526  */
1527 static LIST_HEAD(lu_context_remembered);
1528
1529 /**
1530  * Destroy \a key in all remembered contexts. This is used to destroy key
1531  * values in "shared" contexts (like service threads), when a module owning
1532  * the key is about to be unloaded.
1533  */
1534 void lu_context_key_quiesce(struct lu_context_key *key)
1535 {
1536         struct lu_context *ctx;
1537
1538         if (!(key->lct_tags & LCT_QUIESCENT)) {
1539                 /*
1540                  * XXX memory barrier has to go here.
1541                  */
1542                 spin_lock(&lu_keys_guard);
1543                 key->lct_tags |= LCT_QUIESCENT;
1544
1545                 /**
1546                  * Wait until all lu_context_key::lct_init() methods
1547                  * have completed.
1548                  */
1549                 while (atomic_read(&lu_key_initing_cnt) > 0) {
1550                         spin_unlock(&lu_keys_guard);
1551                         CDEBUG(D_INFO, "%s: \"%s\" %p, %d (%d)\n",
1552                                __func__,
1553                                key->lct_owner ? key->lct_owner->name : "",
1554                                key, atomic_read(&key->lct_used),
1555                         atomic_read(&lu_key_initing_cnt));
1556                         schedule();
1557                         spin_lock(&lu_keys_guard);
1558                 }
1559
1560                 list_for_each_entry(ctx, &lu_context_remembered, lc_remember)
1561                         key_fini(ctx, key->lct_index);
1562                 spin_unlock(&lu_keys_guard);
1563                 ++key_set_version;
1564         }
1565 }
1566
1567 void lu_context_key_revive(struct lu_context_key *key)
1568 {
1569         key->lct_tags &= ~LCT_QUIESCENT;
1570         ++key_set_version;
1571 }
1572
1573 static void keys_fini(struct lu_context *ctx)
1574 {
1575         unsigned int i;
1576
1577         if (!ctx->lc_value)
1578                 return;
1579
1580         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
1581                 key_fini(ctx, i);
1582
1583         kfree(ctx->lc_value);
1584         ctx->lc_value = NULL;
1585 }
1586
1587 static int keys_fill(struct lu_context *ctx)
1588 {
1589         unsigned int i;
1590
1591         /*
1592          * A serialisation with lu_context_key_quiesce() is needed, but some
1593          * "key->lct_init()" are calling kernel memory allocation routine and
1594          * can't be called while holding a spin_lock.
1595          * "lu_keys_guard" is held while incrementing "lu_key_initing_cnt"
1596          * to ensure the start of the serialisation.
1597          * An atomic_t variable is still used, in order not to reacquire the
1598          * lock when decrementing the counter.
1599          */
1600         spin_lock(&lu_keys_guard);
1601         atomic_inc(&lu_key_initing_cnt);
1602         spin_unlock(&lu_keys_guard);
1603
1604         LINVRNT(ctx->lc_value);
1605         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1606                 struct lu_context_key *key;
1607
1608                 key = lu_keys[i];
1609                 if (!ctx->lc_value[i] && key &&
1610                     (key->lct_tags & ctx->lc_tags) &&
1611                     /*
1612                      * Don't create values for a LCT_QUIESCENT key, as this
1613                      * will pin module owning a key.
1614                      */
1615                     !(key->lct_tags & LCT_QUIESCENT)) {
1616                         void *value;
1617
1618                         LINVRNT(key->lct_init);
1619                         LINVRNT(key->lct_index == i);
1620
1621                         LASSERT(key->lct_owner);
1622                         if (!(ctx->lc_tags & LCT_NOREF) &&
1623                             !try_module_get(key->lct_owner)) {
1624                                 /* module is unloading, skip this key */
1625                                 continue;
1626                         }
1627
1628                         value = key->lct_init(ctx, key);
1629                         if (unlikely(IS_ERR(value))) {
1630                                 atomic_dec(&lu_key_initing_cnt);
1631                                 return PTR_ERR(value);
1632                         }
1633
1634                         lu_ref_add_atomic(&key->lct_reference, "ctx", ctx);
1635                         atomic_inc(&key->lct_used);
1636                         /*
1637                          * This is the only place in the code, where an
1638                          * element of ctx->lc_value[] array is set to non-NULL
1639                          * value.
1640                          */
1641                         ctx->lc_value[i] = value;
1642                         if (key->lct_exit)
1643                                 ctx->lc_tags |= LCT_HAS_EXIT;
1644                 }
1645                 ctx->lc_version = key_set_version;
1646         }
1647         atomic_dec(&lu_key_initing_cnt);
1648         return 0;
1649 }
1650
1651 static int keys_init(struct lu_context *ctx)
1652 {
1653         ctx->lc_value = kcalloc(ARRAY_SIZE(lu_keys), sizeof(ctx->lc_value[0]),
1654                                 GFP_NOFS);
1655         if (likely(ctx->lc_value))
1656                 return keys_fill(ctx);
1657
1658         return -ENOMEM;
1659 }
1660
1661 /**
1662  * Initialize context data-structure. Create values for all keys.
1663  */
1664 int lu_context_init(struct lu_context *ctx, __u32 tags)
1665 {
1666         int     rc;
1667
1668         memset(ctx, 0, sizeof(*ctx));
1669         ctx->lc_state = LCS_INITIALIZED;
1670         ctx->lc_tags = tags;
1671         if (tags & LCT_REMEMBER) {
1672                 spin_lock(&lu_keys_guard);
1673                 list_add(&ctx->lc_remember, &lu_context_remembered);
1674                 spin_unlock(&lu_keys_guard);
1675         } else {
1676                 INIT_LIST_HEAD(&ctx->lc_remember);
1677         }
1678
1679         rc = keys_init(ctx);
1680         if (rc != 0)
1681                 lu_context_fini(ctx);
1682
1683         return rc;
1684 }
1685 EXPORT_SYMBOL(lu_context_init);
1686
1687 /**
1688  * Finalize context data-structure. Destroy key values.
1689  */
1690 void lu_context_fini(struct lu_context *ctx)
1691 {
1692         LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1693         ctx->lc_state = LCS_FINALIZED;
1694
1695         if ((ctx->lc_tags & LCT_REMEMBER) == 0) {
1696                 LASSERT(list_empty(&ctx->lc_remember));
1697                 keys_fini(ctx);
1698
1699         } else { /* could race with key degister */
1700                 spin_lock(&lu_keys_guard);
1701                 keys_fini(ctx);
1702                 list_del_init(&ctx->lc_remember);
1703                 spin_unlock(&lu_keys_guard);
1704         }
1705 }
1706 EXPORT_SYMBOL(lu_context_fini);
1707
1708 /**
1709  * Called before entering context.
1710  */
1711 void lu_context_enter(struct lu_context *ctx)
1712 {
1713         LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1714         ctx->lc_state = LCS_ENTERED;
1715 }
1716 EXPORT_SYMBOL(lu_context_enter);
1717
1718 /**
1719  * Called after exiting from \a ctx
1720  */
1721 void lu_context_exit(struct lu_context *ctx)
1722 {
1723         unsigned int i;
1724
1725         LINVRNT(ctx->lc_state == LCS_ENTERED);
1726         ctx->lc_state = LCS_LEFT;
1727         if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value) {
1728                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1729                         /* could race with key quiescency */
1730                         if (ctx->lc_tags & LCT_REMEMBER)
1731                                 spin_lock(&lu_keys_guard);
1732                         if (ctx->lc_value[i]) {
1733                                 struct lu_context_key *key;
1734
1735                                 key = lu_keys[i];
1736                                 if (key->lct_exit)
1737                                         key->lct_exit(ctx,
1738                                                       key, ctx->lc_value[i]);
1739                         }
1740                         if (ctx->lc_tags & LCT_REMEMBER)
1741                                 spin_unlock(&lu_keys_guard);
1742                 }
1743         }
1744 }
1745 EXPORT_SYMBOL(lu_context_exit);
1746
1747 /**
1748  * Allocate for context all missing keys that were registered after context
1749  * creation. key_set_version is only changed in rare cases when modules
1750  * are loaded and removed.
1751  */
1752 int lu_context_refill(struct lu_context *ctx)
1753 {
1754         return likely(ctx->lc_version == key_set_version) ? 0 : keys_fill(ctx);
1755 }
1756
1757 /**
1758  * lu_ctx_tags/lu_ses_tags will be updated if there are new types of
1759  * obd being added. Currently, this is only used on client side, specifically
1760  * for echo device client, for other stack (like ptlrpc threads), context are
1761  * predefined when the lu_device type are registered, during the module probe
1762  * phase.
1763  */
1764 __u32 lu_context_tags_default;
1765 __u32 lu_session_tags_default;
1766
1767 int lu_env_init(struct lu_env *env, __u32 tags)
1768 {
1769         int result;
1770
1771         env->le_ses = NULL;
1772         result = lu_context_init(&env->le_ctx, tags);
1773         if (likely(result == 0))
1774                 lu_context_enter(&env->le_ctx);
1775         return result;
1776 }
1777 EXPORT_SYMBOL(lu_env_init);
1778
1779 void lu_env_fini(struct lu_env *env)
1780 {
1781         lu_context_exit(&env->le_ctx);
1782         lu_context_fini(&env->le_ctx);
1783         env->le_ses = NULL;
1784 }
1785 EXPORT_SYMBOL(lu_env_fini);
1786
1787 int lu_env_refill(struct lu_env *env)
1788 {
1789         int result;
1790
1791         result = lu_context_refill(&env->le_ctx);
1792         if (result == 0 && env->le_ses)
1793                 result = lu_context_refill(env->le_ses);
1794         return result;
1795 }
1796 EXPORT_SYMBOL(lu_env_refill);
1797
1798 struct lu_site_stats {
1799         unsigned        lss_populated;
1800         unsigned        lss_max_search;
1801         unsigned        lss_total;
1802         unsigned        lss_busy;
1803 };
1804
1805 static void lu_site_stats_get(struct cfs_hash *hs,
1806                               struct lu_site_stats *stats, int populated)
1807 {
1808         struct cfs_hash_bd bd;
1809         unsigned int i;
1810
1811         cfs_hash_for_each_bucket(hs, &bd, i) {
1812                 struct lu_site_bkt_data *bkt = cfs_hash_bd_extra_get(hs, &bd);
1813                 struct hlist_head       *hhead;
1814
1815                 cfs_hash_bd_lock(hs, &bd, 1);
1816                 stats->lss_busy  +=
1817                         cfs_hash_bd_count_get(&bd) - bkt->lsb_lru_len;
1818                 stats->lss_total += cfs_hash_bd_count_get(&bd);
1819                 stats->lss_max_search = max((int)stats->lss_max_search,
1820                                             cfs_hash_bd_depmax_get(&bd));
1821                 if (!populated) {
1822                         cfs_hash_bd_unlock(hs, &bd, 1);
1823                         continue;
1824                 }
1825
1826                 cfs_hash_bd_for_each_hlist(hs, &bd, hhead) {
1827                         if (!hlist_empty(hhead))
1828                                 stats->lss_populated++;
1829                 }
1830                 cfs_hash_bd_unlock(hs, &bd, 1);
1831         }
1832 }
1833
1834 /*
1835  * lu_cache_shrink_count() returns an approximate number of cached objects
1836  * that can be freed by shrink_slab(). A counter, which tracks the
1837  * number of items in the site's lru, is maintained in a percpu_counter
1838  * for each site. The percpu values are incremented and decremented as
1839  * objects are added or removed from the lru. The percpu values are summed
1840  * and saved whenever a percpu value exceeds a threshold. Thus the saved,
1841  * summed value at any given time may not accurately reflect the current
1842  * lru length. But this value is sufficiently accurate for the needs of
1843  * a shrinker.
1844  *
1845  * Using a per cpu counter is a compromise solution to concurrent access:
1846  * lu_object_put() can update the counter without locking the site and
1847  * lu_cache_shrink_count can sum the counters without locking each
1848  * ls_obj_hash bucket.
1849  */
1850 static unsigned long lu_cache_shrink_count(struct shrinker *sk,
1851                                            struct shrink_control *sc)
1852 {
1853         struct lu_site *s;
1854         struct lu_site *tmp;
1855         unsigned long cached = 0;
1856
1857         if (!(sc->gfp_mask & __GFP_FS))
1858                 return 0;
1859
1860         down_read(&lu_sites_guard);
1861         list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage)
1862                 cached += percpu_counter_read_positive(&s->ls_lru_len_counter);
1863         up_read(&lu_sites_guard);
1864
1865         cached = (cached / 100) * sysctl_vfs_cache_pressure;
1866         CDEBUG(D_INODE, "%ld objects cached, cache pressure %d\n",
1867                cached, sysctl_vfs_cache_pressure);
1868
1869         return cached;
1870 }
1871
1872 static unsigned long lu_cache_shrink_scan(struct shrinker *sk,
1873                                           struct shrink_control *sc)
1874 {
1875         struct lu_site *s;
1876         struct lu_site *tmp;
1877         unsigned long remain = sc->nr_to_scan, freed = 0;
1878         LIST_HEAD(splice);
1879
1880         if (!(sc->gfp_mask & __GFP_FS))
1881                 /* We must not take the lu_sites_guard lock when
1882                  * __GFP_FS is *not* set because of the deadlock
1883                  * possibility detailed above. Additionally,
1884                  * since we cannot determine the number of
1885                  * objects in the cache without taking this
1886                  * lock, we're in a particularly tough spot. As
1887                  * a result, we'll just lie and say our cache is
1888                  * empty. This _should_ be ok, as we can't
1889                  * reclaim objects when __GFP_FS is *not* set
1890                  * anyways.
1891                  */
1892                 return SHRINK_STOP;
1893
1894         down_write(&lu_sites_guard);
1895         list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
1896                 freed = lu_site_purge(&lu_shrink_env, s, remain);
1897                 remain -= freed;
1898                 /*
1899                  * Move just shrunk site to the tail of site list to
1900                  * assure shrinking fairness.
1901                  */
1902                 list_move_tail(&s->ls_linkage, &splice);
1903         }
1904         list_splice(&splice, lu_sites.prev);
1905         up_write(&lu_sites_guard);
1906
1907         return sc->nr_to_scan - remain;
1908 }
1909
1910 /**
1911  * Debugging printer function using printk().
1912  */
1913 static struct shrinker lu_site_shrinker = {
1914         .count_objects  = lu_cache_shrink_count,
1915         .scan_objects   = lu_cache_shrink_scan,
1916         .seeks          = DEFAULT_SEEKS,
1917 };
1918
1919 /**
1920  * Initialization of global lu_* data.
1921  */
1922 int lu_global_init(void)
1923 {
1924         int result;
1925
1926         CDEBUG(D_INFO, "Lustre LU module (%p).\n", &lu_keys);
1927
1928         result = lu_ref_global_init();
1929         if (result != 0)
1930                 return result;
1931
1932         LU_CONTEXT_KEY_INIT(&lu_global_key);
1933         result = lu_context_key_register(&lu_global_key);
1934         if (result != 0)
1935                 return result;
1936
1937         /*
1938          * At this level, we don't know what tags are needed, so allocate them
1939          * conservatively. This should not be too bad, because this
1940          * environment is global.
1941          */
1942         down_write(&lu_sites_guard);
1943         result = lu_env_init(&lu_shrink_env, LCT_SHRINKER);
1944         up_write(&lu_sites_guard);
1945         if (result != 0)
1946                 return result;
1947
1948         /*
1949          * seeks estimation: 3 seeks to read a record from oi, one to read
1950          * inode, one for ea. Unfortunately setting this high value results in
1951          * lu_object/inode cache consuming all the memory.
1952          */
1953         register_shrinker(&lu_site_shrinker);
1954
1955         return result;
1956 }
1957
1958 /**
1959  * Dual to lu_global_init().
1960  */
1961 void lu_global_fini(void)
1962 {
1963         unregister_shrinker(&lu_site_shrinker);
1964         lu_context_key_degister(&lu_global_key);
1965
1966         /*
1967          * Tear shrinker environment down _after_ de-registering
1968          * lu_global_key, because the latter has a value in the former.
1969          */
1970         down_write(&lu_sites_guard);
1971         lu_env_fini(&lu_shrink_env);
1972         up_write(&lu_sites_guard);
1973
1974         lu_ref_global_fini();
1975 }
1976
1977 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx)
1978 {
1979         struct lprocfs_counter ret;
1980
1981         lprocfs_stats_collect(stats, idx, &ret);
1982         return (__u32)ret.lc_count;
1983 }
1984
1985 /**
1986  * Output site statistical counters into a buffer. Suitable for
1987  * lprocfs_rd_*()-style functions.
1988  */
1989 int lu_site_stats_print(const struct lu_site *s, struct seq_file *m)
1990 {
1991         struct lu_site_stats stats;
1992
1993         memset(&stats, 0, sizeof(stats));
1994         lu_site_stats_get(s->ls_obj_hash, &stats, 1);
1995
1996         seq_printf(m, "%d/%d %d/%ld %d %d %d %d %d %d %d\n",
1997                    stats.lss_busy,
1998                    stats.lss_total,
1999                    stats.lss_populated,
2000                    CFS_HASH_NHLIST(s->ls_obj_hash),
2001                    stats.lss_max_search,
2002                    ls_stats_read(s->ls_stats, LU_SS_CREATED),
2003                    ls_stats_read(s->ls_stats, LU_SS_CACHE_HIT),
2004                    ls_stats_read(s->ls_stats, LU_SS_CACHE_MISS),
2005                    ls_stats_read(s->ls_stats, LU_SS_CACHE_RACE),
2006                    ls_stats_read(s->ls_stats, LU_SS_CACHE_DEATH_RACE),
2007                    ls_stats_read(s->ls_stats, LU_SS_LRU_PURGED));
2008         return 0;
2009 }
2010 EXPORT_SYMBOL(lu_site_stats_print);
2011
2012 /**
2013  * Helper function to initialize a number of kmem slab caches at once.
2014  */
2015 int lu_kmem_init(struct lu_kmem_descr *caches)
2016 {
2017         int result;
2018         struct lu_kmem_descr *iter = caches;
2019
2020         for (result = 0; iter->ckd_cache; ++iter) {
2021                 *iter->ckd_cache = kmem_cache_create(iter->ckd_name,
2022                                                         iter->ckd_size,
2023                                                         0, 0, NULL);
2024                 if (!*iter->ckd_cache) {
2025                         result = -ENOMEM;
2026                         /* free all previously allocated caches */
2027                         lu_kmem_fini(caches);
2028                         break;
2029                 }
2030         }
2031         return result;
2032 }
2033 EXPORT_SYMBOL(lu_kmem_init);
2034
2035 /**
2036  * Helper function to finalize a number of kmem slab cached at once. Dual to
2037  * lu_kmem_init().
2038  */
2039 void lu_kmem_fini(struct lu_kmem_descr *caches)
2040 {
2041         for (; caches->ckd_cache; ++caches) {
2042                 kmem_cache_destroy(*caches->ckd_cache);
2043                 *caches->ckd_cache = NULL;
2044         }
2045 }
2046 EXPORT_SYMBOL(lu_kmem_fini);
2047
2048 void lu_buf_free(struct lu_buf *buf)
2049 {
2050         LASSERT(buf);
2051         if (buf->lb_buf) {
2052                 LASSERT(buf->lb_len > 0);
2053                 kvfree(buf->lb_buf);
2054                 buf->lb_buf = NULL;
2055                 buf->lb_len = 0;
2056         }
2057 }
2058 EXPORT_SYMBOL(lu_buf_free);
2059
2060 void lu_buf_alloc(struct lu_buf *buf, size_t size)
2061 {
2062         LASSERT(buf);
2063         LASSERT(!buf->lb_buf);
2064         LASSERT(!buf->lb_len);
2065         buf->lb_buf = libcfs_kvzalloc(size, GFP_NOFS);
2066         if (likely(buf->lb_buf))
2067                 buf->lb_len = size;
2068 }
2069 EXPORT_SYMBOL(lu_buf_alloc);
2070
2071 void lu_buf_realloc(struct lu_buf *buf, size_t size)
2072 {
2073         lu_buf_free(buf);
2074         lu_buf_alloc(buf, size);
2075 }
2076 EXPORT_SYMBOL(lu_buf_realloc);
2077
2078 struct lu_buf *lu_buf_check_and_alloc(struct lu_buf *buf, size_t len)
2079 {
2080         if (!buf->lb_buf && !buf->lb_len)
2081                 lu_buf_alloc(buf, len);
2082
2083         if ((len > buf->lb_len) && buf->lb_buf)
2084                 lu_buf_realloc(buf, len);
2085
2086         return buf;
2087 }
2088 EXPORT_SYMBOL(lu_buf_check_and_alloc);
2089
2090 /**
2091  * Increase the size of the \a buf.
2092  * preserves old data in buffer
2093  * old buffer remains unchanged on error
2094  * \retval 0 or -ENOMEM
2095  */
2096 int lu_buf_check_and_grow(struct lu_buf *buf, size_t len)
2097 {
2098         char *ptr;
2099
2100         if (len <= buf->lb_len)
2101                 return 0;
2102
2103         ptr = libcfs_kvzalloc(len, GFP_NOFS);
2104         if (!ptr)
2105                 return -ENOMEM;
2106
2107         /* Free the old buf */
2108         if (buf->lb_buf) {
2109                 memcpy(ptr, buf->lb_buf, buf->lb_len);
2110                 kvfree(buf->lb_buf);
2111         }
2112
2113         buf->lb_buf = ptr;
2114         buf->lb_len = len;
2115         return 0;
2116 }